Articles

Queryable Kafka Topics met Kafka Streams

in de huidige dataverwerkingsarchitecturen wordt Apache Kafka vaak gebruikt in de inkomende fase. Meestal wordt deze stap gebruikt om de inkomende berichtgegevens te verrijken en te filteren; het is echter niet mogelijk om interactieve query ‘ s te maken tot een later stadium in de dataverwerkingspijplijn. Dit komt omdat, hoewel elk bericht in een Kafka-onderwerp standaard wordt voortgezet, er nog geen mechanisme beschikbaar is dat snelle lookups van een specifiek bericht in een onderwerp mogelijk maakt.

niettemin zou de mogelijkheid om in dit vroege stadium in de pijplijn nieuwe gegevens te bevragen de vertragingen van de traditionele verwerkingspijpleidingen, die gewoonlijk langdurige voorbewerkingsstappen voor batchverwerking omvatten, voorkomen en de eindgebruikers bijna onmiddellijke toegang geven tot inkomende gegevens.

voor het bouwen van dataverwerkingstoepassingen met Kafka wordt de Kafka Streams-bibliotheek, die wordt onderhouden als onderdeel van het Kafka-project, vaak gebruikt om datatransformaties en analyses te definiëren. Een belangrijk kenmerk van Kafka Streams zijn state stores, het aanbieden van een abstractie van een snelle lokale sleutel-waarde winkel die kan worden gelezen en geschreven om bij het verwerken van berichten met Kafka Streams. Deze Key-Value stores kunnen continu worden gevuld met nieuwe berichten uit een Kafka topic door het definiëren van een geschikte stream processor, zodat het nu mogelijk is om snel berichten uit het onderliggende onderwerp op te halen.

voortbouwend op deze Kafka Streams-functionaliteit, maken we een unified REST API die een enkel querying-eindpunt biedt voor een bepaald Kafka-onderwerp.

samengevat kan het combineren van Kafka Streams processors met Statusopslag en een HTTP-server elk Kafka-onderwerp effectief veranderen in een snel alleen-lezen sleutelwaardeopslag.

Kafka Streams is gebouwd als een bibliotheek die kan worden ingebed in een zelfstandige Java-of Scala-toepassing. Het stelt ontwikkelaars in staat om stream processors die gegevens transformaties of aggregaties uit te voeren op Kafka berichten te definiëren, ervoor te zorgen dat elk Invoerbericht precies één keer wordt verwerkt. Met behulp van de Kafka Streams DSL, die is geà nspireerd door de Java Stream API, stream processors, en state stores kunnen flexibel worden geketend.

bovendien, wanneer meerdere instanties van een Kafka Streams-gebaseerde applicatie worden gestart, vormen de processen automatisch een load-balancing, zeer beschikbare verwerkingscluster zonder afhankelijk te zijn van andere externe systemen dan Kafka.

om de architectuur van een Kafka Streams-applicatie te illustreren die gebruik maakt van state stores, stelt u zich het volgende scenario voor: als spoorwegexploitant wordt elke keer dat een klant een reis boekt op onze website een nieuw bericht ingevoegd dat bestaat uit de klant-id en een tijdstempel in een Kafka-onderwerp. Specifiek, een of meer Kafka producenten voegen deze berichten in een onderwerp met negen partities. Omdat de klant-id wordt gekozen als de sleutel voor elk bericht, gegevens die behoren tot een bepaalde klant zal altijd worden ingevoegd in dezelfde partitie van het onderwerp. Impliciet gebruiken Kafka-producenten de DefaultPartitioner om berichten toe te wijzen aan partities.

neem nu aan dat we een Kafka Streams applicatie hebben die berichten van dit onderwerp leest en ze in een Statusopslag voorthoudt. Omdat de sleutel van elk bericht alleen bestaat uit de klant-id, zal de overeenkomstige waarde in de state store altijd de tijdstempel van de laatste boeking van de klant zijn. Om de maximale mate van parallelle verwerking te bereiken, kunnen we tot negen instanties van de Kafka Streams applicatie opstarten. In dit geval zal elke toepassing instantie precies een van de topic partities toegewezen krijgen. Voor elke invoerpartitie maakt Kafka Streams een aparte Statusopslag, die op zijn beurt alleen de gegevens bevat van de klanten die tot die partitie behoren.

de resulterende toepassingsarchitectuur wordt in het onderstaande diagram geïllustreerd.

de Kafka Stream Processors verantwoordelijk voor de partities 4 tot 9 zijn weggelaten in deze afbeelding. De gestippelde pijlen geven aan dat nieuwe berichten in een partitie ook worden doorgegeven aan extra stream processors en hun state stores, waardoor een snelle fail-over als de primair toegewezen processor moet mislukken.

