Creato per il tempo reale: messaggistica di big data con Apache Kafka, parte 1

Quando è iniziato il movimento dei big data, si è concentrato principalmente sull'elaborazione in batch. Archiviazione dei dati distribuita e strumenti di query come MapReduce, Hive e Pig sono stati tutti progettati per elaborare i dati in batch anziché in modo continuo. Le aziende eseguivano più lavori ogni notte per estrarre i dati da un database, quindi analizzare, trasformare e infine archiviare i dati. Più di recente le aziende hanno scoperto il potere di analizzare ed elaborare dati ed eventi nel momento in cui si verificano , non solo una volta ogni poche ore. Tuttavia, la maggior parte dei sistemi di messaggistica tradizionali non si adatta per gestire i big data in tempo reale. Quindi gli ingegneri di LinkedIn hanno creato Apache Kafka open source: un framework di messaggistica distribuito che soddisfa le esigenze dei big data scalando su hardware di base.

Negli ultimi anni, Apache Kafka è emerso per risolvere una varietà di casi d'uso. Nel caso più semplice, potrebbe essere un semplice buffer per l'archiviazione dei log dell'applicazione. In combinazione con una tecnologia come Spark Streaming, può essere utilizzato per tenere traccia delle modifiche ai dati e agire su tali dati prima di salvarli in una destinazione finale. La modalità predittiva di Kafka lo rende un potente strumento per rilevare le frodi, come controllare la validità di una transazione con carta di credito quando si verifica e non attendere l'elaborazione in batch ore dopo.

Questo tutorial in due parti introduce Kafka, iniziando con come installarlo ed eseguirlo nel tuo ambiente di sviluppo. Avrai una panoramica dell'architettura di Kafka, seguita da un'introduzione allo sviluppo di un sistema di messaggistica Apache Kafka pronto all'uso. Infine, creerai un'applicazione produttore / consumatore personalizzata che invia e consuma messaggi tramite un server Kafka. Nella seconda metà del tutorial imparerai come partizionare e raggruppare i messaggi e come controllare quali messaggi verranno utilizzati da un consumatore Kafka.

Cos'è Apache Kafka?

Apache Kafka è un sistema di messaggistica creato per scalare per i big data. Simile ad Apache ActiveMQ o RabbitMq, Kafka consente alle applicazioni costruite su piattaforme diverse di comunicare tramite il passaggio di messaggi asincrono. Ma Kafka differisce da questi sistemi di messaggistica più tradizionali in modi chiave:

  • È progettato per scalare orizzontalmente, aggiungendo più server commodity.
  • Fornisce un throughput molto più elevato sia per i processi di produttore che di consumatore.
  • Può essere utilizzato per supportare casi d'uso sia batch che in tempo reale.
  • Non supporta JMS, l'API middleware orientata ai messaggi di Java.

L'architettura di Apache Kafka

Prima di esplorare l'architettura di Kafka, dovresti conoscere la sua terminologia di base:

  • Un produttore è un processo che può pubblicare un messaggio su un argomento.
  • un consumatore è un processo che può iscriversi a uno o più argomenti e consumare messaggi pubblicati su argomenti.
  • Una categoria di argomenti è il nome del feed in cui vengono pubblicati i messaggi.
  • Un broker è un processo in esecuzione su una singola macchina.
  • Un cluster è un gruppo di broker che lavorano insieme.

L'architettura di Apache Kafka è molto semplice, il che può portare a prestazioni e velocità di trasmissione migliori in alcuni sistemi. Ogni argomento in Kafka è come un semplice file di registro. Quando un produttore pubblica un messaggio, il server Kafka lo aggiunge alla fine del file di registro per l'argomento specificato. Il server assegna anche un offset , che è un numero utilizzato per identificare in modo permanente ogni messaggio. Man mano che il numero di messaggi cresce, il valore di ogni offset aumenta; ad esempio, se il produttore pubblica tre messaggi, il primo potrebbe ottenere un offset di 1, il secondo un offset di 2 e il terzo un offset di 3.

Quando il consumatore Kafka si avvia per la prima volta, invierà una richiesta pull al server, chiedendo di recuperare tutti i messaggi per un particolare argomento con un valore di offset maggiore di 0. Il server controllerà il file di registro per quell'argomento e restituirà i tre nuovi messaggi . Il consumatore elaborerà i messaggi, quindi invierà una richiesta per i messaggi con un offset maggiore di 3 e così via.

