Come creare applicazioni di streaming stateful con Apache Flink

Fabian Hueske è un committer e membro PMC del progetto Apache Flink e co-fondatore di Data Artisans.

Apache Flink è un framework per l'implementazione di applicazioni di elaborazione di flussi con stato e per eseguirle su larga scala su un cluster di elaborazione. In un articolo precedente abbiamo esaminato cos'è l'elaborazione del flusso con stato, quali casi d'uso risolve e perché dovresti implementare ed eseguire le tue applicazioni di streaming con Apache Flink.

In questo articolo, presenterò esempi per due casi d'uso comuni di elaborazione di flussi con stato e discuterò come possono essere implementati con Flink. Il primo caso d'uso sono le applicazioni guidate dagli eventi, cioè le applicazioni che assorbono flussi continui di eventi e applicano una logica di business a questi eventi. Il secondo è il caso d'uso dell'analisi di streaming, in cui presenterò due query analitiche implementate con l'API SQL di Flink, che aggregano i dati di streaming in tempo reale. Noi di Data Artisans forniamo il codice sorgente di tutti i nostri esempi in un repository GitHub pubblico.

Prima di immergerci nei dettagli degli esempi, introdurrò il flusso di eventi che viene importato dalle applicazioni di esempio e spiegherò come è possibile eseguire il codice che forniamo.

Un flusso di eventi di corsa in taxi

Le nostre applicazioni di esempio si basano su un set di dati pubblico sulle corse in taxi avvenute a New York City nel 2013. Gli organizzatori della Grand Challenge 2015 DEBS (ACM International Conference on Distributed Event-Based Systems) hanno riorganizzato il set di dati originale e lo hanno convertito in un unico file CSV da cui stiamo leggendo i seguenti nove campi.

  • Medaglione: un ID somma MD5 del taxi
  • Hack_license: un ID somma MD5 della licenza di taxi
  • Pickup_datetime: l'ora in cui i passeggeri sono stati prelevati
  • Dropoff_datetime: l'ora in cui i passeggeri sono stati lasciati
  • Pickup_longitude: la longitudine del punto di prelievo
  • Pickup_latitude: la latitudine del luogo di prelievo
  • Dropoff_longitude: la longitudine del punto di riconsegna
  • Dropoff_latitude: la latitudine del luogo di riconsegna
  • Total_amount: totale pagato in dollari

Il file CSV memorizza i record in ordine crescente rispetto al loro attributo orario di riconsegna. Pertanto, il file può essere trattato come un registro ordinato degli eventi pubblicati al termine di un viaggio. Per eseguire gli esempi che forniamo su GitHub, è necessario scaricare il set di dati della sfida DEBS da Google Drive.

Tutte le applicazioni di esempio leggono in sequenza il file CSV e lo ingeriscono come flusso di eventi di corsa in taxi. Da lì in poi, le applicazioni elaborano gli eventi proprio come qualsiasi altro flusso, cioè come un flusso che viene importato da un sistema di pubblicazione-sottoscrizione basato su log, come Apache Kafka o Kinesis. In effetti, leggere un file (o qualsiasi altro tipo di dati persistenti) e trattarlo come un flusso è una pietra angolare dell'approccio di Flink all'unificazione di batch e elaborazione di flussi.

Esecuzione degli esempi Flink

Come accennato in precedenza, abbiamo pubblicato il codice sorgente delle nostre applicazioni di esempio in un repository GitHub. Ti invitiamo a eseguire il fork e clonare il repository. Gli esempi possono essere facilmente eseguiti dall'interno del tuo IDE preferito; non è necessario impostare e configurare un cluster Flink per eseguirli. Innanzitutto, importa il codice sorgente degli esempi come progetto Maven. Quindi, eseguire la classe principale di un'applicazione e fornire la posizione di archiviazione del file di dati (vedere sopra per il collegamento per scaricare i dati) come parametro del programma.

Una volta avviata un'applicazione, verrà avviata un'istanza Flink locale incorporata nel processo JVM dell'applicazione e inoltrerà l'applicazione per eseguirla. Vedrai una serie di istruzioni di registro mentre Flink si avvia e le attività del lavoro vengono pianificate. Una volta che l'applicazione è in esecuzione, il suo output verrà scritto sullo standard output.

Creazione di un'applicazione basata sugli eventi in Flink

Ora, esaminiamo il nostro primo caso d'uso, che è un'applicazione guidata dagli eventi. Le applicazioni guidate dagli eventi acquisiscono flussi di eventi, eseguono calcoli man mano che gli eventi vengono ricevuti e possono emettere nuovi eventi o attivare azioni esterne. È possibile comporre più applicazioni guidate dagli eventi collegandole tra loro tramite sistemi di registro eventi, in modo simile a come possono essere composti sistemi di grandi dimensioni dai microservizi. Le applicazioni guidate dagli eventi, i registri degli eventi e le istantanee dello stato dell'applicazione (note come punti di salvataggio in Flink) costituiscono un modello di progettazione molto potente perché puoi reimpostare il loro stato e riprodurre il loro input per ripristinare da un errore, correggere un bug o migrare un applicazione a un cluster diverso.

