RabbitMQ 实战教程


文章链接
https://gitee.com/fakerlove/rabbitmq

RabbitMQ 实战教程

1.MQ引言

1.1 什么是MQ

MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.2 MQ有哪些

当今市面上有很多主流的消息中间件,如老牌的ActiveMQRabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

1.3 不同MQ特点

# 1.ActiveMQ
		ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!

# 2.Kafka
		Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,
		追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,
		适合产生大量数据的互联网服务的数据收集业务。

# 3.RocketMQ
		RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起
		源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消
		息推送、日志流式处理、binglog分发等场景。

# 4.RabbitMQ
		RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和
		发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在
		其次。
		

RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。


2.RabbitMQ 的安装

2.1 RabbitMQ

简介

基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

image-20190925215603036

官网: https://www.rabbitmq.com/

官方教程: https://www.rabbitmq.com/#getstarted

 # AMQP 协议
 		AMQP(advanced message queuing protocol)`在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

image-20200311182438041

应用场景

RabbitMQ除了像兔子一样跑的很快以外,还有这些特点:

  • 开源、性能优秀,稳定性保障
  • 提供可靠性消息投递模式、返回模式
  • 与Spring AMQP完美整合,API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模型
  • 保证数据不丢失的前提做到高可靠性、可用性

MQ典型应用场景:

  • 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
  • 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
  • 日志处理
  • 应用解耦。假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。

AMQP协议

提到RabbitMQ,就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

先了解一下AMQP协议中间的几个重要概念:

  • Server:接收客户端的连接,实现AMQP实体服务。
  • Connection:连接,应用程序与Server的网络连接,TCP连接。
  • Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
  • Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
  • Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
  • Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
  • Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
  • RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
  • Queue:消息队列,用来保存消息,供消费者消费。

我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢?

试想这样一个场景, 一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用 TCP 连接复用的方式,不仅可以减少性能开销,同时也便于管理 。

下图是AMQP的协议模型:

img

正如图中所看到的,AMQP协议模型有三部分组成:生产者、消费者和服务端。

生产者是投递消息的一方,首先连接到Server,建立一个连接,开启一个信道;然后生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。同理,消费者也需要进行建立连接,开启信道等操作,便于接收消息。

接着生产者就可以发送消息,发送到服务端中的虚拟主机,虚拟主机中的交换器根据路由键选择路由规则,然后发送到不同的消息队列中,这样订阅了消息队列的消费者就可以获取到消息,进行消费。

最后还要关闭信道和连接。

RabbitMQ是基于AMQP协议实现的,其结构如下图所示,和AMQP协议简直就是一模一样。

img

常用交换器

RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种。

Direct Exchange

该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中。

img

Topic Exchange

该类型的交换器将所有发送到Topic Exchange的消息被转发到所有RoutingKey中指定的Topic的队列上面。

Exchange将RoutingKey和某Topic进行模糊匹配,其中“”用来匹配一个词,“#”用于匹配一个或者多个词。例如“com.#”能匹配到“com.rabbitmq.oa”和“com.rabbitmq”;而"login."只能匹配到“com.rabbitmq”。

img

Fanout Exchange

该类型不处理路由键,会把所有发送到交换器的消息路由到所有绑定的队列中。优点是转发消息最快,性能最好。

img

Headers Exchange

该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用。

2.2 RabbitMQ 的安装

2.2.1 下载

rabbitmq 是基于 erlang 编程语言的,所以需要环境的

首先注意系统版本,自己的服务器是linux 还是windows

下面的网址是下载erlang 的

https://www.erlang-solutions.com/resources/download.html

下面的比较慢

https://www.erlang.org/downloads

下面是下载rabbitmq 的

https://www.rabbitmq.com/download.html

image-20190925220115235

最新版本: 3.7.18

2.2.2 下载的安装包

rabbit 如果能够运行,需要两个东西 erlang 和socket 的包

image-20190925220343521

注意:这里的安装包是centos7安装的包

2.2.3 安装步骤

1.将rabbitmq安装包上传到linux系统中

erlang-22.0.7-1.el7.x86_64.rpm
rabbitmq-server-3.7.18-1.el7.noarch.rpm

2.安装依赖包

rpm安装方式

就需要三个rpm 格式的东西

rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
yum 安装方式
  • 安装erlang需要的依赖环境
# 添加仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
Detected operating system as centos/8.
# 安装erlang
dnf install erlang
  • 安装socat
wget http://www.dest-unreach.org/socat/download/socat-1.7.0.1.tar.gz
tar -zxvf socat-1.7.0.1.tar.gz
cd socat-1.7.0.1 
./configure --disable-fips 
make && make install
# 如果是centos 8 
http://www.dest-unreach.org/socat/download/socat-1.7.4.0.tar.gz
  • 安装 logrotate
yum -y install logrotate
  • 问题

    centos 7 的socat.rpm

    wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
    

二、导入密钥

rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc

三、在/etc/yum.repos.d目录下添加rabbitmq.repo文件,内容如下:

[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/8/
gpgcheck=0
repo_gpgcheck=0
enabled=1

centos7 的配置

[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1

3.安装RabbitMQ安装包(需要联网)

yum install -y rabbitmq-server-3.7.18-1.el7.noarch.rpm

4.复制配置文件

cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

注意:默认安装完成后配置文件模板在:/usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example目录中,需要 将配置文件复制到/etc/rabbitmq/目录中,并修改名称为rabbitmq.config

5.查看配置文件位置

ls /etc/rabbitmq/rabbitmq.config

6.修改配置文件(参见下图:)

vim /etc/rabbitmq/rabbitmq.config 

image-20190925222230260

将上图中配置文件中红色部分去掉%%,以及最后的,逗号 修改为下图:

image-20190925222329200

7.执行如下命令,启动rabbitmq中的插件管理

rabbitmq-plugins enable rabbitmq_management

出现如下说明: Enabling plugins on node rabbit@localhost: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@localhost... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch

set 3 plugins. Offline change; changes will take effect at broker restart.

8.启动RabbitMQ的服务

systemctl start rabbitmq-server
systemctl restart rabbitmq-server
systemctl stop rabbitmq-server

9.查看服务状态(见下图:)

systemctl status rabbitmq-server
  ● rabbitmq-server.service - RabbitMQ broker
     Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
     Active: active (running) since 三 2019-09-25 22:26:35 CST; 7s ago
   Main PID: 2904 (beam.smp)
     Status: "Initialized"
     CGroup: /system.slice/rabbitmq-server.service
             ├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -
             MBlmbcs...
             ├─3220 erl_child_setup 32768
             ├─3243 inet_gethost 4
             └─3244 inet_gethost 4
      .........

image-20190925222743776

10.关闭防火墙服务

systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
systemctl stop firewalld  

11.访问web管理界面

http://10.15.0.8:15672/

image-20190926194738708

12.登录管理界面

username:  guest
password:  guest

image-20190926194954822

2.3 docker 安装 rabbitmq

下载镜像,并且运行

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

最新版本的话

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

访问网址

http://你的主机名(比如localhost):15672/#/

3. 配置

3.1 rabbitmq 所有命令

rabbitmq-defaults     rabbitmq-env          rabbitmq-queues       rabbitmq-upgrade      rabbitmqctl           
rabbitmq-diagnostics  rabbitmq-plugins      rabbitmq-server       rabbitmqadmin 

有这么多命令

3.2 命令介绍

启动

rabbitmq-server

后台启动

rabbitmq-server  -detached

查看状态

rabbitmqctl status

关闭

rabbitmqctl stop

所有ctl 指令

Help:

   autocomplete                  Provides command name autocomplete variants
   help                          Displays usage information for a command
   version                       Displays CLI tools version

Nodes:

   await_startup                 Waits for the RabbitMQ application to start on the target node
   reset                         Instructs a RabbitMQ node to leave the cluster and return to its virgin state
   rotate_logs                   Instructs the RabbitMQ node to perform internal log rotation
   shutdown                      Stops RabbitMQ and its runtime (Erlang VM). Monitors progress for local nodes. Does not require a PID file path.
   start_app                     Starts the RabbitMQ application but leaves the runtime (Erlang VM) running
   stop                          Stops RabbitMQ and its runtime (Erlang VM). Requires a local node pid file path to monitor progress.
   stop_app                      Stops the RabbitMQ application, leaving the runtime (Erlang VM) running
   wait                          Waits for RabbitMQ node startup by monitoring a local PID file. See also 'rabbitmqctl await_online_nodes'

Cluster:

   await_online_nodes            Waits for <count> nodes to join the cluster
   change_cluster_node_type      Changes the type of the cluster node
   cluster_status                Displays all the nodes in the cluster grouped by node type, together with the currently running nodes
   force_boot                    Forces node to start even if it cannot contact or rejoin any of its previously known peers
   force_reset                   Forcefully returns a RabbitMQ node to its virgin state
   forget_cluster_node           Removes a node from the cluster
   join_cluster                  Instructs the node to become a member of the cluster that the specified node is in
   rename_cluster_node           Renames cluster nodes in the local database
   update_cluster_nodes          Instructs a cluster member node to sync the list of known cluster members from <seed_node>

Replication:

   cancel_sync_queue             Instructs a synchronising mirrored queue to stop synchronising itself
   sync_queue                    Instructs a mirrored queue with unsynchronised mirrors (follower replicas) to synchronise them

Users:

   add_user                      Creates a new user in the internal database
   authenticate_user             Attempts to authenticate a user. Exits with a non-zero code if authentication fails.
   change_password               Changes the user password
   clear_password                Clears (resets) password and disables password login for a user
   delete_user                   Removes a user from the internal database. Has no effect on users provided by external backends such as LDAP
   list_users                    List user names and tags
   set_user_tags                 Sets user tags

Access Control:

   clear_permissions             Revokes user permissions for a vhost
   clear_topic_permissions       Clears user topic permissions for a vhost or exchange
   list_permissions              Lists user permissions in a virtual host
   list_topic_permissions        Lists topic permissions in a virtual host
   list_user_permissions         Lists permissions of a user across all virtual hosts
   list_user_topic_permissions   Lists user topic permissions
   list_vhosts                   Lists virtual hosts
   set_permissions               Sets user permissions for a vhost
   set_topic_permissions         Sets user topic permissions for an exchange

Monitoring, observability and health checks:

   list_bindings                 Lists all bindings on a vhost
   list_channels                 Lists all channels in the node
   list_ciphers                  Lists cipher suites supported by encoding commands
   list_connections              Lists AMQP 0.9.1 connections for the node
   list_consumers                Lists all consumers for a vhost
   list_exchanges                Lists exchanges
   list_hashes                   Lists hash functions supported by encoding commands
   list_queues                   Lists queues and their properties
   list_unresponsive_queues      Tests queues to respond within timeout. Lists those which did not respond
   ping                          Checks that the node OS process is up, registered with EPMD and CLI tools can authenticate with it
   report                        Generate a server status report containing a concatenation of all server status information for support purposes
   schema_info                   Lists schema database tables and their properties
   status                        Displays status of a node

Parameters:

   clear_global_parameter        Clears a global runtime parameter
   clear_parameter               Clears a runtime parameter.
   list_global_parameters        Lists global runtime parameters
   list_parameters               Lists runtime parameters for a virtual host
   set_global_parameter          Sets a runtime parameter.
   set_parameter                 Sets a runtime parameter.

Policies:

   clear_operator_policy         Clears an operator policy
   clear_policy                  Clears (removes) a policy
   list_operator_policies        Lists operator policy overrides for a virtual host
   list_policies                 Lists all policies in a virtual host
   set_operator_policy           Sets an operator policy that overrides a subset of arguments in user policies
   set_policy                    Sets or updates a policy

Virtual hosts:

   add_vhost                     Creates a virtual host
   clear_vhost_limits            Clears virtual host limits
   delete_vhost                  Deletes a virtual host
   list_vhost_limits             Displays configured virtual host limits
   restart_vhost                 Restarts a failed vhost data stores and queues
   set_vhost_limits              Sets virtual host limits
   trace_off                     
   trace_on                      

Configuration and Environment:

   decode                        Decrypts an encrypted configuration value
   encode                        Encrypts a sensitive configuration value
   environment                   Displays the name and value of each variable in the application environment for each running application
   set_cluster_name              Sets the cluster name
   set_disk_free_limit           Sets the disk_free_limit setting
   set_log_level                 Sets log level in the running node
   set_vm_memory_high_watermark  Sets the vm_memory_high_watermark setting

Definitions:

   export_definitions            Exports definitions in JSON or compressed Erlang Term Format.
   import_definitions            Imports definitions in JSON or compressed Erlang Term Format.

Feature flags:

   enable_feature_flag           Enables a feature flag on target node
   list_feature_flags            Lists feature flags

Operations:

   close_all_connections         Instructs the broker to close all connections for the specified vhost or entire RabbitMQ node
   close_connection              Instructs the broker to close the connection associated with the Erlang process id
   eval                          Evaluates a snippet of Erlang code on the target node
   eval_file                     Evaluates a file that contains a snippet of Erlang code on the target node
   exec                          Evaluates a snippet of Elixir code on the CLI node
   force_gc                      Makes all Erlang processes on the target node perform/schedule a full sweep garbage collection
   resume_listeners              Resumes client connection listeners making them accept client connections again
   suspend_listeners             Suspends client connection listeners so that no new client connections are accepted

Queues:

   delete_queue                  Deletes a queue
   purge_queue                   Purges a queue (removes all messages in it)

Deprecated:

   hipe_compile                  DEPRECATED. This command is a no-op. HiPE is no longer supported by modern Erlang versions
   node_health_check             DEPRECATED. Performs intrusive, opinionated health checks on a fully booted node. See https://www.rabbitmq.com/monitoring.html#health-checks instead

Use 'rabbitmqctl help <command>' to learn more about a specific command

所有插件

rabbitmq-plugins list

3.3 Web 页面

3.3.1 页面介绍

image-20191126162026720

  • connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
  • channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
  • Exchanges:交换机,用来实现消息的路由
  • Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

3.3.2 Admin用户和虚拟主机管理

添加用户

  • 超级管理员(administrator)

    可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

  • 监控者(monitoring)

    可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

  • 策略制定者(policymaker)

    可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

  • 普通管理者(management)

    仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

  • 其他

    无法登陆管理控制台,通常就是普通的生产者和消费者。

创建虚拟主机

为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

绑定虚拟主机和用户

4. Java 使用rabbitmq

4.1 直连模型--Helloword

image-20191126165840602

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>helloword</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>

    </dependencies>


</project>

创建开发生产者

package com.ak.test;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {

    @Test
    public  void testSendMessage(){
        ConnectionFactory connectionFactory=new ConnectionFactory();
        // 设置主机名
        connectionFactory.setHost("47.100.104.187");
        // 设置端口号
        connectionFactory.setPort(5672);
        // 设置连接的虚拟主机的名字
        connectionFactory.setVirtualHost("/joker");
        // 设置虚拟机的用户名和密码
        connectionFactory.setUsername("joker");
        connectionFactory.setPassword("123456");

        // 获取连接对象 生产者----> 队列

        try {
            // 获取连接对象
            Connection connection=connectionFactory.newConnection();
            // 获取连接中通道
            Channel channel=connection.createChannel();
            // 通道绑定对应消息队列
            /**
             * 参数一:队列名字,队列不存在自动创建
             * 参数二,是否持久化
             * 参数三:是否独占队列 true 是独占队列 ,false 不独占
             * 参数四: 是否在消费完成后删除队列
             * 参数五:额外附加参数
             */
            channel.queueDeclare("hello",false,false,false,null);
            // 发布消息
            /**
             * 参数一:交换机名称
             * 参数二:队列名称
             * 参数三: 传递消息额外设置
             * 参数四:消息的内容
             *
             */
            channel.basicPublish("","hello",null,"hello rabbit".getBytes());
            channel.close();
            connection.close();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {

        }


    }
}

发布成功

建立消费者

package com.ak.test;

import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MyConsumer {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory=new ConnectionFactory();
        // 设置主机名
        connectionFactory.setHost("47.100.104.187");
        // 设置端口号
        connectionFactory.setPort(5672);
        // 设置连接的虚拟主机的名字
        connectionFactory.setVirtualHost("/joker");
        // 设置虚拟机的用户名和密码
        connectionFactory.setUsername("joker");
        connectionFactory.setPassword("123456");

        // 获取连接对象 生产者----> 队列

        try {
            // 获取连接对象
            Connection connection=connectionFactory.newConnection();
            // 获取连接中通道
            Channel channel=connection.createChannel();
            // 通道绑定对应消息队列
            /**
             * 参数一:队列名字,队列不存在自动创建
             * 参数二,是否持久化
             * 参数三:是否独占队列 true 是独占队列 ,false 不独占
             * 参数四: 是否在消费完成后删除队列
             * 参数五:额外附加参数
             */
            channel.queueDeclare("hello",false,false,false,null);

            // 发布消息
            /**
             * 参数一:队列名称
             * 参数二:开始消费的自动确认机制
             * 参数三: 消费时的回调接口
             *
             */
            channel.basicConsume("",true, new DefaultConsumer(channel){


                /**
                 * 参数回调
                 * @param consumerTag
                 * @param envelope
                 * @param properties
                 * @param body 消息队列中取出的消息
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                }
            });

            channel.close();
            connection.close();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {

        }
    }
}

查看是否被消费

工具类

package com.ak.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtils {
    private static ConnectionFactory connectionFactory;
    static {
        connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("47.100.104.187");
        // 设置端口号
        connectionFactory.setPort(5672);
        // 设置连接的虚拟主机的名字
        connectionFactory.setVirtualHost("/joker");
        // 设置虚拟机的用户名和密码
        connectionFactory.setUsername("joker");
        connectionFactory.setPassword("123456");
    }
    public static Connection  getConnection(){
       // ConnectionFactory connectionFactory=new ConnectionFactory();
        // 设置主机名
        try {
            // 获取连接对象
            Connection connection=connectionFactory.newConnection();
            return connection;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {

        }

        return  null;
    }

    public static void closeConnectionAndChanel(Channel  channel,Connection connection){
        try {
            if(channel!=null){
                channel.close();
            }
            if(connection!=null){
                connection.close();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

使用测试

package com.ak;

import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class Provider {
    public static void main(String[] args) {

        // 获取连接对象 生产者----> 队列
        try {
            Connection connection= RabbitMQUtils.getConnection();
            Channel channel=connection.createChannel();
            channel.queueDeclare("hello",false,false,false,null);
            channel.basicPublish("","hello",null,"hello rabbit".getBytes());
            RabbitMQUtils.closeConnectionAndChanel(channel,connection);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {

        }

    }
}


项目结构

4.2 work quene 任务模型

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

image-20200314221002008

角色:

  • P:生产者:任务的发布者
  • C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
  • C2:消费者-2:领取任务并完成任务,假设完成速度快

创建生产者

package com.ak.demo_2;

import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

public class Provider {
    public static void main(String[] args) {
        Connection connection= RabbitMQUtils.getConnection();
        try {
            Channel channel=connection.createChannel();
            channel.queueDeclare("work",true,false,false,null);
            for(int i=0;i<1000;i++){
                channel.basicPublish("","work",null,("hello"+i).getBytes());
            }
            RabbitMQUtils.closeConnectionAndChanel(channel,connection);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

创建消费者

package com.ak.demo_2;


import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_1 {
    public static void main(String[] args) {
        try {
            // 获取连接对象
            Connection connection=RabbitMQUtils.getConnection();
            // 获取连接中通道
            Channel channel=connection.createChannel();
            channel.queueDeclare("work",true,false,false,null);
            channel.basicConsume("",true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                }
            });
           // RabbitMQUtils.closeConnectionAndChanel(channel,connection);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
        }

    }
}

package com.ak.demo_2;


import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_2 {
    public static void main(String[] args) {
        try {
            // 获取连接对象
            Connection connection=RabbitMQUtils.getConnection();
            // 获取连接中通道
            Channel channel=connection.createChannel();
            channel.queueDeclare("work",true,false,false,null);
            channel.basicConsume("",true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                }
            });
           // RabbitMQUtils.closeConnectionAndChanel(channel,connection);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
        }

    }
}

总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

消息自动确认机制

如何实现能者多劳的任务模型。需要手动确认信息

package com.ak.demo_2;


import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_1_Information {
    public static void main(String[] args) {
        try {
            // 获取连接对象
            Connection connection=RabbitMQUtils.getConnection();
            // 获取连接中通道
            Channel channel=connection.createChannel();
            channel.basicQos(1);
            channel.queueDeclare("workquene",true,false,false,null);
            channel.basicConsume("",false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                    // 手动确认 ,参数1:手动确认信息标识,参数2:false 每次确认一个
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
           // RabbitMQUtils.closeConnectionAndChanel(channel,connection);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
        }

    }
}

4.3 fanout 模型

image-20191126213115873

在广播模式下,消息发送流程是这样的:

  • 可以有多个消费者
  • 每个消费者有自己的queue(队列)
  • 每个队列都要绑定到Exchange(交换机)
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

开发生产者

package com.ak.fanout;

import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection= RabbitMQUtils.getConnection();
        Channel channel=connection.createChannel();
        // 参数一:为交换机名称,参数二:fanout 为交换机
        channel.exchangeDeclare("joker","fanout");
        // 发送信息
        channel.basicPublish("joker","",null,"fanout type message".getBytes());

        RabbitMQUtils.closeConnectionAndChanel(channel,connection);

    }
}

开发消费者

package com.ak.fanout;

import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class MyConsumer_3 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel=connection.createChannel();
        // 通道绑定交换机
        channel.exchangeDeclare("joker","fanout");
        // 临时队列
        String queneName=channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queneName,"joker","");
        // 消费信息
        channel.basicConsume(queneName,true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
    }
}

4.4 Routing

4.4.1 直连

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

流程:

image-20191126220145375

图解:

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

我希望只有错误日志的时候,才能存储到磁盘

其他日志在控制台打印

开发生产者

package com.ak.routedirect;

import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Random;

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection= RabbitMQUtils.getConnection();
        Channel channel=connection.createChannel();
        // 参数一:为交换机名称,参数二:fanout 为交换机
        channel.exchangeDeclare("log_router","direct");
        String []routeKey={"error","info","waring","debug"};

        // 发送信息
        for(int i=0;i<10;i++){
            int temp=new Random().nextInt(100)%4;
            channel.basicPublish("log_router",routeKey[temp],null,("发送的信息为 "+routeKey[temp]).getBytes());
        }
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);

    }
}

开发消费者

package com.ak.routedirect;

import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_1 {
    public static void main(String[] args) throws IOException {
        Connection connection= RabbitMQUtils.getConnection();
        // 获取连接中通道
        Channel channel=connection.createChannel();
        channel.exchangeDeclare("log_router","direct");
        // 临时队列
        String queneName=channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queneName,"log_router","error");
        // 消费信息
        channel.basicConsume(queneName,true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2 "+new String(body));
            }
        });
    }
}

package com.ak.routedirect;

import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_2 {
    public static void main(String[] args) throws IOException {
        Connection connection= RabbitMQUtils.getConnection();
        // 获取连接中通道
        Channel channel=connection.createChannel();
        channel.exchangeDeclare("log_router","direct");
        // 临时队列
        String queneName=channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queneName,"log_router","error");
        channel.queueBind(queneName,"log_router","info");
        channel.queueBind(queneName,"log_router","waring");
        channel.queueBind(queneName,"log_router","debug");

        // 消费信息
        channel.basicConsume(queneName,true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 "+new String(body));
            }
        });
    }
}

检验

4.4.2 Routing 之订阅模型-Topic

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

image-20191127121900255

# 统配符
		* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
		# (hash) can substitute for zero or more words.  匹配一个或多个词
# 如:
		audit.#    匹配audit.irs.corporate或者 audit.irs 等
    audit.*   只能匹配 audit.irs

创建生产者

package com.ak.routeTopic;

import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Random;

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection= RabbitMQUtils.getConnection();
        Channel channel=connection.createChannel();
        // 参数一:为交换机名称,参数二:fanout 为交换机
        String channelName="log_top";
        channel.exchangeDeclare(channelName,"topic");
        String []routeKey={"user.save","user.add","admin.add","admin.save"};

        for (int i=0;i<10;i++){
            int temp=new Random().nextInt(100)%4;
            channel.basicPublish(channelName,routeKey[temp],null,("这个是topics 发布的信息"+routeKey[temp]).getBytes());
        }
        RabbitMQUtils.closeConnectionAndChanel(channel,connection);
    }
}

创建消费者

package com.ak.routeTopic;

import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_1 {
    public static void main(String[] args) throws IOException {
        Connection connection= RabbitMQUtils.getConnection();
        // 获取连接中通道
        Channel channel=connection.createChannel();
        String channelName="log_top";
        channel.exchangeDeclare(channelName,"topic");
        // 临时队列
        String queneName=channel.queueDeclare().getQueue();
        // 绑定交换机和队列

        channel.queueBind(queneName,channelName,"user.*");
        // 消费信息
        channel.basicConsume(queneName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者user 的信息 "+new String(body));
            }
        });
    }
}

package com.ak.routeTopic;

import com.ak.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_2 {
    public static void main(String[] args) throws IOException {
        Connection connection= RabbitMQUtils.getConnection();
        // 获取连接中通道
        Channel channel=connection.createChannel();
        String channelName="log_top";
        channel.exchangeDeclare(channelName,"topic");
        // 临时队列
        String queneName=channel.queueDeclare().getQueue();
        // 绑定交换机和队列

        channel.queueBind(queneName,channelName,"admin.*");
        // 消费信息
        channel.basicConsume(queneName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者admin 的信息 "+new String(body));
            }
        });
    }
}

检查

5. 整合SpringBoot

5.1 helloword 模型

引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>org.example</groupId>
    <artifactId>demo1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>



        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

创建生产者

package com.ak.demo;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = RabbitmqSpringApplication.class)
public class MyTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public  void test(){
        rabbitTemplate.convertAndSend("hello","hello world");
    }
}

创建消费者

package com.ak.demo.hello;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 默认持久化队列
 */
@Component
@RabbitListener(queuesToDeclare =@Queue("hello"))
public class HelloCustomer {

    @RabbitHandler
    public void kk(String message){
        System.out.println(message);
    }
}

目录结构

5.2 workquene 模型

修改work 类

 @Test
    public  void test2(){
        for(int i=0;i<10;i++){
            rabbitTemplate.convertAndSend("work","work 模型");
        }
    }

创建消费者

package com.ak.demo.work;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkConsumer {

    @RabbitListener(queuesToDeclare =@Queue("work"))
    public void receive(String messaage){
        System.out.println("消费者1---"+messaage);
    }

    @RabbitListener(queuesToDeclare =@Queue("work"))
    public void receive2(String messaage){
        System.out.println("消费者2--"+messaage);
    }
}

创建确认机制

5.3 广播模式

创建生产者

 /**
     * 广播形式的发布信息
     */
    @Test
    public  void test3(){

        for(int i=0;i<10;i++){
            rabbitTemplate.convertAndSend("kk","","广播信息");
        }
    }

创建消费者

package com.ak.demo.fanout;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanOutConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value =@Queue,
                    exchange = @Exchange(value = "kk",type = "fanout")
            )
    })
    public void receive(String message){
        System.out.println("----");
        System.out.println("广播信息1"+message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value =@Queue,
                    exchange = @Exchange(value = "kk",type = "fanout")
            )
    })
    public void receive2(String message){
        System.out.println("----");
        System.out.println("广播信息2"+message);
    }
}

测试

5.4 直连模式

创建生产者

/**
     * 测试路由模型
     */
    @Test
    public void testDirect(){
        String []routeKey={"error","info","waring","debug"};
        for(int i=0;i<10;i++){
            int temp=new Random().nextInt(100)%4;
            rabbitTemplate.convertAndSend("directs",routeKey[temp],routeKey[temp]+"的日志信息");
        }

    }

创建消费者

package com.ak.demo.router;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectConsumer {

    String []routeKey={"error","info","waring","debug"};
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,// 临时队列
                    exchange = @Exchange(value = "directs",type = "direct"),
                    key={"error","info","waring","debug"}
            )
    })
    public void receive(String message){
        System.out.println("接受全部信息--"+message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,// 临时队列
                    exchange = @Exchange(value = "directs",type = "direct"),
                    key={"error"}
            )
    })
    public void receive2(String message){
        System.out.println("只接受error--"+message);
    }
}

检查

5.5 Topic 模式

创建生产者

@Test
    public void testFive(){
        String []routeKey={"user.save","user.add","admin.add","admin.save"};
        for(int i=0;i<10;i++){
            int temp=new Random().nextInt(100)%4;
            rabbitTemplate.convertAndSend("topics",routeKey[temp],routeKey[temp]+"信息");
        }
    }

消费者

package com.ak.demo.topic;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(name = "topics",type = "topic"),
                    key={"user.*"}
            )
    })
    public void receive(String message){
        System.out.println("user类---"+message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(name = "topics",type = "topic"),
                    key={"admin.*"}
            )
    })
    public void receive2(String message){
        System.out.println("admin 类--"+message);
    }
}

测试

6. 搭建集群

6.1 命令行搭建

默认情况下:RabbitMQ代理操作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是消息队列,默认情况下,消息队列位于一个节点上,尽管它们可以从所有节点看到和访问

  1. 架构图

image-20200320094147471

​ 核心解决问题: 当集群中某一时刻master节点宕机,可以对Quene中信息,进行备份

  1. 集群搭建
    # 0.集群规划
    	node1: 10.15.0.3  mq1  master 主节点
    	node2: 10.15.0.4  mq2  repl1  副本节点
    	node3: 10.15.0.5  mq3  repl2  副本节点
    
    # 1.克隆三台机器主机名和ip映射
    	vim /etc/hosts加入:
    		 10.15.0.3 mq1
        	10.15.0.4 mq2
        	10.15.0.5 mq3
    	node1: vim /etc/hostname 加入:  mq1
    	node2: vim /etc/hostname 加入:  mq2
    	node3: vim /etc/hostname 加入:  mq3
    
    # 2.三个机器安装rabbitmq,并同步cookie文件,在node1上执行:
    	scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
    	scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/
    
    # 3.查看cookie是否一致:
    	node1: cat /var/lib/rabbitmq/.erlang.cookie 
    	node2: cat /var/lib/rabbitmq/.erlang.cookie 
    	node3: cat /var/lib/rabbitmq/.erlang.cookie 
    
    # 4.后台启动rabbitmq所有节点执行如下命令,启动成功访问管理界面:
    	rabbitmq-server -detached 
    
    # 5.在node2和node3执行加入集群命令:
    	1.关闭       rabbitmqctl stop_app
    	2.加入集群    rabbitmqctl join_cluster rabbit@mq1
    	3.启动服务    rabbitmqctl start_app
    
    # 6.查看集群状态,任意节点执行:
    	rabbitmqctl cluster_status
    
    # 7.如果出现如下显示,集群搭建成功:
    	Cluster status of node rabbit@mq3 ...
    	[{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
    	{running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]},
    	{cluster_name,<<"rabbit@mq1">>},
    	{partitions,[]},
    	{alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}]
    
    # 8.登录管理界面,展示如下状态:
    

    image-20200320095613586

    # 9.测试集群在node1上,创建队列
    

    image-20200320095743935

    # 10.查看node2和node3节点:
    

    image-20200320095827688

    image-20200320095843370

    # 11.关闭node1节点,执行如下命令,查看node2和node3:
    	rabbitmqctl stop_app
    

    image-20200320100000347

    image-20200320100010968

6.2 docker 搭建

image-20200320113423235

使用docker 进行搭建

删除所有镜像

docker stop myrabbit1 myrabbit2 myrabbit3
docker rm myrabbit1 myrabbit2 myrabbit3

自定义网络

docker network create --driver bridge --subnet 192.168.0.0/24  --gateway 192.168.0.1 myrediswork

创建集群

docker run -d --hostname rabbit1 --name myrabbit1 -p 5675:5672 -p 15673:15672 -v ~/mydata/rabbitmq/rabbitmq01:/var/lib/rabbitmq  -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3-management
docker run -d --hostname rabbit2 --name myrabbit2 -p 5673:5672 -p 15674:15672 -v ~/mydata/rabbitmq/rabbitmq02:/var/lib/rabbitmq --link myrabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3-management
docker run -d --hostname rabbit3 --name myrabbit3 -p 5674:5672 -p 15675:15672 -v ~/mydata/rabbitmq/rabbitmq03:/var/lib/rabbitmq --link myrabbit1:rabbit1 --link myrabbit2:rabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3-management

进入每个集群中,然后运行命令

docker exec -it myrabbit1 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

这里主义 myrabbit2 是不一样的

docker exec -it myrabbit2 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit1
rabbitmqctl start_app
exit
docker exec -it myrabbit3 bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@rabbit1
rabbitmqctl start_app
exit

访问网址

http://www.jokerak.com:15673/#/

问题

Error response from daemon: Pool overlaps with other one on this address space

192.168.0.0 网段已经被使用了,换个网段即可

上次编辑于: 2021/10/12 下午4:55:55
贡献者: fakerlove1