简介:
使用简单模式完成消息传递,作为RabbitMQ的入门程序。
步骤
创建工程 (生产者、消费者)
在项目中创建两个Maven模块(Modules)

添加依赖
<!-- RabbitMQ Java 客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
两个模块都需要在 pom.xml 中添加该依赖
模拟发送消息代码
1、建立连接:
ConnectionFactory factory = new ConnectionFactory();
2、设置参数:
//2.设置参数
factory.setHost("192.168.8.202");// ip 默认值localhost
factory.setPort(5672);// 端口,默认值5672
factory.setVirtualHost("vhost1");// 虚拟机,默认值/
factory.setUsername("hso");// 用户名,默认guest
factory.setPassword("123");// 密码,默认 guest
3、建立连接 Connection
//3.创建连接
ConnectionConnection connection = factory.newConnection();
4、建立Channel
//4.创建Channel
Channel channel = connection.createChannel();
5、创建队列:
//5.创建队列Queue
// 如果没有一个名字叫 hello_comsumer 的队列,则会创建一个
channel.queueDeclare("hello_comsumer",true,false,false,null);
队列创建有很多带参数构造器
进入到所在的类中,点击此处可以查看源码

查看开发者的注释信息

queue:队列名称 durable:是否持久化 exclusive: exclusive 为 true 时。只能有一个消费者监听该队列。 且Connection关闭时,会删除队列 autodelete:为 true 时,当没有Consumer时,会自动删除掉。 arguments:其他参数
6、发送消息:
//6.发送消息
String msg = "hello comsumer!";
channel.basicPublish("","hello_comsumer",null,msg.getBytes());
使用Channel类中的 用同样的方法查看源码中的参数
exchange:交换机名称。简单模式下交换机会使用默认的”“
routingKey:路由名称
props:配置信息
body:字节数组(发送的消息)
7、释放资源
//7.释放资源
channel.close();
connection.close();
测试生产者代码

运行程序,然后打开RabbitMQ管理台
查看服务器中的Queues,可以看到队列的信息
如果注释掉第七步释放资源的部分,在Connections和Channels中也可以看到相关内容。

查看和打开Consumer Count

可以发现当前队列存在已准备消息,但还没有消费者(还没有写消费者的代码)

模拟接受消息代码
1~5 同上
从第1步到第4步建立连接,代码可以复用。 不过要注意的是队列和生产者创建的队列是不同的。
6、接受消息
接收消息的实现,使用 channel 中的 basicConsume 方法
查看源码查看参数的注释
queue:队列名称
autoAck:是否自动确认
callback:回调的函数,可以自动执行一些方法
需要创建一个 Consumer 对象,Consumer 是一个接口,包含一个接收 Channel 对象作为参数的 DefaultConsumer 实现类,是空的实现类。
需要重写其中的回调函数方法。

//6.接收消息
Consumer consumer = new DefaultConsumer(channel){
/**
* 回调方法,接收到消息后,会自动执行此方法
* @param consumerTag 标识
* @param envelope 获取一些信息,交换机,路由key...
* @param properties 配置信息
* @param body 数据(内容)
* @throws IOException
*/ @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag : "+consumerTag);
System.out.println("envelope.getExchange() : "+envelope.getExchange());
System.out.println("envelope.getRoutingKey() : "+envelope.getRoutingKey());
System.out.println("properties" + properties);
System.out.println("body : "+new String(body));
}
};
channel.basicConsume("Q2",true,consumer);
消费者不需要关闭资源
消费者代码如下

测试消费者代码
运行消费者主函数 可以看到回调函数会被调用,并得到 Q1 队列中留存的 Message 结果

同时在 RabbitMQ中可以看到Q1队列中增加了一个消费者,并处理掉了消息

小结:
使用了简单模式,Producer 生产出要发送给 Consumer 处理的消息,Queue队列就相当于一个邮箱,存放着 P -> C 的资源。