Articles

Queryable Kafka Topics with Kafka Streams

w dzisiejszych architekturach przetwarzania danych, Apache Kafka jest często używany na etapie ingress. Zazwyczaj ten krok służy do wzbogacania i filtrowania danych przychodzących wiadomości; jednak nie jest możliwe wykonywanie interaktywnych zapytań aż do późniejszego etapu w potoku przetwarzania danych. Dzieje się tak dlatego, że chociaż każda wiadomość w temacie Kafki jest domyślnie utrzymywana, nie jest jeszcze dostępny żaden mechanizm, który pozwala na szybkie wyszukiwanie konkretnej wiadomości w temacie.

niemniej jednak możliwość zapytania nowych danych na tym wczesnym etapie rurociągu pozwoliłaby uniknąć opóźnień w tradycyjnych rurociągach przetwarzania, które zwykle obejmują długotrwałe etapy wstępnego przetwarzania wsadowego i dałyby użytkownikom końcowym niemal natychmiastowy dostęp do przychodzących danych.

do tworzenia aplikacji do przetwarzania danych z Kafka, Biblioteka Kafka Streams, która jest utrzymywana w ramach projektu Kafka, jest powszechnie używana do definiowania transformacji i analiz danych. Jedną z ważnych cech strumieni Kafka są state stores, oferujące abstrakcję szybkiego lokalnego magazynu klucza i wartości, do którego można odczytywać i zapisywać podczas przetwarzania wiadomości za pomocą strumieni Kafka. Te kluczowe magazyny wartości mogą być stale wypełniane nowymi wiadomościami z tematu Kafki poprzez zdefiniowanie odpowiedniego procesora strumieniowego, dzięki czemu możliwe jest teraz Szybkie Pobieranie wiadomości z tematu bazowego.

opierając się na tej funkcjonalności Kafka Streams, tworzymy zunifikowane API REST, które zapewnia pojedynczy punkt końcowy zapytania dla danego tematu Kafki.

podsumowując, połączenie procesorów strumieni Kafki ze sklepami stanowymi i serwerem HTTP może skutecznie zmienić każdy temat Kafki w szybki magazyn wartości klucza tylko do odczytu.

Kafka Streams jest zbudowana jako biblioteka, która może być wbudowana w samodzielną aplikację Java lub Scala. Pozwala programistom definiować procesory strumieniowe, które wykonują transformacje lub agregacje danych na wiadomościach Kafki, zapewniając, że każda wiadomość wejściowa jest przetwarzana dokładnie raz. Korzystanie z Kafka Streams DSL, który jest inspirowany przez API Java Stream, procesory strumieniowe i sklepy państwowe mogą być elastycznie łańcuchowe.

Ponadto, podczas uruchamiania wielu instancji aplikacji opartej na strumieniach Kafki, procesy automatycznie tworzą wysoce dostępny klaster przetwarzania równoważący obciążenie, bez zależności od systemów zewnętrznych innych niż Kafka.

aby zilustrować architekturę aplikacji Kafka Streams, która zatrudnia sklepy państwowe, wyobraź sobie następujący scenariusz: jako operator kolejowy, za każdym razem, gdy klient rezerwuje wycieczkę na naszej stronie internetowej, nowa wiadomość składająca się z identyfikatora klienta i znacznika czasu jest wstawiana do tematu Kafki. Konkretnie, jeden lub więcej producentów Kafki wstawia te wiadomości do tematu z dziewięcioma partycjami. Ponieważ identyfikator klienta jest wybierany jako klucz dla każdej wiadomości, dane należące do danego klienta będą zawsze wstawiane do tej samej partycji tematu. Domyślnie producenci Kafki używają DefaultPartitioner do przypisywania Wiadomości do partycji.

teraz Załóżmy, że mamy aplikację Kafka Streams, która odczytuje wiadomości z tego tematu i kontynuuje je w sklepie państwowym. Ponieważ klucz każdej wiadomości składa się tylko z identyfikatora klienta, odpowiednia wartość w sklepie stanowym zawsze będzie znacznikiem czasu ostatniej rezerwacji klienta. Aby osiągnąć maksymalny stopień przetwarzania równoległego, możemy uruchomić do dziewięciu instancji aplikacji Kafka Streams. W takim przypadku każdej instancji aplikacji zostanie przypisana dokładnie jedna z partycji tematycznych. Dla każdej partycji wejściowej Kafka Streams tworzy oddzielny magazyn stanu, który z kolei przechowuje tylko dane klientów należących do tej partycji.

wynikowa Architektura aplikacji jest zilustrowana na poniższym diagramie.

The procesory strumieniowe Kafki odpowiedzialne za partycje od 4 do 9 zostały pominięte na tej ilustracji. Przerywane strzałki wskazują, że nowe wiadomości na partycji są również propagowane do dodatkowych procesorów strumieniowych i ich państwowych magazynów, co pozwala na szybkie przełączanie awaryjne, jeśli główny przypisany procesor zawiedzie.

