はじめに
エンジニアの松原です。前回の記事( RabbitMQを使って開発言語の異なるアプリケーション間のメッセージ処理を試す ) に引き続き、ZeroMQについて取り上げたいと思います。
ZeroMQの設計パターンについて
前回の記事で紹介した通り、ZeroMQはブローカーレスのメッセージキューイングの仕組みを持っていますが、たくさんの設計パターンがあります。そのうちいくつかをざっくり紹介します。
- REQ-REPパターン
- Requester(クライアント)から特定のメッセージを送出、Responder(サーバー)がそれに応答してメッセージを返信する
- PUB-SUBパターン
- Publisher(サーバー)に対しSubscriber(クライアント)が購読することによって、Pubulisherからのメッセージを受信する
- ROUTER(P2P)パターン
- Router(クライアント、サーバー)とRouter(クライアント、サーバー)が互いに双方向でメッセージを交換する
REQ-REPパターンやPUB-SUBパターンはZeroMQの公式ガイドで紹介されているパターンで、ROUTERパターンに関してはZeroMQのパターンのRFCサイトで立案(Draft)されているパターンになります。
ただし、いずれかのノードがサーバーの機能を有している、かつあらかじめBind()を実行してクライアントが接続できる状態にしておく必要があります。このため、いずれかのパターンでもノードの起動の順序に依存します。
PUB-SUB+Proxyパターン
上記で紹介したパターンはいずれもブローカーレスの特徴であるパターンを紹介しましたが、実はZeroMQでもブローカードに近いパターンを構築することができます。それが Pub-Sub Network with a Proxy というパターンになります。
こちらはPublisherからサーバーの役割を切り離し、Proxy(Intermediator)にサーバーの機能を持たせるパターンです。他のパターンについてもZeroMQのパターンのRFCサイトでたくさんのパターンの概要を知ることができます。このようにアイデア次第でいろいろ役割や構造を持たせることができるところがZeroMQの面白いところかと思います。
NetMQを使ってPUB-SUB+Proxyパターンを構築してみる
今回はUnityでZeroMQベースのシステムを扱うため.NET向けに提供されているNetMQを利用しました。UnityでNuGetを扱うためにNuGetForUnityからNetMQをインストールできます。
Unityアプリからメッセージを送出したいので、Publisherとして役割を持たせたいのですが、Unityをサーバーとして動作させるには不安があるためPUB-SUB+Proxyのパターンを利用しました。
ただし、今回時間が足りずに調べられなかったのですが、別の開発言語のライブラリ(例えばpythonのpyzmq)と相互に通信するところまでは確認できず、NetMQを利用することがベストかどうかまではわかりませんでした。
今回はすべてNetMQを利用しProxy、Publisher、Subscriberそれぞれを簡易動作するところまで行いました。以下は今回のアプリケーションの構成になります。
Proxyサーバー用のコード
Proxyサーバーはdotnet consoleアプリケーションとして動作させました。以下がサンプルコードになります。
ややこしいのですが、PublisherはXSubscriberSocketに指定したポートに対して接続し、SubscriberはXPublisherSocketに指定したポートに対して接続するようになります。
using NetMQ; using NetMQ.Sockets; using (var xpubSocket = new XPublisherSocket("@tcp://127.0.0.1:1234")) using (var xsubSocket = new XSubscriberSocket("@tcp://127.0.0.1:5678")) { Console.WriteLine("Waiting for messages..."); var proxy = new Proxy(xsubSocket, xpubSocket); proxy.Start(); }
Subscriberクライアント用のコード
Subscriberクライアントもdotnet consoleアプリケーションとして動作させました。以下がサンプルコードになります。
Proxyサーバーが提供しているXPublisherSocketに対して接続し、特定のトピックを購読することにより、そのトピックに対してメッセージが送信されたときに受け取れるようになっています。
using NetMQ; using NetMQ.Sockets; using (var subscriberSocket = new SubscriberSocket(">tcp://127.0.0.1:1234")) { subscriberSocket.Options.ReceiveHighWatermark = 1000; subscriberSocket.Subscribe("testTopic"); while (true) { string messageTopicReceived = subscriberSocket.ReceiveFrameString(); Console.WriteLine(messageTopicReceived); } }
Publisherクライアント用のコード(Unity)
SubscriberクライアントはUnityのアプリケーションとして動作させました。以下がサンプルコードになります。以下のコードを適当なGameObjectにアタッチすることによって動作します。
注意点として、NetMQの現行バージョンでは実際の通信処理などは別スレッドで動作するように設計されているため、Unity(特にUnityEditor)からだとアプリ終了時などにリソースの開放(publisherSocket.Dispose()とNetMQConfig.Cleanup()の箇所)を明示的に行う必要があります。この内容についてはNetMQ公式サイトのCleanupの項目で書かれています。(記事書く前にこの内容を見落としていて、しばらくUnityEditorのフリーズに悩みました・・・)
dotnetのコンソールアプリではusingスコープがこの辺りを吸収してくれているようですが、Unityだとうまく動作せず、最悪アプリのフリーズやクラッシュを誘発する原因になります。
using UnityEngine; using NetMQ; using NetMQ.Sockets; public class MessagePublisher : MonoBehaviour { [SerializeField] private float messageIntervalSeconds = 1f; private PublisherSocket publisherSocket; private float delta; private long count = 0; private void Start() { publisherSocket = new PublisherSocket(">tcp://127.0.0.1:5678"); publisherSocket.Options.SendHighWatermark = 1000; } private void OnDestroy() { if (publisherSocket != null) { publisherSocket.Dispose(); publisherSocket = null; NetMQConfig.Cleanup(); } } private void Update() { delta += Time.deltaTime; if (delta > messageIntervalSeconds) { publisherSocket.SendMoreFrame("testTopic").SendFrame($"Hello Wolrd: {count++}"); delta -= messageIntervalSeconds; } } }
実行結果
Subscriberとして動作しているdotnet consoleアプリからUnityから送出されたメッセージを確認できます。
おわりに
今回はメッセージキューイングのZeroMQ(NetMQ)を取り上げました。メッセージキューイングライブラリのうちZeroMQベースのライブラリが一番パフォーマンスが出るそうですが、ライブラリの取り回しが難しい印象があり、今回UnityやC#以外の環境での動作確認ができなかったのが残念です。 もしNetMQがほかの開発言語のZeroMQで動くようになったら追って記事にしたいと思います。