はじめに
エンジニアの松原です。前回の記事では、簡単にKafkaをローカルでサーバーを建ててHello Worldをする内容を紹介しました。
前回はConsumer側にKafkaJSを利用していましたが、今回は、Karafka を利用して、Rails上からConsumerとして振舞わせることを記事にしました。今回の記事で登場したコードはGitHubにリンクを置いていますので、参考になれば幸いです。
※今回使用しているKarafkaはLGPL v3.0のライセンスのものを利用しているため、利用の際はご注意ください。
Karafkaをインストールする
Karafka自体はRails上で動いていることを想定しているため、Railsのアプリケーションを作成していない場合はbundlerを通してrailsをインストールしておく必要があります。
Railsが入っていない場合はインストールする
既にRailsアプリケーションがある場合はこの作業は不要です。以下はRubyが入っていることを前提に書いています。
# Gemfileを追加 bundle init # Railsをインストール bundle add rails # Railsアプリケーションを追加(kafka-consumer-railsディレクトリ以下にソースが追加される) bundle exec rails new -G -M --skip-active-storage -C -A -J --api kafka-consumer-rails # カレントディレクトリをkafka-consumer-railsに移動 cd kafka-consumer-rails
Karafkaをインストールする
railsが追加されたら、次にKarafkaをインストールします。
bundle add karafka
bundle exec karafka install
karafkaをインストールすると、以下の3つファイルが追加されます。
karafka.rb app/consumers/application_consumer.rb app/consumers/example_consumer.rb
設定を書き換える(karafka.rb)
karafka.rb
の中身は以下のようになっています。(途中省略しています)
# frozen_string_literal: true class KarafkaApp < Karafka::App setup do |config| config.kafka = { 'bootstrap.servers': '127.0.0.1:9092' } # <-- ブートストラップサーバーの設定 config.client_id = 'example_app' # <-- KafkaのクライアントアプリケーションのID設定 # ... config.consumer_persistence = !Rails.env.development? end (--中略--) routes.draw do # ... topic :example do # <-- トピック名 consumer ExampleConsumer end end end
GitHubのサンプルではDocker内で動かしているため、設定を以下のように変更していますが、他の環境ではその都度変わると思いますので、環境に合わせて設定してください。
config.kafka = { 'bootstrap.servers': 'host.docker.internal:29092' } config.client_id = 'my-app'
GitHubのサンプルではトピック名は sample
のままにしています。
コードを一部書き換える(example_consumer.rb)
トピックを受け取って処理する内容を記述しているのは example_consumer.rb
になります。
# frozen_string_literal: true # Example consumer that prints messages payloads class ExampleConsumer < ApplicationConsumer def consume messages.each { |message| puts message.payload } end # ... end
このままだとうまくメッセージが受け取れなかったので、GitHubのサンプルでは以下のように変更しました。ここのmessageはKarafkaで定義されているオブジェクト型なので、このページからKarafkaの設計を詳しく調べておく必要がありそうです。
def consume messages.each { |message| puts message.raw_payload } end
Karafkaを直接実行する
Karafkaはbundlerに依存関係の解決を任せているため、必ず bundle exec
のコマンドを使って実行する必要があります。
bundle exec karafka server
仮想環境から実行する
GitHubのサンプルを実行する場合は、プロジェクト直下のディレクトリから、docker-composeを利用して以下のコマンドでビルド、実行します。
# ビルド docker-compose -f zk-single-kafka-single.yml -f docker-compose-rails.yml build # 起動(grep使って標準入出力の表示をフィルタする) docker-compose -f zk-single-kafka-single.yml -f docker-compose-rails.yml up | grep kafka-consumer-rails
実行結果
前回の記事のProducerを別に動作させています。(トピック名は example
に変更しています)
とりあえずは動いたというイメージです。
仮想環境上だと詰まったような動きをするのですが、どこかで設定間違っている可能性があるので、Karafkaの設定を見直す必要ありそうです。
おわりに
前回に引き続きKafkaを取り上げました。今回はRails上で利用できるKarafkaを利用してConsumerとして動かしてみました。
ProducerやConsumerに関する記事はJavaを利用したものが多いので、今後Javaを触ってみるのもよさそうです。