RabbitMQ 入门案例 - fanout 模式

发布订阅模式

图解

RabbitMQ 入门案例 - fanout 模式

具体实现

  • 类型 fanout
  • 特点 Fanout-发布订阅模式,是一种广播机制,它是没有路由 key 的模式

生产者代码

public class Producer {
    public static void main(String[] args) {
        // 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议
        // ip port
        // 1 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2 设置连接属性
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3 从连接工厂中获取连接 Connection
            connection = connectionFactory.newConnection("生产者");
            // 4 通过连接获取通道 Channel
            channel = connection.createChannel();

            // 5 准备发送消息的内容
            String message = "This is a routing message";

            // 6 准备交换机
            String exchangeName = "fanout-exchange";
            // 7 定义路由 key
            String routingKey = "";
            // 8 指定交换机类型
            String type = "fanout";


            // 7 发送消息给队列 queue
            /*
             * @param1:交换机名称
             * @param2:队列名称/routingKey
             * @param3:属性配置
             * @param4:发送消息的内容
             * #.course.* queue3...
             */
            channel.basicPublish(exchangeName,routingKey,null,message.getBytes());
            System.out.println("发送消息成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 7 关闭通道
            if (channel!= null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            // 8 关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者代码

public class Consumer {
    public static Runnable runnable = new Runnable() {
        public void run() {
            // 所有的中间件技术都是基于 TCP/IP 协议基础之上构建的协议规范,只不过 rabbitmq 遵循的是 AMQP 协议
            // ip port
            // 1 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 2 设置连接属性
            connectionFactory.setHost("192.168.33.110");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("test");
            connectionFactory.setVirtualHost("/");
            // 获取队列的名称
            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // 3 从连接工厂中获取连接
                connection = connectionFactory.newConnection("生产者");
                // 4 通过连接获取通道 Channel
                channel = connection.createChannel();
                // 5 声明队列 queue 存储消息
                /*
                 * 如果队列不存在,则会创建
                 * RabbitMQ 不允许创建两个相同的队列名称,否则会报错
                 *
                 * @param1:queue 队列的名称
                 * @param2:durable 队列是否持久化
                 * @param3:exclusive 是否排他,即是否私有,如果为 true,会对当前队列加锁,其它通道不能访问,并且连接自动关闭
                 * @param4:autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息
                 * @param5:arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
                 */
                Channel finalChannel = channel;
                finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(delivery.getEnvelope().getDeliveryTag());
                        System.out.println(queueName+":收到消息是:\t" + new String(delivery.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    public void handle(String s) throws IOException {
                        System.out.println("接收失败");
                    }
                });

                System.out.println(queueName+":开始接收消息");
//                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 7 关闭通道
                if (channel!= null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
                // 8 关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };
    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable,"queue1").start();
        new Thread(runnable,"queue2").start();
    }
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
247
粉丝
18
喜欢
217
收藏
62
排名:731
访问:9753
私信
所有博文
社区赞助商