
1、Erlang的版本选择必须对应好
wget http://erlang.org/download/otp_src_22.3.tar.gz
https://www.erlang-solutions.com/downloads/
rpm -ivh esl-erlang_22.3.4-1_centos_7_amd64.rpm
实际版本自己补全
2、安装Erlang
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.14/rabbitmq-server-3.7.14-1.el7.noarch.rpm
yum install -y rabbitmq-server-3.8.17-1.el7.noarch.rpm
3、启动停止
service rabbitmq-server start
service rabbitmq-server restart
service rabbitmq-server stop
4、开启管理界面
管理界面命令:
rabbitmq-plugins enable rabbitmq_management
添加远程用户:
rabbitmqctl add_user happylayga flowerpower
加tag:
rabbitmqctl set_user_tags happylayga administrator
授权:
rabbitmqctl set_permissions -p "/" happylayga ".*" ".*" ".*"
通过ip:15672就可以访问后台,服务端口是5672
java整合的某案例(单通)
分别创建producer和consumer项目,都引入相关mq依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
</dependencies>
生产者:
/**
* 发送消息
*/
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("47.115.203.188");//默认是localhost
factory.setPort(5672);//默认5672
factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
factory.setUsername("happylayga");//默认guest
factory.setPassword("flowerpower");//默认guest
//3、创建连接Connection
Connection connection = factory.newConnection();
//4、创建channel
Channel channel = connection.createChannel();
//5、创建队列
/**
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* 队列名,持久化,是否独占(一般false),自动删除,参数信息
*/
channel.queueDeclare("hello_world", true, false, false, null);
String body = "hello rabbitmq~~~";
//6、发送消息
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
* 交换机名称、路由名称、配置信息,发送的消息数据(字节信息)
*/
channel.basicPublish("", "hello_world", null, body.getBytes(StandardCharsets.UTF_8));
//7、释放资源
channel.close();
connection.close();
}
}

消费者:
public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2、设置参数
factory.setHost("47.115.203.188");//默认是localhost
factory.setPort(5672);//默认5672
factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
factory.setUsername("happylayga");//默认guest
factory.setPassword("flowerpower");//默认guest
//3、创建连接Connection
Connection connection = factory.newConnection();
//4、创建channel
Channel channel = connection.createChannel();
//5、消费者
Consumer consumer = new DefaultConsumer(channel){
@Override //回调方法。消息表示,获取信息(交换机、路由),配置类信息,真是数据结果
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
//6、接受消息
/**
* String queue, boolean autoAck, Consumer callback
* 队列名,自动确认机制,消费者回调
*/
channel.basicConsume("hello_world",true,consumer);
//7、不要关闭资源,因为消费者是监听者,消息不是只有一个,关闭了后续消息就无法接受了
channel.close();
connection.close();
}
}

java整合的某案例(Work queues 消息分发)

对比之前,生产者是循环生成多条,而消费者两个接受同一个生产者产出的消息队列。代码方面变化不大
for (int i = 1; i <= 20; i++) {
String body = "No." + i + ": hello rabbitmq~~~";
channel.basicPublish("", "work_queues", null, body.getBytes(StandardCharsets.UTF_8));
}
最终效果是两个消费者分别获得各一半的生产到的消息
java整合的某案例(PubSub订阅模式)带交换机Exchange

