Kafka Manager を構築してみたら Kafka 2.0.0 は対応していなかったお話
Kafka
でクラスター中のノード(Broker
)やパーティション、トピックを WEB ベースで管理するためのツールとして、Kafka Manager
というものがあるそうです。
公式ページである Github リポジトリには、README.md 中に実際の WEB の UI が載っていました。表示されている情報としては、Kafka
にバンドルされている kafka-topics.sh
や kafka-consumer-groups.sh
等から取れるもののようですが、統計的に可視化されていたり、更新処理までできるようです。
Kafka Manager を構築する
公式ページからはバイナリや rpm
パッケージ等は配布されていないようなので、自分でビルドしていきます。
まずは Github
からソースコードを clone
しておきましょう。$ git clone git@github.com:yahoo/kafka-manager.git
ブランチは特に何も考えずに master
を使うことにします。
ソースコードを取得したら、ビルドツールの sbt
からビルドすることができます。$ ./sbt clean dist
ビルドすると target/universal/
ディレクトリ配下に kafka-manager-1.3.3.21.zip
ができあがります。これが Kafka Manager
の実行バイナリを固めた zip
ファイルです。
これを展開すると、bin
ディレクトリ下に kafka-manager
という実行バイナリがあります。これを使うと Kafka Manager
が起動します。$ bin/kafka-manager -Dkafka-manager.zkhosts=localhost:2181 -Dhttp.port=8080
システムプロパティ kafka-manager.zkhosts
には Kafka
の Zookeeper
のホストを指定します。今回は localhsot
で Kafka
クラスターが動いているという前提で、同じマシンの Zookeeper
を指定しました。 http.port
値は Kafka Manager
自体の待受ポートです。8080 ポートを指定しているので、http://localhost:8080 にアクセスすれば Kafka Manager
を使うことができます。
続きを読む
Kafka
コメント (0)
2018/10/05 19:54:32
コマンドラインから Sudachi で形態素解析してみる
日本語を品詞に分割する形態素解析器については、以前試してみた Mecab の他に、最近では Sudachi の評判がよいようです。それならばひとまずは触ってみようということで、今日は Sudachi
をビルドして日本語の文章を形態素解析してみました。
Sudachi をビルドする
Github にてソースコードが公開されているので、 これを clone してきてビルドしてみます。$ git clone git@github.com:WorksApplications/Sudachi.git
$ cd Sudachi
リポジトリには辞書をビルドするためのファイルもコミットされているので、サイズはそれなりに大きいです。lex なので字句解析器ですね。辞書自体は UniDic を使っているようで、ビルド時にダウンロードしている様子がビルドログから伺えます。
Sudachi
は Maven
プロジェクトなので、package
ゴールまで実行すればビルドできます。$ mvn clean package
実際に Sudachi
を動かすにはビルドした jar
ファイルを叩くのですが、そのままビルドしただけではもろもろの設定が足りていません。必要な設定ファイルのサンプルは src/main/resources/sudachi.json、src/main/resources/sudachi_fulldict.json として置かれているので、これをベースに設定をしていきます。
systemDict
値には Sudachi
が利用する辞書のパスを指定します。リポジトリ上では辞書ファイルの名前だけが書かれているので、このパスの辞書にアクセスできるディレクトリで jar
ファイルを実行するか、systemDict
値自体を正しく指定し直す必要があります。Sudachi
の辞書自体は、ビルドすると target
ディレクトリに system_core.dic
、system_full.dic
として生成されます。また同時に、圧縮された zip
ファイルとしても、sudachi-0.1.1-SNAPSHOT-dictionary-full.zip
、sudachi-0.1.1-SNAPSHOT-dictionary-core.zip
等としてビルドされていました。
ここでは以下の内容の sudachi_fulldict.json
を my_sudachi_config.json
という名前でカレントティディレクトリにコピーし、以下の内容に修正しました。
続きを読む
Sudachi
コメント (0)
2018/10/04 20:41:21
Java から Kafka の Producer を利用してみる
前回 Java で Kafka の Consumer を試してみましたが、今回は Producer を実装してみました。
Consumer 側はパーティション分割された Topic からのメッセージの受け取りでだいぶ詰まっていたのですが、Producer 側はクラスター構成に対してもすんなりと動かせました。実装コードは接続先の Kafka ブローカーの設定値を書き換えるくらいで、同じコードが動いてくれました。
利用したのは前回と同じく Kafka の Java クライアント、kafka-clients
です。dependencies {
...
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0'
...
}
Gradle を使う場合、build.gradle
に追加する依存性は前回と同じです。Maven の場合はこの辺りを参考にしてください。
単一ノード構成で Producer を利用する
Producer のサンプルコードは以下の通りです。接続設定をして、Producer のインスタンスを作って、メッセージを送るだけというとてもシンプルな内容になりました。
続きを読む
Kafka
コメント (0)
2018/10/03 19:20:04
Java から Kafka の Consumer を利用してみる
分散メッセージングシステムである Kafka を Java から利用してみました。手始めに今回は Consumer を使い、メッセージの受け取りを試してみることにします。
公式ドキュメントに記載せれている通り、Kafka には Java から接続するためのクライアントライブラリがあります。Maven Central Repository にも公開されているので、Maven や Gradle からすぐに利用することができます。
今回は実際にこの kafka-clients
ライブラリを使って接続してみたのですが、単一ノード構成では問題ないものの、複数ノードでのクラスター構成への接続が一部意図通りに動作してくれませんでした。記事の内容としてはすっきり終わらないものの、現時点でのノウハウとして書いておこうと思います。
kafka-clients をプロジェクトの依存性に追加する
プロジェクト管理には Gradle 4.8.1 を使ってみます。プロジェクトを作ったら、build.gradle
に以下の依存性を追加します。dependencies {
...
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.0.0'
...
}
kafka-clients
の現在の最新バージョンは 2.0.0 です。
続きを読む
Kafka
コメント (0)
2018/10/02 19:47:35
chocolablog を ver 0.10 にアップデートしました
ひさしぶりのアップデートです。ver 0.9 のリリースが去年の 11 月なので、もうすぐ 1 年ぶりです。
自分で記事を書いていく中で欲しいなと思った機能を拡張していくことが多くなっていますが、今回も記事を書くための拡張機能をちょこちょこと追加した形になっています。
記事文中のキーワード表記がまさにそれです。こういうやつ
です。マークダウンでインラインコードと呼ばれてるものになります。このブログを始めた頃はただの日記サイトでしたが、最近は技術的な記事の比重が増えているので、記事を読みやすくするための改良が増えていっている気がします。
マークダウンといえば、chocolablog では見出しやスタイルを加えるために独自の chocola 記法というものを実装しているのですが、これも時代の移り変わりに伴って、一般的なマークダウンなんかに変えていったほうがいいのかなと感じています。chocolablog を作り始めた頃は、ドキュメントを記述するための一般的な記述方法みたいなものはありませんでしたが、Github や Qiita がメジャーになっていくにつれて、マークダウン記法がだいぶ浸透してきました。とはいえマークダウンにはマークダウンで方言が乱立しまくっているという現状もあるので、単純にマークダウンに寄せていけばいいのかと言われると微妙なところでもあります。私の中ではっきりとデファクトスタンダードにあわせていこうと決めているわけではないので、ちょっと時間をかけて考えていこうかなといった状況です。
ver 0.10 の追加機能
フロント側
・Scala ソースコード用シンタクスハイライトを追加した
・記事文中のキーワード用の chocola 記法を追加した
システム管理画面側
・記事の公開日時として秒も指定できるようにした
続きを読む
chocolablog 開発
コメント (0)
2018/10/01 19:32:05
Kafka 複数台でクラスターを組んでみる