In questo articolo esamineremo un'applicazione basata sugli eventi che supporta un servizio che monitora l'orario di lavoro dei tassisti. Nel 2016, la NYC Taxi and Limousine Commission ha deciso di limitare l'orario di lavoro dei tassisti a turni di 12 ore e di richiedere una pausa di almeno otto ore prima dell'inizio del turno successivo. Un turno inizia con l'inizio della prima corsa. Da quel momento in poi, un conducente può iniziare nuove corse entro un periodo di 12 ore. La nostra applicazione tiene traccia delle corse dei conducenti, segna l'ora di fine della loro finestra di 12 ore (ovvero l'ora in cui possono iniziare l'ultima corsa) e contrassegna le corse che hanno violato il regolamento. Puoi trovare il codice sorgente completo di questo esempio nel nostro repository GitHub.

La nostra applicazione è implementata con l'API DataStream di Flink e un file KeyedProcessFunction. L'API DataStream è un'API funzionale e basata sul concetto di flussi di dati tipizzati. A DataStreamè la rappresentazione logica di un flusso di eventi di tipo T. Un flusso viene elaborato applicandovi una funzione che produce un altro flusso di dati, possibilmente di tipo diverso. Flink elabora i flussi in parallelo distribuendo eventi alle partizioni di flusso e applicando diverse istanze di funzioni a ciascuna partizione.

Il frammento di codice seguente mostra il flusso di alto livello della nostra applicazione di monitoraggio.

// assimilare il flusso di corse in taxi.

DataStream ride = TaxiRides.getRides (env, inputPath);

Flusso di dati notifiche = corse

   // flusso di partizione in base all'ID della patente di guida

   .keyBy (r -> r.licenseId)

   // monitora gli eventi di corsa e genera notifiche

   .process (new MonitorWorkTime ());

// stampa le notifiche

notifiche.print ();

L'applicazione inizia a importare un flusso di eventi di corsa in taxi. Nel nostro esempio, gli eventi vengono letti da un file di testo, analizzati e memorizzati in TaxiRideoggetti POJO. Un'applicazione del mondo reale in genere inserisce gli eventi da una coda di messaggi o da un registro eventi, come Apache Kafka o Pravega. Il passo successivo è la chiave degli TaxiRideeventi da parte licenseIddel conducente. L' keyByoperazione partiziona il flusso sul campo dichiarato, in modo che tutti gli eventi con la stessa chiave vengano elaborati dalla stessa istanza parallela della funzione seguente. Nel nostro caso partizioniamo sul licenseIdcampo perché vogliamo monitorare il tempo di lavoro di ogni singolo autista.

Successivamente, applichiamo la MonitorWorkTimefunzione agli TaxiRideeventi partizionati . La funzione tiene traccia delle corse per conducente e monitora i loro turni e tempi di pausa. Emette eventi di tipo Tuple2, dove ogni tupla rappresenta una notifica composta dall'ID della licenza del conducente e da un messaggio. Infine, la nostra applicazione emette i messaggi stampandoli sullo standard output. Un'applicazione del mondo reale scriverà le notifiche su un messaggio esterno o un sistema di archiviazione, come Apache Kafka, HDFS o un sistema di database, oppure attiverebbe una chiamata esterna per inviarle immediatamente.

Ora che abbiamo discusso il flusso complessivo dell'applicazione, diamo un'occhiata alla MonitorWorkTimefunzione, che contiene la maggior parte della logica di business effettiva dell'applicazione. La MonitorWorkTimefunzione è uno stato KeyedProcessFunctionche inserisce TaxiRideeventi ed emette Tuple2record. L' KeyedProcessFunctioninterfaccia presenta due metodi per elaborare i dati: processElement()e onTimer(). Il processElement()metodo viene chiamato per ogni evento in arrivo. Il onTimer()metodo viene chiamato quando viene attivato un timer registrato in precedenza. Il frammento di codice seguente mostra lo scheletro della MonitorWorkTimefunzione e tutto ciò che viene dichiarato al di fuori dei metodi di elaborazione.

