Return Listener用于处理一些不可路由的消息!
正常情况:我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
异常情况:在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就需要使用Return Listener!
在基础API中有一个关键的配置项
Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么Broker端自动删除该消息!
消费端代码
package com.javaxl.rabbitmqapi.returnlistener; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * @author 小李飞刀 * @site www.javaxl.com * @company * @create 2019-11-20 9:58 */ public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.147.146"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "return.#"; String queueName = "test_return_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费者: " + msg); } } }
生产端代码
package com.javaxl.rabbitmqapi.returnlistener; import com.rabbitmq.client.*; import java.io.IOException; /** * @author 小李飞刀 * @site www.javaxl.com * @company * @create 2019-11-20 9:44 */ public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.147.146"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_return_exchange"; String routingKey = "return.save"; String routingKeyError = "abc.save"; String msg = "Hello RabbitMQ Return Message"; channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("---------handle return----------"); System.err.println("replyCode: " + replyCode); System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); //消息投递成功,会被消费者所消费 // channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); //消息不可达,将触发ReturnListener channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); } }
over.......
备案号:湘ICP备19000029号
Copyright © 2018-2019 javaxl晓码阁 版权所有