深圳幻海软件技术有限公司 欢迎您!

RabbitMQ Bridge后台系统开发

2023-02-28

​​想了解更多关于开源的内容,请访问:​​​​51CTO 开源基础软件社区​​​​https://ost.51cto.com​​前言前面几篇文章已经简单写了关于RabbitMQ安装,使用,结合SpringBoot使用流程,有了前面的基础知识了,我们现在开始开发一个完整,可以直接使用到生产上

​​想了解更多关于开源的内容,请访问:​​

​​51CTO 开源基础软件社区​​

​​https://ost.51cto.com​​

前言

前面几篇文章已经简单写了关于RabbitMQ安装,使用,结合SpringBoot使用流程,有了前面的基础知识了,我们现在开始开发一个完整,可以直接使用到生产上的MQBridge后台系统,创建SpringBoot项目,这里就不详细说了,主要讲解MQBridge项目的开发过程,我画了一个流程图,整体说明MQBridge的功能和流程。

第一步项 目依赖

项目依赖哪些第三方包,项目结构说明:

<dependencies>
        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.

第二步 公共实体类和工具类

参数DefaultMessage说明:

@Data
@ToString
public class DefaultMessage implements Serializable {
    // 标识不同功能模块, 如: 订单、物流
    private String source;
    // 标识不同路由信息,如: 订单,物流
    private String action;
    // 参数
    private Map<String, Object> data;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.

返回类StandardResponse说明:

@Data
public class StandardResponse<T> implements Serializable {
    // 状态码
    private String statusCode;
    // 状态描述
    private String statusDesc;
    // 响应结果
    private Boolean success;
    // 返回内容
    private T data;
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

第三步 发送Controller

