Articles

Argomenti Kafka interrogabili con Kafka Streams

Nelle architetture di elaborazione dati di oggi, Apache Kafka è spesso usato nella fase di ingresso. Di solito, questo passaggio viene utilizzato per arricchire e filtrare i dati dei messaggi in arrivo; tuttavia, non è possibile effettuare query interattive fino a una fase successiva nella pipeline di elaborazione dei dati. Questo perché, sebbene ogni messaggio in un argomento Kafka sia persistito per impostazione predefinita, non è ancora disponibile alcun meccanismo che consenta ricerche rapide di un messaggio specifico in un argomento.

Tuttavia, essere in grado di interrogare nuovi dati in questa fase iniziale della pipeline eviterebbe i ritardi delle pipeline di elaborazione tradizionali che di solito includono fasi di pre-elaborazione batch di lunga durata e darebbe agli utenti finali un accesso quasi immediato ai dati in entrata.

Per la creazione di applicazioni di elaborazione dati con Kafka, la libreria Kafka Streams, gestita come parte del progetto Kafka, viene comunemente utilizzata per definire trasformazioni e analisi dei dati. Una caratteristica importante di Kafka Streams sono gli archivi di stato, che offrono un’astrazione di un archivio chiave-valore locale veloce che può essere letto e scritto durante l’elaborazione dei messaggi con Kafka Streams. Questi archivi chiave-valore possono essere continuamente riempiti con nuovi messaggi da un argomento Kafka definendo un processore di flusso appropriato, in modo che sia ora possibile recuperare rapidamente i messaggi dall’argomento sottostante.

Basandosi su questa funzionalità di Kafka Streams, creiamo un’API REST unificata che fornisce un singolo endpoint di interrogazione per un determinato argomento Kafka.

In sintesi, la combinazione di processori Kafka Streams con archivi di stato e un server HTTP può effettivamente trasformare qualsiasi argomento Kafka in un archivio chiave-valore di sola lettura veloce.

Kafka Streams è costruito come una libreria che può essere incorporata in un’applicazione Java o Scala autonoma. Consente agli sviluppatori di definire stream processor che eseguono trasformazioni o aggregazioni di dati sui messaggi Kafka, assicurando che ogni messaggio di input venga elaborato esattamente una volta. Utilizzando il Kafka Streams DSL, che si ispira al Java Stream API, stream processor, e negozi di stato può essere flessibile concatenato.

Inoltre, quando si avviano più istanze di un’applicazione basata su Kafka Streams, i processi formano automaticamente un cluster di elaborazione altamente disponibile per il bilanciamento del carico senza dipendere da sistemi esterni diversi da Kafka.

Per illustrare l’architettura di un’applicazione Kafka Streams che utilizza archivi di stato, immagina il seguente scenario: Come operatore ferroviario, ogni volta che un cliente prenota un viaggio sul nostro sito Web, un nuovo messaggio composto dall’ID cliente e da un timestamp viene inserito in un argomento Kafka. In particolare, uno o più produttori di Kafka inseriscono questi messaggi in un argomento con nove partizioni. Poiché l’ID cliente viene scelto come chiave per ogni messaggio, i dati appartenenti a un determinato cliente verranno sempre inseriti nella stessa partizione dell’argomento. Implicitamente, i produttori di Kafka usano DefaultPartitioner per assegnare messaggi alle partizioni.

Ora, supponiamo di avere un’applicazione Kafka Streams che legge i messaggi da questo argomento e li persiste in un archivio di stato. Poiché la chiave di ogni messaggio consiste solo nell’ID cliente, il valore corrispondente nell’archivio di stato sarà sempre il timestamp dell’ultima prenotazione del cliente. Per ottenere il massimo grado di elaborazione parallela, possiamo avviare fino a nove istanze dell’applicazione Kafka Streams. In questo caso, ad ogni istanza dell’applicazione verrà assegnata esattamente una delle partizioni dell’argomento. Per ogni partizione di input, Kafka Streams crea un archivio di stato separato, che a sua volta contiene solo i dati dei clienti appartenenti a tale partizione.

L’architettura dell’applicazione risultante è illustrata nello schema seguente.

Kafka Stream processor, responsabile per le Partizioni da 4 a 9 sono lasciati fuori, in questa illustrazione. Le frecce tratteggiate indicano che i nuovi messaggi in una partizione vengono propagati anche a processori di flusso aggiuntivi e ai loro archivi di stato, consentendo un rapido failover se il processore assegnato principalmente dovesse fallire.