Het is mogelijk om in-memory of permanente Statusopslag in de toepassing te gebruiken. Operaties op in-memory state stores zijn nog sneller in vergelijking met de persistent variant, die intern gebruik maakt van een RocksDB-winkel. Aan de andere kant, persistent state stores kunnen sneller worden hersteld in het geval een Kafka Streams applicatie is mislukt en moet opnieuw worden opgestart. Bovendien wordt het datavolume per winkel niet beperkt door de hoeveelheid hoofdgeheugen bij het gebruik van persistent state stores.

in ons scenario is het niet nodig om een changelog-onderwerp te hebben dat schrijfbewerkingen naar de Statusopslag registreert: alle gegevens die nodig zijn om een Statusopslag te herstellen, kunnen worden verkregen uit het oorspronkelijke invoeronderwerp.

het toevoegen van een REST-eindpunt aan stream processors

met de tot nu toe gepresenteerde architectuur, hebben we negen state stores die kunnen worden gebruikt om de laatste boekingsdatum op te halen van de klanten die behoren tot de respectievelijke input topic partitie.

om deze informatie toegankelijk te maken van buiten de Kafka Streams processors, moeten we een service-eindpunt tonen op elk van de stream processor applicaties en inkomende aanvragen beantwoorden van het interne statusarchief dat wordt beheerd door Kafka Streams.

als extra vereiste kunnen we niet verwachten dat de aanvragende applicatie weet welke Kafka Streams instantie momenteel verantwoordelijk is voor het verwerken van de gegevens van een bepaalde klant. Daarom is elk servicepunt verantwoordelijk voor het omleiden van de query naar het juiste toepassingsvoorbeeld als de klantgegevens niet lokaal beschikbaar zijn.

we hebben ervoor gekozen om de service endpoint te implementeren als een REST API, die het voordeel heeft dat het toegankelijk is vanaf elke client die HTTP ondersteunt, en het mogelijk maakt om heel gemakkelijk een transparante load balancer toe te voegen.

hetKafkaStreams object, dat beschikbaar is in elke Kafka Streams toepassing, biedt alleen-lezen toegang tot alle lokale Statusopslag en kan ook de toepassing instantie bepalen die verantwoordelijk is voor een bepaalde klant-id. Wanneer u dit object te bouwen van onze REST-service, de architectuur ziet er als volgt uit:

Een HTTP-client kunt sturen opzoeken verzoeken voor elk van de REST eindpunten van de stream processors. De gestippelde pijl geeft aan hoe een verzoek intern wordt doorgestuurd tussen de stream processors, als het niet kan worden beantwoord vanuit een lokale Statusopslag.

samenvattend maakt onze voorgestelde architectuur gebruik van Kafka-onderwerpen om berichtgegevens betrouwbaar op te slaan in rust en onderhoudt een tweede weergave van de gegevens in Statusopslag om snelle query ‘ s te ondersteunen.

zorgen voor schaalbaarheid van de toepassing

om een schaalbare toepassing te verkrijgen, moeten we ervoor zorgen dat de verwerkingsbelasting gelijk is verdeeld over alle instanties van de toepassing Kafka Streams. De belasting van een individuele stream processor hangt af van de hoeveelheid gegevens en queries die het moet verwerken.

meer specifiek is het belangrijk om een partitioneringsschema te kiezen voor het Kafka-onderwerp, zodat het laden van inkomende berichten en queries goed in balans is over alle partities en dus ook alle streamprocessors.

als in-memory state stores gebruikt moeten worden, moet het aantal partities in het onderwerp groot genoeg zijn zodat elke stream processor in staat is om het datavolume van een partitie in het hoofdgeheugen te behouden. Merk op dat in het geval van fail-overs, een stream processor kan zelfs nodig hebben om twee partities in het hoofdgeheugen.

om rekening te houden met storingen in de streamprocessor, kan het aantal standby-replica ‘ s worden geconfigureerd met behulp van de instelling num.standby.replicas In Kafka-Streams, die ervoor zorgt dat extra streamprocessors zich ook abonneren op berichten van een bepaalde processorpartitie. In het geval van een storing kunnen deze processors snel het beantwoorden van query ‘ s overnemen, in plaats van het statusarchief pas te reconstrueren nadat een storing zich al heeft voorgedaan.

ten slotte moet het gewenste aantal stream processors passen bij de beschikbare hardware. Voor elke stream processor toepassing instantie, ten minste één CPU core moet worden gereserveerd. Op multicore-systemen is het mogelijk om het aantal stream threads per toepassing te verhogen, wat de overhead vermindert door een aparte Java-toepassing per CPU-kern te starten.