Creato per il tempo reale: messaggistica di big data con Apache Kafka, parte 2

Nella prima metà di questa introduzione di JavaWorld ad Apache Kafka, hai sviluppato un paio di applicazioni producer / consumer su piccola scala utilizzando Kafka. Da questi esercizi dovresti acquisire familiarità con le basi del sistema di messaggistica Apache Kafka. In questa seconda metà imparerai come utilizzare le partizioni per distribuire il carico e ridimensionare la tua applicazione orizzontalmente, gestendo fino a milioni di messaggi al giorno. Imparerai anche come Kafka utilizza gli offset dei messaggi per tracciare e gestire elaborazioni complesse dei messaggi e come proteggere il tuo sistema di messaggistica Apache Kafka da guasti in caso di interruzione del servizio. Svilupperemo l'applicazione di esempio dalla parte 1 sia per i casi d'uso di pubblicazione-sottoscrizione che punto-punto.

Partizioni in Apache Kafka

Gli argomenti in Kafka possono essere suddivisi in partizioni. Ad esempio, durante la creazione di un argomento denominato Demo, è possibile configurarlo per avere tre partizioni. Il server creerebbe tre file di registro, uno per ciascuna delle partizioni demo. Quando un produttore pubblicava un messaggio sull'argomento, assegnava un ID partizione per quel messaggio. Il server aggiungerebbe quindi il messaggio al file di registro solo per quella partizione.

Se quindi sono stati avviati due consumatori, il server potrebbe assegnare le partizioni 1 e 2 al primo consumatore e la partizione 3 al secondo consumatore. Ogni consumatore legge solo dalle partizioni assegnate. È possibile vedere l'argomento Demo configurato per tre partizioni nella Figura 1.

Per espandere lo scenario, immagina un cluster Kafka con due broker, alloggiati in due macchine. Quando hai partizionato l'argomento demo, dovresti configurarlo per avere due partizioni e due repliche. Per questo tipo di configurazione, il server Kafka assegnerebbe le due partizioni ai due broker nel tuo cluster. Ogni broker sarebbe il leader per una delle partizioni.

Quando un produttore pubblicava un messaggio, veniva inviato al leader della partizione. Il leader prende il messaggio e lo aggiunge al file di registro sulla macchina locale. Il secondo broker replicherebbe passivamente il log di commit sulla propria macchina. Se il leader della partizione si interrompe, il secondo broker diventa il nuovo leader e inizia a servire le richieste dei client. Allo stesso modo, quando un consumatore inviava una richiesta a una partizione, quella richiesta andava prima al leader della partizione, che restituiva i messaggi richiesti.

Vantaggi del partizionamento

Considera i vantaggi del partizionamento di un sistema di messaggistica basato su Kafka:

  1. Scalabilità : in un sistema con una sola partizione, i messaggi pubblicati in un argomento vengono archiviati in un file di registro, che esiste su una singola macchina. Il numero di messaggi per un argomento deve rientrare in un singolo file di registro del commit e la dimensione dei messaggi archiviati non può mai essere superiore allo spazio su disco di quella macchina. Il partizionamento di un argomento consente di ridimensionare il sistema archiviando i messaggi su macchine diverse in un cluster. Se si desidera archiviare 30 gigabyte (GB) di messaggi per l'argomento Demo, ad esempio, è possibile creare un cluster Kafka di tre macchine, ciascuna con 10 GB di spazio su disco. Quindi configurerai l'argomento in modo che abbia tre partizioni.
  2. Bilanciamento del carico del server : avere più partizioni consente di diffondere le richieste di messaggi tra i broker. Ad esempio, se avessi un argomento che elabora 1 milione di messaggi al secondo, potresti dividerlo in 100 partizioni e aggiungere 100 broker al tuo cluster. Ogni broker sarebbe il leader per una singola partizione, responsabile della risposta a sole 10.000 richieste client al secondo.
  3. Bilanciamento del carico del consumatore : simile al bilanciamento del carico del server, l'hosting di più consumatori su macchine diverse consente di distribuire il carico del consumatore. Supponiamo che tu voglia consumare 1 milione di messaggi al secondo da un argomento con 100 partizioni. Potresti creare 100 consumatori e gestirli in parallelo. Il server Kafka assegnerebbe una partizione a ciascuno dei consumatori e ogni consumatore elaborerebbe 10.000 messaggi in parallelo. Poiché Kafka assegna ogni partizione a un solo consumatore, all'interno della partizione ogni messaggio verrà consumato in ordine.

