RabbitMQ 进阶(二)

发布于 11 天前  56 次阅读


Docker 搭建 RabbitMQ 集群

#创建三个 rabbitmq 目录,存储三个节点配置信息。(root 目录下创建的)
mkdir rabbitmq01 rabbitmq02 rabbitmq03

#创建 rabbitmq01 节点容器
docker run -d --hostname rabbitmq01 --name rabbitmq01 -v ~/rabbitmq01:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management

#创建 rabbitmq02 节点容器
docker run -d --hostname rabbitmq02 --name rabbitmq02 -v ~/rabbitmq02:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 --link rabbitmq01:rabbitmq01 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management

#创建 rabbitmq03 节点容器
docker run -d --hostname rabbitmq03 --name rabbitmq03 -v ~/rabbitmq03:/var/lib/rabbitmq -p 15674:15672 -p 5674:5672 --link rabbitmq01:rabbitmq01 --link rabbitmq02:rabbitmq02 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management

#进入 rabbitmq02 容器,将02节点加入到集群中。
docker exec -it rabbitmq02 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram [email protected]
rabbitmqctl start_app
exit

#进入 rabbitmq03 容器,将03节点加入到集群中。
docker exec -it rabbitmq03 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram [email protected]
rabbitmqctl start_app
exit

浏览器访问 ip:15672,出现如下页面就说明成功了。

RabbitMQ 进阶(二)插图

分布式事务

分布式事务指事务的操作位于不同的节点上,需要保证事务的 ACID 原则。

RabbitMQ 进阶(二)插图1

如上图所示,两个服务位于不同的节点上,使用的是不同的数据库,如果其中一个服务出现了问题,虽然是能实现当前数据的回滚,但是另一个服务的数据就无法回滚了。

准备工作

