查看: 1612|回复: 0

[PHP代码] 用Docker搭建RabbitMQ高可用集群

发表于 2018-3-12 08:00:03

作者:林柏格,原文地址

RabbitMQ是基于高级消息队列协议(AMQP)实现的开源消息代理软件,主要提供消息队列服务。这里介绍用Docker Compose搭建RabbitMQ高可用集群的过程。

RabbitMQ自身提供部署集群的功能,通过命令:

  1. $ rabbitmqctl -n rabbit@rmqha_node1 stop_app
  2. $ rabbitmqctl -n rabbit@rmqha_node1 join_cluster --ram rabbit@rmqha_node0
  3. $ rabbitmqctl -n rabbit@rmqha_node1 start_app
复制代码

就可以很容易的将节点rabbit@rmqha_node1加入到集群rabbit@rmqha_node0中。--ram选项表示节点以内存存储方式运行,读写速度快,重启后内容会丢失;不加--ram选项,节点则以磁盘存储方式运行,虽然读写速度慢,但是内容一般可以持久保持。

在同一个RabbitMQ集群中,节点之间并没有主从之分,所有节点会同步相同的队列结构,队列内容(消息)则各自不同,不过消息会在节点间传递。这样的集群只是提高了应对大量并发请求的能力,整体可用性还是很低,因为某个节点宕机后,寄存在该节点上的消息不可用,而在其他节点上也没有这些消息的备份,若是该节点无法恢复,那么这些消息就丢失了。

为了解决这个问题,RabbitMQ提供镜像队列功能,通过命令:

  1. $ rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
复制代码

可以设置镜像队列,"^"表示匹配所有队列,即所有队列在各个节点上都会有备份。在集群中,只需要在一个节点上设置镜像队列,设置操作会同步到其他节点。

Docker Compose编排

这个编排主要实现一个磁盘节点、两个内存节点的RabbitMQ集群和一个HAProxy代理。

目录结构
  1. L--rabbitmq-ha-docker //主目录
  2. L--scripts //本地(Docker宿主)使用的一些脚本
  3. L--rmqha_set_policy.sh //设置各个数据库账号和开启主从复制
  4. L--volumes //各个容器的挂载数据卷
  5. L--rmqha_proxy
  6. L--haproxy.cfg //HAProxy配置
  7. L--rmqha_slave
  8. L--cluster_entrypoint.sh //入口文件
  9. L--parameters.env //账号密码等环境参数
  10. L--docker-compose.yml //编排配置
复制代码
docker-compose.yml
  1. version: "2"
  2. services:
  3. master:
  4. image: rabbitmq:3.6-management
  5. container_name: rmqha_node0
  6. restart: always
  7. mem_limit: 256m
  8. networks:
  9. net1:
  10. ipv4_address: 10.9.0.10
  11. hostname: rmqha_node0
  12. ports:
  13. - "55672:15672"
  14. - "56720:5672"
  15. env_file:
  16. - ./parameters.env
  17. environment:
  18. - CONTAINER_NAME=rmqha_node0
  19. - RABBITMQ_HOSTNAME=rmqha_node0
  20. - RABBITMQ_NODENAME=rabbit
  21. slave1:
  22. image: rabbitmq:3.6-management
  23. container_name: rmqha_node1
  24. restart: always
  25. depends_on:
  26. - master
  27. mem_limit: 256m
  28. networks:
  29. net1:
  30. ipv4_address: 10.9.0.11
  31. hostname: rmqha_node1
  32. # ports:
  33. # - "56721:5672"
  34. volumes:
  35. - "./volumes/rmqha_slave/cluster_entrypoint.sh:/usr/local/bin/cluster_entrypoint.sh"
  36. entrypoint: "/usr/local/bin/cluster_entrypoint.sh"
  37. command: "rabbitmq-server"
  38. env_file:
  39. - ./parameters.env
  40. environment:
  41. - CONTAINER_NAME=rmqha_node1
  42. - RABBITMQ_HOSTNAME=rmqha_node1
  43. - RABBITMQ_NODENAME=rabbit
  44. - RMQHA_RAM_NODE=true
  45. slave2:
  46. image: rabbitmq:3.6-management
  47. container_name: rmqha_node2
  48. restart: always
  49. depends_on:
  50. - master
  51. mem_limit: 256m
  52. networks:
  53. net1:
  54. ipv4_address: 10.9.0.12
  55. hostname: rmqha_node2
  56. # ports:
  57. # - "56722:5672"
  58. volumes:
  59. - "./volumes/rmqha_slave/cluster_entrypoint.sh:/usr/local/bin/cluster_entrypoint.sh"
  60. entrypoint: "/usr/local/bin/cluster_entrypoint.sh"
  61. command: "rabbitmq-server"
  62. env_file:
  63. - ./parameters.env
  64. environment:
  65. - CONTAINER_NAME=rmqha_node2
  66. - RABBITMQ_HOSTNAME=rmqha_node2
  67. - RABBITMQ_NODENAME=rabbit
  68. - RMQHA_RAM_NODE=true
  69. haproxy:
  70. image: haproxy:1.8
  71. container_name: rmqha_proxy
  72. restart: always
  73. depends_on:
  74. - master
  75. - slave1
  76. - slave2
  77. mem_limit: 256m
  78. networks:
  79. net1:
  80. ipv4_address: 10.9.0.19
  81. hostname: rmqha_proxy
  82. ports:
  83. - "56729:5672"
  84. - "51080:1080"
  85. volumes:
  86. - "./volumes/rmqha_proxy/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro"
  87. - "./volumes/rmqha_proxy:/root/rmqha_proxy"
  88. environment:
  89. - CONTAINER_NAME=rmqha_proxy
  90. networks:
  91. net1:
  92. driver: bridge
  93. ipam:
  94. config:
  95. - subnet: 10.9.0.0/16
  96. gateway: 10.9.0.1