Due modi per partizionare

Il produttore è responsabile della decisione su quale partizione andrà un messaggio. Il produttore ha due opzioni per controllare questa assegnazione:

  • Partizionatore personalizzato : è possibile creare una classe che implementa l' org.apache.kafka.clients.producer.Partitionerinterfaccia. Questa abitudine Partitionerimplementerà la logica aziendale per decidere dove vengono inviati i messaggi.
  • DefaultPartitioner : se non crei una classe di partitioner personalizzata, per impostazione predefinita org.apache.kafka.clients.producer.internals.DefaultPartitionerverrà utilizzata la classe. Il partitioner predefinito è abbastanza buono per la maggior parte dei casi, fornendo tre opzioni:
    1. Manuale : quando si crea un ProducerRecord, utilizzare il costruttore sovraccarico new ProducerRecord(topicName, partitionId,messageKey,message)per specificare un ID partizione.
    2. Hashing (sensibile alla località) : quando crei un ProducerRecord, specifica un messageKey, chiamando new ProducerRecord(topicName,messageKey,message). DefaultPartitionerutilizzerà l'hash della chiave per garantire che tutti i messaggi per la stessa chiave vadano allo stesso produttore. Questo è l'approccio più semplice e più comune.
    3. Spraying (bilanciamento del carico casuale) : se non si desidera controllare a quali messaggi di partizione vanno inviati, è sufficiente chiamare new ProducerRecord(topicName, message)per creare il proprio file ProducerRecord. In questo caso il partizionatore invierà messaggi a tutte le partizioni in modalità round robin, garantendo un carico equilibrato del server.

Partizionamento di un'applicazione Apache Kafka

Per il semplice esempio produttore / consumatore nella Parte 1, abbiamo utilizzato un file DefaultPartitioner. Ora proveremo invece a creare un partizionatore personalizzato. Per questo esempio, supponiamo di avere un sito di vendita al dettaglio che i consumatori possono utilizzare per ordinare prodotti in qualsiasi parte del mondo. In base all'utilizzo, sappiamo che la maggior parte dei consumatori si trova negli Stati Uniti o in India. Vogliamo suddividere la nostra applicazione per inviare ordini dagli Stati Uniti o dall'India ai rispettivi consumatori, mentre gli ordini da qualsiasi altro luogo andranno a un terzo consumatore.

Per iniziare, creeremo un CountryPartitionerche implementa l' org.apache.kafka.clients.producer.Partitionerinterfaccia. Dobbiamo implementare i seguenti metodi:

  1. Kafka chiamerà configure () quando inizializzeremo la Partitionerclasse, con una Mapdelle proprietà di configurazione. Questo metodo inizializza le funzioni specifiche della logica aziendale dell'applicazione, come la connessione a un database. In questo caso vogliamo un partizionatore abbastanza generico che prenda countryNamecome proprietà. Possiamo quindi utilizzare configProperties.put("partitions.0","USA")per mappare il flusso di messaggi alle partizioni. In futuro possiamo utilizzare questo formato per modificare i paesi che ottengono la propria partizione.
  2. L' ProducerAPI chiama partition () una volta per ogni messaggio. In questo caso lo useremo per leggere il messaggio e analizzare il nome del paese dal messaggio. Se il nome del paese è nel countryToPartitionMap, tornerà partitionIdmemorizzato nel Map. In caso contrario, eseguirà l'hash del valore del paese e lo utilizzerà per calcolare a quale partizione dovrebbe andare.
  3. Chiamiamo close () per chiudere il partitioner. L'utilizzo di questo metodo garantisce che tutte le risorse acquisite durante l'inizializzazione vengano eliminate durante l'arresto.

