【资料图】
大家好,我是指北君。
今天指北君带领大家接着学习RabbitMQ,了解RabbitMQ的五大通信模型之一的发布订阅模型;接下来还会有关于RabbitMQ的系列教程,对你有帮助的话记得关注哦~
发布订阅模型上一篇文章中,简单的介绍了一下RabbitMQ的work模型。这篇文章来学习一下RabbitMQ中的发布订阅模型。
发布订阅模型(Publish/Subscribe):简单的说就是队列里面的消息会被多个消费者同时接受到,消费者接收到的信息一致。
发布订阅模型适合于做模块之间的异步通信。
适用场景发送并记录日志信息springcloud的config组件里面通知配置自动更新缓存同步微信订阅号演示生产者public class Producer { private static final String EXCHANGE_NAME = "exchange_publish_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 发送消息到交换机 for (int i = 0; i < 100; i++) { channel.basicPublish(EXCHANGE_NAME, "", null, ("发布订阅模型的第 " + i + " 条消息").getBytes()); } // 关闭资源 channel.close(); connection.close(); }}消费者
// 消费者1public class Consumer { private static final String QUEUE_NAME = "queue_publish_1"; private static final String EXCHANGE_NAME = "exchange_publish_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 将队列绑定到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("队列1接收到的消息是:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); }}
// 消费者2public class Consumer2 { private static final String QUEUE_NAME = "queue_publish_2"; private static final String EXCHANGE_NAME = "exchange_publish_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 将队列绑定到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("队列2接收到的消息是:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); }}测试
先启动2个消费者,再启动生产者
可以看出来消费者1和消费者2接收到的消息是一模一样的,每个消费者都收到了生产者发送的消息;
发布订阅模型,用到了一个新的东西-交换机,这里也解释一下相关方法的参数:
// 声明交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 该方法的最多参数的重载方法是:Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map // 将队列绑定到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");/** * param1:destination,目的地,队列的名字 * param2:source,资源,交换机的名字 * param3:routingKey,路由键(目前没有用到routingKey,填 "" 即可) */小结 本文到这里就结束了,介绍了RabbitMQ通信模型中的发布订阅模型,适合于做模块之间的异步通信。
X 关闭
X 关闭
- 15G资费不大降!三大运营商谁提供的5G网速最快?中国信通院给出答案
- 2联想拯救者Y70发布最新预告:售价2970元起 迄今最便宜的骁龙8+旗舰
- 3亚马逊开始大规模推广掌纹支付技术 顾客可使用“挥手付”结账
- 4现代和起亚上半年出口20万辆新能源汽车同比增长30.6%
- 5如何让居民5分钟使用到各种设施?沙特“线性城市”来了
- 6AMD实现连续8个季度的增长 季度营收首次突破60亿美元利润更是翻倍
- 7转转集团发布2022年二季度手机行情报告:二手市场“飘香”
- 8充电宝100Wh等于多少毫安?铁路旅客禁止、限制携带和托运物品目录
- 9好消息!京东与腾讯续签三年战略合作协议 加强技术创新与供应链服务
- 10名创优品拟通过香港IPO全球发售4100万股 全球发售所得款项有什么用处?