classe statica pubblica MonitorWorkTime

    estende KeyedProcessFunction {

  // costanti di tempo in millisecondi

  finale statico privato lungo ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 ore

  finale statico privato lungo REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 ore

  finale statico privato lungo CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 ore

 formattatore DateTimeFormatter transitorio privato;

  // handle di stato per memorizzare l'ora di inizio di un turno

  ValueState shiftStart;

  @Oltrepassare

  public void open (Configuration conf) {

    // registra l'handle di stato

    shiftStart = getRuntimeContext (). getState (

      nuovo ValueStateDescriptor ("shiftStart", Types.LONG));

    // inizializza il formattatore dell'ora

    this.formatter = DateTimeFormat.forPattern ("yyyy-MM-dd HH: mm: ss");

  }

  // processElement () e onTimer () sono discussi in dettaglio di seguito.

}

La funzione dichiara alcune costanti per gli intervalli di tempo in millisecondi, un formattatore di tempo e un handle di stato per lo stato con chiave gestito da Flink. Lo stato gestito viene periodicamente controllato e ripristinato automaticamente in caso di guasto. Lo stato con chiave è organizzato per chiave, il che significa che una funzione manterrà un valore per handle e chiave. Nel nostro caso, la MonitorWorkTimefunzione mantiene un Longvalore per ogni tasto, cioè per ciascuno licenseId. Lo shiftStartstato memorizza l'ora di inizio del turno di un conducente. L'handle di stato viene inizializzato nel open()metodo, che viene chiamato una volta prima che il primo evento venga elaborato.

Ora, diamo un'occhiata al processElement()metodo.

@Oltrepassare

public void processElement (

    TaxiRide ride,

    Ctx contesto,

    Collettore out) genera l'eccezione {

  // cerca l'ora di inizio dell'ultimo turno

  Long startTs = shiftStart.value ();

  if (startTs == null ||

    startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // questa è la prima corsa di un nuovo turno.

    startTs = ride.pickUpTime;

    shiftStart.update (startTs);

    long endTs = startTs + ALLOWED_WORK_TIME;

    out.collect (Tuple2.of (ride.licenseId,

      "Puoi accettare nuovi passeggeri fino a" + formatter.print (endTs)));

    // registra il timer per ripulire lo stato in 24 ore

    ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

  } altrimenti if (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

    // questa corsa è iniziata al termine del tempo di lavoro consentito.

    // è una violazione dei regolamenti!

    out.collect (Tuple2.of (ride.licenseId,

      "Questa corsa ha violato le norme sull'orario di lavoro."));

  }

}

Il processElement()metodo viene chiamato per ogni TaxiRideevento. Innanzitutto, il metodo recupera l'ora di inizio del turno del conducente dall'handle di stato. Se lo stato non contiene un'ora di inizio ( startTs == null) o se l'ultimo turno è iniziato più di 20 ore ( ALLOWED_WORK_TIME + REQ_BREAK_TIME) prima della corsa corrente, la corsa corrente è la prima corsa di un nuovo turno. In entrambi i casi, la funzione avvia un nuovo turno aggiornando l'ora di inizio del turno con l'ora di inizio della corsa corrente, invia un messaggio al guidatore con l'ora di fine del nuovo turno e registra un timer per pulire stato in 24 ore.

Se la corsa in corso non è la prima di un nuovo turno, la funzione controlla se viola la regolazione dell'orario di lavoro, ovvero se è iniziata più di 12 ore dopo l'inizio del turno attuale del conducente. In tal caso, la funzione emette un messaggio per informare il conducente della violazione.

Il processElement()metodo della MonitorWorkTimefunzione registra un timer per ripulire lo stato 24 ore dopo l'inizio di un turno. La rimozione dello stato che non è più necessario è importante per impedire l'aumento delle dimensioni dello stato a causa di perdite di stato. Un timer si attiva quando l'ora dell'applicazione supera il timestamp del timer. A quel punto, onTimer()viene chiamato il metodo. In modo simile allo stato, i timer vengono mantenuti per chiave e la funzione viene inserita nel contesto della chiave associata prima che il onTimer()metodo venga chiamato. Quindi, tutto l'accesso allo stato è diretto alla chiave che era attiva quando il timer è stato registrato.

Diamo un'occhiata al onTimer()metodo di MonitorWorkTime.

@Oltrepassare

public void onTimer (

    timer lunghi,

    OnTimerContext ctx,

    Collettore out) genera l'eccezione {

  // rimuove lo stato del turno se non è già stato avviato alcun nuovo turno.

  Long startTs = shiftStart.value ();

  if (startTs == timerTs - CLEAN_UP_INTERVAL) {

    shiftStart.clear ();

  }

}

Il processElement()metodo registra i timer per 24 ore dopo l'inizio di un turno per ripulire uno stato che non è più necessario. La pulizia dello stato è l'unica logica onTimer()implementata dal metodo. Quando scatta un timer, controlliamo se il conducente ha iniziato un nuovo turno nel frattempo, ovvero se l'ora di inizio del turno è cambiata. Se non è così, cancelliamo lo stato del turno per il conducente.