xmtrock
发布于 2021-06-26 / 335 阅读
0

RabbitMq学习

20220418163022907

1、Erlang的版本选择必须对应好

wget http://erlang.org/download/otp_src_22.3.tar.gz
https://www.erlang-solutions.com/downloads/
rpm -ivh esl-erlang_22.3.4-1_centos_7_amd64.rpm
实际版本自己补全

2、安装Erlang

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.14/rabbitmq-server-3.7.14-1.el7.noarch.rpm
yum install -y rabbitmq-server-3.8.17-1.el7.noarch.rpm

3、启动停止

service rabbitmq-server start
service rabbitmq-server restart
service rabbitmq-server stop

4、开启管理界面

管理界面命令:
rabbitmq-plugins enable rabbitmq_management
添加远程用户:
rabbitmqctl add_user happylayga flowerpower
加tag:
rabbitmqctl set_user_tags happylayga administrator
授权:
rabbitmqctl set_permissions -p "/" happylayga ".*" ".*" ".*"
通过ip:15672就可以访问后台,服务端口是5672

java整合的某案例(单通)

分别创建producer和consumer项目,都引入相关mq依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.12.0</version>
    </dependency>
</dependencies>

生产者:

/**
 * 发送消息
 */
public class Producer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2、设置参数
        factory.setHost("47.115.203.188");//默认是localhost
        factory.setPort(5672);//默认5672
        factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
        factory.setUsername("happylayga");//默认guest
        factory.setPassword("flowerpower");//默认guest

        //3、创建连接Connection
        Connection connection = factory.newConnection();

        //4、创建channel
        Channel channel = connection.createChannel();

        //5、创建队列
        /**
         * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
         * 队列名,持久化,是否独占(一般false),自动删除,参数信息
         */
        channel.queueDeclare("hello_world", true, false, false, null);

        String body = "hello rabbitmq~~~";

        //6、发送消息
        /**
         * String exchange, String routingKey, BasicProperties props, byte[] body
         * 交换机名称、路由名称、配置信息,发送的消息数据(字节信息)
         */
        channel.basicPublish("", "hello_world", null, body.getBytes(StandardCharsets.UTF_8));

        //7、释放资源
        channel.close();
        connection.close();
    }
}

20210626022434016
消费者:

public class Consumer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2、设置参数
        factory.setHost("47.115.203.188");//默认是localhost
        factory.setPort(5672);//默认5672
        factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
        factory.setUsername("happylayga");//默认guest
        factory.setPassword("flowerpower");//默认guest

        //3、创建连接Connection
        Connection connection = factory.newConnection();

        //4、创建channel
        Channel channel = connection.createChannel();

        //5、消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override //回调方法。消息表示,获取信息(交换机、路由),配置类信息,真是数据结果
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };

        //6、接受消息
        /**
         * String queue, boolean autoAck, Consumer callback
         * 队列名,自动确认机制,消费者回调
         */
        channel.basicConsume("hello_world",true,consumer);

        //7、不要关闭资源,因为消费者是监听者,消息不是只有一个,关闭了后续消息就无法接受了
        channel.close();
        connection.close();
    }
}

20210626024154570

java整合的某案例(Work queues 消息分发)

20210626025532843
对比之前,生产者是循环生成多条,而消费者两个接受同一个生产者产出的消息队列。代码方面变化不大

for (int i = 1; i <= 20; i++) {
    String body = "No." + i + ": hello rabbitmq~~~";
    channel.basicPublish("", "work_queues", null, body.getBytes(StandardCharsets.UTF_8));
}

最终效果是两个消费者分别获得各一半的生产到的消息

java整合的某案例(PubSub订阅模式)带交换机Exchange

20210626030938768
交换机exchange的作用是处理消息,例如递交给某个特别队列、递交给所有队列、丢弃等。
exchange三种类型:Fanout广播(所有)、Direct定向(符合routingkey的队列)、Topic通配符(符合routingpattern的队列)
这个例子是Fanout(全体广播)
生产者:

