xmtrock
发布于 2021-06-26 / 159 阅读
0

RabbitMQ高级特性

消息确认confirmCallback:

一般消息投递路径:producer->rabbitmq broker->exchange->queue->consumer
confirm确认模式(从producer到exchange会返回一个confirmCallback)
return退回模式(从exchange到queue失败则会返回returnCallback)

# 应用名称
spring.application.name=springbootproducer
# 应用服务 WEB 访问端口
server.port=8080
spring.rabbitmq.host=47.115.203.188
spring.rabbitmq.port=5672
spring.rabbitmq.username=happylayga
spring.rabbitmq.password=flowerpower
spring.rabbitmq.virtual-host=/soulike
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.publisher-confirm-type=correlated
@Configuration
public class RabbitMQConfig {
    public final static String EXCHANGE_NAME = "test_exchange_confirm";
    public final static String QUEUE_NAME = "test_queue_confirm";

    //交换机
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
    }

    //队列
    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //队列和交换机绑定关系
    @Bean//不需要被注入所以不用给名了
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();
    }
}
@SpringBootTest
class ProducerApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 确认模式开启(在properties中)
     */
    @Test
    public void testConfirm() throws{

        //在rabbitTemplate定义confirmCallBack回调函数
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 相关设置信息,在发送时设置的
             * @param ack 发送成功与否
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) System.out.println("发送成功");
                else System.out.println("接受状态:" + cause);
            }
        });

        //发送消息
        rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm");
    }
}

20210626171713416

消息抵达队列returnCallback:

/**
 * 回退模式,在消息发送给Exhange后,Exchange路由到Queue失败了才会返回
 * 处理模式:如果消息没有路由到Queue,则丢弃消息(默认)或返回给发送方
 */
@Test
public void testReturn(){
    //设置处理失败的模式,注意只有失败时才会看到消息
    rabbitTemplate.setMandatory(true); 【也可以在配置文件里设置】
    //设置ReturnCallback
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("return执行了");
            System.out.println(message);
            System.out.println(replyCode);
            System.out.println(replyText);
            System.out.println(exchange);
            System.out.println(routingKey);
        }
    });

    rabbitTemplate.convertAndSend("test_exchange_confirm", "confsdfairm", "message confirm");
}

20210626172625716


@Configuration
public class RabbitMQConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 订制RabbitTemplate
     */
    @PostConstruct//创建完RabbitMQConfig后,再执行此来设置自定义template
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 设置确认回调[成功时生效]
             * @param correlationData 当前消息的唯一关联数据(有消息的唯一id)
             * @param ack  消息是否成功收到
             * @param cause 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("========setConfirmCallback========");
                System.out.println("confirm..." + correlationData); // convertAndSend时可以自定义new一个correlationData
                System.out.println("ack..." + ack);
                System.out.println("cause..." + cause);
                System.out.println("========setConfirmCallback========");
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 设置抵达回调[失败时才生效],消息未抵达队列这里才会生效!!!
             * @param message 投递时标详细信息
             * @param replyCode 回复状态码
             * @param replyText 回复文本内容
             * @param exchange 消息发往的交换机
             * @param routingkey 消息使用的路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingkey) {
                System.out.println("========returnedMessage========");
                System.out.println(message, replyCode, replyText, exchange, routingkey);
                System.out.println("========returnedMessage========");
            }
        });
    }
}
// @RabbitListener(queues = {"hello-java-queue"}) //实验不接收
@Service("orderReturnReasonService")
public class OrderReturnReasonServiceImpl extends ServiceImpl<OrderReturnReasonDao, OrderReturnReasonEntity> implements OrderReturnReasonService {
    public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException {
        //实验不接收
    }
    public void receiveMessage(Message message, OrderEntity content, Channel channel) throws InterruptedException {
        //实验不接收
    }
}
@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMq")
public String sendEntityMessageTest(@RequestParam(value = "num", defaultValue = "10") Integer num) {
    OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
    OrderEntity orderEntity = new OrderEntity();
    for (int i = 1; i <= num; i++) {
        if (i % 2 == 0) {
            orderReturnReasonEntity.setId(1L);
            orderReturnReasonEntity.setCreateTime(new Date());
            orderReturnReasonEntity.setName("第" + i + "号");
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnReasonEntity, new CorrelationData(UUID.randomUUID().toString()));
        } else {
            orderEntity.setOrderSn(UUID.randomUUID().toString());
            orderEntity.setNote("第" + i + "号");
            orderEntity.setCreateTime(new Date());
            orderEntity.setId(2L);
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello2.java", orderEntity, new CorrelationData(UUID.randomUUID().toString()));
        }
    }
    log.info("消息发送完成");
    return "OK";
}

