依赖和配置:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
rabbitmq:
host: 47.115.203.188
port: 5672
username: xmtrock
password: *****
virtual-host: /learn_gulimall
创建:
@Autowired
private AmqpAdmin amqpAdmin;
@Test
public void createExchange(){
//声明交换机
DirectExchange directExchange = new DirectExchange("hello-java-exchange",false,false);
amqpAdmin.declareExchange(directExchange);//无返回
log.info("Exchange[{}]创建成功了","hello-java-exchange");
}
@Test
public void createQueue(){
//参数3是排他[只能被已声明的连接使用],一般多人使用最好false
Queue queue = new Queue("hello-java-queue",false,false,false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功了","hello-java-exchange");
}
@Test
public void createBinding(){
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
amqpAdmin.declareBinding(binding);
}
发送:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessageTest() {
String msg = "hello_world!!!";
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", msg);
log.info("消息发送完成:[{}]", msg);
}
@Test
public void sendEntityMessageTest(){
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setCreateTime(new Date());
orderReturnReasonEntity.setName("操你妈");
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnReasonEntity);
log.info("消息发送完成:[{}]", orderReturnReasonEntity.toString());
}

如果发送的是个对象,最好还是序列化一下,以便存储和传输,implements Serializable
如果不想生成如上序列化文,最好是让其生成json,配置如下
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
监听:
直接在service里听
/**
* 1. Message message是原生的消息类型,头+体
* 2. T<发送的消息类型>
* 3. import com.rabbitmq.client.Channel; 当前传输数据的通道
*
* Queue: 可以很多人来监听。只要收到消息,队列删除消息,而且只有一个人收到此消息
* 场景:
* 1> 订单服务启动多个: 同一个消息,只有一个客户端能收到,既交替
* 2> 只有一个消息完全处理完,才会接受下一个
*/
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException {
byte[] body = message.getBody();//消息体内容,也就是json数据
MessageProperties messageProperties = message.getMessageProperties();//消息头属性信息
Thread.sleep(3000);
System.out.println("message:" + message);
System.out.println("content:" + content.toString());
System.out.println("channel:" + channel.toString());
System.out.println("消息处理完成...");
}
丰富场景:发送不同的对象,实现重载,用到@RabbitListener
@RestController
@Slf4j
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMq")
public String sendEntityMessageTest(@RequestParam(value = "num", defaultValue = "10") Integer num) {
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
OrderEntity orderEntity = new OrderEntity();
for (int i = 1; i <= num; i++) {
if (i % 2 == 0) {
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setCreateTime(new Date());
orderReturnReasonEntity.setName("第" + i + "号");
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderReturnReasonEntity);
} else {
orderEntity.setOrderSn(UUID.randomUUID().toString());
orderEntity.setNote("第" + i + "号");
orderEntity.setCreateTime(new Date());
orderEntity.setId(2L);
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", orderEntity);
}
}
log.info("消息发送完成");
return "OK";
}
}
@RabbitListener(queues = {"hello-java-queue"})
public class OrderReturnReasonServiceImpl extends ServiceImpl<OrderReturnReasonDao, OrderReturnReasonEntity> implements OrderReturnReasonService {
/**
* 1. Message message是原生的消息类型,头+体
* 2. T<发送的消息类型>
* 3. import com.rabbitmq.client.Channel; 当前传输数据的通道
*
* Queue: 可以很多人来监听。只要收到消息,队列删除消息,而且只有一个人收到此消息
* 场景:
* 1> 订单服务启动多个: 同一个消息,只有一个客户端能收到,既交替
* 2> 只有一个消息完全处理完,才会接受下一个
*/
// @RabbitListener(queues = {"hello-java-queue"})
@RabbitHandler //??类似重载
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException {
byte[] body = message.getBody();//消息体内容,也就是json数据
MessageProperties messageProperties = message.getMessageProperties();//消息头属性信息
Thread.sleep(1000);
System.out.println("message:" + message);
System.out.println("content:" + content.toString());
System.out.println("channel:" + channel.toString());
System.out.println("消息处理完成...");
}
@RabbitHandler //??类似重载
public void receiveMessage(Message message, OrderEntity content, Channel channel) throws InterruptedException {
byte[] body = message.getBody();//消息体内容,也就是json数据
MessageProperties messageProperties = message.getMessageProperties();//消息头属性信息
Thread.sleep(1000);
System.out.println("message:" + message);
System.out.println("content:" + content.toString());
System.out.println("channel:" + channel.toString());
System.out.println("消息处理完成...");
}
}
# 应用名称
spring.application.name=springbootproducer
# 应用服务 WEB 访问端口
server.port=8080
spring.rabbitmq.host=47.115.203.188
spring.rabbitmq.port=5672
spring.rabbitmq.username=happylayga
spring.rabbitmq.password=f*****
spring.rabbitmq.virtual-host=/soulike
生产者配置类:
/**
* 交换机和队列需要手动创建,且需要手动绑定!!!
* 然后交换机和队列的持久化也必须和这里的一致!!!
*/
@Configuration
public class RabbitMQConfig {
public final static String EXCHANGE_NAME = "boot_topic_exchange";
public final static String QUEUE_NAME = "boot_queue";
//交换机
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//队列
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
//队列和交换机绑定关系
@Bean//不需要被注入所以不用给名了
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
使用生产者:
@SpringBootTest
class ProducerApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","boot mq hello~~~");
}
}
消费者监听器
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message) {
System.out.println(message);
}
}
消费者直接启动主Application就可以看到控制台打印消息
=实际cloud工程中=
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
封装Service(生产者):
@Service
public class RabbitService {
@Autowired
private RabbitTemplate rabbitTemplate;
public boolean sendMessage(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
return true;
}
}
配置消息转换器(默认是字符串转换器):
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
@Configuration
public class MQConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
cloud中生产者模块引入
<dependency>
<groupId>com.soulike</groupId>
<artifactId>rabbitmq_util</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
生产者配置properties
spring.rabbitmq.host=47.115.203.188
spring.rabbitmq.port=5672
spring.rabbitmq.username=happylayga
spring.rabbitmq.password=flowerpower
spring.rabbitmq.virtual-host=/yygh
util下配置对应的常量参数(交换机、队列,键)
public class MqConst {
//预约下单
public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
public static final String ROUTING_ORDER = "order";
//队列
public static final String QUEUE_ORDER = "queue.order";
//短信
public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
public static final String ROUTING_MSM_ITEM = "msm.item";
//队列
public static final String QUEUE_MSM_ITEM = "queue.msm.item";
//定时任务
public static final String EXCHANGE_DIRECT_TASK = "exchange.direct.task";
public static final String ROUTING_TASK_8 = "task.8";
//队列
public static final String QUEUE_TASK_8 = "queue.task.8";
}
配置时交换机和队列键的自动生成
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
private RabbitTemplate rabbitTemplate;
//实现交换机和队列自动创建的超级关键点
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
public void initRabbitTemplate() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
/**
* 1、做好消息确认机制(publisher、consumer【手动ack】
* 2、每一个发送的消息都做好数据库记录,并定期将失败消息重新发送
*/
//服务器收到---持久化
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
//未达到队列,这里才报错了
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}
封装监听器(消费者)【监听到了对应的队列后才发送短信】
@Component
public class MsmReceiver{
@Autowired
private MailService mailService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = MqConst.QUEUE_MSM_ITEM, durable = "true"), //队列
exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM), //交换机
key = {MqConst.ROUTING_MSM_ITEM} //读取键
))
public void send(MsmVo msmVo, Message message, Channel channel) {
mailService.send(msmVo);
}
}
生产者
rabbitService.sendMessage(MqConst.EXCHANGE_DIRECT_ORDER, MqConst.ROUTING_ORDER, orderMqVo);