消息队列-RabbitMQ
安装rabbitMQ
双击运行otp_win64_24.3.4.6.exe,点击下一步下一步即可,可以更改安装位置,需要注意路径不要有中文或空格
他的作用类似JDK,因为rabbitMQ是erlang语言开发的,所以需要环境支持
配置环境变量,在path中配置: 自己安装erlang的路径\bin
双击运行rabbitmq-server-3.10.1.exe,下一步即可,安装好后进入以下目录
在该目录下打开dos窗口,输入以下运行命令
1
| rabbitmq-plugins enable rabbitmq_management
|
启动结束后,访问:http://localhost:15672
用户名和密码都是:guest
rabbitMQ的基本概念
Exchange
接受生产者发送的消息,并根据binding规则将消息路由给服务器中的队列。
ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的类型有三种:direct、fanout和topic。
Message Queue
消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到对应的queue则数据会丢失),等待消费者来取。
Binding Key
它表示的是Exchange和Message Queue是通过binding key进行联系的,这个关系是固定。
Routing Key
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与ExchangeType及binding key联合使用才能生效,我们的生产者只需要通过指定routing key来决定消息流向哪里
RabbitMQ六种工作模式
创建一个RabbitMQ的项目,用来学习rabbitmq,需要添加依赖
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.12.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.4</version> </dependency>
|
简单模式
只有一个消费者
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package cn.tedu.rabbitmq.simple;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class Producer { public static void main(String[] args)throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel();
cc.queueDeclare("helloworld",false,false,false,null); String msg = "hello world" + System.currentTimeMillis();
cc.basicPublish("","helloworld",null,msg.getBytes()); nc.close(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package cn.tedu.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.queueDeclare("helloworld",false,false,false,null); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到了消息:"+msg); System.out.println("==========================="); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException {
} }; cc.basicConsume("helloworld",true,deliverCallback,cancelCallback); } }
|
工作队列模式
多个消费者,从同一个队列中接受消息
负载均衡,消息会轮询发送给所有消费者
合理的分发消息
-
手动ack
消息回执
向服务器发送一个通知,告诉服务器一条消息已经处理完毕
服务器可以通过ack,知道消费者是空闲还是繁忙
-
qos=1
每次抓取的消息数量
消息处理完毕之前,不会抓取新消息
手动ack模式下才有效
消息持久化:在服务器端,持久保存消息,避免因为服务器宕机而造成消息丢失
- 队列持久化: cc.queueDeclare(“队列名”,true,…)
- 消息持久化: cc.basocPublish(“”,“队列名”,MessageProperties.PERSISTENT_TEXT_PLAIN,消息)
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package cn.tedu.rabbitmq.work;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class Producer { public static void main(String[] args)throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel();
cc.queueDeclare("work_queue",false,false,false,null); while(true){ System.out.print("输入消息:"); String msg = new Scanner(System.in).nextLine(); if (msg.equals("quit")) break; cc.basicPublish("","work_queue",null,msg.getBytes()); } nc.close(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| package cn.tedu.rabbitmq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.queueDeclare("work_queue",false,false,false,null); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到了消息:"+msg); for (int i = 0; i < msg.length(); i++) { if(msg.charAt(i)=='.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("消息处理完毕"); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException {
} }; cc.basicConsume("work_queue",true,deliverCallback,cancelCallback); } }
|
合理分发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package cn.tedu.rabbitmq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.queueDeclare("work_queue",false,false,false,null); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到了消息:"+msg); for (int i = 0; i < msg.length(); i++) { if(msg.charAt(i)=='.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("消息处理完毕"); cc.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException {
} }; cc.basicQos(1); cc.basicConsume("work_queue",false,deliverCallback,cancelCallback); } }
|
持久化
如果消息保存在服务器端的内存中,是不安全的,如果服务器宕机,信息会丢失
1 2
| 停止rabbitmq服务:rabbitmq-service stop或者rabbitmqctl stop 启动rabbitmq服务:rabbitmq-service start
|
消息数据持久化、消息队列持久化
但是需要注意持久化队列时,不能直接修改队列属性,因为队列一定创建属性就固定了,不可以进行修改
可以选择新建一个别名队列或者删除该队列重新创建
队列持久化:cc.queueDeclare(“task_queue”,true,false,false,null);
数据持久化:cc.basicPublish(“”,“task_queue”, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
发布订阅模式
把消息群发给所有消费者,同一条消息所有消费者都可以收到
fanout类型的交换机
生产者:定义交换机,向交换机发送数据
消费者:定义交换机,定义自已独占的非持久的自动删除随机队列,绑定,正常从随机队列中接受数据
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package cn.tedu.rabbitmq.publishsubscribe;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class Producer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.exchangeDeclare("ps_exchange","fanout"); while (true){ System.out.print("输入消息:"); String msg = new Scanner(System.in).nextLine(); if (msg.equals("quit")) break; cc.basicPublish("ps_exchange","",null,msg.getBytes()); } nc.close(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package cn.tedu.rabbitmq.publishsubscribe;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.exchangeDeclare("ps_exchange","fanout"); String queue = cc.queueDeclare().getQueue();
cc.queueBind(queue,"ps_exchange","");
DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到:"+msg); System.out.println("消息处理完毕"); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException {
} }; cc.basicConsume(queue,true,deliverCallback,cancelCallback); } }
|
路由模式
通过关键字匹配,来决定把消息发送到哪个队列
生产者:定义direct类型的交换机,向交换机发送数据并携带路由键
消费者:定义交换机,定义队列,用绑定键来绑定交换机和队列,正常接收消息
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package cn.tedu.rabbitmq.route;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class Producer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.exchangeDeclare("route_exchange", BuiltinExchangeType.DIRECT); while(true){ Scanner scanner = new Scanner(System.in); System.out.print("输入消息:"); String msg = scanner.nextLine(); System.out.print("路由键:"); String routingKey = scanner.nextLine(); if(msg.equals("quit")) break; cc.basicPublish("route_exchange",routingKey,null,msg.getBytes()); System.out.println("消息发送成功"); } nc.close(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package cn.tedu.rabbitmq.route;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Scanner;
public class Consumer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.exchangeDeclare("route_exchange", BuiltinExchangeType.DIRECT); String queue = cc.queueDeclare().getQueue(); System.out.print("输入绑定键,用空格隔开:"); String bindingKeys = new Scanner(System.in).nextLine(); String[] keys = bindingKeys.split("\\s+"); for (String bindingKey : keys) { cc.queueBind(queue,"route_exchange",bindingKey); } DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { String msg = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(msg+"-"+routingKey); System.out.println("===================="); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException {
} }; cc.basicConsume(queue,true,deliverCallback,cancelCallback); } }
|
主题模式
和路由模式相同,具有特殊的关键字规则
topic类型的交换机实现这种特殊路由规则
aaa.bbb.ccc.ddd
*.ccc.ddd.eee
#.ddd
"*"可以通配单个单词
"#"可以通配零个或多个单词
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package cn.tedu.rabbitmq.topic;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class Producer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC); while(true){ Scanner scanner = new Scanner(System.in); System.out.print("输入消息:"); String msg = scanner.nextLine(); System.out.print("路由键:"); String routingKey = scanner.nextLine(); if(msg.equals("quit")) break; cc.basicPublish("topic_exchange",routingKey,null,msg.getBytes()); System.out.println("消息发送成功"); } nc.close(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package cn.tedu.rabbitmq.topic;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.Scanner;
public class Consumer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC); String queue = cc.queueDeclare().getQueue(); System.out.print("输入绑定键,用空格隔开:"); String bindingKeys = new Scanner(System.in).nextLine(); String[] keys = bindingKeys.split("\\s+"); for (String bindingKey : keys) { cc.queueBind(queue,"topic_exchange",bindingKey); } DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { String msg = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(msg+"-"+routingKey); System.out.println("===================="); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume(queue,true,deliverCallback,cancelCallback); } }
|
RPC模式
实现原理
-
两个队列
调用队列
返回队列:每个客户端,都需要有自己的返回队列
-
返回队列的队列名
在调用消息数据中,携带返回队列名
根据返回队列名,向正确的返回队列来发送计算结果
-
关联ID
用来匹配计算结果和调用
如果连续发送多次调用,获得计算结果时,需要直到这个节骨欧式按一次调用的结果
客户端发送调用时,携带一个关联ID
服务器端返回结果时,也携带这个关联ID
客户端:多线程异步处理结果
-
主线程
发送调用信息
需要计算结果时,要取结果
-
次线程:等待接受结果
-
线程之间传递数据,可以使用BlockingQueue
集合工具
这个集合中,添加了线程的等待和通知
如果没有数据,取数据时会暂停等待
有多个子类:比如ArrayBlockingQueue
服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package cn.tedu.rabbitmq.rpc;
import com.rabbitmq.client.*; import com.rabbitmq.client.AMQP.BasicProperties;
import java.io.IOException;
public class RPCServer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.queueDeclare("rpc_queue",false,false,false,null); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { String msg = new String(delivery.getBody()); String replyTo = delivery.getProperties().getReplyTo(); String correlationId = delivery.getProperties().getCorrelationId(); long fbnqs = fbnqs(Integer.parseInt(msg)); BasicProperties basicProperties = new BasicProperties.Builder().correlationId(correlationId).build(); cc.basicPublish("",replyTo,basicProperties,(""+fbnqs).getBytes()); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException {
} }; cc.basicConsume("rpc_queue",true,deliverCallback,cancelCallback);
} static long fbnqs(int n){ if(n==1 || n==2) return 1; return fbnqs(n-1)+fbnqs(n-2); } }
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| package cn.tedu.rabbitmq.rpc;
import com.rabbitmq.client.*; import com.rabbitmq.client.AMQP.BasicProperties;
import java.io.IOException; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue;
public class RPCClient { static BlockingQueue<Long> q = new ArrayBlockingQueue<Long>(10); public static void main(String[] args) throws Exception{ System.out.print("输入求第几个斐波那契数:"); int n = new Scanner(System.in).nextInt(); long fbnqs = fbnqs(n); System.out.println("第"+n+"个的斐波那契数是:"+fbnqs); } private static long fbnqs(int n) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); cc.queueDeclare("rpc_queue",false,false,false,null); String replyTo = cc.queueDeclare().getQueue(); String cid = UUID.randomUUID().toString(); BasicProperties basicProperties = new BasicProperties.Builder() .replyTo(replyTo) .correlationId(cid) .build(); cc.basicPublish("","rpc_queue",basicProperties,(""+n).getBytes()); System.out.println("调用消息已经发送"); System.out.println("模拟执行其他运算,不立即等待计算结果"); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { if(cid.equals(delivery.getProperties().getCorrelationId())){ String msg = new String(delivery.getBody()); long fbnqs = Integer.parseInt(msg); q.offer(fbnqs); cc.getConnection().close(); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException {
} }; cc.basicConsume(replyTo,true,deliverCallback,cancelCallback); return q.take(); } }
|