Consumer Ack,确认:

表示消费者端收到消息后的确认方式
自动确认acknowledge=“none”、手动确认acknowledge=“manual”
根据异常类型确认acknowledge=“auto”(较难)

spring.rabbitmq.listener.simple.acknowledge-mode=manual
/**
 * Consumer ACK机制
 * 1、手动签收(properties设置)
 * 2、实现ChannelAwareMessageListener接口
 * 3、如果消息成功处理,则调用channel的basicAck()签收
 * 4、如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
 */
@Component
public class AckListener implements ChannelAwareMessageListener {
    @Override
    @RabbitListener(queues = "test_queue_confirm")
    public void onMessage(Message message, Channel channel) {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag:" + deliveryTag);

        try {
            //接受消息
            System.out.println(new String(message.getBody()));
            //处理业务逻辑,模拟出错
            System.out.println("处理业务逻辑...");
            int i = 3 / 0;

            //手动签收,正确签收
            /**
             * long deliveryTag: the tag from the received (Basic.GetOk) or (Basic.Deliver)
             * boolean multiple: true to acknowledge all message up to and including the supplied delivery tag
             * false to acknowledge just the supplied delivery tag
             */
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            try {
                //参数3是重回队列:消息回到队列,broker会重新发送直到正确处理为止
                channel.basicNack(deliveryTag, true, true);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }
}

20210626215653001

消费端限流:

MQ中接收多个请求后,消费者端从MQ中每秒拉取定量请求(1000个?)。这样的做法是为了避免消费端一次性处理过多请求导致宕机

spring.rabbitmq.listener.simple.prefetch=10
/**
 * Consumer 限流机制
 * 1、确保ack机制是手动确认的
 * 2、配置属性perfectch,值是每次拉取消息的最大值
 */
@Component
public class QosListener implements ChannelAwareMessageListener {
    @Override
    @RabbitListener(queues = "test_queue_confirm")
    public void onMessage(Message message, Channel channel) throws IOException {
        //获取消息
        System.out.println(new String(message.getBody()));
        //处理业务逻辑

        //签收。假设不签收,控制台只会有10条消息
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }
}

20210626221251267
20210626221549967

TTL,全程Time To Live(存活时间/过期时间):

当消息到达存活时间还未被消费,就会被自动清除的。
统一过期:

@Configuration
public class RabbitMQConfig {
    public final static String EXCHANGE_NAME = "test_exchange_ttl";
    public final static String QUEUE_NAME = "test_queue_ttl";

    //交换机
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //队列
    @Bean("bootQueue")
    public Queue bootQueue() {
        //这里新增了x-message-ttl参数,用来设置过期时间(单位:毫秒)。或者单独用ttl()也是可以的
        //return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 10000).build();
        return QueueBuilder.durable(QUEUE_NAME).ttl(10000).build();
    }

    //队列和交换机绑定关系
    @Bean//不需要被注入所以不用给名了
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
    }
}
@Test
public void testTtl() {
    for (int i = 1; i <= 200; i++) {
        rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.vivi", "message ttl...");
    }
}

观察可见10秒钟后队列的所有消息自动过期
单独过期:
注意单独的过期时间可以不同于队列的过期时间,时间以两者中最短的为准
如果多条不同过期时间的消息,会从最顶端的开始判断是否过期再进行清除
比如,顶端是10秒过期,第二个是5秒过期,那么第二个过期后第一个仍未过期,那么第二个不会被清除

@Test
public void testTtl() {
    //消息后处理对象,可以设置消息的参数信息
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //设置message信息
            message.getMessageProperties().setExpiration("10000");//过期时间
            //返回消息
            return message;
        }
    };

    rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.vivi", "message ttl...", messagePostProcessor);
}

