RSS2.0

Kafka 複数台でクラスターを組んでみる

kafka_quickstart.png
前回シングルノードでの Kafka 構築を試してみたわけですが、今回は引き続き Kafka 公式ページの Quick Start に従って、Kafka(Broker)複数台でのクラスター構築を試してみたいと思います。

Kafka の Broker が持つデータは Topic 毎にパーティションという単位で分割することができ、そのパーティションを別々の Broker (のプロセスやサーバー)に割り当てたり、コピーを置いておいたりすることで、データ分散による性能改善や冗長化を実現することができます。パーティション内のデータを参照するためのインデックスもパーティション単位で作られるようなので、複数のパーティションにわけておくことで、参照性能をあげられる仕組みになっているようです。分割や複製されたパーティションの管理は Kafka が自動で行ってくれるとのことなので、今回は障害時のフェールオーバーまで試してみたいと思います。

Kafka クラスターを構築する

Broker 2 つで Kafka クラスターを構築してみます。今回は 1 つのマシン内に各 Broker を別プロセスとして用意しようと思います。実際の運用では、サーバーを複数台用意して各マシンで 1 プロセスずつ動かすことになると思いますが、どのマシンで作業するかが変わるだけでやることは同じです。

前提として、Kafka のインストールまでは終わっているものとします。
Broker 毎の設定ファイルとして、server.properties をコピーして server-1.properties と server-2.properties を用意します。
# server-1.properties

# Kafka サーバー(Broker)の ID 値
broker.id=1
# Kafka サーバー(Broker)への接続 URI
listeners=PLAINTEXT://localhost:9093
# Kafka サーバー(Broker)の Zookeeper のホスト/ポート
zookeeper.connect=localhost:2181
# Kafka サーバー(Broker)のデータ保存ディレクトリ
log.dirs=/tmp/kafka-logs-1
# server-2.properties

# Kafka サーバー(Broker)の ID 値
broker.id=2
# Kafka サーバー(Broker)への接続 URI
listeners=PLAINTEXT://localhost:9094
# Kafka サーバー(Broker)の Zookeeper のホスト/ポート
zookeeper.connect=localhost:2181
# Kafka サーバー(Broker)のデータ保存ディレクトリ
log.dirs=/tmp/kafka-logs-2
broker.id 値は Broker 毎にユニークである必要があるため、別々の値を設定しています。1 と 2 です。
listeners 値については、今回は同じマシン内の別プロセスとして動かすので、異なるポート番号を指定しました。複数サーバーを用意して 1 サーバー 1 プロセスで動かす構成なら、ポートは同じでもいいと思います。また、ホスト名は localhost からの接続しか試さないので localhost にしていますが、複数サーバーでクラスターを組む場合は localhost 以外から触ることになるでしょうから、各サーバーのホスト名を書くことになると思います。
zookeeper.connect 値も、今回は同じマシン内で動いている Zookeeper を使うので localhost:2181 としていますが、複数サーバー構成でクラスターを組む場合には、Zookeeper の動いているサーバーのホスト名/ポート番号を設定することになるでしょう。
log.dirs 値は、Broker のデータの保存先ディレクトリです。今回は同じマシンで複数プロセスの Broker が動くので、異なるディレクトリを設定しました。複数サーバーを用意して 1 サーバー 1 プロセスで動かす構成なら、同じパスでもいいと思います。

設定ファイルが用意できたところで、各 Broker を起動していきます。
$ bin/kafka-server-start.sh config/server-1.properties
$ bin/kafka-server-start.sh config/server-2.properties
& をつけてバックグラウンドで実行してもいいですが、のちのちフェールオーバーを試してみる時に止めやすいように、別ターミナルで動かしておきます。
ひとまずこれで Kafka クラスターの構築は完了です。

Kafka クラスター上に Topic を作成する

構築した Kafka クラスターを実際に使ってみるために、まずは Kafka クラスター上に Topic を作ってみます。
Kafka クラスターを使う場合、Topic 作成時にはパーティション数やパーティションの複製となるレプリカの数を指定する必要があります。ここではパーティション数 3、レプリカ数 2 とします。各 Broker は Topic 毎に 1 つのレプリカしか置けないので、パーティション数は Broker の数以下でなければなりません。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic my-clustered-topic
--partitions オプションの値が分割するパーティションの数、--replication-factor オプションの値が各パーティションのレプリカの数です。
それ以外の bin/kafka-topics.sh の使い方は、前回 Topic を作った時と同じです。

Topic 作成後には、その Topic のパーティションがどのように各 Broker に割り当てられているかを bin/kafka-topics.sh で確認することができます。
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-clustered-topic
Topic:my-clustered-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-clustered-topic	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 2,1
	Topic: my-clustered-topic	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: my-clustered-topic	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 2,1
実行結果の各行が、Topic の分割された各パーティションに該当します。Partition 欄の数値がパーティション番号です。今回は 3 分割なので 0 〜 2 番になっています。
Leader 欄には、そのパーティションのリーダーとなる Broker の id が表示されています。パーティションへのデータの参照・更新は、Leader となっている Broker に対して行われます。実行結果を見てみると 2 つの Broker にだいたい均等に Leader が割り当てられているので、負荷分散されそうな雰囲気があります。
Replicas 欄は、そのパーティションのレプリカが配置されている Broker の id が表示されます。今回はレプリカ数を 2 としているので、各 Broker がすべてのパーティションのレプリカを持っている形になります。
Isr 欄は、そのパーティションのレプリカのうち、データの同期が完了しているレプリカをもっている Broker の id が表示されます。パーティションのデータ更新は、まず Leader となる Broker に対して行われ、それが各レプリカに反映されていきます。このレプリケーション処理が完了している Broker を、Isr 欄では確認できることになります。今回はレプリカ数が 2 なので、Broker id も 2 つ表示されていればすべての Broker でパーティションが同期されていることになります。

