Articles

subiecte Kafka interogabile cu fluxuri Kafka

în arhitecturile de procesare a datelor de astăzi, Apache Kafka este adesea folosit în etapa de intrare. De obicei, acest pas este utilizat pentru a îmbogăți și filtra datele mesajelor primite; cu toate acestea, nu este posibil să se facă interogări interactive până la o etapă ulterioară în conducta de procesare a datelor. Acest lucru se datorează faptului că, deși fiecare mesaj dintr-un subiect Kafka este persistat în mod implicit, încă nu este disponibil niciun mecanism care să permită căutări rapide ale unui anumit mesaj dintr-un subiect.

cu toate acestea, posibilitatea de a interoga date noi în acest stadiu incipient al conductei ar evita întârzierile conductelor tradiționale de procesare care includ, de obicei, etape de preprocesare pe loturi de lungă durată și ar oferi utilizatorilor finali acces aproape instantaneu la datele primite.

pentru construirea aplicațiilor de procesare a datelor cu Kafka, Biblioteca Kafka Streams, care este menținută ca parte a proiectului Kafka, este utilizată în mod obișnuit pentru a defini transformările și analizele datelor. O caracteristică importantă a fluxurilor Kafka sunt magazinele de stat, oferind o abstractizare a unui magazin local rapid de valori cheie care poate fi citit și scris la procesarea mesajelor cu fluxuri Kafka. Aceste magazine cheie-valoare pot fi umplute continuu cu mesaje noi de la un subiect Kafka prin definirea unui procesor de flux adecvat, astfel încât acum este posibil pentru a prelua rapid mesaje de la subiectul de bază.

bazându-ne pe această funcționalitate Kafka Streams, creăm un API REST unificat care oferă un singur punct final de interogare pentru un anumit subiect Kafka.în rezumat, combinarea procesoarelor de fluxuri Kafka cu magazinele de stat și un server HTTP poate transforma în mod eficient orice subiect Kafka într-un magazin rapid de valori cheie numai în citire.

Kafka Streams este construit ca o bibliotecă care poate fi încorporată într-o aplicație Java sau Scala autonomă. Permite dezvoltatorilor să definească procesoare de flux care efectuează transformări sau agregări de date pe mesajele Kafka, asigurându-se că fiecare mesaj de intrare este procesat exact o singură dată. Folosind Kafka Streams DSL, care este inspirat de API-ul Java Stream, procesoarele de flux și magazinele de stat pot fi înlănțuite flexibil.

Mai mult, la pornirea mai multor instanțe ale unei aplicații bazate pe fluxuri Kafka, procesele formează automat un cluster de procesare de echilibrare a sarcinii, foarte disponibil, fără a depinde de alte sisteme externe decât Kafka.

pentru a ilustra arhitectura unei aplicații Kafka Streams care utilizează magazine de stat, imaginați-vă următorul scenariu: ca operator feroviar, de fiecare dată când un client rezervă o călătorie pe site-ul nostru, un nou mesaj format din id-ul clientului și o marcă de timp este introdus într-un subiect Kafka. Mai exact, unul sau mai mulți producători Kafka introduc aceste mesaje într-un subiect cu nouă partiții. Deoarece ID-ul clientului este ales ca cheie pentru fiecare mesaj, datele aparținând unui anumit client vor fi întotdeauna inserate în aceeași partiție a subiectului. Implicit, producătorii Kafka folosesc DefaultPartitioner pentru a atribui mesaje partițiilor.

acum, să presupunem că avem o aplicație Kafka Streams care citește mesaje din acest subiect și le persistă într-un magazin de stat. Deoarece cheia fiecărui mesaj constă doar din id-ul clientului, valoarea corespunzătoare din magazinul de stat va fi întotdeauna marca de timp a celei mai recente rezervări a clientului. Pentru a atinge gradul maxim de procesare paralelă, putem porni până la nouă instanțe ale aplicației Kafka Streams. În acest caz, fiecărei instanțe de aplicație i se va atribui exact una dintre partițiile de subiect. Pentru fiecare partiție de intrare, Kafka Streams creează un magazin de stat separat, care la rândul său deține doar datele clienților aparținând acelei partiții.

arhitectura de aplicație rezultată este ilustrată în diagrama de mai jos.

procesoarele de flux Kafka responsabile pentru partițiile 4 până la 9 sunt lăsate în afara acestei ilustrații. Săgețile punctate indică faptul că mesajele noi dintr-o partiție sunt, de asemenea, propagate către procesoare de flux suplimentare și magazinele lor de stat, permițând o defecțiune rapidă dacă procesorul atribuit în primul rând ar trebui să eșueze.

