RabbitMQを使って開発言語の異なるアプリケーション間のメッセージ処理を試す

はじめに

エンジニアの松原です。これまでの記事( Kafkaその1 Kafkaその2 )でもメッセージキューイングに関するトピックを扱っていましたが、今回は他のメッセージキューイングのサービスのうち、RabbitMQについて取り上げたいと思います。

synamon.hatenablog.com

synamon.hatenablog.com

ブローカーレス、ブローカードについて

メッセージキューイングの仕組みでは、ブローカーレスとブローカードという呼ばれ方で分類されることがあります。 ブローカーレス、ブローカードの違いは「メッセージをやり取りする際に仲介するサービス(ブローカー)が存在する・しない」という違いがあります。

スループットを重視するか、それともシステムの冗長性が必須となる大規模なサービス設計が必要かでブローカーレス・ブローカードを採用する理由の分かれ目になると思いますが、 既存でブローカーレスのメッセージキューイングで有名なのはZeroMQぐらいで、ほとんどのメッセージキューイングのサービスはブローカードで占められている印象です。今回取り上げるRabbitMQもブローカードのメッセージキューイングのライブラリ、サービスに含まれます。

想定するシステム要求について(メッセージキューイングの用途範囲)

以下に今回システムに要求される構成を想定しました。Unityで何らかの処理を外部のアプリケーションで加工し、さらにそこから別のアプリケーションに渡して配信の仕組みを作りたい場合など、アプリケーション間でデータのやり取りをするためにメッセージキューイングを利用することを想定しています。

一旦システムの構成に関しては説明したので、今度は実際にRabbitMQを使ってHelloWorldを試してみます。

RabbitMQのBroker(Server)を用意する

RabbitMQはブローカードの仕組みのため仲介役のBrokerを用意する必要があります。RabbitMQは公式でDockerのBrokerのイメージを提供しているので、下記のようにdocker-compose.ymlを用意して docker-comopse up のコマンドを呼び出すだけでBrokerを立ち上げることができます。
今回は管理画面を通してデータの送出や受信が行われているかを確認したいので、 rabbitmq:3-management のイメージを利用しています。

version: '3'

services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - 5672:5672
      - 5000:15672
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

volumes:
  rabbitmq_data:

RabbitMQは内部でデータベースを持っていますが、そのデータベースはディスク上に保存されるので、 volumeを指定しています。詳しくは dockerhubHow to use this image のところをお読みください。

docker-comopse up でBroker立ち上げ後、Webブラウザで http://localhost:5000 にアクセスすることで管理画面を表示することができます。

デフォルトではユーザー名 guest 、パスワードは guest でダッシュボードに入れます。

メッセージ送信側のコード(Unity)を用意する

メッセージ送信側はUnity(C#)を想定しています。UnityのRabbitMQのクライアントはRabbitMQ.Clientの.net standard 2.0のパッケージを利用しました。(インストール方法は省略)
以下のコードを適当なGameObjectに張り付けて実行することでメッセージを送出できます。
Brokerのホスト名は localhost 、ポートはデフォルトポートの 5672 (省略時はデフォルトポート)を指定しています。

using UnityEngine;
using System;
using System.Threading.Tasks;
using System.Text;
using RabbitMQ.Client;

public class MessagePublisher : MonoBehaviour
{
    [SerializeField]
    private string hostName = "localhost";
    [SerializeField]
    private string exchangeName = "ex001";
    [SerializeField]
    private float messageIntervalSeconds = 1f;

    private IConnection connection;
    private IModel channel;
    private float delta;
    private long count = 0;

    private void OnEnable()
    {
        var factory = new ConnectionFactory()
        {
            HostName = hostName
        };

        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false);
    }

    private void OnDisable()
    {
        channel.Close();
        connection.Close();
    }

    private async Task Update()
    {
        delta += Time.deltaTime;
        if (delta > messageIntervalSeconds)
        {
            await PublishMessageAsync(channel, $"Hello Wolrd: {count++}");
            delta -= messageIntervalSeconds;
        }
    }

    private async Task PublishMessageAsync(IModel model, string message)
    {
        await Task.Run(() => {
            try
            {
                if (!model.IsClosed) {
                    model.BasicPublish(exchangeName, "", null, Encoding.UTF8.GetBytes(message));
                }
            }
            catch (Exception e)
            {
                Debug.LogError($"failer send. reason: {e.Message}");
            }
        });
    }
}

実行するとダッシュボードからメッセージが送信されていることが確認できます。( Publish の項目の数値が変化する)

メッセージ送信側のコード(Python)を用意する

メッセージ送信側はPythonを想定しています。PythonのRabbitMQのクライアントはpikaのパッケージを利用しました。(こちらは pip install pika ですぐに利用できます) 以下のコードを適当なファイル名(例: main.py )を付けて保存し、実行(python main.py)します。

import pika

queue_name = 'q001'
exchange_name = 'ex001'
host_name = 'localhost'

def callback(ch, method, properties, body):
    message = body.decode()
    print(message)
    ch.basic_ack(delivery_tag = method.delivery_tag)

params = pika.ConnectionParameters(host=host_name)
connection = pika.BlockingConnection(params)

channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.queue_bind(queue=queue_name, exchange=exchange_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback)

try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.queue_unbind(queue=queue_name, exchange=exchange_name)
    channel.queue_delete(queue=queue_name)
    channel.stop_consuming()

connection.close()

実行するとコンソール画面上に受信したメッセージが表示されます。

また、ダッシュボードからメッセージが送信されていることが確認できます。( DeliverConsumer ack 項目の数値が変化する)

簡単な解説

RabbitMQでは少し変わった特徴があり、メッセージの送信側(Publisher)がメッセージを送信する先に直接Queueに送信するのではなく、Exchangeというメッセージ交換局的な役割を持つ仕組みにメッセージを送信します。メッセージの受信側(Consumer/Subscriber)はこれまで通り、Queueからメッセージを取得します。

Exchangeではメッセージがキューイングされないため、単純にExchangeに対してメッセージを送るだけだと、送ったメッセージは自動的に破棄(drop)されます。この動きはダッシュボード上の Unroutable の項目で確認できます。

メッセージをQueueに貯めるには、バインディングという仕組みを利用します。RabbitMQではExchangeからメッセージの送り先、蓄積先となるQueueを選ぶことができます。(複数指定も可能)

上記のpythonのコードではqueue_bind()というメソッドを利用して、Exchange経由でPublisherからのメッセージの送り先を特定のQueueに送るように設定しています。

#...(略)
channel.queue_bind(queue=queue_name, exchange=exchange_name)
#...(略)

受信側のアプリでこの設定を行っているため、受信アプリを起動後はメッセージがQueueに送信され、メッセージの自動破棄が無くなるため Unroutable の数値が 0 に下がります。

ダッシュボードでこのような情報が可視化されているので、管理がしやすそうな印象があります。(Kafkaでも似たようなダッシュボードがありそうです)

おわりに

今回にはメッセージキューイングのサービスのうち、RabbitMQを取り上げました。以前に触ったKafkaと比べ、シンプルで使いやすい印象があります。パフォーマンスや用途によっても使い分けたいところです。
スループットを最優先するのであればブローカーレスのZeroMQが一番パフォーマンスが出るようなので、次回以降のテーマにしたいと思います。