简介
使用SpringBoot + RabbitMQ 实现需求: 通过 在HTML页面中填写表单并提交 发起 HTTP 请求来调用生产者方法发送消息 格式如 test.orange.test 并将Message输出打印到控制台上。
Topic 程序
配置类
在 spring框架的amcq中,提供了 队列,交换机和绑定关系的构建方法(Builder) 在使用 SpringBoot 提供 RabbitMQ 服务时,活用构建器可以简化开发流程。
Spring框架的amqp.core包提供了如下构建器:
QueueBuilder
ExchangeBuilder
BindindBuilder
以点分隔接收参数,队列与交换机以 build()
结尾
绑定关系Binding以 noargs()
或者 and()
结尾
在构建过程中可以查看后续方法的类型。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* */
@Configuration
public class TopicConfig {
// 1.交换机 标识使用@Qualify注解调用这个bean
@Bean("topicExchange")
public Exchange topicExchange(){
// 创建一个 topic类型 的交换机
return ExchangeBuilder.topicExchange("topicExchange").durable(false).build();
}
// 2.队列
@Bean("topicQueue1")
public Queue topicQueue1(){
return QueueBuilder.durable("topicQueue1").build();
}
@Bean("topicQueue2")
public Queue topicQueue2(){
return QueueBuilder.durable("topicQueue2").build();
}
// 3.绑定Topic关系
@Bean
public Binding Binding1(@Qualifier("topicQueue1") Queue queue,@Qualifier("topicExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("*.orange.*").noargs();
}
@Bean
public Binding binding2(@Qualifier("topicQueue2") Queue queue,@Qualifier("topicExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("*.*.rabbit").noargs();
}
@Bean
public Binding binding3(@Qualifier("topicQueue2") Queue queue,@Qualifier("topicExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("lazy.#").noargs();
}
}
HTML 页面
准备一个 HTML 页面,包含routingKey
Message
的表单。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Topic Test</title>
</head>
<body>
<form action="http://localhost:8080/topic" method="get" >
routing key: <input type="text" name="routingKey">
message : <input type="text" name="message"></input>
<input type="submit" value="提交">
</form></body>
</html>
<!--
form 表单
action发送表单中的数据到action的url里,
url为空时发送到当前页面
method
get 有url长度限制
post发送到f12中网络请求中,无网络限制
input
type
text 文本类型,可操作
submit 按钮类型,可点击
name
是必须的属性
value
框框中的值
-->
在浏览器中打开的效果如下
生产者
新建生产者类 ropicProducer
先创建一个Spring框架中的 RabbitTemplate
实现类
该类已装载在 Bean 容器中,设置AutoWired自动获取该对象
然后提供一个 sendMessage
方法,包含 routingKey
和 message
形参,方便外部传参调用。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class TopicProducer {
// 创建一个rabbitTemplate对象
private RabbitTemplate rabbitTemplate;
private String exchangeName = "topicExchange";
@Autowired
public TopicProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
// sendMessage 方法
public void sendMessage(String routingKey,String message) {
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
}
}
控制器
新建控制器 TopicController
用于响应HTTP请求,并执行生产者发送消息的 sendMessage
方法
创建一个生产者对象 topicProducer
用以调用发送消息的方法
并提供一个 GetMapping
方法, 在接收到HTTP页面发来的请求后, 将表单中包含的 routingKey
和 Message
传给 sendMessage()
方法
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TopicController {
private final TopicProducer topicProducer;
public TopicController(TopicProducer topicProducer) {
this.topicProducer = topicProducer;
}
@GetMapping("/topic")
public String sendMessage(String routingKey,String message){
topicProducer.sendMessage(routingKey,message);
return "routingKey: "+routingKey+"\nmessage: "+message;
}
}
测试效果
将主程序运行起来, 打开 html 页面, 通过表单提交三条测试信息和一条无关信息.
在控制台中可以大致确认信息达到了队列
消费者
新建两个消费者类, 分别用以处理两个队列的消息 并通过@RabbitListener 注解监听队列, 当取到队列的信息时, 会将参数传入注解下的方法并自动调用
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer1 {
private final String queueName = "topicQueue1";
@RabbitListener(queues = queueName)
public void recv(String message) {
System.out.println("C1 receive message : " + message);
}
}
消费者2中的 QueueName
改为 topicQueue2
控制台输出信息中的 C1 改为 C2 加以区分
运行
运行代码,拿到并处理了队列中积压的信息
打开 html 页面再测试几组数据,返回了正确的结果