Elixirのプロセス間の処理を試してみる

はじめに

エンジニアの松原です。学習のためにゲーム関連のリアルタイム通信に関していろいろ調べていたところ、大規模なシステムになるほど、複数のプロセスやサーバーに処理を分散させて運用することが多いということが伺えました。
これまで私が触ってきた手続き型言語やオブジェクト指向言語(例えばC++やC#、Javascriptなど)では一纏のロジックを複数のプロセスやサーバーに分割、または平行処理をさせようとすると、言語ごとのパフォーマンスに関する優劣はあれど、元々それらの言語の仕様レベルで設計されていないため、実現するために外部ライブラリの導入や独自のAPIの定義などでかなりの労力を費やす必要があるかと思います。
一方で、言語自体が分散処理や複数プロセスでの実装が前提となっている言語もあり、今回はそれらの言語の代表的なものとしてElixir(Erlang VM)について本記事で触れていこうと思います。

Elixirをベース言語として有名なPhoenixフレームワークがありますが、前提となる知識が多く必要で、まだ私自身も学習中で、Elixirの書き方自体にもあまり慣れていないため、今回はコア機能の一つであるプロセス管理や処理に焦点を絞って紹介したいと思います。

また、Elixir自体のインストールや実行方法などは公式ページのIntroductionに記載されていますので、割愛します。

プロセスについて

ElixirやErlangで呼ばれるプロセスとは、具体的にはVM上で動作するプロセスのことを指します。通常の言語では実行時OSが管理するプロセス(またはネイティブプロセス)として実行されますが、Elixirのアプリケーションの実行環境であるErlang VM(BEAM)では、VM上で生成・管理されます。 他の言語のVMといえば、例えばJavaやPythonが代表しやすいかと思いますが、それらの言語はそれぞれOS上のプロセスとして実行されます。これらの違いはプロセス間のやりとりの仕組みが大きく異なります。

通常のプログラミング言語でのプロセス

JavaやPythonを含め、通常のプログラミング言語で作成されたアプリケーションのプロセスはOSが管理するプロセスとして実行されるため、他のプロセス内のリソースに対して直接的な操作が行えません。
このため、一般的にはプロセス間通信を利用するのですが、アプリケーション側にSocketなどの通信プロトコルを用いるか、OSの機能に依存する名前付きパイプなど、間接的な手法を用いてプロセス間通信を行っています。 アプリケーションとしてのパフォーマンスを高めるために利用されているのは主にスレッドで、こちらは同一のプロセス内で実行されます。例外もありますが、一般的なアプリケーション開発で、プロセス間の処理を意識することは少ないかと思います。

Elixir(Erlang VM)のプロセス

ElixirではプロセスはErlang VM上で生成・管理されている、かつ言語仕様としてプロセスの操作方法が豊富に提供されているため、直接的にプロセスに対して操作を行うことができます。
また、このプロセスはかなり軽量で、PCのスペックが高ければ、数千、数万というプロセスを立ち上げることも可能だそうです。
Elixirではモジュール単位に並行処理を行えるように設計されており、厳密には異なりますが、使い勝手的にプロセスをインスタンスのように扱うこともできます。ただし、プロセスには変わりないので、いろんな箇所で同じプロセスを取りまわす、みたいなことはしないほうがよさそうです。

今回はElixirのプロセスに関する一部の機能に関して、コードを書きつつ自分なりに理解できた部分について紹介したいと思います。

Elixirでプロセスを作る

Elixirではいくつかプロセスを扱う方法がありますが、今回はコードが短く、比較的簡単に扱えるものを取り上げてみました。

モジュールを定義してプロセスを実行数する

Elixirでプロセスを扱う簡単な方法は、Elixirで用意されている spawn 関数を利用すること(らしい)です。この関数は引数に指定した関数をプロセスとして起動、実行する関数になります。まずはプロセスとして立ち上げるためのモジュールを定義します。

# example_math.ex
defmodule ExampleMath do
  def add(a, b) do
    Process.sleep(2000)
    IO.puts(a + b)
  end
end

このモジュールを定義したファイル(例: example_math.ex )をシェル上でのコマンド elixirc example_math.ex で.beamファイルにコンパイル後、インタラクティブシェル( iex )を起動して下記のコマンドを実行します。
spawnの関数の引数は左から、モジュール名、実行する関数名(アトムとして記述)、関数に渡す引数(配列として記述)になります。

iex(1)> spawn(ExampleMath, :add, [5,10]) 

返り値としてプロセスのIDが返却されます。プロセスIDは実行環境ごとに異なります。2秒後に標準入出力に a + b の評価結果が表示されます。この処理は非同期処理になります。

#PID<0.107.0>
iex(2)> 15

メッセージパッシングを行ってみる

さっきの例ではプロセス起動と同時に非同期処理を実行していましたが、今度は特定のタイミングでプロセスに対してメッセージを渡して、処理を受け取れるようにしてみます。例として以下のようなモジュールを定義し、コンパイルしておきます。

# example_greeting.ex
defmodule ExampleGreeting do
  def greet do
    receive do
      {_sender_pid, :hello} ->
          IO.puts("Hello, my friend.")
      {sender_pid, :howareyou} ->
          send(sender_pid, {:ok, "I'm fine."})
    end

    greet()
  end
end

インタラクティブシェル( iex )を起動して下記のコマンドを実行します。

iex(1)> pid = spawn(ExampleGreeting, :greet, []) 
#PID<0.107.0>

まずは単方向のメッセージとして、send 関数を使ってプロセスにメッセージを送ってみます。関数の引数は左から、プロセスID、プロセスにメッセージとして送るタプル(self() の評価結果 = 自分のプロセスID、メッセージ内容(アトム))となります。

iex(2)> send(pid, {self(), :hello})

すると、起動した標準入出力にプロセス側で定義したメッセージが表示されます。send 関数の返り値として、送信したメッセージを受け取ります。

Hello, my friend.
{#PID<0.105.0>, :hello}

今度は双方向のメッセージとして、メッセージを送ります。

iex(3)> send(pid, {self(), :howareyou})

今度は send 関数実行時の返り値のみ帰ってきます。

{#PID<0.105.0>, :howareyou}

先ほどの send 関数で送った処理は、パターンマッチを介して下記の例のようにプロセス側で再度送信元にメッセージを返すように実装されています。

    receive do
      ...
      {sender_pid, :howareyou} ->
          send(sender_pid, {:ok, "I'm fine."})
    end

コマンドを通して呼び出し元でメッセージを受け取ります。

iex(4)> receive do {:ok, message} -> IO.puts message end

結果は以下のようになります。

I'm fine.
:ok

このように、プロセス間でメッセージの送受信が簡単にできます。

GenServerを使ってみる

先ほどはモジュールを直接プロセス化しましたが、よりサーバーアプリケーションらしく、プロセス側で状態管理を持ち、起動時に初期化処理を行うなど汎用的にプロセスを扱うためにGenServerというモジュールを使ってみます。以下はスタックの機構をプロセスとして簡易実装したモジュールになります。

defmodule Stack do
  use GenServer

  @impl GenServer
  def init(state) do
    {:ok, state}
  end

  @impl GenServer
  def handle_call(:pop, _from, [head | tail]) do
    {:reply, head, tail}
  end

  @impl GenServer
  def handle_cast({:push, item}, state) do
    {:noreply, [item | state]}
  end
end

GenServerを用いる際、初期化の手続きに init 、同期的処理を行いたい(=呼び出し後プロセスから何かしら値を取得したい)場合は handle_call 、非同期処理を行いたい(=呼び出しのみでプロセスから値を取得しなくてよい)場合は handle_cast の関数を用います。

これらのメソッドの引数と返り値はあらかじめ決められており、それぞれ解説していきますと

  • init プロセス起動時に呼び出されるGenServerのコールバック関数
    • 引数
      • <引数> 初期化時に呼び出し元から渡されるパラメータ、型の指定は無くユーザー側で定義できる
    • 返り値(以下のいずれかを返却する)
      • { :ok, <状態> } プロセスが正常に起動した場合、タプルの2つ目にプロセスが保持する状態を指定する
      • :ignore 失敗ではないが、プロセスの起動を無視する
      • { :stop, <理由> } ロジック上理由があってプロセスを終了する場合、タプルの2つ目にプロセスの起動の理由を指定する
  @impl GenServer
  def init(<引数>) do
    ...
    {:ok, <状態>}
  end
  • handle_call プロセスに対し、同期的(=返り値を要求する場合)に呼び出されるGenServerのコールバック関数
    • 引数
      • <引数> 初期化時に呼び出し元から渡されるパラメータ、型の指定は無くユーザー側で定義できる
      • <呼び出し元プロセス> 呼び出し元のプロセスID
      • <状態> handle_call が呼び出された直後に保持しているプロセスの状態、型の指定は無くユーザー側で定義できる
    • 返り値(以下のいずれかを返却する)
      • { :reply, <返り値>, <状態> } 呼び出しに対して返り値を返す場合、タプルの2つ目に返り値、タプル3つめに更新された状態を指定する
      • { :noreply, <状態> } 呼び出しに対して返り値を返さない場合、タプル2つめに更新された状態を指定する
      • { :stop, <理由>, <状態> } 呼び出しに対してプロセスを終了する場合、タプルの2つ目にプロセスの起動の理由、タプル3つめに更新された状態を指定する
  @impl GenServer
  def handle_call(<引数>, <呼び出し元プロセス>, <状態>) do
    ...
    {:reply, <返り値>, <状態>}
  end
  • handle_cast プロセスに対し、非同期的に呼び出されるGenServerのコールバック関数
    • 引数
      • <引数> 初期化時に呼び出し元から渡されるパラメータ、型の指定は無くユーザー側で定義できる
      • <状態> handle_cast が呼び出された直後に保持しているプロセスの状態、型の指定は無くユーザー側で定義できる
    • 返り値(以下のいずれかを返却する)
      • { :noreply, <状態> } 処理を正常に完了する場合、タプル2つめに更新された状態を指定する
      • { :stop, <理由>, <状態> } 呼び出しに対してプロセスを終了する場合、タプルの2つ目にプロセスの起動の理由、タプル3つめに更新された状態を指定する
  @impl GenServer
  def handle_cast(<引数>, <状態>) do
    {:noreply, <状態>}
  end

・・・というコールバック関数が用意されています。気を付ける点として、この関数の返り値はそのまま呼び出し元への返り値として返却されるのではなく、いったんGenServerのモジュール内で別のロジックに渡され、最後に呼び出し元に処理を返すという実装になっているため、GenServer側で実装した関数の返り値によって呼び出し元の関数の返り値と振る舞いが変わってくるので、

このモジュールをコンパイル後、 インタラクティブシェル( iex )を起動して下記のコマンドを実行します。

iex(1)> {_, pid1} = GenServer.start_link(Stack, ["hello", "world"]) 

プロセスが起動し、起動したプロセスのIDが返り値に返ってきます。

{:ok, #PID<0.107.0>}

スタックから値を取り出してみます。プロセスに対する同期処理の呼び出しは以下のようなコマンドを実行します。

iex(2)> GenServer.call(pid1, :pop)

プロセスから値を一つ取り出しました。再度実行すると二つ目の値を取り出します。

"hello"
iex(3)> GenServer.call(pid1, :pop)
"world"

次は値をスタックに積んでみます。プロセスに対する非同期処理の呼び出しは以下のようなコマンドを実行します。

iex(4)> GenServer.cast(pid1, {:push, "hoge"})

呼び出し後の返り値は :ok のみになります。

:ok

ここで別プロセスを作成してみます。

iex(5)> {_, pid2} = GenServer.start_link(Stack, ["hello", "world"])

さっきとは異なるプロセスが作成されました。

{:ok, #PID<0.112.0>}

それぞれのプロセスから値を取り出します。

iex(6)> GenServer.call(pid1, :pop)
"hoge"
iex(7)> GenServer.call(pid2, :pop)
"hello"

このように、プロセスごとに管理している状態が独立しているため、この仕組みを使って実現できるロジックはたくさんありそうです。

おわりに

今回Elixirを使って、プロセスに関する処理を扱ってみました。
今回はじめてブログでElixirを取り上げてみましたが、これまで扱ったことがないタイプの言語だったため、まだプログラムの書き方が分からない部分が多くありますが、振る舞いは徐々にわかってきたので、これからもっと実践的なコードが書いていけそうです。
また、以前から異なるアプリケーション間の通信処理を書いたことがあるのですが、ここまで簡素に書けたことが無く、ElixirやErlangのマイクロプロセスによって実現できるロジックがもっと分かれば、面白いサービスを作れるようになるかもしれません。

また、InfoQの記事のDiscordの事例にあるように、Erlang VM(BEAM)にはNative Implemented Functions(NIF)という、Erlang向けにポーティングされたC言語のライブラリを使用することで、Erlang VM上でネイティブ動作している関数を呼び出すことができるため、処理の最適化のために実装をネイティブに寄せていく、ということも可能みたいです。
リアルタイムネットワーク用途や大規模なサービスにも応用できそうなので、個人でしばらく学習していきたいと思います。