In Kafka, il client è responsabile di ricordare il conteggio degli offset e di recuperare i messaggi. Il server Kafka non tiene traccia o gestisce il consumo dei messaggi. Per impostazione predefinita, un server Kafka conserverà un messaggio per sette giorni. Un thread in background nel server controlla ed elimina i messaggi di sette giorni o più vecchi. Un consumatore può accedere ai messaggi fintanto che si trovano sul server. Può leggere un messaggio più volte e persino leggere i messaggi in ordine inverso rispetto alla ricezione. Ma se il consumatore non riesce a recuperare il messaggio prima che siano trascorsi i sette giorni, perderà quel messaggio.

Benchmark di Kafka

L'utilizzo in produzione da LinkedIn e altre aziende ha dimostrato che con una configurazione corretta Apache Kafka è in grado di elaborare centinaia di gigabyte di dati al giorno. Nel 2011, tre ingegneri di LinkedIn hanno utilizzato test di benchmark per dimostrare che Kafka poteva raggiungere un throughput molto più elevato di ActiveMQ e RabbitMQ.

Configurazione rapida e demo di Apache Kafka

Creeremo un'applicazione personalizzata in questo tutorial, ma iniziamo installando e testando un'istanza Kafka con un produttore e un consumatore out-of-the-box.

  1. Visita la pagina di download di Kafka per installare la versione più recente (0.9 al momento della stesura di questo documento).
  2. Estrai i file binari in una software/kafkacartella. Per la versione attuale è software/kafka_2.11-0.9.0.0.
  3. Cambia la tua directory corrente in modo che punti alla nuova cartella.
  4. Avviare il server Zookeeper eseguendo il comando: bin/zookeeper-server-start.sh config/zookeeper.properties.
  5. Avviare il server Kafka eseguendo: bin/kafka-server-start.sh config/server.properties.
  6. Creare un argomento di prova che è possibile utilizzare per il test: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Avviare un semplice consumatore console che può consumare i messaggi pubblicati per un determinato argomento, come ad esempio javaworld: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning.
  8. Avviare una semplice console produttore che può pubblicare messaggi al tema di prova: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld.
  9. Prova a digitare uno o due messaggi nella console del produttore. I tuoi messaggi dovrebbero essere visualizzati nella console consumer.

Applicazione di esempio con Apache Kafka

Hai visto come funziona Apache Kafka fuori dagli schemi. Successivamente, sviluppiamo un'applicazione produttore / consumatore personalizzata. Il produttore recupererà l'input dell'utente dalla console e invierà ogni nuova riga come messaggio a un server Kafka. Il consumatore recupererà i messaggi per un determinato argomento e li stamperà sulla console. I componenti produttore e consumatore in questo caso sono le tue implementazioni di kafka-console-producer.she kafka-console-consumer.sh.

Cominciamo creando una Producer.javaclasse. Questa classe client contiene la logica per leggere l'input dell'utente dalla console e inviare quell'input come messaggio al server Kafka.

Configuriamo il producer creando un oggetto dalla java.util.Propertiesclasse e impostandone le proprietà. La classe ProducerConfig definisce tutte le diverse proprietà disponibili, ma i valori predefiniti di Kafka sono sufficienti per la maggior parte degli usi. Per la configurazione predefinita dobbiamo solo impostare tre proprietà obbligatorie:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.

The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a String, which is the data entered by the user on the console.

To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].

Custom key/value objects

Similar to StringSerializer, Kafka provides serializers for other primitives such as int and long. In order to use a custom object for our key or value, we would need to create a class implementing org.apache.kafka.common.serialization.Serializer. We could then add logic to serialize the class into byte[]. We would also have to use a corresponding deserializer in our consumer code.

The Kafka producer

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:

Listing 1. KafkaProducer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, line); producer.send(rec); line = in.nextLine(); } in.close(); producer.close(); } } 

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

Nel caso dell'applicazione di esempio, sappiamo che il produttore sta usando ByteArraySerializerper la chiave e StringSerializerper il valore. Dal lato client dobbiamo quindi utilizzare org.apache.kafka.common.serialization.ByteArrayDeserializerper la chiave e org.apache.kafka.common.serialization.StringDeserializerper il valore. L'impostazione di tali classi come valori per KEY_DESERIALIZER_CLASS_CONFIGe VALUE_DESERIALIZER_CLASS_CONFIGconsentirà al consumatore di deserializzare i byte[]tipi codificati inviati dal produttore.

Infine, dobbiamo impostare il valore di GROUP_ID_CONFIG. Dovrebbe essere un nome di gruppo in formato stringa. Spiegherò di più su questa configurazione tra un minuto. Per ora, guarda il consumatore Kafka con le quattro proprietà obbligatorie impostate: