当前位置 博文首页 > 军大君的博客:SPringBoot集成RabbitMQ实现30秒过期删除功能
1.引入RabbitMQ 包
<!--RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.编写配置文件
spring:
rabbitmq:
host: 66.666.66.66 #IP地址
port: 5672 #端口号
username: guest
password: guest
3.编写配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
//定义延时队列
@Bean("delayQueue")
public Queue delayQueue(){
//设置死信交换机和路由key
return QueueBuilder.durable("delayQueue")
//如果消息过时,则会被投递到当前对应的my-dlx-exchange
.withArgument("x-dead-letter-exchange","my-dlx-exchange")
//如果消息过时,my-dlx-exchange会更具routing-key-delay投递消息到对应的队列
.withArgument("x-dead-letter-routing-key","routing-key-delay").build();
}
//定义死信队列
@Bean("dlxQueue")
public Queue dlxQueue(){
return QueueBuilder.durable("my-dlx-queue").build();
}
//定义死信交换机
@Bean("dlxExchange")
public Exchange dlxExchange(){
return ExchangeBuilder.directExchange("my-dlx-exchange").build();
}
//绑定死信队列与交换机
@Bean("dlxBinding")
public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange,@Qualifier("dlxQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs();
}
}
4.编写生产者
@Override
//这里的carid是我刚刚添加物品到购物车后的id,把car存到队列里面去
public void sendCarToQueue(Integer carid) {
Car car = carDao.findCarById(carid);
Map<String,Car> map = new HashMap<>();
map.put("msg",car);
rabbitTemplate.convertAndSend("delayQueue", map, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//延迟队列延迟30秒后,发消息给死信交换机,
message.getMessageProperties().setExpiration("30000");
return message;
}
});
}
5.编写消费者
import com.example.pojo.Car;
import com.example.service.CarService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
@Component
@RabbitListener(queues = "my-dlx-queue")
public class CarAddListener {
@Autowired
CarService carService;
@RabbitHandler
public void receive(HashMap<String,Car> map){
//c是rabbitMQ里面是购物车数据car
for (Car c : map.values()){
//我在这里根据用户id获取了购物车表里面的数据
List<Car> carlist = carService.findByUserid(c.getUserid());
for (Car car1 : carlist) {
//通过遍历查看购物车里面的数据和rabbitMQ队列里面的数据是否一致[就是30秒内,购物车某一条数据的购买数量是否变化]
if(c.getOrdernumber().equals(car1.getOrdernumber()) && c.getUserid().equals(car1.getUserid())){
carService.deleteById(c.getId());
}
}
}
}
}
cs