Articles

Abfragbare Kafka-Themen mit Kafka-Streams

In den heutigen Datenverarbeitungsarchitekturen wird Apache Kafka häufig in der Ingress-Phase verwendet. Normalerweise wird dieser Schritt verwendet, um die eingehenden Nachrichtendaten anzureichern und zu filtern; es ist jedoch erst zu einem späteren Zeitpunkt in der Datenverarbeitungspipeline möglich, interaktive Abfragen durchzuführen. Dies liegt daran, dass, obwohl jede Nachricht in einem Kafka-Thema standardmäßig beibehalten wird, noch kein Mechanismus verfügbar ist, der eine schnelle Suche nach einer bestimmten Nachricht in einem Thema ermöglicht.Dennoch würde die Möglichkeit, neue Daten in diesem frühen Stadium der Pipeline abzufragen, die Verzögerungen herkömmlicher Verarbeitungspipelines vermeiden, die normalerweise langwierige Batch-Vorverarbeitungsschritte umfassen, und den Endbenutzern einen nahezu sofortigen Zugriff auf eingehende Daten ermöglichen.

Zum Erstellen von Datenverarbeitungsanwendungen mit Kafka wird häufig die Kafka Streams-Bibliothek verwendet, die als Teil des Kafka-Projekts verwaltet wird, um Datentransformationen und -analysen zu definieren. Ein wichtiges Merkmal von Kafka Streams sind State Stores, die eine Abstraktion eines schnellen lokalen Schlüssel-Wert-Speichers bieten, in den gelesen und geschrieben werden kann, wenn Nachrichten mit Kafka Streams verarbeitet werden. Diese Schlüssel-Wert-Speicher können kontinuierlich mit neuen Nachrichten aus einem Kafka-Thema gefüllt werden, indem ein geeigneter Stream-Prozessor definiert wird, so dass es jetzt möglich ist, Nachrichten aus dem zugrunde liegenden Thema schnell abzurufen.

Aufbauend auf dieser Kafka Streams-Funktionalität erstellen wir eine einheitliche REST-API, die einen einzigen Abfrageendpunkt für ein bestimmtes Kafka-Thema bereitstellt.Zusammenfassend lässt sich sagen, dass die Kombination von Kafka-Streams-Prozessoren mit Statusspeichern und einem HTTP-Server jedes Kafka-Thema effektiv in einen schnellen schreibgeschützten Schlüsselwertspeicher verwandeln kann.

Kafka Streams ist als Bibliothek aufgebaut, die in eine eigenständige Java- oder Scala-Anwendung eingebettet werden kann. Es ermöglicht Entwicklern, Stream-Prozessoren zu definieren, die Datentransformationen oder Aggregationen für Kafka-Nachrichten durchführen, um sicherzustellen, dass jede Eingabenachricht genau einmal verarbeitet wird. Mit dem Kafka Streams DSL, das von der Java Stream API inspiriert ist, können Stream-Prozessoren und Statusspeicher flexibel verkettet werden.Darüber hinaus bilden die Prozesse beim Starten mehrerer Instanzen einer Kafka Streams-basierten Anwendung automatisch einen lastausgleichenden, hochverfügbaren Verarbeitungscluster, ohne von anderen externen Systemen als Kafka abhängig zu sein.Stellen Sie sich folgendes Szenario vor, um die Architektur einer Kafka Streams-Anwendung zu veranschaulichen, die State Stores verwendet: Als Eisenbahnbetreiber wird jedes Mal, wenn ein Kunde eine Reise auf unserer Website bucht, eine neue Nachricht bestehend aus der Kunden-ID und einem Zeitstempel in ein Kafka-Thema eingefügt. Insbesondere fügen ein oder mehrere Kafka-Produzenten diese Nachrichten in ein Thema mit neun Partitionen ein. Da die Kunden-ID als Schlüssel für jede Nachricht ausgewählt wird, werden Daten, die zu einem bestimmten Kunden gehören, immer in dieselbe Partition des Themas eingefügt. Implizit verwenden Kafka-Produzenten die DefaultPartitioner , um Partitionen Nachrichten zuzuweisen.Angenommen, wir haben eine Kafka Streams-Anwendung, die Nachrichten aus diesem Thema liest und in einem Statusspeicher beibehält. Da der Schlüssel jeder Nachricht nur aus der Kunden-ID besteht, ist der entsprechende Wert im Statusspeicher immer der Zeitstempel der letzten Buchung des Kunden. Um den maximalen Grad der parallelen Verarbeitung zu erreichen, können wir bis zu neun Instanzen der Kafka Streams-Anwendung starten. In diesem Fall wird jeder Anwendungsinstanz genau eine der Themenpartitionen zugewiesen. Für jede Eingabepartition erstellt Kafka Streams einen separaten Statusspeicher, der wiederum nur die Daten der Kunden enthält, die zu dieser Partition gehören.

Die resultierende Anwendungsarchitektur ist im folgenden Diagramm dargestellt.

Die Kafka-Stream-Prozessoren, die für die Partitionen 4 bis 9 verantwortlich sind, werden in dieser Abbildung weggelassen. Die gestrichelten Pfeile zeigen an, dass neue Nachrichten in einer Partition auch an zusätzliche Stream-Prozessoren und deren Zustandsspeicher weitergegeben werden, was ein schnelles Failover ermöglicht, wenn der primär zugewiesene Prozessor ausfallen sollte.