死信队列:

Dead Letter Exchange,死信交换机。当消息成为Dead message后,可以被重新发送到另一个交换机,这个就是死信交换机
比如上面,过期了的消息可以被发送到死信交换机去,由死信交换机送到其他消费者去
20210627001056329

@Configuration
public class RabbitMQConfig {
    private static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
    private static final String TOPIC_QUEUE_NAME = "topic_queue";

    public static final String DLX_EXCHANGE = "dlx_exchange";
    public static final String DLX_QUEUE = "dlx_queue";

    public static final String DLX_ROUTING_KEY = "error";

    //交换机
    @Bean("bootExchange")
    public Exchange itemTopicExchange() {
        return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE_NAME).build();
    }

    //死信交换机
    @Bean("deadExchange")
    public Exchange DlxExchange() {
        return ExchangeBuilder.directExchange(DLX_EXCHANGE).build();
    }

    //队列
    @Bean("bootQueue")
    public Queue itemQueue() {
        //与死信交换机绑定
        return QueueBuilder
                .durable(TOPIC_QUEUE_NAME)
                .ttl(10000) //过期算死信
                .maxLength(10) //过长算死信
                .deadLetterExchange(DLX_EXCHANGE) //死信交换机
                .deadLetterRoutingKey(DLX_ROUTING_KEY).build(); //死信路由
    }

    //死信队列
    @Bean("deadQueue")
    public Queue dlxQueue() {
        return QueueBuilder.durable(DLX_QUEUE).build();
    }

    //队列和交换机绑定关系
    @Bean//不需要被注入所以不用给名了
    public Binding itemQueueExchange(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
    }

    @Bean
    public Binding deadQueueExchange(@Qualifier("deadExchange") Exchange exchange, @Qualifier("deadQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(DLX_ROUTING_KEY).noargs();
    }
}
@Test
public void testDlx() {
    //测试过期时间的,10秒钟后会从topic_queue到dlx_queue
    rabbitTemplate.convertAndSend("topic_exchange", "item.fuck", "我是一条有期限的消息,我会死掉吗?");

    //测试长度问题,第50条之后都是死信,再过10秒100条全死信
    for (int i = 0; i < 100; i++) {
        rabbitTemplate.convertAndSend("topic_exchange", "item.fuck", "我是一条有期限的消息,我会死掉吗?");
    }

    //测试拒收
    rabbitTemplate.convertAndSend("topic_exchange", "item.fuck", "我是一条有期限的消息,我会死掉吗?");
}

注意拒收需要修改消费者,不要让其重回队列里

@Component
public class DlxListener implements ChannelAwareMessageListener {
    @Override
    @RabbitListener(queues = "topic_queue")
    public void onMessage(Message message, Channel channel) {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag:" + deliveryTag);

        try {
            //接受消息
            System.out.println(new String(message.getBody()));
            //处理业务逻辑,模拟出错
            System.out.println("处理业务逻辑...");
            int i = 3 / 0;
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            try {
                //拒收。参数3是重回队列。但是如果是死信,应该要false,让其归队到死信里
                System.out.println("异常,拒绝接受");
                channel.basicNack(deliveryTag, true, false);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }
}

延迟队列:

消息进入队列后不会立即被消费,只有到了指定时间了才会被消费。
典型:抢单后30分钟内为付款就取消订单
一般的延迟队列,是使用TTL+死信来完成的,rabbitmq本身未提供这样的功能
和上面的不同在于,消费者监听的是死信队列而不是正常队列

日志监控:

日志默认存放在/var/log/rabbitmq@xx.log