RabbitMQ 基础(三)

发布于 17 天前  65 次阅读


整合 SpringBoot 框架

添加依赖

新建 SpringBoot 项目,导入 Spring Web 和 Spring for RabbitMQ 依赖。

配置文件

server:
  port: 8080

spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 192.168.64.136
    port: 5672

模拟订单业务(发布订阅模式)

配置类

package ml.guest997.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfiguration {

    //声明交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    //声明队列
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }

    //声明队列
    @Bean
    public Queue emailQueue() {
        return new Queue("email.fanout.queue", true);
    }

    //确定绑定关系
    @Bean
    public Binding bindSms() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    //确定绑定关系
    @Bean
    public Binding bindEmail() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

}

生产者

package ml.guest997.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrder(String userId, String productId, int num) {
        System.out.println("保存订单成功,用户 id 是:" + userId + ",订单 id 是:" + productId + ",产品数量是:" + num);
        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, productId);
    }

}

消费者

package ml.guest997.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues ={"email.fanout.queue"})     //消费者监听相应的队列
public class EmailConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("email 接收到了订单消息,订单 id 是: " + message);
    }
}
package ml.guest997.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(queues ={"sms.fanout.queue"})     //消费者监听相应的队列
public class SmsConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("sms 接收到了订单消息,订单 id 是: " + message);
    }
}

发送订单

package ml.guest997;

import ml.guest997.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqSpringbootApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {
        orderService.makeOrder("1", "2", 3);
    }

}
//结果为 保存订单成功,用户 id 是:1,订单 id 是:2,产品数量是:3

测试

启动 SpringBoot 项目,发现控制台输出了下面的结果。

email 接收到了订单消息,订单 id 是: 2
sms 接收到了订单消息,订单 id 是: 2

由于其它模式与上面的代码大部分一样,就不再赘述了。

注解实现

将上面的配置类删掉,使用注解实现上面的业务。与上面不同的是,就消费者的代码改变了。

package ml.guest997.service;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "email.fanout.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "fanout_order_exchange",type = "ExchangeTypes.FANOUT")
))
public class EmailConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("email 接收到了订单消息,订单 id 是: " + message);
    }
}
package ml.guest997.service;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Service;

@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "sms.fanout.queue",durable = "true",autoDelete = "false"),
        exchange = @Exchange(value = "fanout_order_exchange",type = "ExchangeTypes.FANOUT")
))
public class SmsConsumer {
    @RabbitHandler
    public void reviceMessage(String message){
        System.out.println("sms 接收到了订单消息,订单 id 是: " + message);
    }
}