xmtrock
发布于 2021-06-26 / 264 阅读
0

SpringBoot、SpringCloud整合RabbitMq

依赖和配置:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>
rabbitmq:
  host: 47.115.203.188
  port: 5672
  username: xmtrock
  password: *****
  virtual-host: /learn_gulimall

创建:

@Autowired
private AmqpAdmin amqpAdmin;

@Test
public void createExchange(){
    //声明交换机
    DirectExchange directExchange = new DirectExchange("hello-java-exchange",false,false);
    amqpAdmin.declareExchange(directExchange);//无返回
    log.info("Exchange[{}]创建成功了","hello-java-exchange");
}
@Test
public void createQueue(){
    //参数3是排他[只能被已声明的连接使用],一般多人使用最好false
    Queue queue = new Queue("hello-java-queue",false,false,false);
    amqpAdmin.declareQueue(queue);
    log.info("Queue[{}]创建成功了","hello-java-exchange");
}
@Test
public void createBinding(){
    Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
    amqpAdmin.declareBinding(binding);
}

发送:

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void sendMessageTest() {
    String msg = "hello_world!!!";
    rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", msg);
    log.info("消息发送完成:[{}]", msg);
}
@Test
public void sendEntityMessageTest(){
    OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
    orderReturnReasonEntity.setId(1L);
    orderReturnReasonEntity.setCreateTime(new Date());
    orderReturnReasonEntity.setName("操你妈");
    rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnReasonEntity);
    log.info("消息发送完成:[{}]", orderReturnReasonEntity.toString());
}

20220419005724812
如果发送的是个对象,最好还是序列化一下,以便存储和传输,implements Serializable
如果不想生成如上序列化文,最好是让其生成json,配置如下

@Configuration
public class RabbitMQConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

监听:
直接在service里听

/**
 * 1. Message message是原生的消息类型,头+体
 * 2. T<发送的消息类型>
 * 3. import com.rabbitmq.client.Channel; 当前传输数据的通道
 *
 * Queue: 可以很多人来监听。只要收到消息,队列删除消息,而且只有一个人收到此消息
 * 场景:
 *      1> 订单服务启动多个: 同一个消息,只有一个客户端能收到,既交替
 *      2> 只有一个消息完全处理完,才会接受下一个
 */
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException {
    byte[] body = message.getBody();//消息体内容,也就是json数据
    MessageProperties messageProperties = message.getMessageProperties();//消息头属性信息
    Thread.sleep(3000);
    System.out.println("message:" + message);
    System.out.println("content:" + content.toString());
    System.out.println("channel:" + channel.toString());
    System.out.println("消息处理完成...");
}

丰富场景:发送不同的对象,实现重载,用到@RabbitListener

@RestController
@Slf4j
public class RabbitController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMq")
    public String sendEntityMessageTest(@RequestParam(value = "num", defaultValue = "10") Integer num) {
        OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
        OrderEntity orderEntity = new OrderEntity();
        for (int i = 1; i <= num; i++) {
            if (i % 2 == 0) {
                orderReturnReasonEntity.setId(1L);
                orderReturnReasonEntity.setCreateTime(new Date());
                orderReturnReasonEntity.setName("第" + i + "号");
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnReasonEntity);
            } else {
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                orderEntity.setNote("第" + i + "号");
                orderEntity.setCreateTime(new Date());
                orderEntity.setId(2L);
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderEntity);
            }
        }
        log.info("消息发送完成");
        return "OK";
    }
}
@RabbitListener(queues = {"hello-java-queue"})
public class OrderReturnReasonServiceImpl extends ServiceImpl<OrderReturnReasonDao, OrderReturnReasonEntity> implements OrderReturnReasonService {
    /**
     * 1. Message message是原生的消息类型,头+体
     * 2. T<发送的消息类型>
     * 3. import com.rabbitmq.client.Channel; 当前传输数据的通道
     *
     * Queue: 可以很多人来监听。只要收到消息,队列删除消息,而且只有一个人收到此消息
     * 场景:
     *      1> 订单服务启动多个: 同一个消息,只有一个客户端能收到,既交替
     *      2> 只有一个消息完全处理完,才会接受下一个
     */
    // @RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler //??类似重载
    public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException {
        byte[] body = message.getBody();//消息体内容,也就是json数据
        MessageProperties messageProperties = message.getMessageProperties();//消息头属性信息
        Thread.sleep(1000);
        System.out.println("message:" + message);
        System.out.println("content:" + content.toString());
        System.out.println("channel:" + channel.toString());
        System.out.println("消息处理完成...");
    }

