Rails上で利用できるKarafkaを使ってKafkaを利用する

はじめに

エンジニアの松原です。前回の記事では、簡単にKafkaをローカルでサーバーを建ててHello Worldをする内容を紹介しました。

synamon.hatenablog.com

前回はConsumer側にKafkaJSを利用していましたが、今回は、Karafka を利用して、Rails上からConsumerとして振舞わせることを記事にしました。今回の記事で登場したコードはGitHubにリンクを置いていますので、参考になれば幸いです。
※今回使用しているKarafkaはLGPL v3.0のライセンスのものを利用しているため、利用の際はご注意ください。

github.com

Karafkaをインストールする

Karafka自体はRails上で動いていることを想定しているため、Railsのアプリケーションを作成していない場合はbundlerを通してrailsをインストールしておく必要があります。

Railsが入っていない場合はインストールする

既にRailsアプリケーションがある場合はこの作業は不要です。以下はRubyが入っていることを前提に書いています。

# Gemfileを追加
bundle init
# Railsをインストール
bundle add rails
# Railsアプリケーションを追加(kafka-consumer-railsディレクトリ以下にソースが追加される)
bundle exec rails new -G -M --skip-active-storage -C -A -J --api kafka-consumer-rails
# カレントディレクトリをkafka-consumer-railsに移動
cd kafka-consumer-rails

Karafkaをインストールする

railsが追加されたら、次にKarafkaをインストールします。

bundle add karafka
bundle exec karafka install

karafkaをインストールすると、以下の3つファイルが追加されます。

karafka.rb
app/consumers/application_consumer.rb
app/consumers/example_consumer.rb

設定を書き換える(karafka.rb)

karafka.rb の中身は以下のようになっています。(途中省略しています)

# frozen_string_literal: true

class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka = { 'bootstrap.servers': '127.0.0.1:9092' } # <-- ブートストラップサーバーの設定
    config.client_id = 'example_app' # <-- KafkaのクライアントアプリケーションのID設定
    # ...
    config.consumer_persistence = !Rails.env.development?
  end
  
  (--中略--)

  routes.draw do
    # ...
    topic :example do  # <-- トピック名
      consumer ExampleConsumer
    end
  end
end

GitHubのサンプルではDocker内で動かしているため、設定を以下のように変更していますが、他の環境ではその都度変わると思いますので、環境に合わせて設定してください。

    config.kafka = { 'bootstrap.servers': 'host.docker.internal:29092' }
    config.client_id = 'my-app'

GitHubのサンプルではトピック名は sample のままにしています。

コードを一部書き換える(example_consumer.rb)

トピックを受け取って処理する内容を記述しているのは example_consumer.rb になります。

# frozen_string_literal: true

# Example consumer that prints messages payloads
class ExampleConsumer < ApplicationConsumer
  def consume
    messages.each { |message| puts message.payload }
  end

  # ...
end

このままだとうまくメッセージが受け取れなかったので、GitHubのサンプルでは以下のように変更しました。ここのmessageはKarafkaで定義されているオブジェクト型なので、このページからKarafkaの設計を詳しく調べておく必要がありそうです。

  def consume
    messages.each { |message| puts message.raw_payload }
  end

Karafkaを直接実行する

Karafkaはbundlerに依存関係の解決を任せているため、必ず bundle exec のコマンドを使って実行する必要があります。

bundle exec karafka server

仮想環境から実行する

GitHubのサンプルを実行する場合は、プロジェクト直下のディレクトリから、docker-composeを利用して以下のコマンドでビルド、実行します。

# ビルド
docker-compose -f zk-single-kafka-single.yml -f docker-compose-rails.yml build
# 起動(grep使って標準入出力の表示をフィルタする)
docker-compose -f zk-single-kafka-single.yml -f docker-compose-rails.yml up | grep kafka-consumer-rails

実行結果

前回の記事のProducerを別に動作させています。(トピック名は example に変更しています)

とりあえずは動いたというイメージです。
仮想環境上だと詰まったような動きをするのですが、どこかで設定間違っている可能性があるので、Karafkaの設定を見直す必要ありそうです。

おわりに

前回に引き続きKafkaを取り上げました。今回はRails上で利用できるKarafkaを利用してConsumerとして動かしてみました。
ProducerやConsumerに関する記事はJavaを利用したものが多いので、今後Javaを触ってみるのもよさそうです。