RabbitMQ 基础(二)

发布于 22 天前  67 次阅读


核心组成部分

RabbitMQ 基础(二)插图
  • server:又称 broker,接受客户端连接,实现 AMQP 实体服务。
  • connection:连接和具体 broker 网络连接。
  • channel:网络信道,几乎所有操作都在 channel 中进行,channel 是消息读写的通道。客户端可以建立多个 channel,每个 channel 表示一个会话任务。
  • message:消息,服务器和应用程序之间传递的数据,由 properties 和 body 组成。properties 可以对消息进行修饰,比如消息的优先级,延迟等高级特性;body 是消息实体内容。
  • Virtual host:虚拟主机,用于逻辑隔离,最上层消息的路由。一个 Virtual host 可以若干个 Exchange 和 Queue,同一个 Virtual host 不能有同名的 Exchange 或 Queue。
  • Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列上。
  • binding:Exchange 和 Queue 之间的虚拟连接,binding 中可以包括 routing key。
  • routing key:一个路由规则,虚拟机根据他来确定如何路由一条消息。
  • Queue:消息队列,用来存放消息的队列。

运行流程

RabbitMQ 基础(二)插图1

支持的消息模型

  • 简单模式
    RabbitMQ 基础(二)插图2
  • 工作模式
    RabbitMQ 基础(二)插图3
  • 发布订阅模式
    RabbitMQ 基础(二)插图4
  • 路由模式
    RabbitMQ 基础(二)插图5
  • 主题模式
    RabbitMQ 基础(二)插图6

简单模式案例

添加依赖

新建 Maven 项目,导入 rabbitmq 依赖。

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.13.1</version>
    </dependency>
</dependencies>

生产者

