自动监听
@RabbitListener
在一个类上注解@RabbitListener,声明为一个监听器,那么对于任何获取到的信息都会被消费,并执行@RabbitHandler方法的逻辑。但是这种方法不够灵活,也不允许手动确认消费。
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(Message m) {
System.out.println("消费者收到消息 : " +m);
}
}
手动监听
配置类以及监听处理类
首先配置类需要注入自定义的监听器容器,相当于监听器存活在环境。然后对这个环境设置自定义的监听器,更改消费的规则。详细见代码
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private MyAckReceiver myAckReceiver;//消息接收处理类
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
//设置一个队列
container.setQueueNames("TestDirectQueue");
//如果同时设置多个如下: 前提是队列都是必须已经创建存在的
// container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
//container.setQueues(new Queue("TestDirectQueue",true));
//container.addQueues(new Queue("TestDirectQueue2",true));
//container.addQueues(new Queue("TestDirectQueue3",true));
container.setMessageListener(myAckReceiver);
return container;
}
}
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理
String msg = message.toString();
String[] msgArray = msg.split("'");//可以点进Message里面看源码,单引号之间的数据就是我们的消息数据
System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());
channel.basicAck(deliveryTag, true); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
//或者
channel.basicReject(deliveryTag, true);//第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
}
评论区