Es ist möglich, entweder In-Memory- oder persistente Zustandsspeicher in der Anwendung zu verwenden. Operationen mit In-Memory-Zustandsspeichern sind im Vergleich zur persistenten Variante, die intern einen RocksDB-Speicher verwendet, noch schneller. Andererseits können persistente Statusspeicher schneller wiederhergestellt werden, falls eine Kafka Streams-Anwendung fehlgeschlagen ist und neu gestartet werden muss. Darüber hinaus ist das Datenvolumen pro Speicher nicht durch die Größe des Hauptspeichers begrenzt, wenn persistente Zustandsspeicher verwendet werden.

In unserem Szenario ist es nicht erforderlich, ein Changelog-Thema zu haben, das Schreiboperationen in die Statusspeicher aufzeichnet: Alle Daten, die zum Wiederherstellen eines Statusspeichers erforderlich sind, können aus dem ursprünglichen Eingabethema abgerufen werden.

Hinzufügen eines REST-Endpunkts zu Stream-Prozessoren

Mit der bisher vorgestellten Architektur verfügen wir über neun State Stores, mit denen das letzte Buchungsdatum der Kunden abgerufen werden kann, die zur jeweiligen Eingabethemenpartition gehören.Um diese Informationen von außerhalb der Kafka Streams-Prozessoren zugänglich zu machen, müssen wir nun einen Dienstendpunkt für jede der Stream Processor-Anwendungsinstanzen verfügbar machen und eingehende Anforderungen aus dem internen Statusspeicher beantworten, der von Kafka Streams verwaltet wird.

Als zusätzliche Anforderung können wir nicht erwarten, dass die anfordernde Anwendung weiß, welche Kafka Streams-Instanz derzeit für die Verarbeitung der Daten eines bestimmten Kunden verantwortlich ist. Folglich ist jeder Serviceendpunkt dafür verantwortlich, die Abfrage an die richtige Anwendungsinstanz umzuleiten, wenn die Kundendaten nicht lokal verfügbar sind.

Wir haben uns entschieden, den Service-Endpunkt als REST-API zu implementieren, die den Vorteil hat, von jedem Client aus zugänglich zu sein, der HTTP unterstützt, und die es ermöglicht, einen transparenten Load Balancer sehr einfach hinzuzufügen.

Das KafkaStreams -Objekt, das in jeder Kafka Streams-Anwendung verfügbar ist, bietet schreibgeschützten Zugriff auf alle lokalen Statusspeicher und kann auch die Anwendungsinstanz bestimmen, die für eine bestimmte Kunden-ID verantwortlich ist. Wenn Sie dieses Objekt zum Erstellen unseres REST-Dienstes verwenden, sieht die Architektur wie folgt aus:

Ein HTTP-Client lookup-Anforderungen an einen der REST-Endpunkte der Stream-Prozessoren. Der gestrichelte Pfeil zeigt an, wie eine Anforderung intern zwischen den Stream-Prozessoren weitergeleitet wird, wenn sie nicht von einem lokalen Statusspeicher beantwortet werden kann.

Zusammenfassend verwendet unsere vorgeschlagene Architektur Kafka-Themen, um Nachrichtendaten zuverlässig im Ruhezustand zu speichern, und verwaltet eine zweite Darstellung der Daten in Zustandsspeichern, um schnelle Abfragen zu unterstützen.

Sicherstellung der Skalierbarkeit der Anwendung

Um eine skalierbare Anwendung zu erhalten, müssen wir sicherstellen, dass die Verarbeitungslast gleichmäßig auf alle Instanzen der Kafka Streams-Anwendung verteilt ist. Die Belastung eines einzelnen Stream-Prozessors hängt von der Datenmenge und den Abfragen ab, die er verarbeiten muss.Genauer gesagt ist es wichtig, ein Partitionierungsschema für das Kafka-Thema so zu wählen, dass die Last eingehender Nachrichten und Abfragen über alle Partitionen und folglich auch über alle Stream-Prozessoren hinweg ausgeglichen ist.

Wenn In-Memory-Zustandsspeicher verwendet werden sollen, muss die Anzahl der Partitionen im Thema groß genug sein, damit jeder Stream-Prozessor das Datenvolumen einer Partition im Hauptspeicher halten kann. Beachten Sie, dass ein Stream-Prozessor im Falle von Failovers möglicherweise sogar zwei Partitionen im Hauptspeicher speichern muss.

Um Stream-Prozessorausfälle zu berücksichtigen, kann die Anzahl der Standby-Replikate mit der Einstellung num.standby.replicas in Kafka Streams konfiguriert werden, die sicherstellt, dass zusätzliche Stream-Prozessoren auch Nachrichten von einer bestimmten Prozessorpartition abonnieren. Im Falle eines Fehlers können diese Prozessoren die Beantwortung von Abfragen schnell übernehmen, anstatt erst nach einem bereits aufgetretenen Fehler mit der Rekonstruktion des Statusspeichers zu beginnen.

Schließlich muss die gewünschte Anzahl von Stream-Prozessoren zur verfügbaren Hardware passen. Für jede Stream Processor-Anwendungsinstanz sollte mindestens ein CPU-Kern reserviert werden. Auf Multicore-Systemen ist es möglich, die Anzahl der Stream-Threads pro Anwendungsinstanz zu erhöhen, wodurch der Overhead verringert wird, der durch das Starten einer separaten Java-Anwendung pro CPU-Kern entsteht.