RabbitMQ+Java入门

简介:

使用简单模式完成消息传递,作为RabbitMQ的入门程序。

步骤

创建工程 (生产者、消费者)

在项目中创建两个Maven模块(Modules) image.png

添加依赖

<!--  RabbitMQ Java 客户端  -->  
<dependency>  
    <groupId>com.rabbitmq</groupId>  
    <artifactId>amqp-client</artifactId>  
    <version>5.6.0</version>  
</dependency>

两个模块都需要在 pom.xml 中添加该依赖

模拟发送消息代码

1、建立连接:

ConnectionFactory factory = new ConnectionFactory();

2、设置参数:

//2.设置参数  
factory.setHost("192.168.8.202");// ip 默认值localhost  
factory.setPort(5672);// 端口,默认值5672  
factory.setVirtualHost("vhost1");// 虚拟机,默认值/  
factory.setUsername("hso");// 用户名,默认guest
factory.setPassword("123");// 密码,默认 guest

3、建立连接 Connection

//3.创建连接 
ConnectionConnection connection = factory.newConnection();

4、建立Channel

//4.创建Channel
Channel channel = connection.createChannel();

5、创建队列:

//5.创建队列Queue  
// 如果没有一个名字叫 hello_comsumer 的队列,则会创建一个  
channel.queueDeclare("hello_comsumer",true,false,false,null);

队列创建有很多带参数构造器 进入到所在的类中,点击此处可以查看源码 image.png

查看开发者的注释信息 image.png

queue:队列名称 durable:是否持久化 exclusive: exclusive 为 true 时。只能有一个消费者监听该队列。 且Connection关闭时,会删除队列 autodelete:为 true 时,当没有Consumer时,会自动删除掉。 arguments:其他参数

6、发送消息:

//6.发送消息  
String msg = "hello comsumer!";  
channel.basicPublish("","hello_comsumer",null,msg.getBytes());

使用Channel类中的 用同样的方法查看源码中的参数

image.png exchange:交换机名称。简单模式下交换机会使用默认的”“ routingKey:路由名称 props:配置信息 body:字节数组(发送的消息)

7、释放资源

//7.释放资源
channel.close();
connection.close();

测试生产者代码

image.png

运行程序,然后打开RabbitMQ管理台 查看服务器中的Queues,可以看到队列的信息 如果注释掉第七步释放资源的部分,在Connections和Channels中也可以看到相关内容。 image.png

查看和打开Consumer Count image.png

可以发现当前队列存在已准备消息,但还没有消费者(还没有写消费者的代码) image.png

模拟接受消息代码

1~5 同上

从第1步到第4步建立连接,代码可以复用。 不过要注意的是队列和生产者创建的队列是不同的。

6、接受消息

接收消息的实现,使用 channel 中的 basicConsume 方法

查看源码查看参数的注释 image.png queue:队列名称 autoAck:是否自动确认 callback:回调的函数,可以自动执行一些方法

需要创建一个 Consumer 对象,Consumer 是一个接口,包含一个接收 Channel 对象作为参数的 DefaultConsumer 实现类,是空的实现类。 需要重写其中的回调函数方法。 image.png

//6.接收消息  
Consumer consumer = new DefaultConsumer(channel){  
    /**  
     * 回调方法,接收到消息后,会自动执行此方法  
     * @param consumerTag 标识  
     * @param envelope 获取一些信息,交换机,路由key...  
     * @param properties 配置信息  
     * @param body 数据(内容)  
     * @throws IOException  
     */    @Override  
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
        System.out.println("consumerTag : "+consumerTag);  
        System.out.println("envelope.getExchange() : "+envelope.getExchange());  
        System.out.println("envelope.getRoutingKey() : "+envelope.getRoutingKey());  
        System.out.println("properties" + properties);  
        System.out.println("body : "+new String(body));  
    }  
};  
channel.basicConsume("Q2",true,consumer);

消费者不需要关闭资源

消费者代码如下

image.png

测试消费者代码

运行消费者主函数 可以看到回调函数会被调用,并得到 Q1 队列中留存的 Message 结果

image.png

同时在 RabbitMQ中可以看到Q1队列中增加了一个消费者,并处理掉了消息 image.png

小结:

使用了简单模式,Producer 生产出要发送给 Consumer 处理的消息,Queue队列就相当于一个邮箱,存放着 P -> C 的资源。

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