Kafka を試してみる
Kafka は分散メッセージングシステムです。メッセージを送る Producer、メッセージをためておく Broker、メッセージを受け取る Consumer という構成をとる RabbitMQ や ActiveMQ のようなメッセージングシステムですが、複数のサーバーインスタンスによる分散構成をとれる点も特徴で、特に大量のメッセージを扱うための速度性能、スケーラビリティ、耐障害性などに特化していると言われています。
今回は公式ページのクイックスタートを参考に、Kafka サーバーの構築とメッセージングについて試してみました。
Kafka をインストールする
まず前提として、Java が使えるようインストールしておきます。java コマンドを叩いて動いていれば大丈夫です。
$ java -version java version "1.8.0_181" Java(TM) SE Runtime Environment (build 1.8.0_181-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
Kafka 公式ページからリンクされているダウンロードページから、Kafka の最新版をダウンロードします。
$ wget http://ftp.riken.jp/net/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
ダウンロードしたら、アーカイブを展開して適当なディレクトリに置いておきます。
$ tar xvfz kafka_2.11-2.0.0.tgz # mv kafka_2.11-2.0.0 /usr/local/ # ln -s /usr/local/kafka_2.11-2.0.0 /usr/local/kafka
次に Zookeeper を起動します。
Kafka では複数台の Kafka サーバー(Broker)でクラスターを組むことができますが、クラスターに属する各 Kafka サーバー(Broker)が連携するための機能を提供しているのが Zookeepr です。
Zookeper は他のサーバーで稼働させているものをつなげることもできますが、ここでは Kafka にバンドルされている Zookerper を同じサーバー内で動かして使うことにします。
インストールした Kafka ディレクトリ内にて、以下のコマンドから Zookeeper を起動できます。
$ bin/zookeeper-server-start.sh config/zookeeper.properties
上記コマンドで指定している config/zookeeper.properties が Zookeper の設定ファイルです。
内容をみてみると、デフォルトで clientPort 値に指定されている 2181 ポートで Zookeeper が起動することがわかります。
... clientPort=2181 ...
続いて Kafak サーバーを起動する前に、Kafka サーバーの設定ファイル config/server.properties を編集しておきます。
... # Kafka サーバー(Broker)の ID 値 broker.id=0 ... # Kafka サーバー(Broker)への接続 URI listeners=PLAINTEXT://localhost:9092 ... # Kafka サーバー(Broker)の Zookeeper のホスト/ポート zookeeper.connect=localhost:2181 ...broker.id は、この Kafka サーバー(Broker)の ID 値です。クラスターに複数の Borker を所属させる場合、それぞれの Broker には別々の ID 値を設定する必要があります。今回は Broker は 1 台だけなので、デフォルト値の 0 をそのままこの Broker の ID 値にします。
listeners は、この Kafka サーバー(Broker)への接続を待ち受ける URI です。プロトコル、ホスト名また IP、ポート番号を , 区切りで任意の数記載します。デフォルトではこの設定項目自体がコメントアウトされているので、コメントアウトを解除します。設定値としては、例えば localhost:9092 としてこの Kafka サーバー(Broker)への接続したい場合、PLAINTEXT://localhost:9092 を指定します。プロトコル部分は Kafka で独自に定義されているようで、PLAINTEXT は認証なし暗号化なしで接続することを表しています。
zookeeper.connect は、この Kafka サーバー(Broker)の面倒をみる Zookeeper のホスト/ポートを指定します。デフォルトでは localhost にむいているため、外部の Zookeeper インスタンスを利用する場合はそれを指定します。
今回は Kafka のクラスターを 1 台構成で動かしたいので、いったんこれらだけ注意しておきます。
設定ファイルが準備できたら、以下のコマンドで Kafka を起動します。
$ bin/kafka-server-start.sh config/server.properties
コマンドラインから Kafka を利用する
Kafka には利用するためのコマンドラインツールもバンドルされているので、それらからメッセージングを試してみます。Topic を作る
Topic は Kafka でやりとりされるメッセージが紐付けられるデータの単位です。やりとりするメッセージの種類毎に Topic を作るという用途が想定されているようです。なのでまずはテスト用の Topic mytopic を作り、この Topic に対してメッセージをやりとりしてみることにします。
Topic の作成には以下のコマンドを利用します。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic--topic オプションの値に指定されているのが作成する Topic の名前です。
接続先には --zookeeper オプションで Kafka クラスターの Zookeeper を指定しています。実際に接続する Kafka サーバー(Broker)はこの Zookeeper から取得する、という動作のようです。
Topic 作成後には、以下のコマンドで作成済み Topic の一覧を確認することができます。
$ bin/kafka-topics.sh --list --zookeeper localhost:2181 mytopic
Producer からメッセージを送る
Producer は Kafka サーバー(Broker)に対してメッセージを送るクライアントのことです。コマンドラインから文字列としてメッセージを入力し、Kafka クラスターに送信する Producer ツールが用意されているので、これを利用してみます。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic >ちはやぶる 神代も聞かず 竜田川 からくれなゐに 水くくるとは >ひさかたの 光のどけき 春の日に 静心なく 花の散るらむ >秋風に たなびく雲の 絶え間より もれ出づる月の 影のさやけさ--topic オプションではメッセージを送る Topic を指定します。
実行すると入力を待ち受けるプロンプトが返されるので、任意の文字列を入力してみます。
コマンドラインツールは Ctrl + C で終了できます。
Consumer からメッセージを受け取る
Consumer は Kafka サーバー(Broker)にからメッセージを受け取るクライアントのことです。これもコマンドラインツールが用意されているので、試してみます。
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopic --from-beginning ちはやぶる 神代も聞かず 竜田川 からくれなゐに 水くくるとは ひさかたの 光のどけき 春の日に 静心なく 花の散るらむ 秋風に たなびく雲の 絶え間より もれ出づる月の 影のさやけさ実行すると、Producer に渡したメッセージが表示されます。
--from-beginning オプションは過去のものも含めてすべてのメッセージを受け取るオプションです。指定しなかった場合、Consumer 実行後に送信されたメッセージのみを受け取ります。
Kafka サーバーの設定をもう少し
ひととおりメッセージングの確認ができて、Kafka の動きが見えてきたところで、もう少しだけサーバーの設定をいじってみます。メッセージの保存期間を設定する
bin/kafka-console-consumer.sh に --from-beginning オプションを指定すると Kafka の Broker 上にあるすべてのメッセージを受け取ることができますが、となると Broker は過去のすべてのメッセージを保持しているのか、という疑問が浮かぶと思います。Kafka は定期的にメッセージをログとしてディスクに書き出しているようで、このログに残っているメッセージは既に過去に送信されているものでも受け取れることになります。メッセージをログとして残しておく期間も、設定ファイル server.properties で設定することができます。
# メッセージをログとして 168 時間残す log.retention.hours=168 # メッセージをログとして 10 分間残す #log.retention.minutes=10 # メッセージをログとして 10000 ミリ秒間残す #log.retention.ms=10000時間単位で指定するなら log.retention.hours 値を、分単位で指定するなら log.retention.minutes 値を、ミリ秒単位で指定するなら log.retention.ms 値を利用します。各設定項目にはそれぞれ優先度があるようですが、どれかひとつだけを設定しておくのが可読性を考えると良さそうです。
また、メッセージをログとして保存しておくディレクトリも、設定ファイルの log.dirs 値で設定することができます。
log.dirs=/tmp/kafka-logs指定されているディレクトリをみてみると、作成したトピックの名前 mytopic に対応した mytopic-0 というディレクトリがありました。この中にインデックス含め、バイナリデータとしてメッセージが保存されています。
あとがき
ひとまず以上が Kafka 環境の構築と動作確認になります。Kafka サーバーの設定ファイルの準備については公式ドキュメントにも書かれていなくて、最初起動させることができませんでした。Kafka サーバーを起動しようとすると、ずっと Broker が見つからないというような WARN がで続けていました。私の環境依存なのかはよくわかりませんが、公式ドキュメントそのままで試してみてうまく起動できない、という方がいれば参考にしていただければと思います。
Kafka コメント (0) 2018/08/16 20:03:53