Kafka 複数台でクラスターを組んでみる
前回シングルノードでの Kafka 構築を試してみたわけですが、今回は引き続き Kafka 公式ページの Quick Start に従って、Kafka(Broker)複数台でのクラスター構築を試してみたいと思います。
Kafka の Broker が持つデータは Topic 毎にパーティションという単位で分割することができ、そのパーティションを別々の Broker (のプロセスやサーバー)に割り当てたり、コピーを置いておいたりすることで、データ分散による性能改善や冗長化を実現することができます。パーティション内のデータを参照するためのインデックスもパーティション単位で作られるようなので、複数のパーティションにわけておくことで、参照性能をあげられる仕組みになっているようです。分割や複製されたパーティションの管理は Kafka が自動で行ってくれるとのことなので、今回は障害時のフェールオーバーまで試してみたいと思います。
Kafka クラスターを構築する
Broker 2 つで Kafka クラスターを構築してみます。今回は 1 つのマシン内に各 Broker を別プロセスとして用意しようと思います。実際の運用では、サーバーを複数台用意して各マシンで 1 プロセスずつ動かすことになると思いますが、どのマシンで作業するかが変わるだけでやることは同じです。前提として、Kafka のインストールまでは終わっているものとします。
Broker 毎の設定ファイルとして、server.properties をコピーして server-1.properties と server-2.properties を用意します。
# server-1.properties # Kafka サーバー(Broker)の ID 値 broker.id=1 # Kafka サーバー(Broker)への接続 URI listeners=PLAINTEXT://localhost:9093 # Kafka サーバー(Broker)の Zookeeper のホスト/ポート zookeeper.connect=localhost:2181 # Kafka サーバー(Broker)のデータ保存ディレクトリ log.dirs=/tmp/kafka-logs-1
# server-2.properties # Kafka サーバー(Broker)の ID 値 broker.id=2 # Kafka サーバー(Broker)への接続 URI listeners=PLAINTEXT://localhost:9094 # Kafka サーバー(Broker)の Zookeeper のホスト/ポート zookeeper.connect=localhost:2181 # Kafka サーバー(Broker)のデータ保存ディレクトリ log.dirs=/tmp/kafka-logs-2broker.id 値は Broker 毎にユニークである必要があるため、別々の値を設定しています。1 と 2 です。
listeners 値については、今回は同じマシン内の別プロセスとして動かすので、異なるポート番号を指定しました。複数サーバーを用意して 1 サーバー 1 プロセスで動かす構成なら、ポートは同じでもいいと思います。また、ホスト名は localhost からの接続しか試さないので localhost にしていますが、複数サーバーでクラスターを組む場合は localhost 以外から触ることになるでしょうから、各サーバーのホスト名を書くことになると思います。
zookeeper.connect 値も、今回は同じマシン内で動いている Zookeeper を使うので localhost:2181 としていますが、複数サーバー構成でクラスターを組む場合には、Zookeeper の動いているサーバーのホスト名/ポート番号を設定することになるでしょう。
log.dirs 値は、Broker のデータの保存先ディレクトリです。今回は同じマシンで複数プロセスの Broker が動くので、異なるディレクトリを設定しました。複数サーバーを用意して 1 サーバー 1 プロセスで動かす構成なら、同じパスでもいいと思います。
設定ファイルが用意できたところで、各 Broker を起動していきます。
$ bin/kafka-server-start.sh config/server-1.properties
$ bin/kafka-server-start.sh config/server-2.properties& をつけてバックグラウンドで実行してもいいですが、のちのちフェールオーバーを試してみる時に止めやすいように、別ターミナルで動かしておきます。
ひとまずこれで Kafka クラスターの構築は完了です。
Kafka クラスター上に Topic を作成する
構築した Kafka クラスターを実際に使ってみるために、まずは Kafka クラスター上に Topic を作ってみます。Kafka クラスターを使う場合、Topic 作成時にはパーティション数やパーティションの複製となるレプリカの数を指定する必要があります。ここではパーティション数 3、レプリカ数 2 とします。各 Broker は Topic 毎に 1 つのレプリカしか置けないので、パーティション数は Broker の数以下でなければなりません。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-clustered-topic--partitions オプションの値が分割するパーティションの数、--replication-factor オプションの値が各パーティションのレプリカの数です。
それ以外の bin/kafka-topics.sh の使い方は、前回 Topic を作った時と同じです。
Topic 作成後には、その Topic のパーティションがどのように各 Broker に割り当てられているかを bin/kafka-topics.sh で確認することができます。
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-clustered-topic Topic:my-clustered-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-clustered-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: my-clustered-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: my-clustered-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1実行結果の各行が、Topic の分割された各パーティションに該当します。Partition 欄の数値がパーティション番号です。今回は 3 分割なので 0 〜 2 番になっています。
Leader 欄には、そのパーティションのリーダーとなる Broker の id が表示されています。パーティションへのデータの参照・更新は、Leader となっている Broker に対して行われます。実行結果を見てみると 2 つの Broker にだいたい均等に Leader が割り当てられているので、負荷分散されそうな雰囲気があります。
Replicas 欄は、そのパーティションのレプリカが配置されている Broker の id が表示されます。今回はレプリカ数を 2 としているので、各 Broker がすべてのパーティションのレプリカを持っている形になります。
Isr 欄は、そのパーティションのレプリカのうち、データの同期が完了しているレプリカをもっている Broker の id が表示されます。パーティションのデータ更新は、まず Leader となる Broker に対して行われ、それが各レプリカに反映されていきます。このレプリケーション処理が完了している Broker を、Isr 欄では確認できることになります。今回はレプリカ数が 2 なので、Broker id も 2 つ表示されていればすべての Broker でパーティションが同期されていることになります。
Kafka クラスター上の Topic で動作確認をする
Kafka クラスター上に作成した Topic で読み書きをして、動作確認をしてみます。まずは Producer を起動し、10 個のメッセージを送ってみます。$ bin/kafka-console-producer.sh --broker-list localhost:9093 --topic my-clustered-topic >message01 >message02 >message03 >message04 >message05 >message06 >message07 >message08 >message09 >message10
続いて送ったメッセージを読むための Consumer を、分割したパーティション 1 つにつき 1 つ起動します。Consumer を 1 つ起動しておけばすべてのパーティションを読めるわけではない、という点に注意してください。Java 等で Consumer を実装する場合には、並列で読めるようなコードを書く必要がある、ということになります。
kafka-console-consumer.sh の --partition オプションには、読みに行くパーティション番号を指定します。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 0 message01 message04 message07 message10
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 1 message03 message06 message09
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 2 message02 message05 message0810 個のメッセージがそれぞれのパーティションに順番に格納されているのがわかります。少なくとも各パーティション内においては、格納されるメッセージの順序もきちんと保証されています。
Kafka クラスターのフェールオーバーを試してみる
続いて Kafka クラスターを構成する Broker を 1 台止めてしまい、フェールオーバー時の動作を見てみます。id: 2 の Broker のプロセスを止め、直後にさきほどの kafka-topics.sh で Topic の状況を確認してみます。$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-clustered-topic Topic:my-clustered-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-clustered-topic Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1 Topic: my-clustered-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1 Topic: my-clustered-topic Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1生きている Broker が id: 1 のもののみになったことで、Leader 欄はすべて 1 になっています。さきほどはパーティション番号 0、2 のパーティションでは Leader が落とした Broker になっていましたが、それらを引き継いだ形です。
また Isr 欄が 1 のみになっているのは、生きている Broker がこれのみになったためです。
この状態でまた Producer からメッセージを 6 つ送ってみます。
$ bin/kafka-console-producer.sh --broker-list localhost:9093 --topic my-clustered-topic >message11 >message12 >message13 >message14 >message15 >message16
# パーティション番号: 0 用の Consumer message11 message14
# パーティション番号: 1 用の Consumer message13 message16
# パーティション番号: 2 用の Consumer message12 message15Kafka クラスター上で動いている Broker は 1 つだけになっていますが、すべてのパーティションについて問題なくメッセージを読み書きできていますね。
Kafka クラスターに落とした Broker を戻してみる
さきほど落とした Broker を復旧させ、再び Kafka クラスターに戻してみます。復旧方法には特別な手順はなく、いつもどおり起動するだけです。$ bin/kafka-server-start.sh config/server-2.propertieskafka-topics.sh で Topic の状況を確認してみると、Isr 欄に id: 2 の Broker が戻っていることがわかります。落ちている間に更新されたメッセージが少なかったので、レプリケーションが追いつくまでにそれほど時間はかかりませんでした。
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-clustered-topic Topic:my-clustered-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-clustered-topic Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: my-clustered-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: my-clustered-topic Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2Leader 欄がすべて 1 になったままですが、これもしばらく待ってから再び確認すると、Broker を落とす前のように均等になるよう調整されていました。
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-clustered-topic Topic:my-clustered-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-clustered-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 1,2 Topic: my-clustered-topic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: my-clustered-topic Partition: 2 Leader: 2 Replicas: 2,1 Isr: 1,2これで Kafka クラスターもすっかり元通りになりました。特に面白みもないので具体例は書きませんが、Topic に対するメッセージの読み書きも問題なくできています。
あとがき
以上、複数の Broker プロセスを動かして Kafka クラスターを組んでみたというお話でした。設定ファイルの修正量などを考えれば、これだけの作業量でクラスターが組めるというのはかなり手軽だと思います。Broker 追加時の Leader 割当やレプリカの扱い、落ちた Broker が復旧した時の再レプリケーションなど、面倒なことまでしっかり自動化されているあたりがさすがです。Kafka では Zookeeper がクラスターを構成する Broker の情報を管理しているそうなのですが、Zookeeper 自体も冗長化する方法があるそうなので、その辺りもそのうちに試してみたいなと思います。Kafka コメント (0) 2018/08/27 15:12:18