package ml.guest997.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) {
        //连接服务
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //创建连接
            connection = connectionFactory.newConnection("生产者");
            //通过连接创建通道,所有的操作都是基于通道的,至于为什么不用连接,因为通道是长连接。
            channel = connection.createChannel();
            //通过通道创建消息队列
            String queueName = "queue1";
            /*
             * @params1 队列的名称
             * @params2 是否持久化,rabbitmq 非持久化会存盘,但是会随着重启服务会丢失。
             * @params3 排它性,是否是独占队列。
             * @params4 是否自动删除,消费完毕消息以后是否把队列自动删除。
             * @params5 附属参数
             */
            channel.queueDeclare(queueName, true, false, false, null);
            //消息内容
            String message = "Hello RabbitMQ!";
            //发送消息给队列
            /*
             * @params1: 交换机
             * @params2 队列、路由 key
             * @params3 消息的状态控制
             * @params4 消息主体
             */
            //虽然这里没有设置交换机,但是是有默认的交换机的。
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("消息发送失败!");
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

运行上面的代码后,访问 ip:15672 下的 Queues 页面会发现出现了新的消息。

RabbitMQ 基础(二)插图7

消费者

package ml.guest997.simple;

import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;

public class Consumer {
    public static void main(String[] args) {
        //连接服务
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //创建连接
            connection = connectionFactory.newConnection("消费者");
            //通过连接创建通道,所有的操作都是基于通道的,至于为什么不用连接,因为通道是长连接。
            channel = connection.createChannel();
            final Channel channel2 = channel;       //为什么要重新定义一个通道为 final,是因为 lambda 函数只能访问 final 修饰的局部变量。
            channel2.basicConsume("queue1", false, (consumerTag, message) -> {
                try {
                    System.out.println("收到的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
                    channel2.basicAck(message.getEnvelope().getDeliveryTag(), false);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }, consumerTag -> System.out.println("接收消息失败..."));
            System.out.println("开始接收消息...");
            System.in.read();       //这行代码是为了防止 channel 和 connection 提前被关闭。
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

运行上面的代码后,访问 ip:15672 下的 Queues 页面会发现消息被消费完了。

RabbitMQ 基础(二)插图8

发布订阅模式案例

生产者

package ml.guest997.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("生产者");
            channel = connection.createChannel();
            String message = "Hello RabbitMQ!";
            //创建交换机
            String exchangeName = "fanout_message_exchange";
            String exchangeType = "fanout";
            channel.exchangeDeclare(exchangeName, exchangeType, true);
            //创建队列
            channel.queueDeclare("queue2", true, false, false, null);
            channel.queueDeclare("queue3", true, false, false, null);
            //绑定队列和交换机的关系
            channel.queueBind("queue2", exchangeName, "");
            channel.queueBind("queue3", exchangeName, "");
            channel.basicPublish(exchangeName, "", null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("消息发送失败!");
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
RabbitMQ 基础(二)插图9

消费者

package ml.guest997.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class Consumer {
    private static Runnable runnable = () -> {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        final String queueName = Thread.currentThread().getName();      //获取线程队列名
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("消费者");
            channel = connection.createChannel();
            final Channel channel2 = channel;
            channel2.basicConsume(queueName, false, (consumerTag, message) -> {
                try {
                    System.out.println(queueName + "收到的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
                    channel2.basicAck(message.getEnvelope().getDeliveryTag(), false);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }, consumerTag -> System.out.println("接收消息失败..."));
            System.out.println(queueName + "开始接收消息...");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    };

    public static void main(String[] args) {
        //启动多个线程创建多个队列
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
    }

}
/*结果为
queue2开始接收消息...
queue3开始接收消息...
queue3收到的消息是:Hello RabbitMQ!
queue2收到的消息是:Hello RabbitMQ!
*/

路由模式案例

生产者

package ml.guest997.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("生产者");
            channel = connection.createChannel();
            String message = "Hello RabbitMQ!";
            //创建交换机
            String exchangeName = "direct_message_exchange";
            String exchangeType = "direct";
            channel.exchangeDeclare(exchangeName, exchangeType, true);
            //创建队列
            channel.queueDeclare("queue4", true, false, false, null);
            channel.queueDeclare("queue5", true, false, false, null);
            channel.queueDeclare("queue6", true, false, false, null);
            //绑定队列和交换机的关系
            channel.queueBind("queue4", exchangeName, "email");     //指定路由 key
            channel.queueBind("queue5", exchangeName, "wechat");
            channel.queueBind("queue6", exchangeName, "email");
            channel.basicPublish(exchangeName, "email", null, message.getBytes());      //往指定的路由 key 队列发送消息
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("消息发送失败!");
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
RabbitMQ 基础(二)插图10

消费者

由于几乎与发布订阅模式中的消费者代码几乎一样,只不过改了线程名和线程数量,故不再放代码在这了。

/*结果为
queue5开始接收消息...
queue4开始接收消息...
queue6开始接收消息...
queue4收到的消息是:Hello RabbitMQ!
queue6收到的消息是:Hello RabbitMQ!
*/

可以从结果看出,只有符合指定路由 key 的队列才能接收到消息。

主题模式案例

生产者

package ml.guest997.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("生产者");
            channel = connection.createChannel();
            String message = "Hello RabbitMQ!";
            //创建交换机
            String exchangeName = "topic_message_exchange";
            String exchangeType = "topic";
            channel.exchangeDeclare(exchangeName, exchangeType, true);
            //创建队列
            channel.queueDeclare("queue7", true, false, false, null);
            channel.queueDeclare("queue8", true, false, false, null);
            channel.queueDeclare("queue9", true, false, false, null);
            //绑定队列和交换机的关系
            channel.queueBind("queue7", exchangeName, "guest997.#");     //指定路由 key
            channel.queueBind("queue8", exchangeName, "*.course.*");
            channel.queueBind("queue9", exchangeName, "*.order");
            channel.basicPublish(exchangeName, "guest997.course.order", null, message.getBytes());      //往指定的路由 key 队列发送消息
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("消息发送失败!");
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
RabbitMQ 基础(二)插图11

消费者

由于几乎与发布订阅模式中的消费者代码几乎一样,只不过改了线程名和线程数量,故不再放代码在这了。

/**结果为
queue8开始接收消息...
queue9开始接收消息...
queue7开始接收消息...
queue7收到的消息是:Hello RabbitMQ!
queue8收到的消息是:Hello RabbitMQ!
/

可以从结果看出,只有符合路由 key 规则的队列才能接收到消息。# 表示可以有零级、一级或多级,* 则表示必须有且只能有一级。

工作模式

主要有两种模式:

  • 轮询分发:一个消费者一条,按均分配
  • 公平分发:根据消费者的消费能力进行分发,处理快的处理的多,处理慢的处理的少,按劳分配

轮询模式案例

注意:需要先运行消费者的代码!

生产者

package ml.guest997.work.polling;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) {
        //连接服务
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("生产者");
            channel = connection.createChannel();
            for (int i = 1; i <= 20; i++) {
                String msg = "Hello RabbitMQ!:" + i;
                channel.basicPublish("", "queue1", null, msg.getBytes());
            }
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("消息发送失败!");
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者1

package ml.guest997.work.polling;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class ConsumerOne {
    public static void main(String[] args) {
        //连接服务
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("消费者1");
            channel = connection.createChannel();
            final Channel channel2 = channel;
            channel2.basicConsume("queue1", false, (consumerTag, message) -> {
                try {
                    System.out.println("消费者1收到的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
                    channel2.basicAck(message.getEnvelope().getDeliveryTag(), false);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }, consumerTag -> System.out.println("消费者1接收消息失败..."));
            System.out.println("消费者1开始接收消息...");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
/*结果为
消费者1开始接收消息...
消费者1收到的消息是:Hello RabbitMQ!:1
消费者1收到的消息是:Hello RabbitMQ!:3
消费者1收到的消息是:Hello RabbitMQ!:5
消费者1收到的消息是:Hello RabbitMQ!:7
消费者1收到的消息是:Hello RabbitMQ!:9
消费者1收到的消息是:Hello RabbitMQ!:11
消费者1收到的消息是:Hello RabbitMQ!:13
消费者1收到的消息是:Hello RabbitMQ!:15
消费者1收到的消息是:Hello RabbitMQ!:17
消费者1收到的消息是:Hello RabbitMQ!:19
*/

消费者2

package ml.guest997.work.polling;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class ConsumerTwo {
    public static void main(String[] args) {
        //连接服务
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("消费者2");
            channel = connection.createChannel();
            final Channel channel2 = channel;
            channel2.basicConsume("queue1", false, (consumerTag, message) -> {
                try {
                    System.out.println("消费者2收到的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
                    channel2.basicAck(message.getEnvelope().getDeliveryTag(), false);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }, consumerTag -> System.out.println("消费者2接收消息失败..."));
            System.out.println("消费者2开始接收消息...");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
/*结果为
消费者2开始接收消息...
消费者2收到的消息是:Hello RabbitMQ!:2
消费者2收到的消息是:Hello RabbitMQ!:4
消费者2收到的消息是:Hello RabbitMQ!:6
消费者2收到的消息是:Hello RabbitMQ!:8
消费者2收到的消息是:Hello RabbitMQ!:10
消费者2收到的消息是:Hello RabbitMQ!:12
消费者2收到的消息是:Hello RabbitMQ!:14
消费者2收到的消息是:Hello RabbitMQ!:16
消费者2收到的消息是:Hello RabbitMQ!:18
消费者2收到的消息是:Hello RabbitMQ!:20
*/

可以从结果看出,消费者1和消费者2收到的消息都是十条,是按均分配的。但如果是消息并不能均分,自然就会是其中一个消费者多收到一条消息。

公平模式案例

生产者

由于几乎与轮询模式中的生产者代码一样,故不再放代码在这了。

消费者1

package ml.guest997.work.fair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class ConsumerOne {
    public static void main(String[] args) {
        //连接服务
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("消费者1");
            channel = connection.createChannel();
            final Channel channel2 = channel;
            channel2.basicQos(1);       // 同一时刻,服务器只会推送一条消息给消费者。
            channel2.basicConsume("queue1", false, (consumerTag, message) -> {
                try {
                    System.out.println("消费者1收到的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
                    Thread.sleep(200);
                    channel2.basicAck(message.getEnvelope().getDeliveryTag(), false);       //一定要使用手动应答
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }, consumerTag -> System.out.println("消费者1接收消息失败..."));
            System.out.println("消费者1开始接收消息...");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
/*结果为
消费者1开始接收消息...
消费者1收到的消息是:Hello RabbitMQ!:1
消费者1收到的消息是:Hello RabbitMQ!:3
消费者1收到的消息是:Hello RabbitMQ!:4
消费者1收到的消息是:Hello RabbitMQ!:5
消费者1收到的消息是:Hello RabbitMQ!:6
消费者1收到的消息是:Hello RabbitMQ!:8
消费者1收到的消息是:Hello RabbitMQ!:9
消费者1收到的消息是:Hello RabbitMQ!:10
消费者1收到的消息是:Hello RabbitMQ!:11
消费者1收到的消息是:Hello RabbitMQ!:12
消费者1收到的消息是:Hello RabbitMQ!:14
消费者1收到的消息是:Hello RabbitMQ!:15
消费者1收到的消息是:Hello RabbitMQ!:16
消费者1收到的消息是:Hello RabbitMQ!:17
消费者1收到的消息是:Hello RabbitMQ!:18
*/

消费者2

package ml.guest997.work.fair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

public class ConsumerTwo {
    public static void main(String[] args) {
        //连接服务
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.64.136");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("消费者2");
            channel = connection.createChannel();
            final Channel channel2 = channel;
            channel2.basicQos(1);       // 同一时刻,服务器只会推送一条消息给消费者。
            channel2.basicConsume("queue1", false, (consumerTag, message) -> {
                try {
                    System.out.println("消费者2收到的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
                    Thread.sleep(1000);
                    channel2.basicAck(message.getEnvelope().getDeliveryTag(), false);       //一定要使用手动应答
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }, consumerTag -> System.out.println("消费者2接收消息失败..."));
            System.out.println("消费者2开始接收消息...");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
/*结果为
消费者2开始接收消息...
消费者2收到的消息是:Hello RabbitMQ!:2
消费者2收到的消息是:Hello RabbitMQ!:7
消费者2收到的消息是:Hello RabbitMQ!:13
消费者2收到的消息是:Hello RabbitMQ!:19
*/

可以从结果看出,由于消费者1的线程休眠时间较短,也就是应答时间较快,就使得其消费的消息更多,是按劳分配的。