Java から Kafka の Producer を利用してみる
前回 Java で Kafka の Consumer を試してみましたが、今回は Producer を実装してみました。Consumer 側はパーティション分割された Topic からのメッセージの受け取りでだいぶ詰まっていたのですが、Producer 側はクラスター構成に対してもすんなりと動かせました。実装コードは接続先の Kafka ブローカーの設定値を書き換えるくらいで、同じコードが動いてくれました。
利用したのは前回と同じく Kafka の Java クライアント、
kafka-clients
です。dependencies { ... compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0' ... }Gradle を使う場合、
build.gradle
に追加する依存性は前回と同じです。Maven の場合はこの辺りを参考にしてください。単一ノード構成で Producer を利用する
Producer のサンプルコードは以下の通りです。接続設定をして、Producer のインスタンスを作って、メッセージを送るだけというとてもシンプルな内容になりました。import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; public class MyProducer { public static void main(String[] args) { // 接続時の設定値を Properties インスタンスとして構築する Properties properties = new Properties(); // 接続先 Kafka ノード properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Producer を構築する KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer()); try { // トピックを指定してメッセージを送信する for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>("mytopic", String.format("message%02d", i))); } } finally { producer.close(); } } }まずは単一ノード構成で動かしてみようということで、アクセスする Kafka の Topic にはパーティション分割されていない Topic を指定しています。
動作確認をしてみる
送信されたメッセージを受け取るために、コマンドラインから Consumer を動かしておきます。kafka-console-consumer.sh
には --from-beginning
オプションを指定しているので、Producer 側の実行は Consumer 待受前でも後でも大丈夫です。今回の例では "message" + 数字という内容のメッセージを 10 個送っているので、以下のように受け取れれば OK です。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning message00 message01 message02 message03 message04 message05 message06 message07 message08 message09
Producer を構築する
kafka-clients
では KafkaProducer
クラスが Producer に相当します。KafkaProducer
の構築では KafkaConsumer
と同じく、接続情報等の設定値をいれた Properties
インスタンスと、メッセージのキー/値をシリアライズするための Serializer
を渡します。// 接続時の設定値を Properties インスタンスとして構築する Properties properties = new Properties(); // 接続先 Kafka ノード properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Producer を構築する KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(), new StringSerializer());
KafkaConsumer
の設定項目は、公式ドキュメントの 3.3 Producer Configs に記載されています。Producer 側はメッセージをひたすら送るだけなので、起動時の接続先 Kafka ブローカーである bootstrap.servers
値くらいを指定しておけば動かすことができます。また、
KafkaProducer
のコンストラクターにインスタンスとして渡している Serializer
は、代わりに設定項目 key.serializer
/ value.serializer
値にクラスのフルパスを渡してもよいです。Producer からメッセージを送る
メッセージの送信では、KafkaProducer
の sent()
メソッドに ProducerRecord
クラスとして送るメッセージを渡します。ProducerRecord
には第一引数に送信する Topic の名前、第二引数にメッセージの内容を指定しています。// トピックを指定してメッセージを送信する producer.send(new ProducerRecord<String, String>("mytopic", String.format("message%02d", i)));特定の Topic パーティションに割りあて、メッセージを待ち受け続ける Consumer と異なり、Producer はメッセージ単位で宛先 Topic を指定できるのが特徴的です。
複数ノードの Kafka クラスターで Producer を利用する
パーティション分割された Topic を宛先として Producer を利用する場合にも、上記サンプルコードはそのまま利用できます。なので、ここでは Kafka クラスターに Producer をむける場合の補足をしていきたいと思います。KafkaProducer
の設定値 bootstrap.servers
ですが、Kafka クラスターの複数のノードをすべて指定したい場合には、, 区切りで記述することもできます。// 接続先 Kafka ノード properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093,localhost:9094");ここで指定するのは起動時の接続先ノードの候補なので、いずれかのノードから全ノードの情報は自動で収集されます。なので、記述しなかったノードが使われないわけではありません。ただ、クラスター上に存在していても死んでいるノードには接続できないので、確実に接続させるという意味ではわかるノードはすべて書いておいたほうがいいかもしれません。
上記サンプルコードでは
ProducerRecord
にはトピック名とメッセージ本文しか指定していませんが、Javadoc にあるとおりトピックのパーティション番号を指定できるコンストラクターも用意されています。トピックのパーティション番号を指定しない場合、メッセージは Kafka クラスターによって、各パーティションに順番に割り当てられていきます。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 0 message00 message03 message06 message09
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 1 message02 message05 message08
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 2 message01 message04 message07一方で
ProducerRecord
にパーティション番号を指定しておくと、そのパーティション上にメッセージが配置されることになります。例えば 10 個すべてのメッセージでパーティション番号に 1 を指定して今回のコードを動かすと、Consumer 側では以下のようにメッセージを受信します。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 0
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 1 message00 message01 message02 message03 message04 message05 message06 message07 message08 message09
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 2一方で
ProducerRecord
にパーティション番号を指定しておくと、そのパーティション上にメッセージが配置されることになります。パーティション内ではメッセージの受信順序は保証されるので、論理的に関連のある一連のメッセージを同じ Consumer に確実に受け取らせたい、というような場合では使うことがありそうです。
Kafka コメント (0) 2018/10/03 19:20:04