Java から Kafka の Consumer を利用してみる
分散メッセージングシステムである Kafka を Java から利用してみました。手始めに今回は Consumer を使い、メッセージの受け取りを試してみることにします。公式ドキュメントに記載せれている通り、Kafka には Java から接続するためのクライアントライブラリがあります。Maven Central Repository にも公開されているので、Maven や Gradle からすぐに利用することができます。
今回は実際にこの
kafka-clients
ライブラリを使って接続してみたのですが、単一ノード構成では問題ないものの、複数ノードでのクラスター構成への接続が一部意図通りに動作してくれませんでした。記事の内容としてはすっきり終わらないものの、現時点でのノウハウとして書いておこうと思います。kafka-clients をプロジェクトの依存性に追加する
プロジェクト管理には Gradle 4.8.1 を使ってみます。プロジェクトを作ったら、build.gradle
に以下の依存性を追加します。dependencies { ... compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0' ... }
kafka-clients
の現在の最新バージョンは 2.0.0 です。単一ノード構成で Consumer を利用する
今回 Java からアクセスするトピックとして、Kafka 上に単一ノード/単一パーティションで mytopic という名前のトピックを作っておきます。Kafka サーバーの構成等についてはこちらの記事と同様とします。Java のクライアントも Kafka サーバーが動いているマシン内で動かし、localhost に対して接続する形です。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic
以下が、単一ノード構成の Kafka に Consumer を接続させる Java コードになります。
kafka-clients
を追加したプロジェクト内に置いてください。import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class NonClusterConsumer { public static void main(String[] args) { // 接続時の設定値を Properties インスタンスとして構築する Properties properties = new Properties(); // 接続先 Kafka ノード properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Consumer を識別するための group id properties.put(ConsumerConfig.GROUP_ID_CONFIG, "java-consumer-group"); // 読み取り位置である offset が未指定だった場合に、Kafka 上に最も早く置かれたメッセージを読み取る properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 読み取り後に、読み取り位置である offset を自動で更新する properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Consumer を構築する KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer()); // Consumer をトピックに割り当てる consumer.subscribe(Arrays.asList("mytopic")); try { while (true) { // メッセージをとりだす ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60l)); // とりだしたメッセージを表示する for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("%s:%s", record.offset(), record.value())); } // メッセージの読み取り位置である offset を、最後に poll() した位置で(同期処理で)更新する consumer.commitSync(); } } finally { consumer.close(); } } }
動作確認をしてみる
サンプルプログラムの内容を見ていく前に、実際にこれを動かしてみます。そのための準備として、作成したトピックにはコマンドラインツールでメッセージを積んでおきます。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic > message01 > message02 > message03
NonClusterConsumer クラスを動かすと、トピックに置かれたこれらのメッセージを読み取ることができます。
1:message01 2:message02 3:message03各メッセージ先頭の : 以前の数値は、Kafka サーバー上でのメッセージの位置をあらわす offset 値です。
NonClusterConsumer クラスは実行中はループしながらメッセージを読み取り続けているので、起動中に新たにトピックに積まれたメッセージも随時読み取っていきます。また、読み取ったメッセージの位置が offset 値として記録されるので、再実行時には前回読み取ったメッセージ移行のメッセージが読み取られます。
Consumer を構築する
Consumer を構築する際には、接続先 Kafka サーバーのアドレスやメッセージの読み取りに関する設定値を指定する必要があります。設定値は公式ドキュメントの 3.4.1 New Consumer Configs に記載されていて、これをキーバリュー形式で Properties インスタンスとして用意します。キーとなる文字列はすべて ConsumerConfig クラスに定数として定義されています。
// 接続時の設定値を Properties インスタンスとして構築する Properties properties = new Properties(); // 接続先 Kafka ノード properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Consumer を識別するための group id properties.put(ConsumerConfig.GROUP_ID_CONFIG, "java-consumer-group"); // 読み取り位置である offset が未指定だった場合に、Kafka 上に最も早く置かれたメッセージを読み取る properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 読み取り後に、読み取り位置である offset を自動で更新する properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");基本的にはソースコードのコメントの通りなのですが、いくつか補足をしていきます。
Kafka では Consumer がどこまでのメッセージを読み取ったかを offset 値として記録する仕組みがあります。メッセージ読み取り後にこの offset 値が自動で更新されるのですが、読み取ったメッセージを処理するまで offset 値を更新したいくないといような場合には、
enable.auto.commit
を false にすることで手動で offset 値を更新できます。デフォルトでは true に設定されているので、今回のサンプルでは手動で更新するようにしました。メッセージの読み取り位置である offset が記録されていない場合、
auto.offset.reset
値に応じて offset が決まります。auto.offset.reset
値のデフォルトは latest
となっていて、これは Consumer が接続後に送信されたメッセージから読み取ることを意味します。今回は auto.offset.reset
値に earliest
を指定していて、Kafka サーバー上に残っている最も古いメッセージから読み取るようにしています。Consumer は任意の数の Consumer インスタンスが属する Consumer Group という単位で管理されていて、この Consumer Group の識別子が
group.id
値になります。メッセージの読み取り位置である offset もこの group.id
に値毎に記録されます。kafka-clients
での Consumer の実装クラスが、KafkaConsumer です。コンストラクターには設定値を入れた Properties インスタンスの他に、メッセージのキーと値をデシリアライズするためのクラスを指定しています。今回はキーも値も String 型としてデシリアライズしたいので、ふたつとも StringDeserializer を指定しています。
ちなみにこれらのデシリアライザーは、
key.deserializer
/ value.deserializer
値として Properties に入れておくこともできます。// Consumer を構築する KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer());
Consumer をトピックに割り当てる
作成した KafkaConsumer インスタンスを Topic に割り当てるには、subscribe() メソッドを呼び出します。// Consumer をトピックに割り当てる consumer.subscribe(Arrays.asList("mytopic"));引数には割り当てる Topic の名前を指定します。
Consumer でメッセージを取り出す
KafkaConsumer の poll() メソッドを呼び出すことで、実際に Topic からメッセージを受信することができます。poll() の呼び出しはブロックされるので、引数には一度の読み取り処理でブロックされる際のタイムアウトの時間を指定します。// メッセージをとりだす ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60l)); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("%s:%s", record.offset(), record.value())); }読み取ったメセージは ConsumerRecords インスタンスとして返されます。ConsumerRecords には Iterable<ConsumerRecord> が実装されていて、この ConsumerRecord クラスが各メッセージに該当します。各メッセージからはメッセージの位置や、メッセージのキー、値、メッセージが配置されたパーティション番号をとることができます。
Consumer の offset を手動で更新する
Consumer Group 毎に記録されるメッセージの読み取り位置ですが、enable.auto.commit
が false になっている場合には、commitSync() / commitAsync() メソッドを明示的に呼び出すことで、記録することができます。逆に言うと、これらのメソッドを呼び出さないと offset 値は更新されず、Consumer を再度動かすとすでに読み取った位置にあるメッセージを再度読み取ることになります。// メッセージの読み取り位置である offset を、最後に poll() した位置で(同期処理で)更新する consumer.commitSync();offset 値の更新は、commitSync() メソッドでは同期処理で行われ、commitAsync() メソッドでは非同期処理で行われます。
複数ノードの Kafka クラスターで Consumer を利用する
前提として、Kafka クラスター上に以下のように Topic を作っておきます。ノード数 2、パーティション数 3 の Topic です。$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-clustered-topic
Kafka クラスターに接続する Consumer の例が、以下のコードになります。
import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; public class ClusterConsumer { public static void main(String[] args) { // 接続時の設定値を Properties インスタンスとして構築する Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093,localhost:9094"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "java-clustered-consumer-group"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Consumer を構築する KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer()); // Consumer をトピックのパーティションに割り当てる TopicPartition partition0 = new TopicPartition("my-clustered-topic", 0); TopicPartition partition1 = new TopicPartition("my-clustered-topic", 1); TopicPartition partition2 = new TopicPartition("my-clustered-topic", 2); List<TopicPartition> topicPartitions = Arrays.asList( partition0, partition1, partition2); consumer.assign(topicPartitions); // Consumer の読み取り位置を明示的に指定する consumer.seekToBeginning(topicPartitions); try { while (true) { // メッセージをとりだす ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2l)); // とりだしたメッセージをパーティション毎に表示する for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); long offset = 0; for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(String.format("%s:%s:%s", record.partition(), record.offset(), record.value())); offset = record.offset(); } } // メッセージの読み取りを最後に poll() した位置で(同期処理で)コミットする // consumer.commitSync(); } } finally { consumer.close(); } } }Consumer の設定に関しては、単一ノード接続の例と基本的には同じです。
bootstrap.servers
値には , 区切りで Kafka クラスターを構成するノードを指定していますが、ここで指定したノードのいずれかからクラスターの全ノードの情報が自動で収集されるため、すべてのノードを記載しなければならないわけではありません。動作確認をしてみる
こちらもコマンドラインツールから Topic にメッセージを積んでおき、まずは動作確認をしてみます。$ bin/kafka-console-producer.sh --broker-list localhost:9093 --topic my-clustered-topic > message01 > message02 > message03 > message04 > message05 > message06
ClusterConsumer クラスを動かすと、トピックに置かれたこれらのメッセージを読み取ることができます。
0:1:message01 0:2:message04 1:1:message02 1:2:message05 2:1:message03 2:2:message06各メッセージの先頭には : 区切りで Topic のパーティション番号と、offset 値が記載されています。3 つのパーティション毎にメッセージの offset が採番されているのがわかります。
Consumer をトピックに割り当てる
Topic が複数のパーティションに分割されている場合、Consumer はそのパーティションを最小単位として割り当てる必要があります。各パーティションを TopicPartition インスタンスとして用意し、それをまとめて割り当てたい Consumer の assign() メソッドに渡します。// Consumer をトピックのパーティションに割り当てる TopicPartition partition0 = new TopicPartition("my-clustered-topic", 0); TopicPartition partition1 = new TopicPartition("my-clustered-topic", 1); TopicPartition partition2 = new TopicPartition("my-clustered-topic", 2); List<TopicPartition> topicPartitions = Arrays.asList( partition0, partition1, partition2); consumer.assign(topicPartitions);
Consumer でメッセージを取り出す
Consumer からのメッセージの読み取りは、基本的には単一ノード構成の場合と同じです。// メッセージをとりだす ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2l));
加えて、poll() の返り値である ConsumerRecords クラスには、指定したパーティション番号のメッセージのみを取り出せる records() メソッドが用意されているので、この例では records() メソッドを使っています。
取り出した ConsumerRecords に含まれるメッセージが配置されているパーティションは、その一覧を partitions() メソッドで取得することができます。
// とりだしたメッセージをパーティション毎に表示する for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(String.format("%s:%s:%s", record.partition(), record.offset(), record.value())); } }
Kafka クラスター外部で offset を記録する
Kafka クラスターから Consumer でメッセージを読み取る場合に、意図しない動作になったのが offset 値の更新です。offset 値の更新も単一ノード構成の場合と同様に、commitSync() / commitAsync() メソッドで記録できるはずなのですが、Kafka クラスターに対してこのメソッドを実行するとエラーになってしまい、きちんと動作してくれませんでした。consumer.commitSync();ソースコードを追ってみた感じでは、Consumer Group の offset を記録する Coordinator と呼ばれる Broker ノードの検知ができていないようで、そのためにパーティション毎のメッセージの読み取り位置が Kafka サーバー上に保持されない状態になっていました。Kafka の Coordinator 周辺の仕組みはこちらの記事が詳しいです。一応
kafka-clients
のバージョン 1.1.1 でも試してみたのですが状況は変わらなかったので、もしかしたら根本的に何かの考慮が足りていないのかもしれません。offset を Kafka 上に記録できないとなると、別の手段で永続化してやる必要があります。RDB やキーバリューストア等ですね。
ConsumerRecord クラスにはそのメッセージの位置を返す offset() メソッドが用意されているので、このメソッドの返り値を永続化することで、Kafka 外部に自分で offset を保存することができます。
long offset = record.offset();また、Consumer でメッセージを読み取る前に、seek() メソッドで読み取り位置を指定することもできます。この引数に、Kafka 外部から取得した offset を指定することになります。
TopicPartition partition0 = new TopicPartition("my-clustered-topic", 0); TopicPartition partition1 = new TopicPartition("my-clustered-topic", 1); TopicPartition partition2 = new TopicPartition("my-clustered-topic", 2); // RDB 等からパーティション毎の offset をとりだす long offset1; long offset2; long offset3; // Consumer にパーティション毎の offset を設定する consumer.seek(partition0, offset1); consumer.seek(partition1, offset2); consumer.seek(partition2, offset3);Kafka クラスターを利用する場合にも Kafka 上に offset 値を記録できればよかったのですが、時間切れで今回はここまでです。Kafka を使いこなすにはまだ時間が必要そうです。
Kafka コメント (0) 2018/10/02 19:47:35