依赖和配置
pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
server:
port: 80
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#虚拟host 可以不设置,使用server默认host
virtual-host: rabbit
RabbitTemplate
常用API
这些get、set是对当前要操作的交换机、队列、路由键进行设置,不做赘述。
public void setExchange(@Nullable String exchange) {
this.exchange = exchange != null ? exchange : "";
}
public String getExchange() {
return this.exchange;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getRoutingKey() {
return this.routingKey;
}
public void setDefaultReceiveQueue(String queue) {
this.defaultReceiveQueue = queue;
}
public void setEncoding(String encoding) {
this.encoding = encoding;
}
public String getEncoding() {
return this.encoding;
}
比较重要的是下面这个方法,也就是发送Message对象,并收取对应队列的Message。如果只用发送不收取,可以用send方法。如果我们要发送自定义的类,还可以用convertAndSend方法,该方法会在send前将参数转成Message类。
@Nullable
public Message sendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) throws AmqpException {
return this.doSendAndReceive(exchange, routingKey, message, correlationData);
}
@Nullable
protected Message doSendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) {
if (!this.evaluatedFastReplyTo) {
synchronized(this) {
if (!this.evaluatedFastReplyTo) {
this.evaluateFastReplyTo();
}
}
}
if (this.usingFastReplyTo && this.useDirectReplyToContainer) {
return this.doSendAndReceiveWithDirect(exchange, routingKey, message, correlationData);
} else {
return this.replyAddress != null && !this.usingFastReplyTo ? this.doSendAndReceiveWithFixed(exchange, routingKey, message, correlationData) : this.doSendAndReceiveWithTemporary(exchange, routingKey, message, correlationData);
}
}
对于收取信息,可以用receive,参数是队列名
@Nullable
public Message receive() throws AmqpException {
return this.receive(this.getRequiredQueue());
}
RabbitAdmin
创建该对象
不像RabbitTemplate可以直接@Autowired,该对象只能在配置文件中配置好注入。
@Bean
public RabbitAdmin getRabbitAdmin(CachingConnectionFactory cachingConnectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(cachingConnectionFactory);
rabbitAdmin.setIgnoreDeclarationExceptions(true);
return rabbitAdmin;
}
常用API
declare开头的API可以声明各类交换机、队列,绑定。
public void declareBinding(Binding binding) {
try {
this.rabbitTemplate.execute((channel) -> {
this.declareBindings(channel, binding);
return null;
});
} catch (AmqpException var3) {
this.logOrRethrowDeclarationException(binding, "binding", var3);
}
}
public void declareExchange(Exchange exchange) {
try {
this.rabbitTemplate.execute((channel) -> {
this.declareExchanges(channel, exchange);
return null;
});
} catch (AmqpException var3) {
this.logOrRethrowDeclarationException(exchange, "exchange", var3);
}
}
public Queue declareQueue() {
try {
DeclareOk declareOk = (DeclareOk)this.rabbitTemplate.execute(Channel::queueDeclare);
return new Queue(declareOk.getQueue(), false, true, true);
} catch (AmqpException var2) {
this.logOrRethrowDeclarationException((Declarable)null, "queue", var2);
return null;
}
}
因此这个类可以灵活的创建各种模式的交换机,灵活进行绑定。当然还有一些purge方法,用来清除队列中的积压消息。这里不做赘述了。
评论区