RabbitMQ 代码示例

Web 管理页面

RabbitMQ

通过代码 debug 来对 Web 管理页面有个更直观的认识
生产者代码如下

导入依赖

<dependencies>
  <!--rabbitmq 原生客户端依赖-->
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
  </dependency>
</dependencies>
public class Producer {
    public static void main(String[] args) {
        // 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议
        // ip port
        // 1 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 2 创建连接 Connection
            connection = connectionFactory.newConnection("生产者");
            // 3 通过连接获取通道 Channel
            channel = connection.createChannel();
            // 4 通过创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息
            String queueName = "queue1";
            /*
             * 队列名字
             * 是否具有持久化 durable=false 所谓持久化消息是否存盘,如果是false 非持久化 true 持久化?非持久化会存盘么,会存盘,但是会随着服务器的重启丢失
             * 排他性,是否具有独立线程
             * 是否自动删除,随着最后一个消费者消息消费完毕之后,是否把队列自动删除
             * 携带一些附属参数
             */
            channel.queueDeclare(queueName,false,false,false,null);
            // 5 准备消息内容
            String message = "hello,world";
            // 6 发送消息给队列 queue
            // 交换机,队列、路由key,消息是否持久化,消息内容主体
            channel.basicPublish("",queueName,null,message.getBytes());
            System.out.println("发送消息成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 7 关闭通道
            if (channel!= null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 8 关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

当代码执行到创建连接 Connection 时,可以在 Connections 一栏看到连接信息。

RabbitMQ

当代码执行到创建通道时,看到 Channels 一栏信息

RabbitMQ

当代码执行到队列时候,看到 queues 一栏的信息
RabbitMQ

执行结束之后可以看到队列中的消息增加了一个

RabbitMQ

消费着代码如下

public class Consumer {
    public static void main(String[] args) {
        // 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议
        // ip port
        // 1 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 2 创建连接 Connection
            connection = connectionFactory.newConnection("生产者");
            // 3 通过连接获取通道 Channel
            channel = connection.createChannel();
            // 4 通过创建交换机,声明队列,绑定关系,路由 key,发送消息和接收消息
            channel.basicConsume("queue1", true, new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("收到消息是:\t" + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                public void handle(String s) throws IOException {
                    System.out.println("接收失败");
                }
            });

            System.out.println("开始接收消息");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 7 关闭通道
            if (channel!= null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 8 关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

当代码执行到 System.in.read() 时,此时消息都被消费了

RabbitMQ

当服务重启后,消息队列消失。

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
247
粉丝
18
喜欢
217
收藏
62
排名:731
访问:9753
私信
所有博文
社区赞助商