SpringBoot环境搭建
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
QueueEnum
消息队列配置枚举
import lombok.Getter;
/**
* 消息队列配置枚举
*/
@Getter
public enum QueueEnum {
/**
* 消息通知队列配置
*/
QUEUE_ORDER_CANCEL("ego.order.cancel.queue.exchange", "ego.order.cancel.queue", "ego.order.cancel"),
/**
* 消息通知ttl(Time To Live)队列配置
*/
QUEUE_TTL_ORDER_CANCEL("ego.order.cancel.exchange.ttl", "ego.order.cancel.queue.ttl", "ego.order.cancel.ttl");
/**
* 交换名称
*/
private String exchange;
/**
* 队列名称
*/
private String name;
/**
* 路由键
*/
private String routeKey;
QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
}
DelayedConfig
延迟消息配置
package com.yaorange.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayedConfig {
/**
* 订单消息实际消费队列所绑定的交换机
*/
@Bean
DirectExchange orderExchange() {
return ExchangeBuilder
.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 订单延迟队列队列所绑定的交换机
*/
@Bean
DirectExchange orderTtlExchange() {
return ExchangeBuilder
.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
.durable(true)
.build();
}
/**
* 订单实际消费队列
*/
@Bean
public Queue orderQueue() {
return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());
}
/**
* 订单延迟队列(死信队列)
*/
@Bean
public Queue orderTtlQueue() {
return QueueBuilder
.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
.withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机
.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键
.build();
}
/**
* 将订单队列绑定到交换机
*/
@Bean
Binding orderBinding(DirectExchange orderExchange, Queue orderQueue){
return BindingBuilder
.bind(orderQueue)
.to(orderExchange)
.with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
}
/**
* 将订单延迟队列绑定到交换机
*/
@Bean
Binding orderTtlBinding(DirectExchange orderTtlExchange,Queue orderTtlQueue){
return BindingBuilder
.bind(orderTtlQueue)
.to(orderTtlExchange)
.with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
}
}
生产者
/**
* 生产者
*/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = CancelOrderApplication.class)
public class Send {
private static Logger LOGGER = LoggerFactory.getLogger(Send.class);
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void send() throws InterruptedException {
Long orderId =1L;
for (int i = 0; i < 100; i++) {
Thread.sleep(1000);
//给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(5000));
return message;
}
});
}
LOGGER.info("发送延时消息 orderId:{}",orderId);
Thread.sleep(1000000);
}
}
消费者
开启手动确认
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #手动确认
/**
* 接收者
*/
@Component
@RabbitListener(queues = "ego.order.cancel.queue")
public class Receiver {
private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
@RabbitHandler
public void handle(Long orderId, Message message, Channel channel){
LOGGER.info("开始取消订单 orderId:{}",orderId);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}