当前位置 博文首页 > 军大君的博客:SPringBoot集成RabbitMQ实现30秒过期删除功能

    军大君的博客:SPringBoot集成RabbitMQ实现30秒过期删除功能

    作者:[db:作者] 时间:2021-07-16 09:38

    1.引入RabbitMQ 包

     		<!--RabbitMQ-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    2.编写配置文件

    spring:
      rabbitmq:
        host: 66.666.66.66    #IP地址
        port: 5672            #端口号
        username: guest
        password: guest
    

    3.编写配置类

    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class RabbitConfig {
        //定义延时队列
        @Bean("delayQueue")
        public Queue delayQueue(){
            //设置死信交换机和路由key
            return QueueBuilder.durable("delayQueue")
                    //如果消息过时,则会被投递到当前对应的my-dlx-exchange
                    .withArgument("x-dead-letter-exchange","my-dlx-exchange")
                    //如果消息过时,my-dlx-exchange会更具routing-key-delay投递消息到对应的队列
                    .withArgument("x-dead-letter-routing-key","routing-key-delay").build();
        }
        //定义死信队列
        @Bean("dlxQueue")
        public Queue dlxQueue(){
            return QueueBuilder.durable("my-dlx-queue").build();
        }
        //定义死信交换机
        @Bean("dlxExchange")
        public Exchange dlxExchange(){
            return ExchangeBuilder.directExchange("my-dlx-exchange").build();
        }
        //绑定死信队列与交换机
        @Bean("dlxBinding")
        public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange,@Qualifier("dlxQueue") Queue queue){
            return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs();
        }
    }
    

    4.编写生产者

    @Override
    	//这里的carid是我刚刚添加物品到购物车后的id,把car存到队列里面去
        public void sendCarToQueue(Integer carid) {
            Car car = carDao.findCarById(carid);
            Map<String,Car> map = new HashMap<>();
            map.put("msg",car);
            rabbitTemplate.convertAndSend("delayQueue", map, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //延迟队列延迟30秒后,发消息给死信交换机,
                    message.getMessageProperties().setExpiration("30000");
                    return message;
                }
            });
        }
    

    5.编写消费者

    
    import com.example.pojo.Car;
    import com.example.service.CarService;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    import java.util.List;
    
    @Component
    @RabbitListener(queues = "my-dlx-queue")
    public class CarAddListener {
        @Autowired
        CarService carService;
        @RabbitHandler
        public void receive(HashMap<String,Car> map){
            //c是rabbitMQ里面是购物车数据car
            for (Car c : map.values()){
                //我在这里根据用户id获取了购物车表里面的数据
                List<Car> carlist = carService.findByUserid(c.getUserid());
                for (Car car1 : carlist) {
                //通过遍历查看购物车里面的数据和rabbitMQ队列里面的数据是否一致[就是30秒内,购物车某一条数据的购买数量是否变化]
                    if(c.getOrdernumber().equals(car1.getOrdernumber()) && c.getUserid().equals(car1.getUserid())){
                        carService.deleteById(c.getId());
                    }
                }
            }   
        }    
    }
    
    cs