はじめに
エンジニアの松原です。これまでの記事( Kafkaその1 Kafkaその2 )でもメッセージキューイングに関するトピックを扱っていましたが、今回は他のメッセージキューイングのサービスのうち、RabbitMQについて取り上げたいと思います。
ブローカーレス、ブローカードについて
メッセージキューイングの仕組みでは、ブローカーレスとブローカードという呼ばれ方で分類されることがあります。 ブローカーレス、ブローカードの違いは「メッセージをやり取りする際に仲介するサービス(ブローカー)が存在する・しない」という違いがあります。
スループットを重視するか、それともシステムの冗長性が必須となる大規模なサービス設計が必要かでブローカーレス・ブローカードを採用する理由の分かれ目になると思いますが、 既存でブローカーレスのメッセージキューイングで有名なのはZeroMQぐらいで、ほとんどのメッセージキューイングのサービスはブローカードで占められている印象です。今回取り上げるRabbitMQもブローカードのメッセージキューイングのライブラリ、サービスに含まれます。
想定するシステム要求について(メッセージキューイングの用途範囲)
以下に今回システムに要求される構成を想定しました。Unityで何らかの処理を外部のアプリケーションで加工し、さらにそこから別のアプリケーションに渡して配信の仕組みを作りたい場合など、アプリケーション間でデータのやり取りをするためにメッセージキューイングを利用することを想定しています。
一旦システムの構成に関しては説明したので、今度は実際にRabbitMQを使ってHelloWorldを試してみます。
RabbitMQのBroker(Server)を用意する
RabbitMQはブローカードの仕組みのため仲介役のBrokerを用意する必要があります。RabbitMQは公式でDockerのBrokerのイメージを提供しているので、下記のようにdocker-compose.ymlを用意して docker-comopse up
のコマンドを呼び出すだけでBrokerを立ち上げることができます。
今回は管理画面を通してデータの送出や受信が行われているかを確認したいので、 rabbitmq:3-management
のイメージを利用しています。
version: '3' services: rabbitmq: image: rabbitmq:3-management ports: - 5672:5672 - 5000:15672 volumes: - rabbitmq_data:/var/lib/rabbitmq volumes: rabbitmq_data:
RabbitMQは内部でデータベースを持っていますが、そのデータベースはディスク上に保存されるので、 volumeを指定しています。詳しくは dockerhubの How to use this image
のところをお読みください。
docker-comopse up
でBroker立ち上げ後、Webブラウザで http://localhost:5000
にアクセスすることで管理画面を表示することができます。
デフォルトではユーザー名 guest
、パスワードは guest
でダッシュボードに入れます。
メッセージ送信側のコード(Unity)を用意する
メッセージ送信側はUnity(C#)を想定しています。UnityのRabbitMQのクライアントはRabbitMQ.Clientの.net standard 2.0のパッケージを利用しました。(インストール方法は省略)
以下のコードを適当なGameObjectに張り付けて実行することでメッセージを送出できます。
Brokerのホスト名は localhost
、ポートはデフォルトポートの 5672
(省略時はデフォルトポート)を指定しています。
using UnityEngine; using System; using System.Threading.Tasks; using System.Text; using RabbitMQ.Client; public class MessagePublisher : MonoBehaviour { [SerializeField] private string hostName = "localhost"; [SerializeField] private string exchangeName = "ex001"; [SerializeField] private float messageIntervalSeconds = 1f; private IConnection connection; private IModel channel; private float delta; private long count = 0; private void OnEnable() { var factory = new ConnectionFactory() { HostName = hostName }; connection = factory.CreateConnection(); channel = connection.CreateModel(); channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false); } private void OnDisable() { channel.Close(); connection.Close(); } private async Task Update() { delta += Time.deltaTime; if (delta > messageIntervalSeconds) { await PublishMessageAsync(channel, $"Hello Wolrd: {count++}"); delta -= messageIntervalSeconds; } } private async Task PublishMessageAsync(IModel model, string message) { await Task.Run(() => { try { if (!model.IsClosed) { model.BasicPublish(exchangeName, "", null, Encoding.UTF8.GetBytes(message)); } } catch (Exception e) { Debug.LogError($"failer send. reason: {e.Message}"); } }); } }
実行するとダッシュボードからメッセージが送信されていることが確認できます。( Publish
の項目の数値が変化する)
メッセージ送信側のコード(Python)を用意する
メッセージ送信側はPythonを想定しています。PythonのRabbitMQのクライアントはpikaのパッケージを利用しました。(こちらは pip install pika
ですぐに利用できます)
以下のコードを適当なファイル名(例: main.py
)を付けて保存し、実行(python main.py
)します。
import pika queue_name = 'q001' exchange_name = 'ex001' host_name = 'localhost' def callback(ch, method, properties, body): message = body.decode() print(message) ch.basic_ack(delivery_tag = method.delivery_tag) params = pika.ConnectionParameters(host=host_name) connection = pika.BlockingConnection(params) channel = connection.channel() channel.queue_declare(queue=queue_name) channel.queue_bind(queue=queue_name, exchange=exchange_name) channel.basic_consume(queue=queue_name, on_message_callback=callback) try: channel.start_consuming() except KeyboardInterrupt: channel.queue_unbind(queue=queue_name, exchange=exchange_name) channel.queue_delete(queue=queue_name) channel.stop_consuming() connection.close()
実行するとコンソール画面上に受信したメッセージが表示されます。
また、ダッシュボードからメッセージが送信されていることが確認できます。( Deliver
と Consumer ack
項目の数値が変化する)
簡単な解説
RabbitMQでは少し変わった特徴があり、メッセージの送信側(Publisher)がメッセージを送信する先に直接Queueに送信するのではなく、Exchangeというメッセージ交換局的な役割を持つ仕組みにメッセージを送信します。メッセージの受信側(Consumer/Subscriber)はこれまで通り、Queueからメッセージを取得します。
Exchangeではメッセージがキューイングされないため、単純にExchangeに対してメッセージを送るだけだと、送ったメッセージは自動的に破棄(drop)されます。この動きはダッシュボード上の Unroutable
の項目で確認できます。
メッセージをQueueに貯めるには、バインディングという仕組みを利用します。RabbitMQではExchangeからメッセージの送り先、蓄積先となるQueueを選ぶことができます。(複数指定も可能)
上記のpythonのコードではqueue_bind()というメソッドを利用して、Exchange経由でPublisherからのメッセージの送り先を特定のQueueに送るように設定しています。
#...(略) channel.queue_bind(queue=queue_name, exchange=exchange_name) #...(略)
受信側のアプリでこの設定を行っているため、受信アプリを起動後はメッセージがQueueに送信され、メッセージの自動破棄が無くなるため Unroutable
の数値が 0 に下がります。
ダッシュボードでこのような情報が可視化されているので、管理がしやすそうな印象があります。(Kafkaでも似たようなダッシュボードがありそうです)
おわりに
今回にはメッセージキューイングのサービスのうち、RabbitMQを取り上げました。以前に触ったKafkaと比べ、シンプルで使いやすい印象があります。パフォーマンスや用途によっても使い分けたいところです。
スループットを最優先するのであればブローカーレスのZeroMQが一番パフォーマンスが出るようなので、次回以降のテーマにしたいと思います。