RabbitMQを使って開発言語の異なるアプリケーション間のメッセージ処理を試す
はじめに
エンジニアの松原です。これまでの記事( Kafkaその1 Kafkaその2 )でもメッセージキューイングに関するトピックを扱っていましたが、今回は他のメッセージキューイングのサービスのうち、RabbitMQについて取り上げたいと思います。
ブローカーレス、ブローカードについて
メッセージキューイングの仕組みでは、ブローカーレスとブローカードという呼ばれ方で分類されることがあります。 ブローカーレス、ブローカードの違いは「メッセージをやり取りする際に仲介するサービス(ブローカー)が存在する・しない」という違いがあります。

スループットを重視するか、それともシステムの冗長性が必須となる大規模なサービス設計が必要かでブローカーレス・ブローカードを採用する理由の分かれ目になると思いますが、 既存でブローカーレスのメッセージキューイングで有名なのはZeroMQぐらいで、ほとんどのメッセージキューイングのサービスはブローカードで占められている印象です。今回取り上げるRabbitMQもブローカードのメッセージキューイングのライブラリ、サービスに含まれます。
想定するシステム要求について(メッセージキューイングの用途範囲)
以下に今回システムに要求される構成を想定しました。Unityで何らかの処理を外部のアプリケーションで加工し、さらにそこから別のアプリケーションに渡して配信の仕組みを作りたい場合など、アプリケーション間でデータのやり取りをするためにメッセージキューイングを利用することを想定しています。

一旦システムの構成に関しては説明したので、今度は実際にRabbitMQを使ってHelloWorldを試してみます。
RabbitMQのBroker(Server)を用意する
RabbitMQはブローカードの仕組みのため仲介役のBrokerを用意する必要があります。RabbitMQは公式でDockerのBrokerのイメージを提供しているので、下記のようにdocker-compose.ymlを用意して docker-comopse up のコマンドを呼び出すだけでBrokerを立ち上げることができます。
今回は管理画面を通してデータの送出や受信が行われているかを確認したいので、 rabbitmq:3-management のイメージを利用しています。
version: '3' services: rabbitmq: image: rabbitmq:3-management ports: - 5672:5672 - 5000:15672 volumes: - rabbitmq_data:/var/lib/rabbitmq volumes: rabbitmq_data:
RabbitMQは内部でデータベースを持っていますが、そのデータベースはディスク上に保存されるので、 volumeを指定しています。詳しくは dockerhubの How to use this image のところをお読みください。
docker-comopse up でBroker立ち上げ後、Webブラウザで http://localhost:5000 にアクセスすることで管理画面を表示することができます。

デフォルトではユーザー名 guest 、パスワードは guest でダッシュボードに入れます。

メッセージ送信側のコード(Unity)を用意する
メッセージ送信側はUnity(C#)を想定しています。UnityのRabbitMQのクライアントはRabbitMQ.Clientの.net standard 2.0のパッケージを利用しました。(インストール方法は省略)
以下のコードを適当なGameObjectに張り付けて実行することでメッセージを送出できます。
Brokerのホスト名は localhost 、ポートはデフォルトポートの 5672 (省略時はデフォルトポート)を指定しています。
using UnityEngine; using System; using System.Threading.Tasks; using System.Text; using RabbitMQ.Client; public class MessagePublisher : MonoBehaviour { [SerializeField] private string hostName = "localhost"; [SerializeField] private string exchangeName = "ex001"; [SerializeField] private float messageIntervalSeconds = 1f; private IConnection connection; private IModel channel; private float delta; private long count = 0; private void OnEnable() { var factory = new ConnectionFactory() { HostName = hostName }; connection = factory.CreateConnection(); channel = connection.CreateModel(); channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false); } private void OnDisable() { channel.Close(); connection.Close(); } private async Task Update() { delta += Time.deltaTime; if (delta > messageIntervalSeconds) { await PublishMessageAsync(channel, $"Hello Wolrd: {count++}"); delta -= messageIntervalSeconds; } } private async Task PublishMessageAsync(IModel model, string message) { await Task.Run(() => { try { if (!model.IsClosed) { model.BasicPublish(exchangeName, "", null, Encoding.UTF8.GetBytes(message)); } } catch (Exception e) { Debug.LogError($"failer send. reason: {e.Message}"); } }); } }
実行するとダッシュボードからメッセージが送信されていることが確認できます。( Publish の項目の数値が変化する)

メッセージ送信側のコード(Python)を用意する
メッセージ送信側はPythonを想定しています。PythonのRabbitMQのクライアントはpikaのパッケージを利用しました。(こちらは pip install pika ですぐに利用できます)
以下のコードを適当なファイル名(例: main.py )を付けて保存し、実行(python main.py)します。
import pika queue_name = 'q001' exchange_name = 'ex001' host_name = 'localhost' def callback(ch, method, properties, body): message = body.decode() print(message) ch.basic_ack(delivery_tag = method.delivery_tag) params = pika.ConnectionParameters(host=host_name) connection = pika.BlockingConnection(params) channel = connection.channel() channel.queue_declare(queue=queue_name) channel.queue_bind(queue=queue_name, exchange=exchange_name) channel.basic_consume(queue=queue_name, on_message_callback=callback) try: channel.start_consuming() except KeyboardInterrupt: channel.queue_unbind(queue=queue_name, exchange=exchange_name) channel.queue_delete(queue=queue_name) channel.stop_consuming() connection.close()
実行するとコンソール画面上に受信したメッセージが表示されます。

また、ダッシュボードからメッセージが送信されていることが確認できます。( Deliver と Consumer ack 項目の数値が変化する)

簡単な解説
RabbitMQでは少し変わった特徴があり、メッセージの送信側(Publisher)がメッセージを送信する先に直接Queueに送信するのではなく、Exchangeというメッセージ交換局的な役割を持つ仕組みにメッセージを送信します。メッセージの受信側(Consumer/Subscriber)はこれまで通り、Queueからメッセージを取得します。

Exchangeではメッセージがキューイングされないため、単純にExchangeに対してメッセージを送るだけだと、送ったメッセージは自動的に破棄(drop)されます。この動きはダッシュボード上の Unroutable の項目で確認できます。

メッセージをQueueに貯めるには、バインディングという仕組みを利用します。RabbitMQではExchangeからメッセージの送り先、蓄積先となるQueueを選ぶことができます。(複数指定も可能)

上記のpythonのコードではqueue_bind()というメソッドを利用して、Exchange経由でPublisherからのメッセージの送り先を特定のQueueに送るように設定しています。
#...(略) channel.queue_bind(queue=queue_name, exchange=exchange_name) #...(略)
受信側のアプリでこの設定を行っているため、受信アプリを起動後はメッセージがQueueに送信され、メッセージの自動破棄が無くなるため Unroutable の数値が 0 に下がります。

ダッシュボードでこのような情報が可視化されているので、管理がしやすそうな印象があります。(Kafkaでも似たようなダッシュボードがありそうです)
おわりに
今回にはメッセージキューイングのサービスのうち、RabbitMQを取り上げました。以前に触ったKafkaと比べ、シンプルで使いやすい印象があります。パフォーマンスや用途によっても使い分けたいところです。
スループットを最優先するのであればブローカーレスのZeroMQが一番パフォーマンスが出るようなので、次回以降のテーマにしたいと思います。