[フレーム]
BT

最新技術を追い求めるデベロッパのための情報コミュニティ

寄稿

InfoQのすべての体験をアンロックして、そのメリットを最大限に活用しましょう

ログインして、InfoQのすべての体験をアンロックしましょう!お気に入りの著者やトピックの最新情報を入手し、コンテンツと交流し、限定リソースをダウンロードできます。

ログイン
または

アカウントをお持ちでない方

登録
  • あなたにとって重要なトピックや同僚の最新情報を入手しましょう最新の洞察とトレンドに関する最新情報を即座に受け取りましょう。
  • 継続的な学習のために、無料のリソースに手軽にアクセスしましょうミニブック、トランスクリプト付き動画、およびトレーニング教材。
  • 記事を保存して、いつでも読むことができます記事をブックマークして、準備ができたらいつでも読めます。

Topics

地域を選ぶ

AIオールスターズ2025

"AI活用"をキーワードに「AIを使い倒し/使いこなす」企業の最前線をお届けします。

QCon San Francisco - image
QCon San Francisco 2025

Get production-proven patterns from the leaders who scaled a GenAI search platform to millions, migrated a core ML system without downtime, and architected a global streaming service from the ground up.

Early Bird ends Nov 11.

QCon AI New York - image
QCon AI New York 2025

Move beyond AI demos to real engineering impact. Discover how teams embed LLMs, govern models, and scale inference pipelines to accelerate development securely.

Early Bird ends Nov 11.

QCon London - image
QCon London 2026

Benchmark your systems against leading engineering teams. See what really works in FinOps, modern Java, and distributed data architectures to balance cost, scale, and reliability.

Early Bird ends Nov 11.

InfoQ ホームページ アーティクル KafkaFlowでKafkaイベント駆動型アプリケーションを構築する

KafkaFlowでKafkaイベント駆動型アプリケーションを構築する

2023年10月13日 読了時間 18 分

キーポイント

  • KafkaFlowは、Kafkaベースのイベント駆動型アプリケーションを合理化し、Kafkaコンシューマとプロデューサの開発を簡素化するオープンソースプロジェクトである。
  • .NETフレームワークは、ミドルウェア、メッセージハンドラ、型ベースのデシリアライゼーション、並行性制御、バッチ処理など、幅広い機能を提供している。
  • ミドルウェアを利用することで、開発者はメッセージを処理するためのロジックをカプセル化でき、懸念事項の分離がより向上し、保守性の高いコードにつながる。
  • プロジェクトは拡張し、カスタマイズの可能性が生まれ、アドオンのエコシステムが成長する。
  • 開発者は、重要なことに集中でき、低レベルの懸念に投資するよりもビジネスロジックを練ることに多くの時間を費やすという点において、KafkaFlowからの恩恵を受けることが可能だ。

なぜ気にする必要があるのか?

KafkaFlow は Confluent .NET Kafka クライアントの抽象化レイヤーを提供する。KafkaFlow は、Kafka コンシューマーとプロデューサーの使用、メンテナンス、テストを容易にする。

マーケティング施策のためにクライアントカタログを構築する必要があると想像してみてください。新しいクライアントを獲得するメッセージを消費するサービスが必要だ。必要なサービスを構築し始めると、既存のサービスがメッセージを消費する方法に一貫性がないことに気づく。

グレースフルシャットダウンのような単純な問題で、チームが苦労して解決しているのをよく見かける。課題の1つを挙げるだけでも、組織全体で4つの異なるJSONシリアライザーの実装があることがわかったであろう。

    関連スポンサーコンテンツ

KafkaFlowのようなフレームワークを採用することで、プロセスを簡素化し、開発サイクルをスピードアップできる。KafkaFlowには、開発者のエクスペリエンスを向上させるために設計された一連の機能がある。

1.ミドルウェア: KafkaFlowは、開発者がメッセージを処理するミドルウェアを作成することを可能にし、Kafkaコンシューマー/プロデューサーパイプラインの制御とカスタマイズを可能にする。

2.ハンドラ: メッセージハンドラの概念を導入し、開発者がトピックからのメッセージ処理をメッセージタイプ専用のハンドラに転送できるようにした。

3.デシリアライズアルゴリズム: シリアライゼーションとデシリアライゼーションのアルゴリズムのセットをすぐに利用できる。

4.マルチスレッドコンシューマ: メッセージの順序が保証されたマルチスレッドを提供し、システムリソースの最適な利用を支援する。

5.管理APIとダッシュボード: コンシューマとコンシューマグループを管理するためのAPIとダッシュボードを提供し、一時停止、再開、オフセットの巻き戻しなどの操作をすべて実行時に行う。

6.消費者のスロットリング: トピックの消費に優先順位をつける簡単な方法を提供する。

