RabbitMQ 实战教程
https://gitee.com/fakerlove/rabbitmq
RabbitMQ 实战教程
1.MQ引言
1.1 什么是MQ
MQ
(Message Quene) : 翻译为 消息队列
,通过典型的 生产者
和消费者
模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件
通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
1.2 MQ有哪些
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ
、RabbitMQ
,炙手可热的Kafka
,阿里巴巴自主开发RocketMQ
等。
1.3 不同MQ特点
# 1.ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
# 2.Kafka
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,
追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,
适合产生大量数据的互联网服务的数据收集业务。
# 3.RocketMQ
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起
源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消
息推送、日志流式处理、binglog分发等场景。
# 4.RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和
发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在
其次。
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
2.RabbitMQ 的安装
2.1 RabbitMQ
简介
基于
AMQP
协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
官网
: https://www.rabbitmq.com/
官方教程
: https://www.rabbitmq.com/#getstarted
# AMQP 协议
AMQP(advanced message queuing protocol)`在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:
应用场景
RabbitMQ除了像兔子一样跑的很快以外,还有这些特点:
- 开源、性能优秀,稳定性保障
- 提供可靠性消息投递模式、返回模式
- 与Spring AMQP完美整合,API丰富
- 集群模式丰富,表达式配置,HA模式,镜像队列模型
- 保证数据不丢失的前提做到高可靠性、可用性
MQ典型应用场景:
- 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
- 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
- 日志处理
- 应用解耦。假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。
AMQP协议
提到RabbitMQ,就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
先了解一下AMQP协议中间的几个重要概念:
- Server:接收客户端的连接,实现AMQP实体服务。
- Connection:连接,应用程序与Server的网络连接,TCP连接。
- Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
- Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
- Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
- Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
- Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
- RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
- Queue:消息队列,用来保存消息,供消费者消费。
我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢?
试想这样一个场景, 一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用 TCP 连接复用的方式,不仅可以减少性能开销,同时也便于管理 。
下图是AMQP的协议模型:
正如图中所看到的,AMQP协议模型有三部分组成:生产者、消费者和服务端。
生产者是投递消息的一方,首先连接到Server,建立一个连接,开启一个信道;然后生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。同理,消费者也需要进行建立连接,开启信道等操作,便于接收消息。
接着生产者就可以发送消息,发送到服务端中的虚拟主机,虚拟主机中的交换器根据路由键选择路由规则,然后发送到不同的消息队列中,这样订阅了消息队列的消费者就可以获取到消息,进行消费。
最后还要关闭信道和连接。
RabbitMQ是基于AMQP协议实现的,其结构如下图所示,和AMQP协议简直就是一模一样。
常用交换器
RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种。
Direct Exchange
该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。
Topic Exchange
该类型的交换器将所有发送到Topic Exchange的消息被转发到所有RoutingKey中指定的Topic的队列上面。
Exchange将RoutingKey和某Topic进行模糊匹配,其中“”用来匹配一个词,“#”用于匹配一个或者多个词。例如“com.#”能匹配到“com.rabbitmq.oa”和“com.rabbitmq”;而"login."只能匹配到“com.rabbitmq”。
Fanout Exchange
该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好。
Headers Exchange
该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用。
2.2 RabbitMQ 的安装
2.2.1 下载
rabbitmq 是基于 erlang 编程语言的,所以需要环境的
首先注意系统版本,自己的服务器是linux 还是windows
下面的网址是下载erlang 的
https://www.erlang-solutions.com/resources/download.html
下面的比较慢
https://www.erlang.org/downloads
下面是下载rabbitmq 的
https://www.rabbitmq.com/download.html
最新版本
: 3.7.18
rabbit 如果能够运行,需要两个东西 erlang 和socket 的包 2.2.2 下载的安装包
注意
:这里的安装包是centos7安装的包
2.2.3 安装步骤
1.将rabbitmq安装包上传到linux系统中
erlang-22.0.7-1.el7.x86_64.rpm
rabbitmq-server-3.7.18-1.el7.noarch.rpm
2.安装依赖包
rpm安装方式
就需要三个rpm 格式的东西
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
yum 安装方式
- 安装erlang需要的依赖环境
# 添加仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
Detected operating system as centos/8.
# 安装erlang
dnf install erlang
- 安装socat
wget http://www.dest-unreach.org/socat/download/socat-1.7.0.1.tar.gz
tar -zxvf socat-1.7.0.1.tar.gz
cd socat-1.7.0.1
./configure --disable-fips
make && make install
# 如果是centos 8
http://www.dest-unreach.org/socat/download/socat-1.7.4.0.tar.gz
- 安装 logrotate
yum -y install logrotate
问题
centos 7 的socat.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
二、导入密钥
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
三、在/etc/yum.repos.d目录下添加rabbitmq.repo文件,内容如下:
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/8/
gpgcheck=0
repo_gpgcheck=0
enabled=1
centos7 的配置
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
3.安装RabbitMQ安装包(需要联网)
yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm
4.复制配置文件
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
注意:默认安装完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目录中,需要 将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config
5.查看配置文件位置
ls /etc/rabbitmq/rabbitmq.config
6.修改配置文件(参见下图:)
vim /etc/rabbitmq/rabbitmq.config
将上图中配置文件中红色部分去掉%%
,以及最后的,
逗号 修改为下图:
7.执行如下命令,启动rabbitmq中的插件管理
rabbitmq-plugins enable rabbitmq_management
出现如下说明: Enabling plugins on node rabbit@localhost: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@localhost... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch
set 3 plugins. Offline change; changes will take effect at broker restart.
8.启动RabbitMQ的服务
systemctl start rabbitmq-server
systemctl restart rabbitmq-server
systemctl stop rabbitmq-server
9.查看服务状态(见下图:)
systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
Active: active (running) since 三 2019-09-25 22:26:35 CST; 7s ago
Main PID: 2904 (beam.smp)
Status: "Initialized"
CGroup: /system.slice/rabbitmq-server.service
├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -
MBlmbcs...
├─3220 erl_child_setup 32768
├─3243 inet_gethost 4
└─3244 inet_gethost 4
.........
10.关闭防火墙服务
systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
systemctl stop firewalld
11.访问web管理界面
http://10.15.0.8:15672/
12.登录管理界面
username: guest
password: guest
2.3 docker 安装 rabbitmq
下载镜像,并且运行
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
最新版本的话
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
访问网址
http://你的主机名(比如localhost):15672/#/
3. 配置
3.1 rabbitmq 所有命令
rabbitmq-defaults rabbitmq-env rabbitmq-queues rabbitmq-upgrade rabbitmqctl
rabbitmq-diagnostics rabbitmq-plugins rabbitmq-server rabbitmqadmin
有这么多命令
3.2 命令介绍
启动
rabbitmq-server
后台启动
rabbitmq-server -detached
查看状态
rabbitmqctl status
关闭
rabbitmqctl stop
所有ctl 指令
Help:
autocomplete Provides command name autocomplete variants
help Displays usage information for a command
version Displays CLI tools version
Nodes:
await_startup Waits for the RabbitMQ application to start on the target node
reset Instructs a RabbitMQ node to leave the cluster and return to its virgin state
rotate_logs Instructs the RabbitMQ node to perform internal log rotation
shutdown Stops RabbitMQ and its runtime (Erlang VM). Monitors progress for local nodes. Does not require a PID file path.
start_app Starts the RabbitMQ application but leaves the runtime (Erlang VM) running
stop Stops RabbitMQ and its runtime (Erlang VM). Requires a local node pid file path to monitor progress.
stop_app Stops the RabbitMQ application, leaving the runtime (Erlang VM) running
wait Waits for RabbitMQ node startup by monitoring a local PID file. See also 'rabbitmqctl await_online_nodes'
Cluster:
await_online_nodes Waits for <count> nodes to join the cluster
change_cluster_node_type Changes the type of the cluster node
cluster_status Displays all the nodes in the cluster grouped by node type, together with the currently running nodes
force_boot Forces node to start even if it cannot contact or rejoin any of its previously known peers
force_reset Forcefully returns a RabbitMQ node to its virgin state
forget_cluster_node Removes a node from the cluster
join_cluster Instructs the node to become a member of the cluster that the specified node is in
rename_cluster_node Renames cluster nodes in the local database
update_cluster_nodes Instructs a cluster member node to sync the list of known cluster members from <seed_node>
Replication:
cancel_sync_queue Instructs a synchronising mirrored queue to stop synchronising itself
sync_queue Instructs a mirrored queue with unsynchronised mirrors (follower replicas) to synchronise them
Users:
add_user Creates a new user in the internal database
authenticate_user Attempts to authenticate a user. Exits with a non-zero code if authentication fails.
change_password Changes the user password
clear_password Clears (resets) password and disables password login for a user
delete_user Removes a user from the internal database. Has no effect on users provided by external backends such as LDAP
list_users List user names and tags
set_user_tags Sets user tags
Access Control:
clear_permissions Revokes user permissions for a vhost
clear_topic_permissions Clears user topic permissions for a vhost or exchange
list_permissions Lists user permissions in a virtual host
list_topic_permissions Lists topic permissions in a virtual host
list_user_permissions Lists permissions of a user across all virtual hosts
list_user_topic_permissions Lists user topic permissions
list_vhosts Lists virtual hosts
set_permissions Sets user permissions for a vhost
set_topic_permissions Sets user topic permissions for an exchange
Monitoring, observability and health checks:
list_bindings Lists all bindings on a vhost
list_channels Lists all channels in the node
list_ciphers Lists cipher suites supported by encoding commands
list_connections Lists AMQP 0.9.1 connections for the node
list_consumers Lists all consumers for a vhost
list_exchanges Lists exchanges
list_hashes Lists hash functions supported by encoding commands
list_queues Lists queues and their properties
list_unresponsive_queues Tests queues to respond within timeout. Lists those which did not respond
ping Checks that the node OS process is up, registered with EPMD and CLI tools can authenticate with it
report Generate a server status report containing a concatenation of all server status information for support purposes
schema_info Lists schema database tables and their properties
status Displays status of a node
Parameters:
clear_global_parameter Clears a global runtime parameter
clear_parameter Clears a runtime parameter.
list_global_parameters Lists global runtime parameters
list_parameters Lists runtime parameters for a virtual host
set_global_parameter Sets a runtime parameter.
set_parameter Sets a runtime parameter.
Policies:
clear_operator_policy Clears an operator policy
clear_policy Clears (removes) a policy
list_operator_policies Lists operator policy overrides for a virtual host
list_policies Lists all policies in a virtual host
set_operator_policy Sets an operator policy that overrides a subset of arguments in user policies
set_policy Sets or updates a policy
Virtual hosts:
add_vhost Creates a virtual host
clear_vhost_limits Clears virtual host limits
delete_vhost Deletes a virtual host
list_vhost_limits Displays configured virtual host limits
restart_vhost Restarts a failed vhost data stores and queues
set_vhost_limits Sets virtual host limits
trace_off
trace_on
Configuration and Environment:
decode Decrypts an encrypted configuration value
encode Encrypts a sensitive configuration value
environment Displays the name and value of each variable in the application environment for each running application
set_cluster_name Sets the cluster name
set_disk_free_limit Sets the disk_free_limit setting
set_log_level Sets log level in the running node
set_vm_memory_high_watermark Sets the vm_memory_high_watermark setting
Definitions:
export_definitions Exports definitions in JSON or compressed Erlang Term Format.
import_definitions Imports definitions in JSON or compressed Erlang Term Format.
Feature flags:
enable_feature_flag Enables a feature flag on target node
list_feature_flags Lists feature flags
Operations:
close_all_connections Instructs the broker to close all connections for the specified vhost or entire RabbitMQ node
close_connection Instructs the broker to close the connection associated with the Erlang process id
eval Evaluates a snippet of Erlang code on the target node
eval_file Evaluates a file that contains a snippet of Erlang code on the target node
exec Evaluates a snippet of Elixir code on the CLI node
force_gc Makes all Erlang processes on the target node perform/schedule a full sweep garbage collection
resume_listeners Resumes client connection listeners making them accept client connections again
suspend_listeners Suspends client connection listeners so that no new client connections are accepted
Queues:
delete_queue Deletes a queue
purge_queue Purges a queue (removes all messages in it)
Deprecated:
hipe_compile DEPRECATED. This command is a no-op. HiPE is no longer supported by modern Erlang versions
node_health_check DEPRECATED. Performs intrusive, opinionated health checks on a fully booted node. See https://www.rabbitmq.com/monitoring.html#health-checks instead
Use 'rabbitmqctl help <command>' to learn more about a specific command
所有插件
rabbitmq-plugins list
3.3 Web 页面
3.3.1 页面介绍
connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
3.3.2 Admin用户和虚拟主机管理
添加用户
超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
创建虚拟主机
为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。
绑定虚拟主机和用户
4. Java 使用rabbitmq
4.1 直连模型--Helloword
引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>helloword</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
</project>
创建开发生产者
package com.ak.test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
@Test
public void testSendMessage(){
ConnectionFactory connectionFactory=new ConnectionFactory();
// 设置主机名
connectionFactory.setHost("47.100.104.187");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接的虚拟主机的名字
connectionFactory.setVirtualHost("/joker");
// 设置虚拟机的用户名和密码
connectionFactory.setUsername("joker");
connectionFactory.setPassword("123456");
// 获取连接对象 生产者----> 队列
try {
// 获取连接对象
Connection connection=connectionFactory.newConnection();
// 获取连接中通道
Channel channel=connection.createChannel();
// 通道绑定对应消息队列
/**
* 参数一:队列名字,队列不存在自动创建
* 参数二,是否持久化
* 参数三:是否独占队列 true 是独占队列 ,false 不独占
* 参数四: 是否在消费完成后删除队列
* 参数五:额外附加参数
*/
channel.queueDeclare("hello",false,false,false,null);
// 发布消息
/**
* 参数一:交换机名称
* 参数二:队列名称
* 参数三: 传递消息额外设置
* 参数四:消息的内容
*
*/
channel.basicPublish("","hello",null,"hello rabbit".getBytes());
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
}
}
发布成功
建立消费者
package com.ak.test;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MyConsumer {
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
// 设置主机名
connectionFactory.setHost("47.100.104.187");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接的虚拟主机的名字
connectionFactory.setVirtualHost("/joker");
// 设置虚拟机的用户名和密码
connectionFactory.setUsername("joker");
connectionFactory.setPassword("123456");
// 获取连接对象 生产者----> 队列
try {
// 获取连接对象
Connection connection=connectionFactory.newConnection();
// 获取连接中通道
Channel channel=connection.createChannel();
// 通道绑定对应消息队列
/**
* 参数一:队列名字,队列不存在自动创建
* 参数二,是否持久化
* 参数三:是否独占队列 true 是独占队列 ,false 不独占
* 参数四: 是否在消费完成后删除队列
* 参数五:额外附加参数
*/
channel.queueDeclare("hello",false,false,false,null);
// 发布消息
/**
* 参数一:队列名称
* 参数二:开始消费的自动确认机制
* 参数三: 消费时的回调接口
*
*/
channel.basicConsume("",true, new DefaultConsumer(channel){
/**
* 参数回调
* @param consumerTag
* @param envelope
* @param properties
* @param body 消息队列中取出的消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
}
}
查看是否被消费
工具类
package com.ak.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
static {
connectionFactory=new ConnectionFactory();
connectionFactory.setHost("47.100.104.187");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接的虚拟主机的名字
connectionFactory.setVirtualHost("/joker");
// 设置虚拟机的用户名和密码
connectionFactory.setUsername("joker");
connectionFactory.setPassword("123456");
}
public static Connection getConnection(){
// ConnectionFactory connectionFactory=new ConnectionFactory();
// 设置主机名
try {
// 获取连接对象
Connection connection=connectionFactory.newConnection();
return connection;
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
}
return null;
}
public static void closeConnectionAndChanel(Channel channel,Connection connection){
try {
if(channel!=null){
channel.close();
}
if(connection!=null){
connection.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
使用测试
package com.ak;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Provider {
public static void main(String[] args) {
// 获取连接对象 生产者----> 队列
try {
Connection connection= RabbitMQUtils.getConnection();
Channel channel=connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicPublish("","hello",null,"hello rabbit".getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
项目结构
4.2 work quene 任务模型
Work queues
,也被称为(Task queues
),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
- P:生产者:任务的发布者
- C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
- C2:消费者-2:领取任务并完成任务,假设完成速度快
创建生产者
package com.ak.demo_2;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class Provider {
public static void main(String[] args) {
Connection connection= RabbitMQUtils.getConnection();
try {
Channel channel=connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
for(int i=0;i<1000;i++){
channel.basicPublish("","work",null,("hello"+i).getBytes());
}
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
} catch (IOException e) {
e.printStackTrace();
}
}
}
创建消费者
package com.ak.demo_2;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_1 {
public static void main(String[] args) {
try {
// 获取连接对象
Connection connection=RabbitMQUtils.getConnection();
// 获取连接中通道
Channel channel=connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("",true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
// RabbitMQUtils.closeConnectionAndChanel(channel,connection);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
package com.ak.demo_2;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_2 {
public static void main(String[] args) {
try {
// 获取连接对象
Connection connection=RabbitMQUtils.getConnection();
// 获取连接中通道
Channel channel=connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("",true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
// RabbitMQUtils.closeConnectionAndChanel(channel,connection);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
消息自动确认机制
如何实现能者多劳的任务模型。需要手动确认信息
package com.ak.demo_2;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_1_Information {
public static void main(String[] args) {
try {
// 获取连接对象
Connection connection=RabbitMQUtils.getConnection();
// 获取连接中通道
Channel channel=connection.createChannel();
channel.basicQos(1);
channel.queueDeclare("workquene",true,false,false,null);
channel.basicConsume("",false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
// 手动确认 ,参数1:手动确认信息标识,参数2:false 每次确认一个
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
// RabbitMQUtils.closeConnectionAndChanel(channel,connection);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
4.3 fanout 模型
在广播模式下,消息发送流程是这样的:
- 可以有多个消费者
- 每个消费者有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
开发生产者
package com.ak.fanout;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection= RabbitMQUtils.getConnection();
Channel channel=connection.createChannel();
// 参数一:为交换机名称,参数二:fanout 为交换机
channel.exchangeDeclare("joker","fanout");
// 发送信息
channel.basicPublish("joker","",null,"fanout type message".getBytes());
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
开发消费者
package com.ak.fanout;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer_3 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel=connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("joker","fanout");
// 临时队列
String queneName=channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queneName,"joker","");
// 消费信息
channel.basicConsume(queneName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
4.4 Routing
4.4.1 直连
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
流程:
图解:
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
我希望只有错误日志的时候,才能存储到磁盘
其他日志在控制台打印
开发生产者
package com.ak.routedirect;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Random;
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection= RabbitMQUtils.getConnection();
Channel channel=connection.createChannel();
// 参数一:为交换机名称,参数二:fanout 为交换机
channel.exchangeDeclare("log_router","direct");
String []routeKey={"error","info","waring","debug"};
// 发送信息
for(int i=0;i<10;i++){
int temp=new Random().nextInt(100)%4;
channel.basicPublish("log_router",routeKey[temp],null,("发送的信息为 "+routeKey[temp]).getBytes());
}
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
开发消费者
package com.ak.routedirect;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_1 {
public static void main(String[] args) throws IOException {
Connection connection= RabbitMQUtils.getConnection();
// 获取连接中通道
Channel channel=connection.createChannel();
channel.exchangeDeclare("log_router","direct");
// 临时队列
String queneName=channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queneName,"log_router","error");
// 消费信息
channel.basicConsume(queneName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 "+new String(body));
}
});
}
}
package com.ak.routedirect;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_2 {
public static void main(String[] args) throws IOException {
Connection connection= RabbitMQUtils.getConnection();
// 获取连接中通道
Channel channel=connection.createChannel();
channel.exchangeDeclare("log_router","direct");
// 临时队列
String queneName=channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queneName,"log_router","error");
channel.queueBind(queneName,"log_router","info");
channel.queueBind(queneName,"log_router","waring");
channel.queueBind(queneName,"log_router","debug");
// 消费信息
channel.basicConsume(queneName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 "+new String(body));
}
});
}
}
检验
4.4.2 Routing 之订阅模型-Topic
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!这种模型Routingkey
一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
# 统配符
* (star) can substitute for exactly one word. 匹配不多不少恰好1个词
# (hash) can substitute for zero or more words. 匹配一个或多个词
# 如:
audit.# 匹配audit.irs.corporate或者 audit.irs 等
audit.* 只能匹配 audit.irs
创建生产者
package com.ak.routeTopic;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Random;
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection= RabbitMQUtils.getConnection();
Channel channel=connection.createChannel();
// 参数一:为交换机名称,参数二:fanout 为交换机
String channelName="log_top";
channel.exchangeDeclare(channelName,"topic");
String []routeKey={"user.save","user.add","admin.add","admin.save"};
for (int i=0;i<10;i++){
int temp=new Random().nextInt(100)%4;
channel.basicPublish(channelName,routeKey[temp],null,("这个是topics 发布的信息"+routeKey[temp]).getBytes());
}
RabbitMQUtils.closeConnectionAndChanel(channel,connection);
}
}
创建消费者
package com.ak.routeTopic;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_1 {
public static void main(String[] args) throws IOException {
Connection connection= RabbitMQUtils.getConnection();
// 获取连接中通道
Channel channel=connection.createChannel();
String channelName="log_top";
channel.exchangeDeclare(channelName,"topic");
// 临时队列
String queneName=channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queneName,channelName,"user.*");
// 消费信息
channel.basicConsume(queneName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者user 的信息 "+new String(body));
}
});
}
}
package com.ak.routeTopic;
import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_2 {
public static void main(String[] args) throws IOException {
Connection connection= RabbitMQUtils.getConnection();
// 获取连接中通道
Channel channel=connection.createChannel();
String channelName="log_top";
channel.exchangeDeclare(channelName,"topic");
// 临时队列
String queneName=channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queneName,channelName,"admin.*");
// 消费信息
channel.basicConsume(queneName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者admin 的信息 "+new String(body));
}
});
}
}
检查
5. 整合SpringBoot
5.1 helloword 模型
引入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>demo1</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
创建生产者
package com.ak.demo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = RabbitmqSpringApplication.class)
public class MyTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
rabbitTemplate.convertAndSend("hello","hello world");
}
}
创建消费者
package com.ak.demo.hello;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 默认持久化队列
*/
@Component
@RabbitListener(queuesToDeclare =@Queue("hello"))
public class HelloCustomer {
@RabbitHandler
public void kk(String message){
System.out.println(message);
}
}
目录结构
5.2 workquene 模型
修改work 类
@Test
public void test2(){
for(int i=0;i<10;i++){
rabbitTemplate.convertAndSend("work","work 模型");
}
}
创建消费者
package com.ak.demo.work;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare =@Queue("work"))
public void receive(String messaage){
System.out.println("消费者1---"+messaage);
}
@RabbitListener(queuesToDeclare =@Queue("work"))
public void receive2(String messaage){
System.out.println("消费者2--"+messaage);
}
}
创建确认机制
5.3 广播模式
创建生产者
/**
* 广播形式的发布信息
*/
@Test
public void test3(){
for(int i=0;i<10;i++){
rabbitTemplate.convertAndSend("kk","","广播信息");
}
}
创建消费者
package com.ak.demo.fanout;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanOutConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value =@Queue,
exchange = @Exchange(value = "kk",type = "fanout")
)
})
public void receive(String message){
System.out.println("----");
System.out.println("广播信息1"+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value =@Queue,
exchange = @Exchange(value = "kk",type = "fanout")
)
})
public void receive2(String message){
System.out.println("----");
System.out.println("广播信息2"+message);
}
}
测试
5.4 直连模式
创建生产者
/**
* 测试路由模型
*/
@Test
public void testDirect(){
String []routeKey={"error","info","waring","debug"};
for(int i=0;i<10;i++){
int temp=new Random().nextInt(100)%4;
rabbitTemplate.convertAndSend("directs",routeKey[temp],routeKey[temp]+"的日志信息");
}
}
创建消费者
package com.ak.demo.router;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectConsumer {
String []routeKey={"error","info","waring","debug"};
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 临时队列
exchange = @Exchange(value = "directs",type = "direct"),
key={"error","info","waring","debug"}
)
})
public void receive(String message){
System.out.println("接受全部信息--"+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,// 临时队列
exchange = @Exchange(value = "directs",type = "direct"),
key={"error"}
)
})
public void receive2(String message){
System.out.println("只接受error--"+message);
}
}
检查
5.5 Topic 模式
创建生产者
@Test
public void testFive(){
String []routeKey={"user.save","user.add","admin.add","admin.save"};
for(int i=0;i<10;i++){
int temp=new Random().nextInt(100)%4;
rabbitTemplate.convertAndSend("topics",routeKey[temp],routeKey[temp]+"信息");
}
}
消费者
package com.ak.demo.topic;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topics",type = "topic"),
key={"user.*"}
)
})
public void receive(String message){
System.out.println("user类---"+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topics",type = "topic"),
key={"admin.*"}
)
})
public void receive2(String message){
System.out.println("admin 类--"+message);
}
}
测试
6. 搭建集群
6.1 命令行搭建
默认情况下:RabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管它们可以从所有节点看到和访问
架构图
核心解决问题: 当集群中某一时刻master节点宕机,可以对Quene中信息,进行备份
集群搭建
# 0.集群规划 node1: 10.15.0.3 mq1 master 主节点 node2: 10.15.0.4 mq2 repl1 副本节点 node3: 10.15.0.5 mq3 repl2 副本节点 # 1.克隆三台机器主机名和ip映射 vim /etc/hosts加入: 10.15.0.3 mq1 10.15.0.4 mq2 10.15.0.5 mq3 node1: vim /etc/hostname 加入: mq1 node2: vim /etc/hostname 加入: mq2 node3: vim /etc/hostname 加入: mq3 # 2.三个机器安装rabbitmq,并同步cookie文件,在node1上执行: scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/ scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/ # 3.查看cookie是否一致: node1: cat /var/lib/rabbitmq/.erlang.cookie node2: cat /var/lib/rabbitmq/.erlang.cookie node3: cat /var/lib/rabbitmq/.erlang.cookie # 4.后台启动rabbitmq所有节点执行如下命令,启动成功访问管理界面: rabbitmq-server -detached # 5.在node2和node3执行加入集群命令: 1.关闭 rabbitmqctl stop_app 2.加入集群 rabbitmqctl join_cluster rabbit@mq1 3.启动服务 rabbitmqctl start_app # 6.查看集群状态,任意节点执行: rabbitmqctl cluster_status # 7.如果出现如下显示,集群搭建成功: Cluster status of node rabbit@mq3 ... [{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]}, {running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}, {cluster_name,<<"rabbit@mq1">>}, {partitions,[]}, {alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}] # 8.登录管理界面,展示如下状态:
# 9.测试集群在node1上,创建队列
# 10.查看node2和node3节点:
# 11.关闭node1节点,执行如下命令,查看node2和node3: rabbitmqctl stop_app
6.2 docker 搭建
使用docker 进行搭建
删除所有镜像
docker stop myrabbit1 myrabbit2 myrabbit3
docker rm myrabbit1 myrabbit2 myrabbit3
自定义网络
docker network create --driver bridge --subnet 192.168.0.0/24 --gateway 192.168.0.1 myrediswork
创建集群
docker run -d --hostname rabbit1 --name myrabbit1 -p 5675:5672 -p 15673:15672 -v ~/mydata/rabbitmq/rabbitmq01:/var/lib/rabbitmq -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3-management
docker run -d --hostname rabbit2 --name myrabbit2 -p 5673:5672 -p 15674:15672 -v ~/mydata/rabbitmq/rabbitmq02:/var/lib/rabbitmq --link myrabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3-management
docker run -d --hostname rabbit3 --name myrabbit3 -p 5674:5672 -p 15675:15672 -v ~/mydata/rabbitmq/rabbitmq03:/var/lib/rabbitmq --link myrabbit1:rabbit1 --link myrabbit2:rabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3-management
进入每个集群中,然后运行命令
docker exec -it myrabbit1 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
这里主义 myrabbit2 是不一样的
docker exec -it myrabbit2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit1
rabbitmqctl start_app
exit
docker exec -it myrabbit3 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit1
rabbitmqctl start_app
exit
访问网址
http://www.jokerak.com:15673/#/
问题
Error response from daemon: Pool overlaps with other one on this address space
192.168.0.0 网段已经被使用了,换个网段即可