想了解更多关于开源的内容,请访问:
51CTO 开源基础软件社区
https://ost.51cto.com
前言
前面几篇文章已经简单写了关于RabbitMQ安装,使用,结合SpringBoot使用流程,有了前面的基础知识了,我们现在开始开发一个完整,可以直接使用到生产上的MQBridge后台系统,创建SpringBoot项目,这里就不详细说了,主要讲解MQBridge项目的开发过程,我画了一个流程图,整体说明MQBridge的功能和流程。
第一步项 目依赖
项目依赖哪些第三方包,项目结构说明:
<dependencies>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
第二步 公共实体类和工具类
参数DefaultMessage说明:
@Data
@ToString
public class DefaultMessage implements Serializable {
// 标识不同功能模块, 如: 订单、物流
private String source;
// 标识不同路由信息,如: 订单,物流
private String action;
// 参数
private Map<String, Object> data;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
返回类StandardResponse说明:
public class StandardResponse<T> implements Serializable {
// 状态码
private String statusCode;
// 状态描述
private String statusDesc;
// 响应结果
private Boolean success;
// 返回内容
private T data;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
第三步 发送Controller
- 对外提供发送消息Controller:
public class BridgeController {
BridgeService bridgeService;
("/sendMessage")
public StandardResponse sendMessage( DefaultMessage message) {
log.info("[sendMessage] params: " + message);
return bridgeService.sendMessage(message);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
2. MQBridge服务接口与实现类:
/**
* Bridge桥接发送消息接口
*/
public interface BridgeService {
// MQ桥接发送消息
StandardResponse sendMessage(DefaultMessage message);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
@Slf4j
@Service
public class BridgeServiceImpl implements BridgeService {
/**
* 缓存RabbitMQ消息生产者
*/
@Autowired
private Map<String, MessageSender> senderMap;
@Override
public StandardResponse sendMessage(DefaultMessage message) {
log.info("[sendMessage] params: " + message);
try {
// 根据不同source调用不同的消息生产者
return senderMap.get(message.getSource()).send(message);
}catch (Exception e) {
e.printStackTrace();
log.info("[sendMessage] Error: " + e.getMessage());
StandardResponse standardResponse = new StandardResponse();
standardResponse.setSuccess(false);
standardResponse.setStatusCode("101");
standardResponse.setStatusDesc("Not Find Service.");
return standardResponse;
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
第四步 消息生产者接口与父类实现
/**
* RabbitMQ消息生产接口
*/
public interface MessageSender {
// 发送RabbitMQ消息
StandardResponse send(DefaultMessage defaultMessage);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
/**
* RabbitMQ消息生产父类
*/
public class BaseMessageSender implements MessageSender {
RabbitTemplate rabbitTemplate;
// 子类注入交换机名称
protected String exchange;
public StandardResponse send(DefaultMessage defaultMessage) {
StandardResponse standardResponse = new StandardResponse();
try {
log.info("{} Sender...",defaultMessage.getAction());
// 根据参数Action,发送消息(交换机,路由主键, 参数)
rabbitTemplate.convertAndSend(exchange, defaultMessage.getAction(), defaultMessage);
standardResponse.setSuccess(true);
standardResponse.setStatusCode("100");
standardResponse.setStatusDesc("Send Success");
return standardResponse;
}catch (Exception e) {
e.printStackTrace();
log.error("convertAndSend Error: " + e.getMessage());
standardResponse.setSuccess(false);
standardResponse.setStatusCode("102");
standardResponse.setStatusDesc("Send Error.");
return standardResponse;
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
消息消费者接口:
/**
* RabbitMQ消息消费接口
*/
public interface MessageHandler {
// 处理RabbitMQ消息
StandardResponse handle(DefaultMessage message);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
第五步 订单消息
订单交换机、队列、路由主键之间关系使用:
1. 创建一个订单交换机。
/**
* 订单交换机
*/
public class OrderTopicRabbitConfig {
// 通过application.yml资源文件定义订单交换机名称,并引入
("${rabbitmq.exchange.order}")
private String orderExchange;
// 创建订单交换机,在生产者,消费者使用
(name = "${rabbitmq.exchange.order}")
public TopicExchange OrderTopicExchange() {
log.info("[MQBridge.Order]主题交换机{}", orderExchange);
return new TopicExchange(orderExchange,true,false);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
2. 订单消息生产者类,实例消息生产者类(测试使用)。
/**
* 订单消息生产者
*/
("MQBridge.Order")
public class OrderSender extends BaseMessageSender {
// 通过application.yml资源文件定义订单交换机名称,并引入
("${rabbitmq.exchange.order}")
private String orderExchange;
/**
* 服务启动时,把订单交换机名称注入到父类变量
*/
public void init() {
this.exchange = orderExchange;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
/**
* 实例消息生产者
*/
("MQBridge.Sample")
public class SampleSender extends BaseMessageSender {
// 通过application.yml资源文件定义订单交换机名称,并引入
("${rabbitmq.exchange.order}")
private String orderExchange;
/**
* 服务启动时,把订单交换机名称注入到父类变量
*/
public void init() {
this.exchange = orderExchange;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
3. 订单Email消费者,订单公共消费者(此消费在这里说明通配符交换机其中一使用)。
/**
* 订单消息消费者
*/
(queues = "MQBridge_Order_Email")
public class OrderEmailHandler implements MessageHandler {
// 定义订单队列名
private static final String QUEUE_NAME = "MQBridge_Order_Email";
// 创建订单队列对象
(QUEUE_NAME)
public Queue OrderTopicQueueEmail() {
log.info("Order主题队列名{}", QUEUE_NAME);
return new Queue(QUEUE_NAME,true);
}
// 订单队列对象与订单交换机绑定,路由主键为MQBridge.Order.Email
Binding bindingOrderTopicEmail( (QUEUE_NAME) Queue queue,
("${rabbitmq.exchange.order}") TopicExchange exchange) {
log.info("Order主题队列绑定到上交换机,队列为{}", QUEUE_NAME);
return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Order.Email");
}
/**
* 消费订单Email通知消息
* @param message
* @return
*/
public StandardResponse handle(DefaultMessage message) {
log.info("[OrderEmailHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Order.Email");
log.info("[MQBridge_Order]-[MQBridge.Order.Email]消费参数:{}", message);
return null;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
/**
* 订单消息消费者
*/
(queues = "MQBridge_Order_Common")
public class OrderCommonlHandler implements MessageHandler {
// 定义订单队列名
private static final String QUEUE_NAME = "MQBridge_Order_Common";
// 创建订单队列对象
(QUEUE_NAME)
public Queue OrderTopicQueueCommon() {
log.info("Order主题队列名{}", QUEUE_NAME);
return new Queue(QUEUE_NAME,true);
}
// 订单队列对象与订单交换机绑定,路由主键为MQBridge.Order.Email
Binding bindingOrderTopicCommon( (QUEUE_NAME) Queue queue,
("${rabbitmq.exchange.order}") TopicExchange exchange) {
log.info("Order主题队列绑定到上交换机,队列为{}", QUEUE_NAME);
return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Order.#");
}
/**
* 消费订单公共通知消息
* @param message
* @return
*/
public StandardResponse handle(DefaultMessage message) {
log.info("[OrderCommonlHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Order.#");
log.info("[MQBridge_Order]-[MQBridge.Order.#]消费参数:{}", message);
return null;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
第六步 物流消息
物流交换机、队列、路由主键之间关系使用:
1. 创建一个物流交换机。
/**
* 物流交换机
*/
public class LogisticsTopicRabbitConfig {
// 通过application.yml资源文件定义物流交换机名称,并引入
("${rabbitmq.exchange.logistics}")
private String logisticsExchange;
// 创建物流交换机,在生产者,消费者使用
(name = "${rabbitmq.exchange.logistics}")
public TopicExchange LogisticsTopicExchange() {
log.info("[MQBridge.Logistics]主题交换机{}", logisticsExchange);
return new TopicExchange(logisticsExchange,true,false);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
2. 物流消息生产者:
/**
* 物流消息生产者
*/
("MQBridge.Logistics")
public class LogisticsSender extends BaseMessageSender {
// 通过application.yml资源文件定义物流交换机名称,并引入
("${rabbitmq.exchange.logistics}")
private String logisticsExchange;
/**
* 服务启动时,把物流交换机名称注入到父类变量
*/
public void init() {
this.exchange = logisticsExchange;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
3. 物流消息消费者:
/**
* 物流消息消费者
*/
(queues = "MQBridge_Logistics_Email")
public class LogisticsEmailHandler implements MessageHandler {
// 定义物流队列名
private static final String QUEUE_NAME = "MQBridge_Logistics_Email";
// 创建物流队列对象
(QUEUE_NAME)
public Queue LogisticsTopicQueue() {
log.info("Logistics主题队列名{}", QUEUE_NAME);
return new Queue(QUEUE_NAME,true);
}
// 物流队列对象与物流交换机绑定,路由主键为MQBridge.Logistics.Email
Binding bindingLogisticsTopic( (QUEUE_NAME) Queue queue,
("${rabbitmq.exchange.logistics}") TopicExchange exchange) {
log.info("Logistics主题队列绑定到上交换机,队列为{}", QUEUE_NAME);
return BindingBuilder.bind(queue).to(exchange).with("MQBridge.Logistics.#");
}
/**
* 消费物流Email通知消息
* @param message
* @return
*/
public StandardResponse handle(DefaultMessage message) {
log.info("[LogisticsEmailHandler] Queue: {}, RoutingKey: {}", QUEUE_NAME, "MQBridge.Logistics.#");
log.info("[MQBridge_Logistics_Email]消费参数:{}", message);
return null;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
第七步 小结
从项目结构和代码可以看出,第五步和第六步是不同的业务功能,我们通过传递参数Source不同,调用不同的业务逻辑功能,之后如果添加新的模块,就可以参考第五步或第六步就可以实现,然后调用时,参数Source指定为新值就可以;这里再说一下Source参数值与Service是怎么对应的,在LogisticsSender类,OrderSender类,SampleSender类上都有一个注解@Service(“MQBridge.Order”), 括号里的字符串就是对应参数的Source值了。
第八步 调试
1. Source为MQBridge.Order调用。
2. Source为MQBridge.Sample调用 。
3. Source为MQBridge.Logistics调用 。
最后总结
从调试和打印日志可以看出MQBridge项目,可以很方便添加新功能,下图是调试的三次日志。
想了解更多关于开源的内容,请访问:
51CTO 开源基础软件社区
https://ost.51cto.com。