このような問題に対処できる可能性を見ていただくために、その可能性を探ってみよう。

KafkaFlowプロデューサー:メッセージプロダクションの簡素化

まずはメッセージプロデューサから始めよう。

Kafkaにメッセージを生成するのはそんなに難しいことではない。それでも、KafkaFlowはConfluentの.NET Kafkaクライアントからのプロデューサーインターフェースをより高いレベルで抽象化し、コードをシンプルにして保守性を高めている。

以下は KafkaFlow プロデューサーでメッセージを送信する例である。

await _producers["my-topic-events"]
 .ProduceAsync("my-topic", message.Id.ToString(), message);

こうすることで、Kafkaクライアントのシリアライズやその他の複雑な処理を直接行うことなく、Kafkaにメッセージをプロデュースできる。

それだけでなく、Producersの定義と管理は、サービス構成上のFluent Interfaceを通して簡単に実行できる。

services.AddKafka(kafka => kafka
 .AddCluster(cluster => cluster
 .WithBrokers(new[] { "host:9092" })
 .AddProducer(
 "product-events",
 producer =>
 producer
 ...
 )
 )
);

プロデューサはシンプルになりがちだが、圧縮やシリアライズのような一般的な問題事項に対処する必要がある。それを探ってみよう。

KafkaFlowにおけるカスタムのシリアライズ/デシリアライズ

Apache Kafkaの魅力的な機能の一つは、データ形式にとらわれないことだ。しかし、それはプロデューサとコンシューマに責任を転換することになる。熟慮されたアプローチがなければ、システム全体で同じ結果を得るために多くの方法が必要になるかもしれない。そのため、シリアライゼーションはクライアントフレームワークによって処理されるべき明らかな利用事例となる。

KafkaFlowには、JSON、Protobuf、さらにはAvro用のシリアライザが用意されている。これらは、ミドルウェアのコンフィギュレーションに追加するだけで利用できる。

.AddProducer<ProductEventsProducer>(producer => producer
 ...
 .AddMiddlewares(middlewares => middleware
 ...
 .AddSerializer<JsonMessageSerializer>()
 )
)

メッセージにカスタムのシリアライザ/デシリアライザを使用できるため、リストはこれら3つに限定されない。Confluentの.NET Kafkaクライアントはすでにカスタムのシリアライゼーション/デシリアライゼーションをサポートしているが、KafkaFlowはよりエレガントな処理方法を提供することで、プロセスを簡素化している。

たとえば、カスタムシリアライザを使用するには、次のように書いてみる。

public class MySerializer : ISerializer
{
 public Task SerializeAsync(object message, Stream output, ISerializerContext context)
 {
 // Serialization logic here
 }
 public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
 {
 // Deserialization logic here
 }
}
// Register the custom serializer when setting up the Kafka consumer/producer
.AddProducer<MyProducer>(producer => producer
 ...
 .AddMiddlewares(middlewares => middleware
 	 ...
 	 .AddSerializer<MySerializer>()
 )
)

KafkaFlowでのメッセージ処理

コンシューマはたくさんの質問と可能性をもたらす。 1つ目は、「メッセージをどのように処理しますか?」である。

もっともシンプルな方法から始めよう。CQRSとMeditorパターンを普及させたMediatRのようなライブラリの出現により、.NET開発者はメッセージハンドラをリクエスト/メッセージレシーバーから切り離すことに慣れてきた。KafkaFlowは、これと同じ原理をKafkaコンシューマにも、もたらす。

KafkaFlowのメッセージハンドラを利用すると、開発者はKafkaトピックからのメッセージを処理する特定のロジックを定義できる。KafkaFlowのメッセージハンドラの構造は、懸念事項をより適切に分離し、コードをよりクリーンで保守しやすいよう設計されている。

メッセージハンドラの例を次に示す。

public class MyMessageHandler : IMessageHandler<MyMessageType>
{
 public Task Handle(IMessageContext context, MyMessageType message)
 {
 // Message handling logic here.
 }
}

このハンドラは、コンシューマコンフィギュレーションに登録できる。

.AddConsumer(consumer => consumer
...
 .AddMiddlewares(middlewares => middlewares
 ...
 .AddTypedHandlers(handlers => handlers
 .AddHandler<MyMessageHandler>()
 )
 )
)

このアプローチでは、コンシューマをハンドラから簡単に分離でき、保守性とテスト性を簡素化できる。

マイクロサービスが1つのトピックを1つのメッセージタイプだけで扱う場合、これは不要な複雑さに見えるかもしれない。そのような場合は、ミドルウェアを活用することができる。

KafkaFlowのミドルウェア

KafkaFlowはミドルウェア指向だ。メッセージハンドラのスニペットに "Middlewares "という記述があることにお気づきだろうか。では、ミドルウェアとは何なのか?

