Rabbitmqを冗長化する方法として、キューミラー方式があり。その動作検証をする。
環境
サーバ台数: 3台(ホスト名はcnt1,2,3)
OS: Ubuntu16.04LTS
環境構築手順
①RabbitMQ Serverインストール&停止
$ sudo apt -y install rabbitmq-server
$ sudo systemctl stop rabbitmq-server
③rabbitmq-server起動
$ sudo systemctl start rabbitmq-server.service
④2,3号機で以下を実施
$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl join_cluster --ram rabbit@cnt1
$ sudo rabbitmqctl start_app
※モードはramとdiscの2種類あり。ramモードはキューデータをramに格納(電源障害時データ保護なし)。discモードはramとディスク両方に格納。デフォルトはdiscモード。HAキューミラー構成時は、1台でもdiscがあれば保護レベルは担保されるため、2,3号機はramモードで起動。各モードはmanのrabbitmqctlを参照。
⑤以下コマンドを実行してクラスタ状態を確認
$ sudo rabbitmqctl cluster_status
Cluster status of node rabbit@cnt2 ...
[{nodes,[{disc,[rabbit@cnt1]},{ram,[rabbit@cnt3,rabbit@cnt2]}]},
{running_nodes,[rabbit@cnt3,rabbit@cnt1,rabbit@cnt2]},
{cluster_name,<<"rabbit@cnt1.pikesaku.net">>},
{partitions,[]}]
$
※running_nodesに3サーバあること
⑥キューミラーの設定。どのサーバで実行してもOK
$ sudo rabbitmqctl set_policy ha-all '^(?!amq\.).*' '{"ha-mode": "all"}'
※この(?!amq¥.)は以下URLの「先読み否定グループ」。"amq."で始まるもの以外全ての意味。
正規表現サンプル集
set-policyの引数は、ポリシー名・パターンマッチ・定義
ha-modeにallを指定すると全ノードにキューデータをミラーする。
検証
システム起動時の動作
システム起動時はクラスタの親である1号機が先に起動している状態で、2,3号機を起動する必要あり。
2,3号機はrabbitmqctl join_clusterで1号機のクラスタに属する形態。しかも1号機がdiscモードで永続的にデータを持っているため、上記動作は理解できる。
クラスタ構成の情報は、各ノードが/var/lib/rabbitmq/mnesia配下のファイルで保持している。
例)cnt2の/var/lib/rabbitmq/mnesia/rabbit@cnt2/cluster_nodes.config
{[rabbit@cnt1,rabbit@cnt2,rabbit@cnt3],[rabbit@cnt1]}.
キュー出し入れ動作
キュー送信ツール(キュー永続化無効)
#!/usr/bin/env python
import pika
import sys
args = sys.argv
if len(args) != 3:
print('Error: invalid argment')
exit()
h = args[1]
q = args[2]
#credentials = pika.PlainCredentials('hogehoge', 'hogehoge')
#connection = pika.BlockingConnection(pika.ConnectionParameters(host=h, credentials=credentials))
connection = pika.BlockingConnection(pika.ConnectionParameters(host=h))
channel = connection.channel()
channel.queue_declare(queue=q, durable=True)
prop = pika.BasicProperties(delivery_mode=2)
channel.basic_publish(exchange='',
routing_key=q,
body='Hello World!',
properties=prop)
print(" [x] Sent 'Hello World!'")
connection.close()
キュー送信ツール(キュー永続化有効)
キュー取り出しツール
#!/usr/bin/env python
import pika
import sys
args = sys.argv
if len(args) != 3:
print('Error: invalid argment')
exit()
h = args[1]
q = args[2]
connection = pika.BlockingConnection(pika.ConnectionParameters(host=h))
channel = connection.channel()
channel.queue_declare(queue=q)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue=q,
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()