Articles

Kafka Streamsを使用したクエリ可能なKafkaトピック

今日のデータ処理アーキテクチャでは、Apache Kafkaは入力段階でよく使用されます。 通常、このステップは、受信メッセージデータのエンリッチとフィルタリングに使用されますが、データ処理パイプラインの後の段階まで対話式クエリを作 これは、Kafkaトピック内のすべてのメッセージはデフォルトで永続化されますが、トピック内の特定のメッセージを高速に検索できるメカニズムはまだ

それにもかかわらず、パイプラインのこの初期段階で新しいデータを照会できることは、通常、長時間実行されるバッチ前処理ステップを含む伝統的な処理パイプラインの遅延を回避し、エンドユーザーが受信データにほぼ瞬時にアクセスできるようにします。Kafkaを使用してデータ処理アプリケーションを構築する場合、Kafkaプロジェクトの一部として保守されているKafka Streamsライブラリは、データ変換と分析を定義す Kafka Streamsの重要な機能の1つは状態ストアであり、kafka Streamsでメッセージを処理するときに読み書きできる高速なローカルキー値ストアの抽象化を提供します。 これらのキー値ストアは、適切なストリームプロセッサを定義することにより、Kafkaトピックからの新しいメッセージで継続的に埋めることができるため、基になるトピックからメッセージをすばやく取得することができるようになりました。このKafka Streams機能の上に構築し、特定のkafkaトピックに対して単一のクエリエンドポイントを提供する統合REST APIを作成します。要約すると、Kafka StreamsプロセッサとステートストアとHTTPサーバーを組み合わせることで、Kafkaトピックを高速読み取り専用のキー値ストアに効果的に変換できます。Kafka Streamsは、自己完結型のJavaまたはScalaアプリケーションに埋め込むことができるライブラリとして構築されています。 これにより、開発者は、Kafkaメッセージに対してデータ変換または集計を実行するストリームプロセッサを定義し、各入力メッセージが正確に1回処理されるよ Java Stream APIに触発されたKafka Streams DSLを使用すると、ストリームプロセッサ、および状態ストアを柔軟にチェーンできます。

さらに、Kafka Streamsベースのアプリケーションの複数のインスタンスを起動すると、kafka以外の外部システムに依存することなく、プロセスは自動的に負荷バラン

州の店舗を使用するKafka Streamsアプリケーションのアーキテクチャを説明するために、次のシナリオを想像してください。 具体的には、1人以上のKafkaプロデューサが、9つのパーティションを持つトピックにこれらのメッセージを挿入します。 顧客idは各メッセージのキーとして選択されるため、特定の顧客に属するデータは常にトピックの同じパーティションに挿入されます。 暗黙的に、KafkaプロデューサーはDefaultPartitionerを使用してメッセージをパーティションに割り当てます。ここで、このトピックからメッセージを読み取り、それらを状態ストアに保持するKafka Streamsアプリケーションがあると仮定します。 各メッセージのキーは顧客idのみで構成されるため、ステートストア内の対応する値は常に顧客の最新の予約のタイムスタンプになります。 並列処理の最大度を達成するために、Kafka Streamsアプリケーションの最大9つのインスタンスを起動できます。 この場合、各アプリケーションインスタンスには、トピックパーティションの1つだけが割り当てられます。 入力パーティションごとに、Kafka Streamsは個別の状態ストアを作成し、そのパーティションに属する顧客のデータのみを保持します。

結果のアプリケーションアーキテクチャを下の図に示します。Div>

パーティション4から9を担当するKafkaストリームプロセッサは、この図では省略されています….. 破線の矢印は、パーティション内の新しいメッセージが追加のストリームプロセッサとその状態ストアにも伝播されることを示しており、主に割り当てられたプロセッサに障害が発生した場合に高速なフェールオーバーを可能にします。figcaption>

アプリケーションでメモリ内または永続的な状態ストアを使用することができます。 メモリ内の状態ストアに対する操作は、RocksDBストアを内部的に使用するpersistentバリアントと比較してさらに高速です。 一方、永続状態ストアは、Kafka Streamsアプリケーションが失敗して再起動する必要がある場合に、より迅速に復元できます。 さらに、永続的な状態ストアを使用する場合、ストアあたりのデータ量はメインメモリの量によって制限されません。

このシナリオでは、状態ストアへの書き込み操作を記録するchangelogトピックを持つ必要はありません。

ストリームプロセッサへのRESTエンドポイントの追加

これまでに提示されたアーキテクチャでは、それぞれの入力トピックパーティションに属す顧客の最新の予約日を取得するために使用できる9つの状態ストアがあります。

ここで、この情報をKafka Streamsプロセッサの外部からアクセスできるようにするには、各stream processorアプリケーションインスタンスのサービスエンドポイントを公開し、kafka Streamsによって管理されている内部状態ストアからの着信要求に応答する必要があります。追加の要件として、要求元のアプリケーションが、特定の顧客のデータの処理を現在担当しているKafka Streamsインスタンスを知ることは期待できません。 したがって、顧客データがローカルで使用できない場合は、各サービスエンドポイントがクエリを正しいアプリケーションインスタンスにリダイレ

私たちは、HTTPをサポートする任意のクライアントからアクセスできるという利点を持ち、透過的なロードバランサーを非常に簡単に追加できるREST APIとしすべてのKafka Streamsアプリケーションで使用可能なKafkaStreamsオブジェクトは、すべてのローカルステートストアへの読み取り専用アクセスを提 このオブジェクトを使用してRESTサービスを構築すると、アーキテクチャは次のようになります。

HTTPクライストリームプロセッサのrestエンドポイントのいずれかへの参照要求。 破線の矢印は、ローカルステートストアから応答できない場合に、ストリームプロセッサ間で要求が内部的にどのように転送されるかを示します。

要約すると、提案されたアーキテクチャは、保存時にメッセージデータを確実に格納するためにKafkaトピックを使用し、高速クエリをサポートするた

アプリケーションのスケーラビリティの確保

スケーラブルなアプリケーションを取得するには、Kafka Streamsアプリケーションのすべてのインスタンス 個々のストリームプロセッサの負荷は、処理する必要があるデータとクエリの量によって異なります。

より具体的には、受信メッセージとクエリの負荷がすべてのパーティションと結果的にすべてのストリームプロセッサでバランスよくなるように、Kafkaトピ

インメモリステートストアを使用する場合は、トピック内のパーティションの数が十分に大きくなり、各ストリームプロセッサがメインメモリ内のパーテ フェイルオーバーの場合、ストリームプロセッサはメインメモリに二つのパーティションを保持する必要があることに注意してください。

ストリームプロセッサの障害を考慮するために、kafkaストリームのnum.standby.replicas設定を使用してスタンバイレプリカの数を設定することができます。 障害が発生した場合、これらのプロセッサは、障害が既に発生した後にのみ状態ストアの再構築を開始するのではなく、クエリへの応答を迅速に引き継ぐことができます。

最後に、ストリームプロセッサの所望の数は、利用可能なハードウェアに適合する必要があります。 Stream processorアプリケーションインスタンスごとに、少なくとも1つのCPUコアを予約する必要があります。 マルチコアシステムでは、アプリケーションインスタンスあたりのストリーム-スレッド数を増やすことができ、CPUコアごとに個別のJavaアプリケーションを