复制代码

这里配置了四个容器服务,一个haproxy,负责代理各个RabbitMQ服务;三个rabbitmq,组成RabbitMQ集群。每个容器服务都指定了静态IP,即使服务重启也不会出现IP错乱问题,特殊的网络端口映射后面会介绍。

环境参数

parameters.env

  1. RMQHA_MASTER_NODE=rabbit
  2. RMQHA_MASTER_HOST=rmqha_node0
  3. RABBITMQ_DEFAULT_USER=guest
  4. RABBITMQ_DEFAULT_PASS=guest
  5. RABBITMQ_NODENAME=rabbit
  6. RABBITMQ_ERLANG_COOKIE=myerlangcookie
复制代码
RabbitMQ启动

这里RabbitMQ容器是使用Docker官方镜像生成的,节点rmqha_node0可以直接启动;而节点rmqha_node1和rmqha_node2需要加入到rmqha_node0集群里,所以需要需改入口文件。
volumes/rmqha_slave/cluster_entrypoint.sh

  1. #!/bin/bash
  2. set -e
  3. if [ -e "/root/is_not_first_time" ]; then
  4. exec "$@"
  5. else
  6. /usr/local/bin/docker-entrypoint.sh rabbitmq-server -detached # 先按官方入口文件启动且是后台运行
  7. rabbitmqctl -n "$RABBITMQ_NODENAME@$RABBITMQ_HOSTNAME" stop_app # 停止应用
  8. rabbitmqctl -n "$RABBITMQ_NODENAME@$RABBITMQ_HOSTNAME" join_cluster ${RMQHA_RAM_NODE:+--ram} "$RMQHA_MASTER_NODE@$RMQHA_MASTER_HOST" # 加入rmqha_node0集群
  9. rabbitmqctl -n "$RABBITMQ_NODENAME@$RABBITMQ_HOSTNAME" start_app # 启动应用
  10. rabbitmqctl stop # 停止所有服务
  11. touch /root/is_not_first_time
  12. sleep 2s
  13. exec "$@"
  14. fi
复制代码
HAProxy配置

这里HAProxy容器也是使用Docker官方镜像生成的,启动前需要先准备配置文件。
volumes/rmqha_proxy/haproxy.cfg

  1. global
  2. log 127.0.0.1 local0
  3. maxconn 4096
  4. defaults
  5. log global
  6. mode tcp
  7. option tcplog
  8. retries 3
  9. option redispatch
  10. maxconn 2000
  11. timeout connect 5000
  12. timeout client 50000
  13. timeout server 50000
  14. # ssl for rabbitmq
  15. # frontend ssl_rabbitmq
  16. # bind *:5673 ssl crt /root/rmqha_proxy/rmqha.pem
  17. # mode tcp
  18. # default_backend rabbitmq
  19. listen stats
  20. bind *:1080 # haproxy容器1080端口显示代理统计页面,映射到宿主51080端口
  21. mode http
  22. stats enable
  23. stats hide-version
  24. stats realm Haproxy\ Statistics
  25. stats uri /
  26. stats auth admin:admin
  27. listen rabbitmq
  28. bind *:5672 # haproxy容器5672端口代理多个rabbitmq服务,映射到宿主56729端口
  29. mode tcp
  30. balance roundrobin
  31. timeout client 1h
  32. timeout server 1h
  33. option clitcpka
  34. # server rmqha_node0 rmqha_node0:5672 check inter 5s rise 2 fall 3
  35. server rmqha_node1 rmqha_node1:5672 check inter 5s rise 2 fall 3
  36. server rmqha_node2 rmqha_node2:5672 check inter 5s rise 2 fall 3