Nota che quando Kafka chiama configure(), il producer di Kafka passerà alla Partitionerclasse tutte le proprietà che abbiamo configurato per il producer . È essenziale leggere solo le proprietà che iniziano con partitions., analizzarle per ottenere partitionIde archiviare l'ID in countryToPartitionMap.

Di seguito è riportata la nostra implementazione personalizzata Partitionerdell'interfaccia.

Listato 1. CountryPartitioner

 public class CountryPartitioner implements Partitioner { private static Map countryToPartitionMap; public void configure(Map configs) { System.out.println("Inside CountryPartitioner.configure " + configs); countryToPartitionMap = new HashMap(); for(Map.Entry entry: configs.entrySet()){ if(entry.getKey().startsWith("partitions.")){ String keyName = entry.getKey(); String value = (String)entry.getValue(); System.out.println( keyName.substring(11)); int paritionId = Integer.parseInt(keyName.substring(11)); countryToPartitionMap.put(value,paritionId); } } } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.availablePartitionsForTopic(topic); String valueStr = (String)value; String countryName = ((String) value).split(":")[0]; if(countryToPartitionMap.containsKey(countryName)){ //If the country is mapped to particular partition return it return countryToPartitionMap.get(countryName); }else { //If no country is mapped to particular partition distribute between remaining partitions int noOfPartitions = cluster.topics().size(); return value.hashCode()%noOfPartitions + countryToPartitionMap.size() ; } } public void close() {} } 

La Producerclasse nel Listato 2 (sotto) è molto simile al nostro semplice producer della Parte 1, con due modifiche contrassegnate in grassetto:

  1. Impostiamo una proprietà di configurazione con una chiave uguale al valore di ProducerConfig.PARTITIONER_CLASS_CONFIG, che corrisponde al nome completo della nostra CountryPartitionerclasse. Impostiamo anche countryNamesu partitionId, mappando così le proprietà a cui vogliamo passare CountryPartitioner.
  2. Passiamo un'istanza di una classe che implementa l' org.apache.kafka.clients.producer.Callbackinterfaccia come secondo argomento del producer.send()metodo. Il client Kafka chiamerà il suo onCompletion()metodo una volta che un messaggio è stato pubblicato con successo, allegando un RecordMetadataoggetto. Potremo utilizzare questo oggetto per scoprire a quale partizione è stato inviato un messaggio, nonché l'offset assegnato al messaggio pubblicato.

Listato 2. Un produttore partizionato

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");  configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName()); configProperties.put("partition.1","USA"); configProperties.put("partition.2","India");  org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, null, line); producer.send(rec, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset()); ; } }); line = in.nextLine(); } in.close(); producer.close(); } } 

Assegnazione di partizioni ai consumatori

Il server Kafka garantisce che una partizione sia assegnata a un solo consumatore, garantendo così l'ordine di consumo dei messaggi. È possibile assegnare manualmente una partizione o assegnarla automaticamente.

Se la logica aziendale richiede un maggiore controllo, sarà necessario assegnare manualmente le partizioni. In questo caso, dovresti KafkaConsumer.assign()passare un elenco di partizioni a cui ciascun consumatore era interessato al server Kakfa.

L'assegnazione automatica delle partizioni è la scelta predefinita e più comune. In questo caso, il server Kafka assegnerà una partizione a ciascun consumatore e riassegnerà le partizioni per scalare per i nuovi consumatori.

Supponi di creare un nuovo argomento con tre partizioni. Quando avvii il primo consumatore per il nuovo argomento, Kafka assegnerà tutte e tre le partizioni allo stesso consumatore. Se poi avvii un secondo consumatore, Kafka riassegnerà tutte le partizioni, assegnando una partizione al primo consumatore e le restanti due partizioni al secondo consumatore. Se aggiungi un terzo consumatore, Kafka riassegnerà nuovamente le partizioni, in modo che a ogni consumatore venga assegnata una singola partizione. Infine, se avvii il quarto e il quinto consumatore, a tre dei consumatori verrà assegnata una partizione, ma gli altri non riceveranno alcun messaggio. Se una delle tre partizioni iniziali si interrompe, Kafka utilizzerà la stessa logica di partizionamento per riassegnare la partizione del consumatore a uno dei consumatori aggiuntivi.