Kafka クラスター上の Topic で動作確認をする

Kafka クラスター上に作成した Topic で読み書きをして、動作確認をしてみます。まずは Producer を起動し、10 個のメッセージを送ってみます。
$ bin/kafka-console-producer.sh --broker-list localhost:9093 --topic my-clustered-topic
>message01
>message02
>message03
>message04
>message05
>message06
>message07
>message08
>message09
>message10

続いて送ったメッセージを読むための Consumer を、分割したパーティション 1 つにつき 1 つ起動します。Consumer を 1 つ起動しておけばすべてのパーティションを読めるわけではない、という点に注意してください。Java 等で Consumer を実装する場合には、並列で読めるようなコードを書く必要がある、ということになります。
kafka-console-consumer.sh の --partition オプションには、読みに行くパーティション番号を指定します。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 0
message01
message04
message07
message10
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 1
message03
message06
message09
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-clustered-topic --from-beginning --partition 2
message02
message05
message08
10 個のメッセージがそれぞれのパーティションに順番に格納されているのがわかります。少なくとも各パーティション内においては、格納されるメッセージの順序もきちんと保証されています。

Kafka クラスターのフェールオーバーを試してみる

続いて Kafka クラスターを構成する Broker を 1 台止めてしまい、フェールオーバー時の動作を見てみます。id: 2 の Broker のプロセスを止め、直後にさきほどの kafka-topics.sh で Topic の状況を確認してみます。
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-clustered-topic
Topic:my-clustered-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-clustered-topic	Partition: 0	Leader: 1	Replicas: 2,1	Isr: 1
	Topic: my-clustered-topic	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1
	Topic: my-clustered-topic	Partition: 2	Leader: 1	Replicas: 2,1	Isr: 1
生きている Broker が id: 1 のもののみになったことで、Leader 欄はすべて 1 になっています。さきほどはパーティション番号 0、2 のパーティションでは Leader が落とした Broker になっていましたが、それらを引き継いだ形です。
また Isr 欄が 1 のみになっているのは、生きている Broker がこれのみになったためです。

この状態でまた Producer からメッセージを 6 つ送ってみます。
$ bin/kafka-console-producer.sh --broker-list localhost:9093 --topic my-clustered-topic
>message11
>message12
>message13
>message14
>message15
>message16
# パーティション番号: 0 用の Consumer
message11
message14
# パーティション番号: 1 用の Consumer
message13
message16
# パーティション番号: 2 用の Consumer
message12
message15
Kafka クラスター上で動いている Broker は 1 つだけになっていますが、すべてのパーティションについて問題なくメッセージを読み書きできていますね。

Kafka クラスターに落とした Broker を戻してみる

さきほど落とした Broker を復旧させ、再び Kafka クラスターに戻してみます。復旧方法には特別な手順はなく、いつもどおり起動するだけです。
$ bin/kafka-server-start.sh config/server-2.properties
kafka-topics.sh で Topic の状況を確認してみると、Isr 欄に id: 2 の Broker が戻っていることがわかります。落ちている間に更新されたメッセージが少なかったので、レプリケーションが追いつくまでにそれほど時間はかかりませんでした。
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-clustered-topic
Topic:my-clustered-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-clustered-topic	Partition: 0	Leader: 1	Replicas: 2,1	Isr: 1,2
	Topic: my-clustered-topic	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: my-clustered-topic	Partition: 2	Leader: 1	Replicas: 2,1	Isr: 1,2
Leader 欄がすべて 1 になったままですが、これもしばらく待ってから再び確認すると、Broker を落とす前のように均等になるよう調整されていました。
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-clustered-topic
Topic:my-clustered-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-clustered-topic	Partition: 0	Leader: 2	Replicas: 2,1	Isr: 1,2
	Topic: my-clustered-topic	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: my-clustered-topic	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 1,2
これで Kafka クラスターもすっかり元通りになりました。特に面白みもないので具体例は書きませんが、Topic に対するメッセージの読み書きも問題なくできています。

あとがき

以上、複数の Broker プロセスを動かして Kafka クラスターを組んでみたというお話でした。設定ファイルの修正量などを考えれば、これだけの作業量でクラスターが組めるというのはかなり手軽だと思います。Broker 追加時の Leader 割当やレプリカの扱い、落ちた Broker が復旧した時の再レプリケーションなど、面倒なことまでしっかり自動化されているあたりがさすがです。Kafka では Zookeeper がクラスターを構成する Broker の情報を管理しているそうなのですが、Zookeeper 自体も冗長化する方法があるそうなので、その辺りもそのうちに試してみたいなと思います。

  Kafka  コメント (0)  2018/08/27 15:12:18


公開範囲:
プロフィール HN: ももかん
ゲーム作ったり雑談書いたり・・・していた時期が私にもありました。
カレンダー
<<2018, 12>>
2526272829301
2345678
9101112131415
16171819202122
23242526272829
303112345