RabbitMQ 进阶(一)

发布于 12 天前  50 次阅读


TTL(Time To Live)

TTL 表示可以对消息设置预期的时间,在这个时间内可以被消费者接收获取,过了之后消息将自动被删除。RabbitMQ 目前有两种方法可以设置。

  • 通过队列属性设置,队列中所有消息都有相同的过期时间。
  • 对消息进行单独设置,每条消息 TTL 可以不同。

队列 TTL 配置类

package ml.guest997.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlConfiguration {

    @Bean
    public DirectExchange ttlExchange() {
        return new DirectExchange("ttl_direct_exchange", true, false);
    }

    @Bean
    public Queue ttlQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);
        return new Queue("ttl.direct.queue", true, false, false, args);
    }

    @Bean
    public Binding ttlBinding() {
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }
}

由于发送消息类以及测试类跟之前的几乎一样,就不在这赘述了。可以从界面看到不用编写消费者,5秒之后队列中的消息就消失了。

RabbitMQ 进阶(一)插图

设置消息 TTL

package ml.guest997.service;

import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class TtlMessageOrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrder(String userId, String productId, int num) {
        System.out.println("保存订单成功,用户 id 是:" + userId + ",订单 id 是:" + productId + ",产品数量是:" + num);
        String exchangeName = "ttlMessage_direct_exchange";
        String routingKey = "ttlMessage";
        MessagePostProcessor messagePostProcessor = message -> {
            message.getMessageProperties().setExpiration("5000");
            message.getMessageProperties().setContentEncoding("UTF-8");
            return message;
        };
        rabbitTemplate.convertAndSend(exchangeName, routingKey, productId, messagePostProcessor);
    }
}

由于配置类以及测试类跟之前的几乎一样,就不在这赘述了。可以从界面看到不用编写消费者,5秒之后队列中的消息就消失了。

RabbitMQ 进阶(一)插图1

死信队列

DLX 全称 Dead-Letter-Exchange,称之为死信交换机。当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列。消息变成死信,可能是由于以下原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX 也是一个普通的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

死信队列配置类

package ml.guest997.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadQueueConfiguration {

    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange("dead_direct_exchange", true, false);
    }

    @Bean
    public Queue deadQueue() {
        return new Queue("dead.direct.queue", true);
    }

    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
    }

}

其它队列配置类

package ml.guest997.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlConfiguration {

    @Bean
    public DirectExchange ttlExchange() {
        return new DirectExchange("ttl_direct_exchange", true, false);
    }

    @Bean
    public Queue ttlQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);
        args.put("x-dead-letter-exchange","dead_direct_exchange");
        args.put("x-dead-letter-routing-key","dead");
        //注意:由于与之前的队列名一样,但是参数不一样,而 RabbitMQ 不会覆盖之前的队列,所有需要手动删除同名队列,否则会报错。
        return new Queue("ttl.direct.queue", true, false, false, args);
    }

    @Bean
    public Binding ttlBinding() {
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }
}

由于发送消息类以及测试类跟之前的几乎一样,就不在这赘述了。可以从界面看到5秒之后其它队列中的消息消失,而死信队列中的消息则加了一条。

内存与磁盘监控

RabbitMQ 进阶(一)插图2

当 RabbitMQ 的使用内存超过给定的内存阈值或磁盘剩余空间低于给定的阈值时,就会报红并且会阻塞所有连接。

内存调整

命令

rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 1GB        #绝对值

fraction/value 为内存阈值。默认情况是:0.4/物理内存,代表的含义是:当 RabbitMQ 的内存超过物理内存的40%时,就会报红并且会阻塞所有连接。通过命令修改阈值在 Broker 重启以后将会失效。

配置文件(rabbitmq.conf)

vm_memory_high_watermark.relative = 0.4        #建议不要超过0.7
vm_memory_high_watermark.absolute = 1GB

通过修改配置文件设置的阈值不会随着重启而消失,但修改了配置文件一样要重启 Broker 才会生效。

磁盘调整

rabbitmqctl set_disk_free_limit 100MB        #绝对值
rabbitmqctl set_disk_free_limit memory_limit <fraction>

内存换页

在某个 Broker 节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间。持久化和非持久化的消息都会被转储到磁盘中,其中持久化的消息本身就在磁盘中有一份副本,这里会将持久化的消息从内存中清除掉。

默认情况下,在内存到达内存阈值的50%时会进行换页动作。也就是说,在默认的内存阈值为0.4的情况下,当内存超过0.4x0.5=0.2时会进行换页操作。

可以通过在配置文件中配置 vm_memory_high_watermark_paging_ratio 项来修改此值。