KafkaとKafkaJSのHelloWorldを試してみた

はじめに

エンジニアの松原です。普段の開発ではHTTPサーバーを扱うことが多いのですが、負荷軽減のための様々なサーバー側のギミックについて調べていく中で、メッセージキューの仕組みについて目に留まりました。

今回はKafkaについて紹介します。元々メッセージキューに利用できることは知っていたのですが、バックグラウンドでどのような振る舞いをしているかに関しては、あまり詳しく調べられていませんでした。

社内でアドベントカレンダーを今年もやることになったので、この機会にしっかりと勉強しようと思い、この記事ではまずはHelloWorld的に、単純にメッセージキューとしてHelloWorldを試してみました。今回の記事で登場したコードはGitHubにリンクを置いていますので、参考になれば幸いです。

github.com

手っ取り早くKafkaを始める

Kafka自体はかなり複雑な仕組みを持っており、書籍を読んでもしっくりこなかったため、ちょうどセールス中だったUdemyの「Apache Kafka Series - Learn Apache Kafka for Beginners v3」を見て学んでようやくどういった仕組みを持っているのか理解できました。(ブログで説明しようとするとかなり記事が長くなってしまうので、今回はKafkaそのものの解説は割愛します。)

さて、ここから実際に触るとなった際に、ネット上の記事をかじりつつ試そうとしたら大いにハマりました。

私は開発用のPCにWindowsを使っているのですが、Unityの開発以外は仮想環境を扱っていることが多く、Kafkaも簡単に仮想環境で扱えると思っていましたが、環境変数などの設定が問題なのか、立ち上がっているはずのKafkaのサーバーにローカルから接続できないという問題に大いにハマり、ひたすらトライアル&エラーを繰り返して試すという状態に陥りました。

結論として、Udemyで取り上げられていた、 conduktor/kafka-stack-docker-composezk-single-kafka-single.yml を使ってdocker-composeから立ち上げて使う事が、Windows環境で素早く試せました。

github.com

Docker Desktop が Windows に入っていれば、以下の docker-compose コマンドから仮想環境上に最低限の環境であるKafkaのBrokerとZookeeperが立ち上がります。

 docker-compose -f zk-single-kafka-single.yml up

ProducerとConsumerをNode.jsから用意する

Kafkaそのものはメッセージキューのサービスですが、メッセージ自体を生成するにはProducer、メッセージを読み取るにはConsumerがそれぞれ必要になります。

これらは複数の言語でクライアントが提供されていますので、Java、Ruby、Node.js、Golangなど、様々な言語を使う事ができます。

今回は開発速度を最優先してNode.jsで動作するProducerとConsumerを用意しました。

クライアントにはKafkaJSを利用しました。

kafka.js.org

KafkaJSのサンプルコードを流用して、ProducerとConsumerを作ります。

Producerのサンプルコード

const { Kafka } = require('kafkajs')
const { setTimeout } = require('timers/promises')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
})

const producer = kafka.producer()

async function main() {
  await producer.connect()

  let count = 0
  while(true) {
    console.log('send message from producer.')
    await producer.send({
      topic: 'test-topic',
      messages: [
        { value: `Hello KafkaJS user!:${count}` },
      ],
    })

    count++
    if (count > 99) {
      break;
    }
    await setTimeout(5000)
  }

  await producer.disconnect()
}

main()

Consumerのサンプルコード

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
})

const consumer = kafka.consumer({ groupId: 'test-group' })

async function main() {
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
  
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log(`received from consumer: ${message.value.toString()}`)
    },
  })
}

main()

サンプルコードを実行する

docker-compose から KafkaのBrokerとZookeeperを立ち上げ後、それぞれProducerとConsumerをNode.jsのアプリをローカル上で実行したログが以下の画像になります。

おわりに

Kafkaそのものは大規模サービスで利用される分散処理の処理基盤であり、クラウドとも相性が良いので、しっかりと勉強して使いこなしていけるようになりたいと考えています。

今回はKafka Streamsまで紹介できませんでしたが、Kafka Streamsは非常に強力な仕組みを作れるため、実サービスにも応用ができそうです。

今後の記事で取り上げることができるかもしれません。