Apache Kafka for HDInsight (public preview) (2)

Microsoft Japan Data Platform Tech Sales Team

高木 英朗

 

前回は Kafka for HDInsight の概要についてご紹介いたしました。今回は実際に Kafka for HDInsight のデプロイからサンプルコードの実行する方法をご紹介いたします。
今回の手順は以下の Get started with Apache Kafka (preview) on HDInsight の記事をもとにしています。
/en-us/azure/hdinsight/hdinsight-apache-kafka-get-started

 

Kafka for HDInsight のデプロイ
HDInsight のデプロイ方法については過去の記事「Microsoft Azure の Hadoop ディストリビューション HDInsight を使ってみよう! (1)」をご参照ください。Kafka for HDInsight の異なる部分は「クラスターの構成」において、クラスターの種類を「Kafka (プレビュー)」を選択する部分です。
deploy

デプロイが完了したら、SSH でヘッドノードにアクセスして実際に動かしてみましょう。

 

準備

ZooKeeper と Broker のホスト名を取得
ストリームの読み書きの作業において、ZooKeeper と Broker のホスト名が必要になるため、環境変数にセットしておきます。

まずは、Ambari API からのレスポンスをきれいに取得するために jq を利用します。

 sudo apt -y install jq

jq が使えるようになったら以下の様にして各ホスト名を環境変数にセットします。{Ambari 管理ユーザー}, {Ambari パスワード}, {HDInsight クラスタ名} は実際にデプロイした環境のものに置き換えてください。

ZooKeeper ホスト名取得

 export KAFKAZKHOSTS=`curl --silent -u '{Ambari 管理ユーザー}:{Ambari パスワード}' -G https://{Ambari ホスト名}:8080/api/v1/clusters/{HDInsight クラスタ名}/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")'`

Broker ホスト名取得

 export KAFKABROKERS=`curl --silent -u '{Ambari 管理ユーザー}:{Ambari パスワード}' -G https://{Ambari ホスト名}:8080/api/v1/clusters/{HDInsight クラスタ名}/services/HDFS/components/DATANODE | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")'`

以下のコマンドを実行してホスト名が出力されれば OK です。

 echo '$KAFKAZKHOSTS='$KAFKAZKHOSTS
echo '$KAFKABROKERS='$KAFKABROKERS

※ Broker は HDInsight の各ワーカーノードに配置される形になります。

 

トピックの作成とレコードの読み書き

トピックの作成
Kafka は「トピック」というカテゴリにストリームデータを保存するため、まずはトピックを作成します。ここでは test という名前のトピックを作成します。

 /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS

以下を実行すると作成されたトピックを確認することができます。

 /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS

レコードの読み書き
トピックを作成したのでレコードを書き込んでみます。
以下を実行するとデータ入力の待機状態になります。

 /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test

待機状態でテキストを入力して、終わったら Ctrl + C を押すと終了します。以下は例です。

 Hello
Konnichiwa
Bye
Sayonara

今度は書き込んだレコードを読んでみます。
以下を実行するとトピックに書き込まれているレコードが画面に出力されて、すべて読み込むと待機状態に入ります。

 /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper $KAFKAZKHOSTS --topic test --from-beginning

以下は出力の例です。

 {metadata.broker.list=Broker のリストが表示されます, request.timeout.ms=30000, client.id=console-consumer-22338, security.protocol=PLAINTEXT}
Hello
Konnichiwa
Bye
Sayonara

Consumer は読み込んだ位置を保存して、以降の書き込みを読むためにポーリング状態になっています。Ctrl + C で終了させます。

 

サンプルコードの実行

ビルド
次はサンプルコードを試してみます。サンプルコードは GitHub にありますので、こちらから取得してビルドしてください。ビルド方法は README.md に書かれていますが、JDK7 以降と Maven が必要です。

この例ではすべてヘッドノード上で作業していますが、実際の環境に読み替えてお試しいただければと思います。

Maven のインストール (本記事投稿時点の最新版である 3.3.9 を使用します)

 wget https://www-eu.apache.org/dist/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
cd /usr/local/
sudo tar xzvf /path/to/apache-maven-3.3.9-bin.tar.gz
sudo ln -s apache-maven-3.3.9 maven

Maven の PATH を設定します。/home/ユーザー名/.profile の PATH に /usr/local/maven/bin を追記します。

 PATH="$HOME/bin:$HOME/.local/bin:$PATH:/usr/local/maven/bin"

読み込みます。

 source /home/username/.profile