este posibil să se utilizeze fie în memorie sau magazine de stat persistente în cerere. Operațiunile din magazinele de Stat din memorie sunt chiar mai rapide în comparație cu varianta persistentă, care utilizează intern un magazin RocksDB. Pe de altă parte, magazinele de stat persistente pot fi restaurate mai repede în cazul în care o aplicație Kafka Streams a eșuat și trebuie să repornească. Mai mult, volumul de date pe magazin nu este limitat de cantitatea de memorie principală atunci când se utilizează magazine de stat persistente.

în scenariul nostru, nu este necesar să avem un subiect changelog care să înregistreze operațiunile de scriere în magazinele de stat: toate datele necesare pentru recuperarea unui magazin de stat pot fi obținute din subiectul de intrare original.

adăugarea unui punct final REST la procesoarele de flux

cu arhitectura prezentată până acum, avem nouă magazine de stat care pot fi utilizate pentru a prelua cea mai recentă dată de rezervare a clienților aparținând partiției respective a subiectului de intrare.

acum, pentru a face aceste informații accesibile din afara procesoarelor de fluxuri Kafka, trebuie să expunem un punct final de serviciu pe fiecare instanță a aplicației procesorului de flux și să răspundem solicitărilor primite din magazinul de stat intern care este gestionat de fluxurile Kafka.

ca o cerință suplimentară, nu ne putem aștepta ca aplicația solicitantă să știe care instanță Kafka Streams este în prezent responsabilă pentru prelucrarea datelor unui anumit client. În consecință, fiecare punct final al serviciului este responsabil să redirecționeze interogarea către instanța corectă a aplicației dacă datele clientului nu sunt disponibile local.

am ales să implementăm punctul final al Serviciului ca API REST, care are avantajul de a fi accesibil de la orice client care acceptă HTTP și permite adăugarea foarte ușoară a unui echilibrator de încărcare transparent.

obiectulKafkaStreams, care este disponibil în fiecare aplicație Kafka Streams, oferă acces numai în citire la toate magazinele locale de stat și poate determina, de asemenea, instanța aplicației responsabilă pentru un anumit ID de client. Când utilizați acest obiect pentru a construi serviciul nostru REST, arhitectura arată după cum urmează:

un client HTTP poate trimite cereri de căutare la oricare dintre punctele finale rămase ale procesoarelor de flux. Săgeata punctată indică modul în care o solicitare este redirecționată intern între procesoarele de flux, dacă nu se poate răspunde dintr-un magazin local de stat.

rezumând, arhitectura noastră propusă folosește subiectele Kafka pentru a stoca în mod fiabil datele mesajelor în repaus și menține oa doua reprezentare a datelor în magazinele de stat pentru a sprijini interogările rapide.

asigurarea scalabilității aplicației

pentru a obține o aplicație scalabilă, trebuie să ne asigurăm că sarcina de procesare este echilibrată în mod egal în toate instanțele aplicației Kafka Streams. Încărcarea pe un procesor de flux individual depinde de cantitatea de date și de interogările pe care trebuie să le gestioneze.

mai precis, este important să alegeți o schemă de partiționare pentru subiectul Kafka, astfel încât încărcarea mesajelor și interogărilor primite să fie bine echilibrată pe toate partițiile și, în consecință, și pe toate procesoarele de flux.

dacă ar trebui utilizate magazine de stare în memorie, numărul de partiții din subiect trebuie să fie suficient de mare pentru ca fiecare procesor de flux să poată păstra volumul de date al unei partiții în memoria principală. Rețineți că, în caz de eșec, un procesor de flux ar putea avea nevoie chiar să dețină două partiții în memoria principală.

pentru a ține cont de eșecurile procesorului de flux, numărul de replici de așteptare poate fi configurat folosind setarea num.standby.replicas în fluxurile Kafka, ceea ce asigură că procesoarele de flux suplimentare se abonează și la mesajele dintr-o anumită partiție de procesor. În caz de eșec, acele procesoare pot prelua rapid răspunsul la întrebări, în loc să înceapă să reconstruiască magazinul de stat numai după ce a avut loc deja un eșec.

în cele din urmă, numărul dorit de procesoare de flux trebuie să se potrivească hardware-ului disponibil. Pentru fiecare instanță de aplicație a procesorului de flux, ar trebui rezervat cel puțin un nucleu CPU. Pe sistemele multicore, este posibil să se mărească numărul de fire de flux pe instanță de aplicație, ceea ce atenuează cheltuielile suportate prin pornirea unei aplicații Java separate pe nucleul procesorului.