消息确认confirmCallback:
一般消息投递路径:producer->rabbitmq broker->exchange->queue->consumer
confirm确认模式(从producer到exchange会返回一个confirmCallback)
return退回模式(从exchange到queue失败则会返回returnCallback)
# 应用名称
spring.application.name=springbootproducer
# 应用服务 WEB 访问端口
server.port=8080
spring.rabbitmq.host=47.115.203.188
spring.rabbitmq.port=5672
spring.rabbitmq.username=happylayga
spring.rabbitmq.password=flowerpower
spring.rabbitmq.virtual-host=/soulike
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
@Configuration
public class RabbitMQConfig {
public final static String EXCHANGE_NAME = "test_exchange_confirm";
public final static String QUEUE_NAME = "test_queue_confirm";
//交换机
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
}
//队列
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
//队列和交换机绑定关系
@Bean//不需要被注入所以不用给名了
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
}
}
@SpringBootTest
class ProducerApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 确认模式开启(在properties中)
*/
@Test
public void testConfirm() throws{
//在rabbitTemplate定义confirmCallBack回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 相关设置信息,在发送时设置的
* @param ack 发送成功与否
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) System.out.println("发送成功");
else System.out.println("接受状态:" + cause);
}
});
//发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm");
}
}

消息抵达队列returnCallback:
/**
* 回退模式,在消息发送给Exhange后,Exchange路由到Queue失败了才会返回
* 处理模式:如果消息没有路由到Queue,则丢弃消息(默认)或返回给发送方
*/
@Test
public void testReturn(){
//设置处理失败的模式,注意只有失败时才会看到消息
rabbitTemplate.setMandatory(true); 【也可以在配置文件里设置】
//设置ReturnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return执行了");
System.out.println(message);
System.out.println(replyCode);
System.out.println(replyText);
System.out.println(exchange);
System.out.println(routingKey);
}
});
rabbitTemplate.convertAndSend("test_exchange_confirm", "confsdfairm", "message confirm");
}

