RSS2.0

Java から Kafka の Producer を利用してみる

前回 Java で Kafka の Consumer を試してみましたが、今回は Producer を実装してみました。
Consumer 側はパーティション分割された Topic からのメッセージの受け取りでだいぶ詰まっていたのですが、Producer 側はクラスター構成に対してもすんなりと動かせました。実装コードは接続先の Kafka ブローカーの設定値を書き換えるくらいで、同じコードが動いてくれました。

利用したのは前回と同じく Kafka の Java クライアント、kafka-clients です。
dependencies {
...
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0'
...
}
Gradle を使う場合、build.gradle に追加する依存性は前回と同じです。Maven の場合はこの辺りを参考にしてください。

単一ノード構成で Producer を利用する

Producer のサンプルコードは以下の通りです。接続設定をして、Producer のインスタンスを作って、メッセージを送るだけというとてもシンプルな内容になりました。

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class MyProducer {

  public static void main(String[] args) {
    // 接続時の設定値を Properties インスタンスとして構築する
    Properties properties = new Properties();
    // 接続先 Kafka ノード
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    // Producer を構築する
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());

    try {
      // トピックを指定してメッセージを送信する
      for (int i = 0; i < 10; i++) {
        producer.send(new ProducerRecord<String, String>("mytopic", String.format("message%02d", i)));
      }
    } finally {
      producer.close();
    }
  }

}
まずは単一ノード構成で動かしてみようということで、アクセスする Kafka の Topic にはパーティション分割されていない Topic を指定しています。

動作確認をしてみる

送信されたメッセージを受け取るために、コマンドラインから Consumer を動かしておきます。kafka-console-consumer.sh には --from-beginning オプションを指定しているので、Producer 側の実行は Consumer 待受前でも後でも大丈夫です。
今回の例では "message" + 数字という内容のメッセージを 10 個送っているので、以下のように受け取れれば OK です。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning
message00
message01
message02
message03
message04
message05
message06
message07
message08
message09

Producer を構築する

kafka-clients では KafkaProducer クラスが Producer に相当します。KafkaProducer の構築では KafkaConsumer と同じく、接続情報等の設定値をいれた Properties インスタンスと、メッセージのキー/値をシリアライズするための Serializer を渡します。
// 接続時の設定値を Properties インスタンスとして構築する
Properties properties = new Properties();
// 接続先 Kafka ノード
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// Producer を構築する
KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
KafkaConsumer の設定項目は、公式ドキュメントの 3.3 Producer Configs に記載されています。Producer 側はメッセージをひたすら送るだけなので、起動時の接続先 Kafka ブローカーである bootstrap.servers 値くらいを指定しておけば動かすことができます。
また、KafkaProducer のコンストラクターにインスタンスとして渡している Serializer は、代わりに設定項目 key.serializer / value.serializer 値にクラスのフルパスを渡してもよいです。

Producer からメッセージを送る

メッセージの送信では、KafkaProducer の sent() メソッドに ProducerRecord クラスとして送るメッセージを渡します。ProducerRecord には第一引数に送信する Topic の名前、第二引数にメッセージの内容を指定しています。
// トピックを指定してメッセージを送信する
producer.send(new ProducerRecord<String, String>("mytopic", String.format("message%02d", i)));
特定の Topic パーティションに割りあて、メッセージを待ち受け続ける Consumer と異なり、Producer はメッセージ単位で宛先 Topic を指定できるのが特徴的です。

複数ノードの Kafka クラスターで Producer を利用する

パーティション分割された Topic を宛先として Producer を利用する場合にも、上記サンプルコードはそのまま利用できます。なので、ここでは Kafka クラスターに Producer をむける場合の補足をしていきたいと思います。
KafkaProducer の設定値 bootstrap.servers ですが、Kafka クラスターの複数のノードをすべて指定したい場合には、, 区切りで記述することもできます。
// 接続先 Kafka ノード
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093,localhost:9094");
ここで指定するのは起動時の接続先ノードの候補なので、いずれかのノードから全ノードの情報は自動で収集されます。なので、記述しなかったノードが使われないわけではありません。ただ、クラスター上に存在していても死んでいるノードには接続できないので、確実に接続させるという意味ではわかるノードはすべて書いておいたほうがいいかもしれません。

上記サンプルコードでは ProducerRecord にはトピック名とメッセージ本文しか指定していませんが、Javadoc にあるとおりトピックのパーティション番号を指定できるコンストラクターも用意されています。
トピックのパーティション番号を指定しない場合、メッセージは Kafka クラスターによって、各パーティションに順番に割り当てられていきます。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 0
message00
message03
message06
message09
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 1
message02
message05
message08
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 2
message01
message04
message07
一方で ProducerRecord にパーティション番号を指定しておくと、そのパーティション上にメッセージが配置されることになります。
例えば 10 個すべてのメッセージでパーティション番号に 1 を指定して今回のコードを動かすと、Consumer 側では以下のようにメッセージを受信します。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 0
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 1
message00
message01
message02
message03
message04
message05
message06
message07
message08
message09
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 2
一方で ProducerRecord にパーティション番号を指定しておくと、そのパーティション上にメッセージが配置されることになります。
パーティション内ではメッセージの受信順序は保証されるので、論理的に関連のある一連のメッセージを同じ Consumer に確実に受け取らせたい、というような場合では使うことがありそうです。
  Kafka  コメント (0)  2018/10/03 19:20:04


公開範囲:
プロフィール HN: ももかん
ゲーム作ったり雑談書いたり・・・していた時期が私にもありました。
カレンダー
<<2018, 10>>
30123456
78910111213
14151617181920
21222324252627
28293031123