复制代码

实际运行
在主目录下执行docker-compose up -d构建并运行整个Docker服务。

镜像队列

,在主目录下执行:

  1. $ sh ./scripts/rmqha_set_policy.sh
复制代码

实际上是执行了:

  1. $ docker exec -it rmqha_node0 rabbitmqctl set_policy ha-all '^' '{"ha-mode":"all"}'
复制代码

即在rmqha_node0集群中将所有队列设置为镜像队列,这个命令只需执行一次,除非重新构建整个Docker服务。

测试

用两个PHP脚本可以对RabbitMQ进行简单的测试,不过需要用到php-amqplib库。

  1. $ composer require php-amqplib/php-amqplib
复制代码

发送消息脚本:

  1. <?php
  2. // send.php
  3. require_once __DIR__ . '/vendor/autoload.php';
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. $connection = new AMQPStreamConnection('127.0.0.1', 56729, 'guest', 'guest', '/'); // 连接rmqha_proxy
  7. $channel = $connection->channel();
  8. $channel->queue_declare('task_queue', false, true, false, false);
  9. $data = "Hello World!";
  10. $msg = new AMQPMessage($data,
  11. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  12. );
  13. $channel->basic_publish($msg, '', 'task_queue');
  14. echo " [x] Sent ", $data, "\n";
  15. $channel->close();
  16. $connection->close();
复制代码

接受消息脚本:

  1. <?php
  2. // receive.php
  3. require_once __DIR__ . '/vendor/autoload.php';
  4. use PhpAmqpLib\Connection\AMQPStreamConnection;
  5. $connection = new AMQPStreamConnection('127.0.0.1', 56720, 'guest', 'guest', '/'); // 连接rmqha_node0
  6. $channel = $connection->channel();
  7. $channel->queue_declare('task_queue', false, true, false, false);
  8. echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
  9. $callback = function ($msg) {
  10. echo " [x] Received ", $msg->body, "\n";
  11. sleep(1);
  12. echo " [x] Done", "\n";
  13. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  14. };
  15. $channel->basic_qos(null, 1, null);
  16. $channel->basic_consume('task_queue', '', false, false, false, false, $callback);
  17. while (count($channel->callbacks)) {
  18. $channel->wait();
  19. }
  20. $channel->close();
  21. $connection->close();
复制代码
SSL设置

为了RabbitMQ服务在网络传输中不泄漏信息,可以给HAProxy设置SSL传输(比对各个RabbitMQ服务设置SSL传输的性能消耗要小),这里简单的介绍一下自签证书:

  1. $ cd ./volumes/rmqha_proxy/
  2. $ openssl genrsa -out rmqha.key 1024 # 随机生成一个私钥
  3. $ openssl req -new -key rmqha.key -out rmqha.csr # 根据私钥生成证书签署请求
  4. $ openssl x509 -req -days 365 -in rmqha.csr -signkey rmqha.key -out rmqha.crt # 自己签署证书
  5. $ cat rmqha.crt rmqha.key|tee rmqha.pem # 将私钥和证书合并到一个文件中
复制代码

注意:生成证书签署请求时,需要填写一些信息,Common Name (eg, fully qualified host name)应该写127.0.0.1。
HAProxy配置中添加:

  1. # ssl for rabbitmq
  2. frontend ssl_rabbitmq
  3. bind *:5673 ssl crt /root/rmqha_proxy/rmqha.pem # haproxy容器5673端口代理rabbitmq服务,需要映射到宿主端口
  4. mode tcp
  5. default_backend rabbitmq
复制代码

PHP中通过SSL连接HAProxy代理服务示例:

  1. <?php
  2. require_once __DIR__ . '/vendor/autoload.php';
  3. use PhpAmqpLib\Connection\AMQPSSLConnection;
  4. $connection = new AMQPSSLConnection('127.0.0.1', 56730, 'guest', 'guest', '/', ['cafile'=>'./rmqha.crt']); // 假设SSL的监听端口是56730,rmqha.crt是上面生成的自签证书
  5. $channel = $connection->channel();
复制代码
最后 建议 消息生产者可以通过HAProxy代理连接RabbitMQ服务,实现负载均衡; 消息处理者应该直接连接RabbitMQ服务主机,并且跟随RabbitMQ服务主机启动服务、停止服务。 扩展阅读 Rabbitmq高可用-镜像队列模式 Rabbitmq集群高可用测试


回复

使用道具 举报