w aplikacji można używać zarówno pamięci wewnętrznej, jak i trwałych zapisów stanu. Operacje na magazynach stanu w pamięci są jeszcze szybsze w porównaniu z wariantem persistent, który wewnętrznie używa magazynu RocksDB. Z drugiej strony, trwałe magazyny stanowe mogą być przywracane szybciej w przypadku awarii aplikacji Kafka Streams i konieczności ponownego uruchomienia. Co więcej, ilość danych na magazyn nie jest ograniczona ilością pamięci głównej w przypadku używania trwałych magazynów stanu.

w naszym scenariuszu nie jest konieczne posiadanie tematu changelog, który rejestruje operacje zapisu do magazynów państwowych: wszystkie dane niezbędne do odzyskania magazynu stanowego można uzyskać z oryginalnego tematu wejściowego.

dodanie punktu końcowego REST do procesorów strumieniowych

dzięki zaprezentowanej dotychczas architekturze mamy dziewięć sklepów stanowych, które można wykorzystać do pobrania ostatniej daty rezerwacji klientów należących do odpowiedniej partycji tematu wejściowego.

teraz, aby te informacje były dostępne z zewnątrz procesorów strumieni Kafka, musimy ujawnić punkt końcowy usługi na każdym z instancji aplikacji procesora strumieniowego i odpowiedzieć na przychodzące żądania z wewnętrznego magazynu stanu, który jest zarządzany przez strumienie Kafka.

jako dodatkowe Wymaganie nie możemy oczekiwać, że żądająca aplikacja będzie wiedziała, która instancja Kafka Streams jest obecnie odpowiedzialna za przetwarzanie danych danego klienta. W związku z tym każdy punkt końcowy usługi jest odpowiedzialny za przekierowanie zapytania do właściwej instancji aplikacji, Jeśli dane Klienta nie są lokalnie dostępne.

zdecydowaliśmy się zaimplementować punkt końcowy usługi jako REST API, który ma tę zaletę, że jest dostępny z dowolnego klienta obsługującego HTTP i pozwala bardzo łatwo dodać przezroczysty load balancer.

obiektKafkaStreams, który jest dostępny w każdej aplikacji Kafka Streams, zapewnia dostęp tylko do odczytu do wszystkich lokalnych sklepów państwowych i może również określić instancję aplikacji odpowiedzialną za dany identyfikator klienta. Przy użyciu tego obiektu do zbudowania naszej usługi REST Architektura wygląda następująco:

klient HTTP może wysyłać żądania wyszukiwania do dowolnego z pozostałych punktów końcowych procesorów strumieniowych. Przerywana strzałka wskazuje, w jaki sposób żądanie jest wewnętrznie przekazywane między procesorami strumieniowymi, jeśli nie można na nie odpowiedzieć z lokalnego sklepu stanowego.

Podsumowując, nasza proponowana architektura wykorzystuje tematy Kafki do niezawodnego przechowywania danych wiadomości w spoczynku i utrzymuje drugą reprezentację danych w magazynach państwowych w celu obsługi szybkich zapytań.

zapewnienie skalowalności aplikacji

aby uzyskać skalowalną aplikację, musimy upewnić się, że obciążenie przetwarzania jest równomiernie zrównoważone we wszystkich instancjach aplikacji Kafka Streams. Obciążenie pojedynczego procesora strumieniowego zależy od ilości danych i zapytań, które musi obsłużyć.

mówiąc dokładniej, ważne jest, aby wybrać schemat partycjonowania dla tematu Kafka tak, aby Ładowanie przychodzących wiadomości i zapytań było dobrze zrównoważone na wszystkich partycjach, a co za tym idzie również na wszystkich procesorach strumieniowych.

jeśli ma być używany zapis stanu w pamięci, liczba partycji w temacie musi być wystarczająco duża, aby każdy procesor strumieniowy był w stanie utrzymać objętość danych jednej partycji w pamięci głównej. Zauważ, że w przypadku awarii procesor strumieniowy może nawet potrzebować dwóch partycji w pamięci głównej.

aby uwzględnić awarie procesora strumieniowego, liczbę replik w trybie gotowości można skonfigurować za pomocą ustawienia num.standby.replicas w strumieniach Kafki, co zapewnia, że dodatkowe procesory strumieniowe również subskrybują wiadomości z danej partycji procesora. W przypadku awarii procesory te mogą szybko przejąć odpowiedzi na zapytania, zamiast zacząć odbudowywać sklep Państwowy dopiero po wystąpieniu awarii.

wreszcie, żądana liczba procesorów strumieniowych musi pasować do dostępnego sprzętu. Dla każdej instancji aplikacji procesora strumieniowego należy zarezerwować co najmniej jeden rdzeń PROCESORA. W systemach wielordzeniowych możliwe jest zwiększenie liczby wątków strumienia na wystąpienie aplikacji, co zmniejsza koszty związane z uruchomieniem osobnej aplikacji Java na rdzeń PROCESORA.