交换机exchange的作用是处理消息,例如递交给某个特别队列、递交给所有队列、丢弃等。
exchange三种类型:Fanout广播(所有)、Direct定向(符合routingkey的队列)、Topic通配符(符合routingpattern的队列)
这个例子是Fanout(全体广播)
生产者:
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.115.203.188");//默认是localhost
factory.setPort(5672);//默认5672
factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
factory.setUsername("happylayga");//默认guest
factory.setPassword("flowerpower");//默认guest
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_fanout";
//创建交换机
/**
* String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments
* 交换机名称
* 交换机类型:DIRECT定向、FANOUT扇形广播、TOPIC通配符、HEADERS参数匹配
* 持久化、自动删除、内部使用(一般false)、参数
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
//创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
//绑定队列和交换机
/**
* String queue, String exchange, String routingKey
* 队列名、交换机名、路由键(绑定规则):如果交换机类型是fanout则""
*/
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");
//发送消息,释放资源
channel.basicPublish(exchangeName, "", null, "日志信息:调用了findall方法...日志级别info".getBytes(StandardCharsets.UTF_8));
channel.close();
connection.close();
}
}
消费者区别在queueName不同以及回调
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.115.203.188");//默认是localhost
factory.setPort(5672);//默认5672
factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
factory.setUsername("happylayga");//默认guest
factory.setPassword("flowerpower");//默认guest
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
Consumer consumer = new DefaultConsumer(channel){
@Override //回调方法。消息表示,获取信息(交换机、路由),配置类信息,真是数据结果
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("讲日志信息打印到了控制台/数据库");
}
};
channel.basicConsume(queue1Name,true,consumer);
//7、不要关闭资源,因为消费者是监听者,消息不是只有一个,关闭了后续消息就无法接受了
}
}


这样子,两边都会收到相同的信息,但是可以做到不同的操作
java整合的某案例(Routing路由模式)

对比之上,实现了不同的操作
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.115.203.188");//默认是localhost
factory.setPort(5672);//默认5672
factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
factory.setUsername("happylayga");//默认guest
factory.setPassword("flowerpower");//默认guest
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
//创建交换机
/**
* String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments
* 交换机名称
* 交换机类型:DIRECT定向、FANOUT扇形广播、TOPIC通配符、HEADERS参数匹配
* 持久化、自动删除、内部使用(一般false)、参数
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType., true, false, false, null);
//创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
//绑定队列和交换机
/**
* String queue, String exchange, String routingKey
* 队列名、交换机名、路由键(绑定规则):如果交换机类型是fanout则""
*/
channel.queueBind(queue1Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "info");//注意queue1和queue2绑定的级别不同
channel.queueBind(queue2Name, exchangeName, "warning");
channel.queueBind(queue2Name, exchangeName, "error");
//发送消息,释放资源
channel.basicPublish(exchangeName, "info", null, "日志信息:调用了findall方法...日志级别info".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "error", null, "日志信息:调用了findall方法...日志级别error".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "warning", null, "日志信息:调用了findall方法...日志级别warning".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "info", null, "日志信息:调用了findall方法...日志级别info".getBytes(StandardCharsets.UTF_8));
channel.close();
connection.close();
}
}



注意4条消息,其中消费者1只获得error级别1条消息,消费者2获得了全部
java整合的某案例(Topics通配符模式)

public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.115.203.188");//默认是localhost
factory.setPort(5672);//默认5672
factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
factory.setUsername("happylayga");//默认guest
factory.setPassword("flowerpower");//默认guest
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topics";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
//创建队列
String queue1Name = "test_topics_queue1";
String queue2Name = "test_topics_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
//绑定队列和交换机
/**
* String queue, String exchange, String routingKey
* 队列名、交换机名、路由键(绑定规则):如果交换机类型是fanout则""
*/
channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue2Name, exchangeName, "order.#");
channel.queueBind(queue2Name, exchangeName, "#.#");
//发送消息,释放资源
channel.basicPublish(exchangeName, "order.info", null, "日志信息:调用了findall方法...日志级别info".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "order.warning", null, "日志信息:调用了findall方法...日志级别warning".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "order.warning", null, "日志信息:调用了findall方法...日志级别warning".getBytes(StandardCharsets.UTF_8));
channel.basicPublish(exchangeName, "order.error", null, "日志信息:调用了findall方法...日志级别error".getBytes(StandardCharsets.UTF_8));
channel.close();
connection.close();
}
}