前回シングルノードでの Kafka 構築を試してみたわけですが、今回は引き続き Kafka 公式ページの Quick Start に従って、Kafka(Broker)複数台でのクラスター構築を試してみたいと思います。
Kafka の Broker が持つデータは Topic 毎にパーティションという単位で分割することができ、そのパーティションを別々の Broker (のプロセスやサーバー)に割り当てたり、コピーを置いておいたりすることで、データ分散による性能改善や冗長化を実現することができます。パーティション内のデータを参照するためのインデックスもパーティション単位で作られるようなので、複数のパーティションにわけておくことで、参照性能をあげられる仕組みになっているようです。分割や複製されたパーティションの管理は Kafka が自動で行ってくれるとのことなので、今回は障害時のフェールオーバーまで試してみたいと思います。
Kafka クラスターを構築する
Broker 2 つで Kafka クラスターを構築してみます。今回は 1 つのマシン内に各 Broker を別プロセスとして用意しようと思います。実際の運用では、サーバーを複数台用意して各マシンで 1 プロセスずつ動かすことになると思いますが、どのマシンで作業するかが変わるだけでやることは同じです。
前提として、Kafka のインストールまでは終わっているものとします。
Broker 毎の設定ファイルとして、server.properties をコピーして server-1.properties と server-2.properties を用意します。# server-1.properties
# Kafka サーバー(Broker)の ID 値
broker.id=1
# Kafka サーバー(Broker)への接続 URI
listeners=PLAINTEXT://localhost:9093
# Kafka サーバー(Broker)の Zookeeper のホスト/ポート
zookeeper.connect=localhost:2181
# Kafka サーバー(Broker)のデータ保存ディレクトリ
log.dirs=/tmp/kafka-logs-1
# server-2.properties
# Kafka サーバー(Broker)の ID 値
broker.id=2
# Kafka サーバー(Broker)への接続 URI
listeners=PLAINTEXT://localhost:9094
# Kafka サーバー(Broker)の Zookeeper のホスト/ポート
zookeeper.connect=localhost:2181
# Kafka サーバー(Broker)のデータ保存ディレクトリ
log.dirs=/tmp/kafka-logs-2
broker.id 値は Broker 毎にユニークである必要があるため、別々の値を設定しています。1 と 2 です。
listeners 値については、今回は同じマシン内の別プロセスとして動かすので、異なるポート番号を指定しました。複数サーバーを用意して 1 サーバー 1 プロセスで動かす構成なら、ポートは同じでもいいと思います。また、ホスト名は localhost からの接続しか試さないので localhost にしていますが、複数サーバーでクラスターを組む場合は localhost 以外から触ることになるでしょうから、各サーバーのホスト名を書くことになると思います。
zookeeper.connect 値も、今回は同じマシン内で動いている Zookeeper を使うので localhost:2181 としていますが、複数サーバー構成でクラスターを組む場合には、Zookeeper の動いているサーバーのホスト名/ポート番号を設定することになるでしょう。
log.dirs 値は、Broker のデータの保存先ディレクトリです。今回は同じマシンで複数プロセスの Broker が動くので、異なるディレクトリを設定しました。複数サーバーを用意して 1 サーバー 1 プロセスで動かす構成なら、同じパスでもいいと思います。
続きを読む
Kafka
コメント (0)
2018/08/27 15:12:18
Kafka を試してみる