@Configuration
public class RabbitMQConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 订制RabbitTemplate
*/
@PostConstruct//创建完RabbitMQConfig后,再执行此来设置自定义template
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 设置确认回调[成功时生效]
* @param correlationData 当前消息的唯一关联数据(有消息的唯一id)
* @param ack 消息是否成功收到
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("========setConfirmCallback========");
System.out.println("confirm..." + correlationData); // convertAndSend时可以自定义new一个correlationData
System.out.println("ack..." + ack);
System.out.println("cause..." + cause);
System.out.println("========setConfirmCallback========");
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 设置抵达回调[失败时才生效],消息未抵达队列这里才会生效!!!
* @param message 投递时标详细信息
* @param replyCode 回复状态码
* @param replyText 回复文本内容
* @param exchange 消息发往的交换机
* @param routingkey 消息使用的路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingkey) {
System.out.println("========returnedMessage========");
System.out.println(message, replyCode, replyText, exchange, routingkey);
System.out.println("========returnedMessage========");
}
});
}
}
// @RabbitListener(queues = {"hello-java-queue"}) //实验不接收
@Service("orderReturnReasonService")
public class OrderReturnReasonServiceImpl extends ServiceImpl<OrderReturnReasonDao, OrderReturnReasonEntity> implements OrderReturnReasonService {
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException {
//实验不接收
}
public void receiveMessage(Message message, OrderEntity content, Channel channel) throws InterruptedException {
//实验不接收
}
}
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMq")
public String sendEntityMessageTest(@RequestParam(value = "num", defaultValue = "10") Integer num) {
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
OrderEntity orderEntity = new OrderEntity();
for (int i = 1; i <= num; i++) {
if (i % 2 == 0) {
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setCreateTime(new Date());
orderReturnReasonEntity.setName("第" + i + "号");
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnReasonEntity, new CorrelationData(UUID.randomUUID().toString()));
} else {
orderEntity.setOrderSn(UUID.randomUUID().toString());
orderEntity.setNote("第" + i + "号");
orderEntity.setCreateTime(new Date());
orderEntity.setId(2L);
rabbitTemplate.convertAndSend("hello-java-exchange", "hello2.java", orderEntity, new CorrelationData(UUID.randomUUID().toString()));
}
}
log.info("消息发送完成");
return "OK";
}
Consumer Ack,确认:
表示消费者端收到消息后的确认方式
自动确认acknowledge=“none”、手动确认acknowledge=“manual”、
根据异常类型确认acknowledge=“auto”(较难)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
/**
* Consumer ACK机制
* 1、手动签收(properties设置)
* 2、实现ChannelAwareMessageListener接口
* 3、如果消息成功处理,则调用channel的basicAck()签收
* 4、如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
*/
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
@RabbitListener(queues = "test_queue_confirm")
public void onMessage(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag:" + deliveryTag);
try {
//接受消息
System.out.println(new String(message.getBody()));
//处理业务逻辑,模拟出错
System.out.println("处理业务逻辑...");
int i = 3 / 0;
//手动签收,正确签收
/**
* long deliveryTag: the tag from the received (Basic.GetOk) or (Basic.Deliver)
* boolean multiple: true to acknowledge all message up to and including the supplied delivery tag
* false to acknowledge just the supplied delivery tag
*/
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
try {
//参数3是重回队列:消息回到队列,broker会重新发送直到正确处理为止
channel.basicNack(deliveryTag, true, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}

消费端限流:
MQ中接收多个请求后,消费者端从MQ中每秒拉取定量请求(1000个?)。这样的做法是为了避免消费端一次性处理过多请求导致宕机
spring.rabbitmq.listener.simple.prefetch=10
/**
* Consumer 限流机制
* 1、确保ack机制是手动确认的
* 2、配置属性perfectch,值是每次拉取消息的最大值
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
@RabbitListener(queues = "test_queue_confirm")
public void onMessage(Message message, Channel channel) throws IOException {
//获取消息
System.out.println(new String(message.getBody()));
//处理业务逻辑
//签收。假设不签收,控制台只会有10条消息
//channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}


TTL,全程Time To Live(存活时间/过期时间):
当消息到达存活时间还未被消费,就会被自动清除的。
统一过期:
@Configuration
public class RabbitMQConfig {
public final static String EXCHANGE_NAME = "test_exchange_ttl";
public final static String QUEUE_NAME = "test_queue_ttl";
//交换机
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//队列
@Bean("bootQueue")
public Queue bootQueue() {
//这里新增了x-message-ttl参数,用来设置过期时间(单位:毫秒)。或者单独用ttl()也是可以的
//return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 10000).build();
return QueueBuilder.durable(QUEUE_NAME).ttl(10000).build();
}
//队列和交换机绑定关系
@Bean//不需要被注入所以不用给名了
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
}
}
@Test
public void testTtl() {
for (int i = 1; i <= 200; i++) {
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.vivi", "message ttl...");
}
}
观察可见10秒钟后队列的所有消息自动过期
单独过期:
注意单独的过期时间可以不同于队列的过期时间,时间以两者中最短的为准
如果多条不同过期时间的消息,会从最顶端的开始判断是否过期再进行清除
比如,顶端是10秒过期,第二个是5秒过期,那么第二个过期后第一个仍未过期,那么第二个不会被清除
@Test
public void testTtl() {
//消息后处理对象,可以设置消息的参数信息
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置message信息
message.getMessageProperties().setExpiration("10000");//过期时间
//返回消息
return message;
}
};
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.vivi", "message ttl...", messagePostProcessor);
}
死信队列:
Dead Letter Exchange,死信交换机。当消息成为Dead message后,可以被重新发送到另一个交换机,这个就是死信交换机
比如上面,过期了的消息可以被发送到死信交换机去,由死信交换机送到其他消费者去

@Configuration
public class RabbitMQConfig {
private static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
private static final String TOPIC_QUEUE_NAME = "topic_queue";
public static final String DLX_EXCHANGE = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
public static final String DLX_ROUTING_KEY = "error";
//交换机
@Bean("bootExchange")
public Exchange itemTopicExchange() {
return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).build();
}
//死信交换机
@Bean("deadExchange")
public Exchange DlxExchange() {
return ExchangeBuilder.directExchange(DLX_EXCHANGE).build();
}
//队列
@Bean("bootQueue")
public Queue itemQueue() {
//与死信交换机绑定
return QueueBuilder
.durable(TOPIC_QUEUE_NAME)
.ttl(10000) //过期算死信
.maxLength(10) //过长算死信
.deadLetterExchange(DLX_EXCHANGE) //死信交换机
.deadLetterRoutingKey(DLX_ROUTING_KEY).build(); //死信路由
}
//死信队列
@Bean("deadQueue")
public Queue dlxQueue() {
return QueueBuilder.durable(DLX_QUEUE).build();
}
//队列和交换机绑定关系
@Bean//不需要被注入所以不用给名了
public Binding itemQueueExchange(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
@Bean
public Binding deadQueueExchange(@Qualifier("deadExchange") Exchange exchange, @Qualifier("deadQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(DLX_ROUTING_KEY).noargs();
}
}
@Test
public void testDlx() {
//测试过期时间的,10秒钟后会从topic_queue到dlx_queue
rabbitTemplate.convertAndSend("topic_exchange", "item.fuck", "我是一条有期限的消息,我会死掉吗?");
//测试长度问题,第50条之后都是死信,再过10秒100条全死信
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("topic_exchange", "item.fuck", "我是一条有期限的消息,我会死掉吗?");
}
//测试拒收
rabbitTemplate.convertAndSend("topic_exchange", "item.fuck", "我是一条有期限的消息,我会死掉吗?");
}
注意拒收需要修改消费者,不要让其重回队列里
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
@RabbitListener(queues = "topic_queue")
public void onMessage(Message message, Channel channel) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag:" + deliveryTag);
try {
//接受消息
System.out.println(new String(message.getBody()));
//处理业务逻辑,模拟出错
System.out.println("处理业务逻辑...");
int i = 3 / 0;
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
try {
//拒收。参数3是重回队列。但是如果是死信,应该要false,让其归队到死信里
System.out.println("异常,拒绝接受");
channel.basicNack(deliveryTag, true, false);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
延迟队列:
消息进入队列后不会立即被消费,只有到了指定时间了才会被消费。
典型:抢单后30分钟内为付款就取消订单
一般的延迟队列,是使用TTL+死信来完成的,rabbitmq本身未提供这样的功能
和上面的不同在于,消费者监听的是死信队列而不是正常队列
日志监控:
日志默认存放在/var/log/rabbitmq@xx.log