ミドルウェアは型付きハンドラを実現するものである。メッセージは順番に呼び出されるミドルウェアパイプラインに送られます。MediatRのパイプラインを使ったことがある人なら、この概念に馴染みがあるかもしれない。また、ミドルウェアは一連の変換を適用するために使うことができる。つまり、あるミドルウェアは、受信メッセージを次のミドルウェアに変換することができる。

KafkaFlowのMiddlewareは、メッセージを処理するためのロジックをカプセル化する。パイプラインは拡張可能であり、開発者はメッセージ処理パイプラインに動作を追加できる。

ここにミドルウェアの例を示す。

public class MyMiddleware : IMessageMiddleware
{
 public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
 {
 // Pre-processing logic here. 
 await next(context); 
 // Post-processing logic here. 
 }
}

このミドルウェアを使用するには、コンシューマーコンフィギュレーションに登録できる。

.AddConsumer(consumer => consumer
 ...
 .AddMiddlewares(middlewares => middlewares
 ...
 .Add<MyMiddleware>()
 )
) 

このように、開発者はカスタムロジックをメッセージ処理パイプラインにプラグインして、柔軟性と制御を実現することが出来る。

型付きハンドラはミドルウェアの一種です。そのため、ミドルウェアを実装し、型付きハンドラなしでメッセージを処理ができ、ミドルウェアを利用して、メッセージを処理する前にバリデーションやエンリッチメントなどを行うメッセージパイプラインを構築することもできる。

KafkaFlowで並行処理を行う

インフラの効率化について考え始めると、多くのKafkaコンシューマーが十分に活用されていないことに気づくだろう。最も一般的な実装はシングルスレッドで、リソースの利用を制限している。そのため、スケーリングが必要な場合、希望のスループットを維持するために水平方向にスケーリングすることになる。

KafkaFlowは、インフラの効率化を実現する別の選択肢をもたらす。KafkaFlowでは、1つのコンシューマーが同時に処理できるメッセージ数を開発者がコントロールできる。KafkaFlowは、1つのトピックを消費して連携できるWorkerというコンセプトを採用している。

この機能により、Kafkaコンシューマーを最適化し、システムの機能によりマッチさせることができる。

以下は、コンシューマーの同時処理ワーカー数を設定する方法の例である。

.AddConsumer(consumer => consumer
.Topic("topic-name")
 .WithGroupId("sample-group")
 .WithBufferSize(100)
 .WithWorkersCount(10) // Set the number of workers.
 .AddMiddlewares(middlewares => middlewares
 	...
 	)
)

KafkaFlowは並行ワーカーが存在する場合でも、順序を保証する。

Batch処理

規模が大きくなると、レイテンシとスループットのトレードオフに直面する。そのトレードオフを処理するために、KafkaFlowは "Batch Consuming" という重要な機能を持っている。この機能は、KafkaからのメッセージをBatch単位で消費・処理する際の効率とパフォーマンスのニーズに応えるものである。メッセージのグループを個別に処理するのではなく、まとめて処理する必要がある使用事例で重要な役割を果たす。

Batch Consumingとは何か?

Batch Consumingとは、メッセージが入ってきたときにアトミックに処理するのではなく、複数のメッセージをグループ化して一度に処理するアプローチだ。この方法は、大量のデータを扱う場合、特にメッセージが互いに独立している場合に効率的である。Batchとして処理を実行することで、全体的なパフォーマンスの向上につながる。

KafkaFlowのBatch処理へのアプローチ

KafkaFlowは、Batch処理を提供するミドルウェアのシステムを活用している。Batch処理ミドルウェアでは、Batchサイズやタイムスパンによってメッセージをグループ化できる。これらの条件のいずれかに達すると、ミドルウェアはメッセージのグループを次のミドルウェアに転送する。

services.AddKafka(kafka => kafka
 .AddCluster(cluster => cluster
 .WithBrokers(new[] { "host:9092" })
 .AddConsumer(
 consumerBuilder => consumerBuilder
 ...
 .AddMiddlewares(
 middlewares => middlewares
 ...
 .BatchConsume(100, TimeSpan.FromSeconds(10))
 .Add<HandlingMiddleware>()
 )
 )
 )
);

Batch処理のパフォーマンスへの影響

Batch処理によって、開発者はKafkaベースのアプリケーションでより高いスループットを実現できる。各処理タスクの開始と終了に関連するオーバーヘッドが大幅に削減されるため、より高速な処理が可能になる。これは、システム・パフォーマンスの全体的な向上につながる。

また、このアプローチでは、データがより大きなチャンク(かたまり)として引き出されるため、ネットワークI/Oオペレーションが削減され、特にネットワーク遅延が懸念されるシステムでは、処理速度がさらに向上する。