準備ができたら、サンプルを取得してビルドします。
GitHub から取得します。

 git clone https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.git

ビルドします。

 cd hdinsight-kafka-java-get-started/Producer-Consumer/
mvn clean package

ビルドが完了すると jar ができてますので、実行権限を与えておきます。

 cd target
chmod u+x kafka-producer-consumer-1.0-SNAPSHOT.jar

サンプルの実行

ビルドしたサンプルの Producer を実行します。以下の様に実行すると Kafka にレコードが書き込まれてレコード数のカウンターが表示されます(全部で 100 万レコード書き込まれます)

 ./kafka-producer-consumer-1.0-SNAPSHOT.jar producer $KAFKABROKERS

このサンプルでは Producer API を使ってトピック名とデータを非同期に送信しています。
*Producer.java の 39 行目あたり

              // Send the sentence to the test topic 
             producer.send(new ProducerRecord("test", sentence));

Consumer を実行して書き込まれたレコードを読み取ります。(Producer で書き込みしながら実行するとほぼリアルタイムに読み取れます)

 ./kafka-producer-consumer.jar consumer $KAFKABROKERS

このサンプルでは Consumer API を使ってトピックを subscribe してレコードをポーリングします。
*Consumer.java の 27 行目あたりから

          // Subscribe to the 'test' topic 
         consumer.subscribe(Arrays.asList("test")); 
 
 
         // Loop until ctrl + c 
         int count = 0; 
         while(true) { 
             // Poll for records 
             ConsumerRecords records = consumer.poll(200); 
             // Did we get any? 
             if (records.count() == 0) { 
                 // timeout/nothing to read 
             } else { 
                 // Yes, loop over records 
                 for(ConsumerRecord record: records) { 
                     // Display record and count 
                     count += 1; 
                     System.out.println( count + ": " + record.value()); 
                 } 
             } 
         }

以下のように出力されれば OK です。

 999591: i am at two with nature
999592: snow white and the seven dwarfs
999593: i am at two with nature
999594: i am at two with nature
999595: the cow jumped over the moon
999596: snow white and the seven dwarfs
999597: snow white and the seven dwarfs
999598: snow white and the seven dwarfs
999599: i am at two with nature

読み取りが終わると待機状態になりますので、Ctrl + C で終了します。

Consumer の負荷分散を試す場合
ビルドした Consumer のサンプルから Consumer Group を指定できますので、複数の SSH 端末から Consumer Group を指定して起動してみてください。複数の Consumer で分担してデータを読み取ることができるようになります。

 ./kafka-producer-consumer.jar consumer $KAFKABROKERS mygroup

 

Kafka Streams のサンプル実行
Kafka Streams を使うと、Storm, Spark Streaming, Samza 等に頼らずに Kafka でストリーム処理をすることができます。Streaming API は Kafka 0.10.0 で追加されました。HDInsight ではそのまま利用することができます。

ビルド
GitHub から落としてきたサンプルの中に Streaming のサンプルも含まれていますので、今度はこちらをビルドします。

 cd /path/to/hdinsight-kafka-java-get-started/Streaming
mvn clean package
cd target
chmod u+x kafka-streaming-1.0-SNAPSHOT.jar

サンプルの実行
ビルドできたらストリーミングプロセスをバックグラウンドで実行します。

 ./kafka-streaming-1.0-SNAPSHOT.jar $KAFKABROKERS $KAFKAZKHOSTS 2>/dev/null &

このサンプルコードでは test トピックのデータを取得し、単語をカウントして wordcounts トピックに送っています。
*Stream.java の 33 行目あたりから

         KStream sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue(word, word))
                .through("RekeyedIntermediateTopic")
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

Producer のサンプルを使ってメッセージを test トピックに送ります。test トピックに送られると、先ほど実行したバックグラウンド起動させたストリーミングプロセスが処理を実行して wordcounts トピックに送ります。

 /path/to/kafka-producer-consumer-1.0-SNAPSHOT.jar producer $KAFKABROKERS &>/dev/null &

Consumer スクリプトを使って、集計後のデータが入っている wordcounts トピックからデータを読み取ります。

 /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper $KAFKAZKHOSTS --topic wordcount --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

単語がカウントされてこんな感じの出力になります。Ctrl + C で終了できます。

 and     10096
seven   10096
years   5062
ago     5062
four    5063
score   5063
and     10097
seven   10097
years   5063

このようにして、HDInsight 上ですぐに Kafka を使い始めることができますので、是非お試しいただければと思います。

関連記事