CREATE DATABASE `order_distribution`;
USE `order_distribution`;
CREATE TABLE `order_service` (
    `order_id` int(50) DEFAULT NULL,
    `user_id` int(50) DEFAULT NULL,
    `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL,
    `create_time` varchar(50) COLLATE utf8_bin DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
CREATE TABLE `dispacher_service` (
    `dispacher_id` int(50) DEFAULT NULL,
    `order_id` int(50) DEFAULT NULL,
    `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL,
    `create_time` varchar(50) COLLATE utf8_bin DEFAULT NULL,
    `user_id` int(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
CREATE TABLE `order_service_message` (
    `order_id` int(50) DEFAULT NULL,
    `status` int(50) DEFAULT NULL,
    `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL,
    `unique_id` int(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

可靠生产

RabbitMQ 进阶(二)插图2

添加依赖

新建 SpringBoot 项目,导入 Spring Web、Lombok、MySQL Driver、MyBatis Framework 和 Spring for RabbitMQ 依赖。最后再手动添加 fastjson 依赖。

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.78</version>
</dependency>

配置文件

server.port=8080
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/order_distribution?characterEncoding=utf8&useSSL=true
spring.datasource.username=root
spring.datasource.password=root
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#开启 confirmCallBack 确认模式
spring.rabbitmq.publisher-confirm-type=correlated

配置类

package ml.guest997.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfiguration {
    @Bean
    public FanoutExchange deadExchange() {
        return new FanoutExchange("dead_order_fanout_exchange", true, false);
    }

    @Bean
    public Queue deadOrderQueue() {
        return new Queue("dead.order.queue", true);
    }

    @Bean
    public Binding bindDeadOrder() {
        return BindingBuilder.bind(deadOrderQueue()).to(deadExchange());
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("order_fanout_exchange", true, false);
    }

    @Bean
    public Queue orderQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dead_order_fanout_exchange");
        return new Queue("order.queue", true, false, false, args);
    }

    @Bean
    public Binding bindorder() {
        return BindingBuilder.bind(orderQueue()).to(fanoutExchange());
    }

}

POJO 层

package ml.guest997.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    private int orderId;
    private int userId;
    private String orderContent;
    private String createTime;
}
package ml.guest997.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageOrder {
    private int orderId;
    private int status;
    private String orderContent;
    private int uniqueId;
}

Mapper 层

package ml.guest997.mapper;

import ml.guest997.pojo.MessageOrder;
import ml.guest997.pojo.Order;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;

@Repository
@Mapper
public interface OrderMapper {
    @Insert("insert into order_service values(#{orderId},#{userId},#{orderContent},#{createTime})")
    int saveOrder(Order order);

    @Insert("insert into order_service_message values(#{orderId},#{status},#{orderContent},#{uniqueId})")
    int saveOrderMessage(MessageOrder order);

    @Update("update order_service_message set status = 1 where order_id=#{orderId}")
    int updateStatus(int orderId);
}

Service 层

package ml.guest997.service;

import ml.guest997.mapper.OrderMapper;
import ml.guest997.pojo.MessageOrder;
import ml.guest997.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderDataBaseService {
    @Autowired
    private OrderMapper orderMapper;

    public int saveOrder(Order orderInfo) throws Exception {
        int i = orderMapper.saveOrder(orderInfo);
        //消息冗余表记录订单的发送状态
        orderMapper.saveOrderMessage(new MessageOrder(orderInfo.getOrderId(), 0, orderInfo.getOrderContent(), orderInfo.getUserId()));
        if (i != 1) {
            throw new Exception("保存订单失败");
        }
        return i;
    }

    public int updateStatus(int orderId) throws Exception {
        int i = orderMapper.updateStatus(orderId);
        if (i != 1) {
            throw new Exception("更新订单状态失败");
        }
        return i;
    }
}
package ml.guest997.service;

import com.alibaba.fastjson.JSON;
import ml.guest997.mapper.OrderMapper;
import ml.guest997.pojo.Order;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;

@Service
public class OrderMQService {
    @Autowired
    OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private OrderDataBaseService orderDataBaseService;

    //构造函数执行完成之后执行的方法
    @PostConstruct
    public void regCallback() {
        /*
         * 为容器创建好的 rabbitTemplate 注册 confirmCallback,消息由生产者投递到 Broker/Exchange 回调。
         * @param correlationData 发送消息时指定的唯一关联数据(消息 id)
         * @param ack 这个消息是否成功投递到 Exchange
         * @param cause 失败的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            System.out.println("cause:" + cause);
            String id = correlationData.getId();
            int orderId = Integer.parseInt(id);
            //如果 ack 为 true 表示消息已收到
            if (!ack) {
                System.out.println("订单信息投递到 mq 失败");
                return;
            }
            try {
                //更改订单状态
                int i = orderDataBaseService.updateStatus(orderId);
                if (i == 1) {
                    System.out.println("修改状态成功,成功投递到 mq。");
                }
            } catch (Exception e) {
                System.out.println("本地消息状态修改异常");
            }
        });
    }

    //在调用这个方法之前就会调用上面的方法。
    public void sendMessage(Order orderInfo) {
        String userJson = JSON.toJSONString(orderInfo);
        int orderId = orderInfo.getOrderId();
        String s = String.valueOf(orderId);
        rabbitTemplate.convertAndSend("order_fanout_exchange", "", userJson, new CorrelationData(s));
    }
}
package ml.guest997.service;

import ml.guest997.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MQOrderService {
    @Autowired
    private OrderDataBaseService orderDataBaseService;
    @Autowired
    private OrderMQService orderMQService;

    public void createOrder(Order orderInfo) throws Exception {
        //正常来说还会有个循环重试的任务类,毕竟消息投递到 mq 不一定一次就成功了,可能会有网络波动。
        orderDataBaseService.saveOrder(orderInfo);
        orderMQService.sendMessage(orderInfo);
    }

}

测试

package ml.guest997;

import ml.guest997.pojo.Order;
import ml.guest997.service.MQOrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class OrderserviceApplicationTests {

    @Autowired
    private MQOrderService mqOrderService;

    @Test
    void contextLoads() throws Exception {
        Order orderInfo = new Order();
        orderInfo.setOrderId(1000001);
        orderInfo.setUserId(1);
        orderInfo.setOrderContent("买了一盒方便面");
        mqOrderService.createOrder(orderInfo);
        System.out.println("订单创建成功.......");
    }

}
RabbitMQ 进阶(二)插图3

可靠消费

RabbitMQ 进阶(二)插图4

添加依赖

新建 SpringBoot 项目,导入 Spring Web、Lombok、MySQL Driver、MyBatis Framework 和 Spring for RabbitMQ 依赖。最后再手动添加 fastjson 依赖。

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.78</version>
</dependency>

配置文件

server.port=8081
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/order_distribution?characterEncoding=utf8&useSSL=true
spring.datasource.username=root
spring.datasource.password=root
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#开启手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
#重试间隔时间
spring.rabbitmq.listener.simple.retry.initial-interval=2000ms

POJO 层

package ml.guest997.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    private int orderId;
    private int userId;
    private String orderContent;
    private String createTime;
}
package ml.guest997.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Dispacher  {
    private int dispacherId;
    private int orderId;
    private String orderContent;
    private String createTime;
    private int userId;
}

Mapper 层

package ml.guest997.mapper;

import ml.guest997.pojo.Dispacher;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;

@Repository
@Mapper
public interface DispatcherMapper {
    @Insert("insert into dispacher_service values(#{dispacherId},#{orderId},#{orderContent},#{createTime},#{userId})")
    int insertUser(Dispacher dispacher);
}

Service 层

package ml.guest997.service;

import ml.guest997.mapper.DispatcherMapper;
import ml.guest997.pojo.Dispacher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class DispatcherService {
    @Autowired
    private DispatcherMapper dispatcherMapper;

    public void dispatcher(int orderId,String orderContent) throws Exception {
        Dispacher dispacher = new Dispacher();
        dispacher.setOrderId(orderId);
        dispacher.setDispacherId(orderId);
        dispacher.setOrderContent(orderContent);
        dispacher.setUserId(orderId);
        int i = dispatcherMapper.insertUser(dispacher);
        if (i != 1) {
            throw new Exception("配送订单创建失败");
        }
    }
}
package ml.guest997.service;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import ml.guest997.pojo.Order;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import java.io.IOException;

@Service
public class OrderConsumer {
    @Autowired
    private DispatcherService dispatcherService;
    private int count = 1;

    @RabbitListener(queues = "order.queue")
    public void messageConsumer(String orderMsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            System.out.println("收到 mq 的消息是:" + orderMsg + ",count = " + count++);
            Order order = JSONObject.parseObject(orderMsg, Order.class);
            int orderId = order.getOrderId();
            String orderContent = order.getOrderContent();
            dispatcherService.dispatcher(orderId, orderContent);
            System.out.println(1 / 0);              //出现异常
            channel.basicAck(tag, false);           //手动 ack 告诉 mq 消息已经正常消费
        } catch (Exception e) {
            /*
             * tag      消息的 tag
             * false    不会重发,会把消息打入到死信队列。死信队列的处理逻辑跟这个类是一样的。
             * requeue  true 会一直重发,如果使用 true 的话,就不要 try/catch,否则会造成死循环。
             */
            channel.basicNack(tag, false, false);
        }
    }
}

测试

启动 SpringBoot 项目,访问 ip:15672 下的 Queues 页面会发现死信队列出现了新的消息。

RabbitMQ 进阶(二)插图5