  1. 对外提供发送消息Controller:
@RestController
@Slf4j
public class BridgeController {
    @Autowired
    BridgeService bridgeService;
    @PostMapping("/sendMessage")
    public StandardResponse sendMessage(@RequestBody DefaultMessage message) {
        log.info("[sendMessage] params: " + message);
        return bridgeService.sendMessage(message);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

2. MQBridge服务接口与实现类:

/**
 * Bridge桥接发送消息接口
 */
public interface BridgeService {
    // MQ桥接发送消息
    StandardResponse sendMessage(DefaultMessage message);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
@Slf4j
@Service
public class BridgeServiceImpl implements BridgeService {
    /**
     * 缓存RabbitMQ消息生产者
     */
    @Autowired
    private Map<String, MessageSender> senderMap;

    @Override
    public StandardResponse sendMessage(DefaultMessage message) {
        log.info("[sendMessage] params: " + message);
        try {
            // 根据不同source调用不同的消息生产者
            return senderMap.get(message.getSource()).send(message);
        }catch (Exception e) {
            e.printStackTrace();
            log.info("[sendMessage] Error: " + e.getMessage());
            StandardResponse standardResponse = new StandardResponse();
            standardResponse.setSuccess(false);
            standardResponse.setStatusCode("101");
            standardResponse.setStatusDesc("Not Find Service.");
            return standardResponse;
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.

第四步 消息生产者接口与父类实现

/**
 * RabbitMQ消息生产接口
 */
public interface MessageSender {
    // 发送RabbitMQ消息
    StandardResponse send(DefaultMessage defaultMessage);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
/**
 * RabbitMQ消息生产父类
 */
@Slf4j
public class BaseMessageSender implements MessageSender {
    @Autowired
    RabbitTemplate rabbitTemplate;
    // 子类注入交换机名称
    protected String exchange;
    @Override
    public StandardResponse send(DefaultMessage defaultMessage) {
        StandardResponse standardResponse = new StandardResponse();
        try {
            log.info("{} Sender...",defaultMessage.getAction());
            // 根据参数Action,发送消息(交换机,路由主键, 参数)
            rabbitTemplate.convertAndSend(exchange, defaultMessage.getAction(), defaultMessage);
            standardResponse.setSuccess(true);
            standardResponse.setStatusCode("100");
            standardResponse.setStatusDesc("Send Success");
            return standardResponse;
        }catch (Exception e) {
            e.printStackTrace();
            log.error("convertAndSend Error: " + e.getMessage());
            standardResponse.setSuccess(false);
            standardResponse.setStatusCode("102");
            standardResponse.setStatusDesc("Send Error.");
            return standardResponse;
        }
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.

消息消费者接口:

/**
 * RabbitMQ消息消费接口
 */
public interface MessageHandler {
    // 处理RabbitMQ消息
    StandardResponse handle(DefaultMessage message);
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.

第五步 订单消息

订单交换机、队列、路由主键之间关系使用:

1. 创建一个订单交换机。

/**
 * 订单交换机
 */
@Slf4j
@Configuration
public class OrderTopicRabbitConfig {
    // 通过application.yml资源文件定义订单交换机名称,并引入
    @Value("${rabbitmq.exchange.order}")
    private String orderExchange;
    // 创建订单交换机,在生产者,消费者使用
    @Bean(name = "${rabbitmq.exchange.order}")
    public TopicExchange OrderTopicExchange() {
        log.info("[MQBridge.Order]主题交换机{}", orderExchange);
        return new TopicExchange(orderExchange,true,false);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

2. 订单消息生产者类,实例消息生产者类(测试使用)。

/**
 * 订单消息生产者
 */
@Slf4j
@Service("MQBridge.Order")
public class OrderSender extends BaseMessageSender {
    // 通过application.yml资源文件定义订单交换机名称,并引入
    @Value("${rabbitmq.exchange.order}")
    private String orderExchange;
    /**
     * 服务启动时,把订单交换机名称注入到父类变量
     */
    @PostConstruct
    public void init() {
        this.exchange = orderExchange;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
/**
 * 实例消息生产者
 */
@Slf4j
@Service("MQBridge.Sample")
public class SampleSender extends BaseMessageSender {
    // 通过application.yml资源文件定义订单交换机名称,并引入
    @Value("${rabbitmq.exchange.order}")
    private String orderExchange;
    /**
     * 服务启动时,把订单交换机名称注入到父类变量
     */
    @PostConstruct
    public void init() {
        this.exchange = orderExchange;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

3. 订单Email消费者,订单公共消费者(此消费在这里说明通配符交换机其中一使用)。

/**
 * 订单消息消费者
 */
@Slf4j
@Service
@RabbitListener(queues = "MQBridge_Order_Email")
public class OrderEmailHandler implements MessageHandler {
    // 定义订单队列名
    private static final String QUEUE_NAME = "MQBridge_Order_Email";
    // 创建订单队列对象
    @Bean(QUEUE_NAME)
    public Queue OrderTopicQueueEmail() {
        log.info("Order主题队列名{}", QUEUE_NAME);
        return new Queue(QUEUE_NAME,true);
    }
    // 订单队列对象与订单交换机绑定,路由主键为MQBridge.Order.Email
    @Bean
    Binding bindingOrderTopicEmail(@Qualifier(QUEUE_NAME) Queue queue,
                                   @Qualifier("${rabbitmq.exchange.order}") TopicExchange exchange) {
        log.info("Order主题队列绑定到上交换机,队列为{}", QUEUE_NAME);
        return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Order.Email");
    }
    /**
     * 消费订单Email通知消息
     * @param message
     * @return
     */
    @RabbitHandler
    @Override
    public StandardResponse handle(DefaultMessage message) {
        log.info("[OrderEmailHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Order.Email");
        log.info("[MQBridge_Order]-[MQBridge.Order.Email]消费参数:{}", message);
        return null;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
/**
 * 订单消息消费者
 */
@Slf4j
@Service
@RabbitListener(queues = "MQBridge_Order_Common")
public class OrderCommonlHandler implements MessageHandler {
    // 定义订单队列名
    private static final String QUEUE_NAME = "MQBridge_Order_Common";
    // 创建订单队列对象
    @Bean(QUEUE_NAME)
    public Queue OrderTopicQueueCommon() {
        log.info("Order主题队列名{}", QUEUE_NAME);
        return new Queue(QUEUE_NAME,true);
    }
    // 订单队列对象与订单交换机绑定,路由主键为MQBridge.Order.Email
    @Bean
    Binding bindingOrderTopicCommon(@Qualifier(QUEUE_NAME) Queue queue,
                                   @Qualifier("${rabbitmq.exchange.order}") TopicExchange exchange) {
        log.info("Order主题队列绑定到上交换机,队列为{}", QUEUE_NAME);
        return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Order.#");
    }
    /**
     * 消费订单公共通知消息
     * @param message
     * @return
     */
    @RabbitHandler
    @Override
    public StandardResponse handle(DefaultMessage message) {
        log.info("[OrderCommonlHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Order.#");
        log.info("[MQBridge_Order]-[MQBridge.Order.#]消费参数:{}", message);
        return null;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.

第六步 物流消息

物流交换机、队列、路由主键之间关系使用:

1. 创建一个物流交换机。

/**
 * 物流交换机
 */
@Slf4j
@Configuration
public class LogisticsTopicRabbitConfig {
    // 通过application.yml资源文件定义物流交换机名称,并引入
    @Value("${rabbitmq.exchange.logistics}")
    private String logisticsExchange;
    // 创建物流交换机,在生产者,消费者使用
    @Bean(name = "${rabbitmq.exchange.logistics}")
    public TopicExchange LogisticsTopicExchange() {
        log.info("[MQBridge.Logistics]主题交换机{}", logisticsExchange);
        return new TopicExchange(logisticsExchange,true,false);
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

2. 物流消息生产者:

/**
 * 物流消息生产者
 */
@Slf4j
@Service("MQBridge.Logistics")
public class LogisticsSender extends BaseMessageSender {
    // 通过application.yml资源文件定义物流交换机名称,并引入
    @Value("${rabbitmq.exchange.logistics}")
    private String logisticsExchange;
    /**
     * 服务启动时,把物流交换机名称注入到父类变量
     */
    @PostConstruct
    public void init() {
        this.exchange = logisticsExchange;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

3. 物流消息消费者:

/**
 * 物流消息消费者
 */
@Slf4j
@Service
@RabbitListener(queues = "MQBridge_Logistics_Email")
public class LogisticsEmailHandler implements MessageHandler {
    // 定义物流队列名
    private static final String QUEUE_NAME = "MQBridge_Logistics_Email";
    // 创建物流队列对象
    @Bean(QUEUE_NAME)
    public Queue LogisticsTopicQueue() {
        log.info("Logistics主题队列名{}", QUEUE_NAME);
        return new Queue(QUEUE_NAME,true);
    }
    // 物流队列对象与物流交换机绑定,路由主键为MQBridge.Logistics.Email
    @Bean
    Binding bindingLogisticsTopic(@Qualifier(QUEUE_NAME) Queue queue,
                                   @Qualifier("${rabbitmq.exchange.logistics}") TopicExchange exchange) {
        log.info("Logistics主题队列绑定到上交换机,队列为{}", QUEUE_NAME);
        return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Logistics.#");
    }
    /**
     * 消费物流Email通知消息
     * @param message
     * @return
     */
    @RabbitHandler
    @Override
    public StandardResponse handle(DefaultMessage message) {
        log.info("[LogisticsEmailHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Logistics.#");
        log.info("[MQBridge_Logistics_Email]消费参数:{}", message);
        return null;
    }
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.

第七步 小结

从项目结构和代码可以看出,第五步和第六步是不同的业务功能,我们通过传递参数Source不同,调用不同的业务逻辑功能,之后如果添加新的模块,就可以参考第五步或第六步就可以实现,然后调用时,参数Source指定为新值就可以;这里再说一下Source参数值与Service是怎么对应的,在LogisticsSender类,OrderSender类,SampleSender类上都有一个注解@Service(“MQBridge.Order”), 括号里的字符串就是对应参数的Source值了。

第八步  调试

1.  Source为MQBridge.Order调用。

2.  Source为MQBridge.Sample调用 。

3.  Source为MQBridge.Logistics调用 。

最后总结

从调试和打印日志可以看出MQBridge项目,可以很方便添加新功能,下图是调试的三次日志。

​​想了解更多关于开源的内容,请访问:​​

​​51CTO 开源基础软件社区​​

​​https://ost.51cto.com​​。