Kafka は分散メッセージングシステムです。メッセージを送る Producer、メッセージをためておく Broker、メッセージを受け取る Consumer という構成をとる RabbitMQ や ActiveMQ のようなメッセージングシステムですが、複数のサーバーインスタンスによる分散構成をとれる点も特徴で、特に大量のメッセージを扱うための速度性能、スケーラビリティ、耐障害性などに特化していると言われています。
今回は公式ページのクイックスタートを参考に、Kafka サーバーの構築とメッセージングについて試してみました。
Kafka をインストールする
まず前提として、Java が使えるようインストールしておきます。
java コマンドを叩いて動いていれば大丈夫です。$ java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
Kafka 公式ページからリンクされているダウンロードページから、Kafka の最新版をダウンロードします。$ wget http://ftp.riken.jp/net/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
ダウンロードしたら、アーカイブを展開して適当なディレクトリに置いておきます。$ tar xvfz kafka_2.11-2.0.0.tgz
# mv kafka_2.11-2.0.0 /usr/local/
# ln -s /usr/local/kafka_2.11-2.0.0 /usr/local/kafka
続きを読む
Kafka
コメント (0)
2018/08/16 20:03:53
Flutter ver 0.3.2 の、無限スクロールする GridView のサンプル
『The Infinite ListView』等で Google 検索すると無限スクロールする ListView については情報がえられるのですが、同じようなことを GridView でやっているサンプルは見つからなかったので書いておきます。言うならば、The Infinite GridView のサンプルです。Flutter は ver 0.3.2 を利用しています。
無限スクロールに関しては、やっていることは無限スクロール ListView と同じです。なので GridView のサンプルではありますが、同じ方法論で無限スクロール ListView も作ることができます。
機能としては、以下を実装しています。
・GridView を末尾までスクロールすると、次のデータを新たに作成して追加表示する
・GridView の要素は、表示されるタイミングで構築(遅延構築)される
・GridView の要素は 1 列で 3 個ずつ並べられる
以下がサンプルコードになります。import 'package:flutter/material.dart';
void main() => runApp(new InfiniteGridViewSampleWidget());
/// 無限にスクロールする GridView サンプルの StatefulWidget
class InfiniteGridViewSampleWidget extends StatefulWidget {
@override
InfiniteGridViewSampleState createState() =>
new InfiniteGridViewSampleState();
}
/// 無限にスクロールする GridView サンプルの State
class InfiniteGridViewSampleState extends State<InfiniteGridViewSampleWidget> {
// 表示するデータの List。
// 初期値としてデータを 20 件いれておく。
final List<int> items = new List.generate(20, (index) => index);
// GridView を構築して返す。
// この GridView を末尾までスクロールした場合、表示するデータ件数を増やして追加表示できるようにする。
//
// GridView が末尾までスクロールされたかは、GridView に設定した ScrollController によって検知できる。
//
// GridView に表示される要素の最大数は、GridView が構築された時点で固定される。
// そのため、表示するデータ件数を増やすには、再度 GridView を再構築する必要がある。
@override
Widget build(BuildContext context) {
// GridView のスクロールを検知するための ScrollController。
final ScrollController _scrollController = new ScrollController();
// ScrollController にイベントリスナーを設定する。
_scrollController.addListener(() {
// 最後までスクロールしたら、次のデータを読み込む。
if (_scrollController.position.maxScrollExtent <=
_scrollController.position.pixels) {
// 表示するデータを追加し、ウィジェットを再構築するよう通知する。
setState(() {
// 表示するデータにさらに 20 件データを追加する。
this.items.addAll(
new List.generate(20, (index) => this.items.length + index));
});
}
});
// GridView の要素を表示されるタイミングで構築できるように itemBuilder を指定する。
return new MaterialApp(
home: new Scaffold(
// itemBuilder を指定できる GridView.builder() で GridView を構築する。
body: GridView.builder(
itemBuilder: (BuildContext context, int index) {
// itemBuilder は、引数 index の位置にある GridView の要素が表示されるタイミングで
// 呼び出され、その要素を構築して返す。
print("make item: ${index}");
// 引数 index の位置にある GridView の要素を構築して返す。
return new Center(child: new Text("#${index}"));
},
// GridView で表示するデータの件数を設定する。
// これ以上の件数のデータを表示する場合、新たな itemCount 値を設定して GridView を再構築する必要がある。
itemCount: items.length,
// スクロールされたことを検知するため、ScrollController を設定する。
controller: _scrollController,
// GridView の要素は横に 3 個ずつ並べてレイアウトする。
// これは、new GridView.count(crossAxisCount: 3, children: <Widget>[]) に相当する。
// GridView の要素の配置は SliverGridDelegateWithFixedCrossAxisCount を設定することで指定できる。
gridDelegate: new SliverGridDelegateWithFixedCrossAxisCount(
crossAxisCount: 3,
),
),
),
);
}
}
続きを読む
Flutter,
Dart
コメント (0)
2018/05/17 19:11:40
Dart 非同期処理の async / await について
Flutter でのアプリ開発にあたって Dart 言語を順序立って体系的に学ぶ余裕がなかったこともあり、必要になるポイント毎に言語仕様を調べていました。しかし、とうとう非同期処理で利用される await キーワードの動作について納得できない事象に遭遇し、きちんと調べ直すことにしました。今日は調べ直した Dart 言語の非同期処理について書いてみたいと思います。
事の発端: await をつけているのに処理が進んでしまう
Dart には await というキーワードがあり、非同期処理を行うメソッドの呼び出し時に追記することで、メソッド内部の非同期処理の完了を待つことができます。
以下のように書いた場合、sumNumber() メソッドの内容が非同期処理であっても、返り値を i で受け取るまで、その先に処理は進みません。var i = await sumNumber();
これについては Dart の言語仕様を正しい理解したものですが、await をとりまく言語仕様について、もう少し考慮しなければならない点がありました。
例えば以下のようなコードを実行すると、どのように標準出力されるでしょうか。import 'dart:async';
main() {
ready();
countDown();
go();
}
void ready() {
print("Ready set...");
}
void countNumber(number) {
print(number);
}
void countDown() async {
// ここで await を使って待機したい
await countNumber(3);
countNumber(2);
countNumber(1);
}
void go() {
print("Go!!");
}
await で処理が止まるので、Ready set...
3
2
1
Go!!
となりそうですが、実際には以下のように出力されます。Ready set...
Go!!
3
2
1
上記ソースコードからのこの出力結果が納得できない方が今回の対象読者です。私もこの記事を書くまで納得できませんでした。
続きを読む
Flutter,
Dart
コメント (1)
2018/05/14 20:09:51
VMwarePlayer で動かした Fedora 26 で KVM を動かしてみる
諸事情により VMwarePlayer で利用している Fedora 26 の中で、さらに KVM を動かせたので方法を残しておきます。
KVM を使いたい理由はマイブームの Flutter 開発で、Android エミュレーターを高速に動かすためです。Linux の Android エミュレーターって、HXAM(Intel Hardware Accelerated Execution Manager) の代わりに KVM を使うと高速化できるそうなのです。
バーチャルマシンの中でバーチャルマシンを動かすという 2 重入れ子なんてとてもパフォーマンスが出ないだろうと思っていたのですが、KVM を有効化しない場合に比べれば結構早く動くようになったので、試してみてよかったです。
前提として、物理マシンや一層めの VMwarePlayer で、HXAM や KVM を有効にしておきましょう。
物理マシンについては BIOS の設定で、VMwarePlayer についてはゲストのバーチャルマシンの CPU の設定で有効にできます。
Fedora 26 に KVM をインストールする
KVM を使うとはいえ、qemu や libvirt も組み合わせて使う必要があるので以下のパッケージを入れておきます。# dnf install -y qemu-kvm virt-manager virt-install
更に、バーチャルマシンを入れ子にして使う場合のパフォーマンスを上げるため、カーネルモジュールの Nested KVM を有効にします。
以下のコマンドで現在の Nested KVM の設定を確認できます。# cat /sys/module/kvm_intel/parameters/nested
N
N(無効)になっているので、これを Y(有効)にしていきます。
設定ファイル /etc/modprobe.d/qemu-system-x86.conf に "options kvm_intel nested=1" を追記します。
設定ファイルを読み込むため、一度カーネルモジュールを外して再度読み込みます。カーネルモジュールは modprobe コマンドでロード/アンロードすることができます。# echo "options kvm_intel nested=1" >> /etc/modprobe.d/qemu-system-x86.conf
# modprobe -r kvm_intel
# modprobe kvm_intel
カーネルモジュール kvm_intel を再ロードしたら、Nested KVM の設定を確認してみます。# cat /sys/module/kvm_intel/parameters/nested
Y
Y(有効)になりました。
virt-manager でバーチャルマシンを管理する
これは Android エミュレーターを使う話ではなく、普通にバーチャルマシンとして他の OS を使う話になります。
KVM でのバーチャルマシンの管理には、libvirt で提供される virt-manager を使うと便利です。
virt-manager を実行するユーザーを libvirt グループに所属させておくと、su 権限なしで実行できるそうなので、グループに所属させておきます。# gpasswd -a momokan libvirt
あとは virt-manager コマンドを叩けば virt-manager を起動できます。$ virt-manager
Android エミュレーターを起動する
KVM を有効にして Android エミュレーターを動かすには、Android エミュレーターに KVM を有効化するオプションを渡す必要があります。
昔の Android Studio では Android エミュレーターを起動する際にこのオプションを指定できたようなのですが、現在の AVD Manager には設定項目が見当たりません。なので、一度 AVD Manager で作った Android エミュレーターを、Android SDK の emulator コマンドで起動します。$ emulator -avd Pixel_API_27 -qemu -m 1024 -enable-kvm &
Pixel_API_27 というのが私が作った Android エミュレーターの名前です。-enable-kvm オプションを指定することで、KVM を有効にして起動することができます。
ちなみに -m オプションは端末のメモリ量(MB)です。ついでに指定しておいたもので KVM の有効化とは関係ありません。
Linux
コメント (0)
2018/05/10 18:58:37