public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("47.115.203.188");//默认是localhost
        factory.setPort(5672);//默认5672
        factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
        factory.setUsername("happylayga");//默认guest
        factory.setPassword("flowerpower");//默认guest

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_fanout";
        //创建交换机
        /**
         * String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments
         * 交换机名称
         * 交换机类型:DIRECT定向、FANOUT扇形广播、TOPIC通配符、HEADERS参数匹配
         * 持久化、自动删除、内部使用(一般false)、参数
         */
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);

        //创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);

        //绑定队列和交换机
        /**
         * String queue, String exchange, String routingKey
         * 队列名、交换机名、路由键(绑定规则):如果交换机类型是fanout则""
         */
        channel.queueBind(queue1Name, exchangeName, "");
        channel.queueBind(queue2Name, exchangeName, "");

        //发送消息,释放资源
        channel.basicPublish(exchangeName, "", null, "日志信息:调用了findall方法...日志级别info".getBytes(StandardCharsets.UTF_8));
        channel.close();
        connection.close();
    }
}

消费者区别在queueName不同以及回调

public class Consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("47.115.203.188");//默认是localhost
        factory.setPort(5672);//默认5672
        factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
        factory.setUsername("happylayga");//默认guest
        factory.setPassword("flowerpower");//默认guest

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";

        Consumer consumer = new DefaultConsumer(channel){
            @Override //回调方法。消息表示,获取信息(交换机、路由),配置类信息,真是数据结果
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
                System.out.println("讲日志信息打印到了控制台/数据库");
            }
        };

        channel.basicConsume(queue1Name,true,consumer);

        //7、不要关闭资源,因为消费者是监听者,消息不是只有一个,关闭了后续消息就无法接受了
    }
}

20210626033640230
20210626033640238
这样子,两边都会收到相同的信息,但是可以做到不同的操作

java整合的某案例(Routing路由模式)

20210626034124295
对比之上,实现了不同的操作

public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("47.115.203.188");//默认是localhost
        factory.setPort(5672);//默认5672
        factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
        factory.setUsername("happylayga");//默认guest
        factory.setPassword("flowerpower");//默认guest

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_direct";
        //创建交换机
        /**
         * String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String, Object> arguments
         * 交换机名称
         * 交换机类型:DIRECT定向、FANOUT扇形广播、TOPIC通配符、HEADERS参数匹配
         * 持久化、自动删除、内部使用(一般false)、参数
         */
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType., true, false, false, null);

        //创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);

        //绑定队列和交换机
        /**
         * String queue, String exchange, String routingKey
         * 队列名、交换机名、路由键(绑定规则):如果交换机类型是fanout则""
         */
        channel.queueBind(queue1Name, exchangeName, "error");
        channel.queueBind(queue2Name, exchangeName, "info");//注意queue1和queue2绑定的级别不同
        channel.queueBind(queue2Name, exchangeName, "warning");
        channel.queueBind(queue2Name, exchangeName, "error");

        //发送消息,释放资源
        channel.basicPublish(exchangeName, "info", null, "日志信息:调用了findall方法...日志级别info".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "error", null, "日志信息:调用了findall方法...日志级别error".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "warning", null, "日志信息:调用了findall方法...日志级别warning".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "info", null, "日志信息:调用了findall方法...日志级别info".getBytes(StandardCharsets.UTF_8));
        channel.close();
        connection.close();
    }
}

20210626035435870
20210626040010876
20210626040010886
注意4条消息,其中消费者1只获得error级别1条消息,消费者2获得了全部

java整合的某案例(Topics通配符模式)

20210626040803595

public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("47.115.203.188");//默认是localhost
        factory.setPort(5672);//默认5672
        factory.setVirtualHost("/soulike");//虚拟机,默认是"/"
        factory.setUsername("happylayga");//默认guest
        factory.setPassword("flowerpower");//默认guest

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_topics";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);

        //创建队列
        String queue1Name = "test_topics_queue1";
        String queue2Name = "test_topics_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);

        //绑定队列和交换机
        /**
         * String queue, String exchange, String routingKey
         * 队列名、交换机名、路由键(绑定规则):如果交换机类型是fanout则""
         */
        channel.queueBind(queue1Name, exchangeName, "#.error");
        channel.queueBind(queue2Name, exchangeName, "order.#");
        channel.queueBind(queue2Name, exchangeName, "#.#");

        //发送消息,释放资源
        channel.basicPublish(exchangeName, "order.info", null, "日志信息:调用了findall方法...日志级别info".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "order.warning", null, "日志信息:调用了findall方法...日志级别warning".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "order.warning", null, "日志信息:调用了findall方法...日志级别warning".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(exchangeName, "order.error", null, "日志信息:调用了findall方法...日志级别error".getBytes(StandardCharsets.UTF_8));
        channel.close();
        connection.close();
    }
}

20210626042144963
20210626042247425
20210626042247420