KafkaFlowによるコンシューマー管理

KafkaFlowは、Kafkaコンシューマの管理に関連する管理作業も簡素化する。KafkaFlowの管理APIを使えば、コンシューマの起動、停止、一時停止、オフセットの巻き戻しなどをおこなうことができる。

管理APIは、プログラミングインターフェース、REST API、ダッシュボードUIを通して使用できる。

[画像をクリックするとフルサイズで表示されます。]

KafkaFlow管理ダッシュボード

コンシューマのスロットリング

多くの場合、基礎となるテクノロジは、Kafka コンシューマと同じように高負荷時に対処できないことがある。これは安定性に問題をもたらす可能性がある。そこで登場するのがスロットリングである。

コンシューマスロットリングは、メッセージの消費を管理するアプローチであり、アプリケーションがメトリクスに基づいてメッセージを消費するレートを動的に微調整できるようにする。

優先順位付け

アトミックアクションとバルクアクションを異なるコンシューマとトピックに分離したいアプリケーションを実行しているとしよう。バルクアクションよりもアトミックアクションの処理を優先したいかもしれない。従来、メッセージ生成速度に潜在的な差異があるため、この差別化を管理することは困難になる可能性があった。

このような場合、アトミックアクションを担当するコンシューマのコンシューマラグを監視できるコンシューマスロットリングが役に立つ。このメトリックに基づいて、バルクアクションを処理するコンシューマにスロットリングを適用し、アトミックアクションが優先的に処理されるようにする。その結果は?効率的で、柔軟で、最適化された消費プロセスである。

KafkaFlowの流暢なインターフェイスを使えば、コンシューマにスロットリングを追加するのは簡単だ。以下に簡単な例を示す。

.AddConsumer(
 consumer => consumer
 .Topic("bulk-topic")
 .WithName("bulkConsumer")
 .AddMiddlewares(
 middlewares => middlewares
 .ThrottleConsumer(
 t => t
 .ByOtherConsumersLag("singleConsumer")
 .WithInterval(TimeSpan.FromSeconds(5))
 .AddAction(a => a.AboveThreshold(10).ApplyDelay(100))
 .AddAction(a => a.AboveThreshold(100).ApplyDelay(1_000))
 .AddAction(a => a.AboveThreshold(1_000).ApplyDelay(10_000)))
 .AddSerializer<JsonCoreSerializer>()
 )
)

KafkaFlow: 将来に向けて

現在のところ、KafkaFlowは、.NETによるリアルタイムデータ処理アプリケーションの構築を簡素化する、Kafka上の堅牢で開発者に優しい抽象化を提供している。しかし、他のアクティブなオープンソースプロジェクトと同様に、KafkaFlowも進化と改善を続けている。

プロジェクトの現在の軌跡を考えると、いくつかの発展が予想される。例えば、KafkaFlowはミドルウェアシステムをさらに強化し、メッセージ処理の制御と柔軟性をさらに高める可能性がある。また、より広範な管理用APIが提供され、開発者がKafkaクラスタをさらにコントロールできるようになるかもしれない。

KafkaFlowは設計上拡張可能であるため、KafkaFlowコミュニティが成長し、より多くの貢献、革新的な機能、拡張、サポートが期待できる。より多くの開発者や組織がKafkaFlowを採用するにつれて、学習リソース、チュートリアル、ケーススタディ、その他のコミュニティが作成したコンテンツが増加し、新規ユーザがKafkaFlowを使い始めたり、既存ユーザがKafkaFlowライブラリからより多くを得るのに役立つだろう。

結論

KafkaFlowは、.NETでKafkaを使用する作業を簡素化する、便利で開発者に優しいツールである。それは開発者の体験と使いやすさの面で優れている。フレームワークのデザインは、クリーンで読みやすいコードに適している。KafkaFlow は、ミドルウェアとメッセージハンドラによる懸念事項の明確な分離と、Apache Kafka 上でアプリケーションを構築する際の複雑な問題の抽象化により、コードベースを管理しやすく理解しやすい状態に保つのに役立つ。

さらに、KafkaFlowを取り巻くコミュニティは成長し続けている。もしKafkaを利用していて、生産性と信頼性を向上させたいと考えているなら、KafkaFlowは十分に検討する価値がある。

作者について

Guilherme Ferreira

もっと見るより少なく

この記事に星をつける

おすすめ度
スタイル
  • 関連記事

トレンド

特集コンテンツ一覧

InfoQ ニュースレター

毎週火曜日に前週のまとめコンテンツをお送りいたします。(日本語版は不定期リリース)25万人のシニアな開発者コミュニティーにぜひご参加ください。 サンプルを見る

We protect your privacy.

BT

AltStyle によって変換されたページ (->オリジナル) /