Articles

Forespørgelige Kafka-emner med Kafka-Streams

i dagens databehandlingsarkitekturer bruges Apache Kafka ofte i ingress-fasen. Normalt bruges dette trin til at berige og filtrere de indgående meddelelsesdata; det er dog ikke muligt at foretage interaktive forespørgsler før et senere tidspunkt i databehandlingsrørledningen. Dette skyldes, at selvom hver meddelelse i et Kafka-emne vedvarer som standard, er der endnu ingen mekanisme tilgængelig, der giver mulighed for hurtige opslag af en bestemt meddelelse i et emne.

ikke desto mindre ville det at være i stand til at forespørge nye data på dette tidlige tidspunkt i rørledningen undgå forsinkelser i traditionelle behandlingsrørledninger, der normalt inkluderer langvarige batchforarbejdningstrin og ville give slutbrugerne næsten øjeblikkelig adgang til indgående data.

til opbygning af databehandlingsapplikationer med Kafka bruges Kafka Streams-biblioteket, der vedligeholdes som en del af Kafka-projektet, ofte til at definere datatransformationer og analyser. Et vigtigt træk ved Kafka Streams er statsbutikker, der tilbyder en abstraktion af en hurtig lokal Nøgleværdibutik, der kan læses og skrives til, når man behandler meddelelser med Kafka Streams. Disse Nøgleværdibutikker kan løbende udfyldes med nye meddelelser fra et Kafka-emne ved at definere en passende streamprocessor, så det nu er muligt hurtigt at hente meddelelser fra det underliggende emne.

Vi bygger oven på denne Kafka Streams-funktionalitet og opretter en samlet REST API, der giver et enkelt forespørgselsendepunkt for et givet Kafka-emne.

sammenfattende kan kombination af Kafka Streams-processorer med Statsforretninger og en HTTP-server effektivt gøre ethvert Kafka-emne til en hurtig skrivebeskyttet nøgleværdibutik.

Kafka Streams er bygget som et bibliotek, der kan integreres i en selvstændig Java eller Scala program. Det giver udviklere mulighed for at definere streamprocessorer, der udfører datatransformationer eller aggregeringer på Kafka-meddelelser, hvilket sikrer, at hver inputmeddelelse behandles nøjagtigt en gang. Ved hjælp af Kafka Streams DSL, som er inspireret af Java Stream API, kan stream-processorer og statsbutikker fleksibelt kædes.når du starter flere forekomster af en Kafka-Streams-baseret applikation, danner processerne automatisk en belastningsbalancering, meget tilgængelig behandlingsklynge uden at være afhængig af andre eksterne systemer end Kafka.

for at illustrere arkitekturen i en Kafka Streams-applikation, der anvender statsbutikker, kan du forestille dig følgende scenarie: som jernbaneoperatør, hver gang en kunde booker en tur på vores hjemmeside, indsættes en ny meddelelse bestående af kunde-id og et tidsstempel i et Kafka-emne. Specifikt indsætter en eller flere Kafka-producenter disse meddelelser i et emne med ni partitioner. Da kunde-id ‘ et vælges som nøgle for hver meddelelse, indsættes data, der tilhører en given kunde, altid i den samme partition af emnet. Implicit bruger Kafka-producenter DefaultPartitioner til at tildele meddelelser til partitioner.

Antag nu, at vi har et Kafka-Streams-program, der læser meddelelser fra dette emne og fortsætter dem i en statsbutik. Da nøglen til hver meddelelse kun består af kunde-id, vil den tilsvarende værdi i statsbutikken altid være tidsstemplet for kundens seneste reservation. For at opnå den maksimale grad af parallel behandling kan vi starte op til ni forekomster af Kafka Streams-applikationen. I dette tilfælde tildeles hver applikationsinstans nøjagtigt en af emnepartitionerne. For hver inputpartition opretter Kafka Streams en separat statsbutik, som igen kun indeholder dataene fra de kunder, der tilhører den partition.

den resulterende applikationsarkitektur er illustreret i nedenstående diagram.

div Kafka stream-processorer, der er ansvarlige for partitionerne 4 til 9, er udeladt i denne illustration. De stiplede pile indikerer, at nye meddelelser i en partition også udbredes til yderligere streamprocessorer og deres statsforretninger, hvilket giver mulighed for en hurtig fail-over, hvis den primært tildelte processor skulle mislykkes.

