标签搜索

目 录CONTENT

文章目录

RabbitMQ整合SpringBoot

陈铭
2021-02-13 / 0 评论 / 1 点赞 / 276 阅读 / 589 字 / 正在检测是否收录...

依赖和配置

pom依赖

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

server:
  port: 80
spring:
  #给项目来个名字
  application: 
    name: rabbitmq-provider
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: root
    #虚拟host 可以不设置,使用server默认host
    virtual-host: rabbit

RabbitTemplate

常用API

这些get、set是对当前要操作的交换机、队列、路由键进行设置,不做赘述。

    public void setExchange(@Nullable String exchange) {
        this.exchange = exchange != null ? exchange : "";
    }

    public String getExchange() {
        return this.exchange;
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public String getRoutingKey() {
        return this.routingKey;
    }

    public void setDefaultReceiveQueue(String queue) {
        this.defaultReceiveQueue = queue;
    }

    public void setEncoding(String encoding) {
        this.encoding = encoding;
    }

    public String getEncoding() {
        return this.encoding;
    }

比较重要的是下面这个方法,也就是发送Message对象,并收取对应队列的Message。如果只用发送不收取,可以用send方法。如果我们要发送自定义的类,还可以用convertAndSend方法,该方法会在send前将参数转成Message类。

@Nullable
    public Message sendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) throws AmqpException {
        return this.doSendAndReceive(exchange, routingKey, message, correlationData);
    }

@Nullable
    protected Message doSendAndReceive(String exchange, String routingKey, Message message, @Nullable CorrelationData correlationData) {
        if (!this.evaluatedFastReplyTo) {
            synchronized(this) {
                if (!this.evaluatedFastReplyTo) {
                    this.evaluateFastReplyTo();
                }
            }
        }

        if (this.usingFastReplyTo && this.useDirectReplyToContainer) {
            return this.doSendAndReceiveWithDirect(exchange, routingKey, message, correlationData);
        } else {
            return this.replyAddress != null && !this.usingFastReplyTo ? this.doSendAndReceiveWithFixed(exchange, routingKey, message, correlationData) : this.doSendAndReceiveWithTemporary(exchange, routingKey, message, correlationData);
        }
    }

对于收取信息,可以用receive,参数是队列名

@Nullable
    public Message receive() throws AmqpException {
        return this.receive(this.getRequiredQueue());
    }

RabbitAdmin

创建该对象

不像RabbitTemplate可以直接@Autowired,该对象只能在配置文件中配置好注入。

@Bean
    public RabbitAdmin getRabbitAdmin(CachingConnectionFactory cachingConnectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(cachingConnectionFactory);
        rabbitAdmin.setIgnoreDeclarationExceptions(true);
        return rabbitAdmin;
    }

常用API

declare开头的API可以声明各类交换机、队列,绑定。

    public void declareBinding(Binding binding) {
        try {
            this.rabbitTemplate.execute((channel) -> {
                this.declareBindings(channel, binding);
                return null;
            });
        } catch (AmqpException var3) {
            this.logOrRethrowDeclarationException(binding, "binding", var3);
        }

    }

    public void declareExchange(Exchange exchange) {
        try {
            this.rabbitTemplate.execute((channel) -> {
                this.declareExchanges(channel, exchange);
                return null;
            });
        } catch (AmqpException var3) {
            this.logOrRethrowDeclarationException(exchange, "exchange", var3);
        }

    }


    public Queue declareQueue() {
        try {
            DeclareOk declareOk = (DeclareOk)this.rabbitTemplate.execute(Channel::queueDeclare);
            return new Queue(declareOk.getQueue(), false, true, true);
        } catch (AmqpException var2) {
            this.logOrRethrowDeclarationException((Declarable)null, "queue", var2);
            return null;
        }
    }

因此这个类可以灵活的创建各种模式的交换机,灵活进行绑定。当然还有一些purge方法,用来清除队列中的积压消息。这里不做赘述了。

1

评论区