È possibile utilizzare archivi di stato in memoria o persistenti nell’applicazione. Le operazioni sugli archivi di stato in memoria sono ancora più veloci rispetto alla variante persistente, che utilizza internamente un archivio RocksDB. D’altra parte, gli archivi di stato persistenti possono essere ripristinati più velocemente nel caso in cui un’applicazione Kafka Streams abbia fallito e debba essere riavviata. Inoltre, il volume di dati per archivio non è limitato dalla quantità di memoria principale quando si utilizzano archivi di stato persistenti.

Nel nostro scenario, non è necessario avere un argomento changelog che registra le operazioni di scrittura negli archivi di stato: tutti i dati necessari per recuperare un archivio di stato possono essere ottenuti dall’argomento di input originale.

Aggiunta di un endpoint REST ai processori stream

Con l’architettura presentata finora, abbiamo nove negozi statali che possono essere utilizzati per recuperare l’ultima data di prenotazione dei clienti appartenenti alla rispettiva partizione dell’argomento di input.

Ora, per rendere queste informazioni accessibili dall’esterno dei processori Kafka Streams, è necessario esporre un endpoint di servizio su ciascuna delle istanze dell’applicazione stream processor e rispondere alle richieste in arrivo dall’archivio di stato interno gestito da Kafka Streams.

Come requisito aggiuntivo, non possiamo aspettarci che l’applicazione richiedente sappia quale istanza di Kafka Streams è attualmente responsabile dell’elaborazione dei dati di un determinato cliente. Di conseguenza, ogni endpoint del servizio è responsabile del reindirizzamento della query all’istanza dell’applicazione corretta se i dati del cliente non sono disponibili localmente.

Abbiamo scelto di implementare l’endpoint del servizio come API REST, che ha il vantaggio di essere accessibile da qualsiasi client che supporti HTTP e consente di aggiungere un bilanciamento del carico trasparente molto facilmente.

L’oggetto KafkaStreams, disponibile in ogni applicazione Kafka Streams, fornisce accesso in sola lettura a tutti gli archivi statali locali e può anche determinare l’istanza dell’applicazione responsabile di un determinato id cliente. Quando si utilizza questo oggetto per costruire il nostro servizio REST, l’architettura si presenta come segue:

HTTP client può inviare richieste di ricerca per del RESTO gli endpoint dei processori stream. La freccia tratteggiata indica come una richiesta viene inoltrata internamente tra i processori di flusso, se non è possibile rispondere da un archivio di stato locale.

Riassumendo, la nostra architettura proposta fa uso di argomenti Kafka per archiviare in modo affidabile i dati dei messaggi a riposo e mantiene una seconda rappresentazione dei dati negli archivi di stato per supportare query veloci.

Garantire la scalabilità dell’applicazione

Per ottenere un’applicazione scalabile, dobbiamo assicurarci che il carico di elaborazione sia ugualmente bilanciato su tutte le istanze dell’applicazione Kafka Streams. Il carico su un singolo processore di flusso dipende dalla quantità di dati e query che deve gestire.

Più specificamente, è importante scegliere uno schema di partizionamento per l’argomento Kafka in modo tale che il carico dei messaggi e delle query in arrivo sia ben bilanciato su tutte le partizioni e di conseguenza anche su tutti i processori di flusso.

Se devono essere utilizzati archivi di stato in memoria, il numero di partizioni nell’argomento deve essere abbastanza grande in modo che ogni stream processor sia in grado di mantenere il volume di dati di una partizione nella memoria principale. Si noti che in caso di failover, un processore di flusso potrebbe anche aver bisogno di contenere due partizioni nella memoria principale.

Per tenere conto degli errori del processore di flusso, è possibile configurare il numero di repliche in standby utilizzando l’impostazione num.standby.replicas in Kafka Streams, che garantisce che anche i processori di flusso aggiuntivi sottoscrivano i messaggi da una determinata partizione del processore. In caso di errore, tali processori possono assumere rapidamente la risposta alle query, invece di iniziare a ricostruire l’archivio di stato solo dopo che si è già verificato un errore.

Infine, il numero desiderato di stream processor deve adattarsi all’hardware disponibile. Per ogni istanza dell’applicazione stream processor, deve essere riservato almeno un core della CPU. Nei sistemi multicore, è possibile aumentare il numero di thread di flusso per istanza dell’applicazione, riducendo l’overhead derivante dall’avvio di un’applicazione Java separata per core della CPU.