    @RabbitHandler //??类似重载
    public void receiveMessage(Message message, OrderEntity content, Channel channel) throws InterruptedException {
        byte[] body = message.getBody();//消息体内容,也就是json数据
        MessageProperties messageProperties = message.getMessageProperties();//消息头属性信息
        Thread.sleep(1000);
        System.out.println("message:" + message);
        System.out.println("content:" + content.toString());
        System.out.println("channel:" + channel.toString());
        System.out.println("消息处理完成...");
    }
}

# 应用名称
spring.application.name=springbootproducer
# 应用服务 WEB 访问端口
server.port=8080
spring.rabbitmq.host=47.115.203.188
spring.rabbitmq.port=5672
spring.rabbitmq.username=happylayga
spring.rabbitmq.password=f*****
spring.rabbitmq.virtual-host=/soulike

生产者配置类:

/**
 * 交换机和队列需要手动创建,且需要手动绑定!!!
 * 然后交换机和队列的持久化也必须和这里的一致!!!
 */
@Configuration
public class RabbitMQConfig {
    public final static String EXCHANGE_NAME = "boot_topic_exchange";
    public final static String QUEUE_NAME = "boot_queue";

    //交换机
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //队列
    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //队列和交换机绑定关系
    @Bean//不需要被注入所以不用给名了
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}

使用生产者:

@SpringBootTest
class ProducerApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend(){
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello~~~");
    }
}

消费者监听器

@Component
public class RabbitMQListener {
    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message) {
        System.out.println(message);
    }
}

消费者直接启动主Application就可以看到控制台打印消息


=实际cloud工程中=

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

封装Service(生产者):

@Service
public class RabbitService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public boolean sendMessage(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
        return true;
    }
}

配置消息转换器(默认是字符串转换器):

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;

@Configuration
public class MQConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

cloud中生产者模块引入

<dependency>
    <groupId>com.soulike</groupId>
    <artifactId>rabbitmq_util</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

生产者配置properties

spring.rabbitmq.host=47.115.203.188
spring.rabbitmq.port=5672
spring.rabbitmq.username=happylayga
spring.rabbitmq.password=flowerpower
spring.rabbitmq.virtual-host=/yygh

util下配置对应的常量参数(交换机、队列,键)

public class MqConst {
    //预约下单
    public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
    public static final String ROUTING_ORDER = "order";
    //队列
    public static final String QUEUE_ORDER  = "queue.order";

    //短信
    public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
    public static final String ROUTING_MSM_ITEM = "msm.item";
    //队列
    public static final String QUEUE_MSM_ITEM  = "queue.msm.item";

    //定时任务
    public static final String EXCHANGE_DIRECT_TASK = "exchange.direct.task";
    public static final String ROUTING_TASK_8 = "task.8";
    //队列
    public static final String QUEUE_TASK_8 = "queue.task.8";
}

配置时交换机和队列键的自动生成

@Configuration
public class RabbitMQConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    private RabbitTemplate rabbitTemplate;

    //实现交换机和队列自动创建的超级关键点
    @Primary
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setMessageConverter(messageConverter());
        initRabbitTemplate();
        return rabbitTemplate;
    }

    public void initRabbitTemplate() {
        /**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            /**
             * 1、做好消息确认机制(publisher、consumer【手动ack】
             * 2、每一个发送的消息都做好数据库记录,并定期将失败消息重新发送
             */
            //服务器收到---持久化
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            //未达到队列,这里才报错了
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }
}

封装监听器(消费者)【监听到了对应的队列后才发送短信】

@Component
public class MsmReceiver{
    @Autowired
    private MailService mailService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = MqConst.QUEUE_MSM_ITEM, durable = "true"), //队列
            exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM), //交换机
            key = {MqConst.ROUTING_MSM_ITEM} //读取键
    ))
    public void send(MsmVo msmVo, Message message, Channel channel) {
        mailService.send(msmVo);
    }
}

生产者

rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_ORDER, MqConst.ROUTING_ORDER, orderMqVo);