det er muligt at enten bruge In-memory eller vedvarende tilstandsbutikker i applikationen. Operationer på In-memory state-butikker er endnu hurtigere sammenlignet med den vedvarende variant, der internt bruger en RocksDB-butik. På den anden side kan vedvarende statsbutikker gendannes hurtigere, hvis en Kafka Streams-applikation mislykkedes og skal genstartes. Butik ikke begrænset af mængden af hovedhukommelse, når du bruger vedvarende statsbutikker.

i vores scenarie er det ikke nødvendigt at have et changelog-emne, der registrerer skriveoperationer til statsforretningerne: alle data, der er nødvendige for at gendanne en statsforretning, kan fås fra det originale inputemne.

tilføjelse af et REST-endepunkt til stream-processorer

med den arkitektur, der er præsenteret indtil videre, har vi ni statsbutikker, der kan bruges til at hente den seneste reservationsdato for de kunder, der tilhører den respektive inputemnepartition.

for at gøre disse oplysninger tilgængelige uden for Kafka Streams-processorer, er vi nødt til at udsætte et serviceendepunkt på hver af streamprocessorens applikationsinstanser og besvare indgående anmodninger fra den interne tilstandsbutik, der administreres af Kafka Streams.

som et yderligere krav kan vi ikke forvente, at den anmodende applikation ved, hvilken Kafka Streams-forekomst der i øjeblikket er ansvarlig for behandlingen af en given kundes data. Derfor er hvert serviceendepunkt ansvarligt for at omdirigere forespørgslen til den korrekte applikationsinstans, hvis kundedataene ikke er lokalt tilgængelige.

Vi valgte at implementere service endpoint som en REST API, som har fordelen ved at være tilgængelig fra enhver klient, der understøtter HTTP, og gør det muligt at tilføje en gennemsigtig belastningsbalancer meget let.

KafkaStreams objektet, som er tilgængeligt i alle Kafka Streams-applikationer, giver skrivebeskyttet adgang til alle lokale statsforretninger og kan også bestemme den applikationsinstans, der er ansvarlig for et givet kunde-id. Når du bruger dette objekt til at opbygge vores HVILETJENESTE, ser arkitekturen ud som følger:

en HTTP-klient kan sende opslag anmodninger til nogen af resten endepunkter i stream processorer. Den stiplede pil angiver, hvordan en anmodning internt videresendes blandt streamprocessorerne, hvis den ikke kan besvares fra en lokal statsbutik.

Sammenfattende bruger vores foreslåede arkitektur Kafka-emner til pålideligt at gemme meddelelsesdata i hvile og opretholder en anden repræsentation af dataene i statsforretninger for at understøtte hurtige forespørgsler.

sikring af applikationens skalerbarhed

for at opnå en skalerbar applikation skal vi sikre, at behandlingsbelastningen er lige afbalanceret over alle forekomster af Kafka Streams-applikationen. Belastningen på en individuel streamprocessor afhænger af mængden af data og forespørgsler, den skal håndtere.

mere specifikt er det vigtigt at vælge et partitioneringsskema for Kafka-emnet, så belastningen af indgående meddelelser og forespørgsler er velafbalanceret på tværs af alle partitioner og følgelig også alle streamprocessorer.

hvis der skal bruges butikker i hukommelsestilstand, skal antallet af partitioner i emnet være stort nok, så hver streamprocessor er i stand til at holde datavolumen for en partition i hovedhukommelsen. Bemærk, at i tilfælde af fejl overs, kan en stream processor endda nødt til at holde to partitioner i hovedhukommelsen.

for at tage højde for strømprocessorfejl kan antallet af standby-replikaer konfigureres ved hjælp afnum.standby.replicas indstilling i Kafka Streams, hvilket sikrer, at yderligere streamprocessorer også abonnerer på meddelelser fra en given processorpartition. I tilfælde af en fejl kan disse processorer hurtigt overtage besvarelsen af forespørgsler i stedet for først at begynde at rekonstruere statsbutikken, efter at der allerede er sket en fejl.

endelig skal det ønskede antal streamprocessorer passe til det tilgængelige udstyr. For hver streamprocessorapplikationsinstans skal mindst en CPU-kerne reserveres. På multicore-systemer er det muligt at øge antallet af stream-tråde pr.applikationsforekomst, hvilket mindsker overhead, der opstår ved at starte en separat Java-applikation pr.