Java 101: concorrenza Java senza problemi, parte 2

Indietro 1 2 3 4 Pagina 3 Avanti Pagina 3 di 4

Variabili atomiche

Le applicazioni multithread eseguite su processori multicore o sistemi multiprocessore possono ottenere un buon utilizzo dell'hardware ed essere altamente scalabili. Possono raggiungere questi scopi facendo in modo che i thread trascorrano la maggior parte del tempo a eseguire il lavoro anziché attendere che il lavoro venga completato o attendere che acquisiscano i blocchi per accedere alle strutture dati condivise.

Tuttavia, il meccanismo di sincronizzazione tradizionale di Java, che impone l'esclusione reciproca (il thread che detiene il blocco che protegge un insieme di variabili ha accesso esclusivo ad esse) e la visibilità (le modifiche alle variabili protette diventano visibili ad altri thread che successivamente acquisiscono il blocco), influisce utilizzo e scalabilità dell'hardware, come segue:

  • La sincronizzazione contesa (più thread in costante competizione per un blocco) è costosa e la velocità effettiva ne risente. Uno dei motivi principali della spesa è il frequente cambio di contesto che si verifica; un'operazione di cambio di contesto può richiedere molti cicli del processore per essere completata. Al contrario, la sincronizzazione incontrollata è poco costosa sulle moderne JVM.
  • Quando un thread che contiene un blocco viene ritardato (ad esempio, a causa di un ritardo nella pianificazione), nessun thread che richiede quel blocco avanza e l'hardware non viene utilizzato come potrebbe essere altrimenti.

Potresti pensare di poterlo utilizzare volatilecome alternativa di sincronizzazione. Tuttavia, le volatilevariabili risolvono solo il problema della visibilità. Non possono essere utilizzati per implementare in modo sicuro le sequenze atomiche di lettura-modifica-scrittura necessarie per l'implementazione sicura di contatori e altre entità che richiedono l'esclusione reciproca.

Java 5 ha introdotto un'alternativa di sincronizzazione che offre l'esclusione reciproca combinata con le prestazioni di volatile. Questa alternativa alla variabile atomica si basa sull'istruzione di confronto e scambio di un microprocessore e consiste in gran parte nei tipi nel java.util.concurrent.atomicpacchetto.

Capire confronta e scambia

L' istruzione compare-and-swap (CAS) è un'istruzione ininterrotta che legge una posizione di memoria, confronta il valore letto con un valore atteso e memorizza un nuovo valore nella posizione di memoria quando il valore letto corrisponde al valore atteso. Altrimenti non si fa nulla. L'istruzione effettiva del microprocessore può differire leggermente (ad esempio, restituisce true se CAS ha avuto esito positivo o false altrimenti invece del valore letto).

Istruzioni CAS del microprocessore

I microprocessori moderni offrono una sorta di istruzione CAS. Ad esempio, i microprocessori Intel offrono la cmpxchgfamiglia di istruzioni, mentre i microprocessori PowerPC offrono istruzioni load-link (ad esempio lwarx) e store-conditional (ad esempio stwcx) per lo stesso scopo.

CAS rende possibile supportare sequenze atomiche di lettura-modifica-scrittura. In genere si utilizza CAS come segue:

  1. Leggere il valore v dall'indirizzo X.
  2. Eseguire un calcolo multistep per derivare un nuovo valore v2.
  3. Utilizzare CAS per modificare il valore di X da v a v2. CAS riesce quando il valore di X non è cambiato durante l'esecuzione di questi passaggi.

Per vedere come CAS offre prestazioni (e scalabilità) migliori rispetto alla sincronizzazione, si consideri un esempio di contatore che consente di leggere il suo valore corrente e incrementare il contatore. La classe seguente implementa un contatore basato su synchronized:

Listato 4. Counter.java (versione 1)

public class Counter { private int value; public synchronized int getValue() { return value; } public synchronized int increment() { return ++value; } }

Un'elevata contesa per il blocco del monitor comporterà un cambio di contesto eccessivo che può ritardare tutti i thread e provocare un'applicazione che non si ridimensiona correttamente.

L'alternativa CAS richiede un'implementazione dell'istruzione compare-and-swap. La classe seguente emula CAS. Utilizza synchronizedinvece delle istruzioni hardware effettive per semplificare il codice:

Listato 5. EmulatedCAS.java

public class EmulatedCAS { private int value; public synchronized int getValue() { return value; } public synchronized int compareAndSwap(int expectedValue, int newValue) { int readValue = value; if (readValue == expectedValue) value = newValue; return readValue; } }

Qui, valueidentifica una posizione di memoria, che può essere recuperata da getValue(). Inoltre, compareAndSwap()implementa l'algoritmo CAS.

La classe seguente utilizza EmulatedCASper implementare un non synchronizedcontatore (fingere che EmulatedCASnon lo richieda synchronized):

Listato 6. Counter.java (versione 2)

public class Counter { private EmulatedCAS value = new EmulatedCAS(); public int getValue() { return value.getValue(); } public int increment() { int readValue = value.getValue(); while (value.compareAndSwap(readValue, readValue+1) != readValue) readValue = value.getValue(); return readValue+1; } }

Counterincapsula EmulatedCASun'istanza e dichiara i metodi per recuperare e incrementare un valore del contatore con l'aiuto di questa istanza. getValue()recupera il "valore corrente del contatore" dell'istanza e increment()incrementa in modo sicuro il valore del contatore.

increment()invoca ripetutamente compareAndSwap()finché readValueil valore di non cambia. È quindi libero di modificare questo valore. Quando non è coinvolto alcun blocco, il conflitto viene evitato insieme a un cambio di contesto eccessivo. Le prestazioni migliorano e il codice è più scalabile.

ReentrantLock e CAS

Hai appreso in precedenza che ReentrantLockoffre prestazioni migliori rispetto a un synchronizedelevato conflitto di thread. Per aumentare le prestazioni, ReentrantLockla sincronizzazione di è gestita da una sottoclasse della java.util.concurrent.locks.AbstractQueuedSynchronizerclasse astratta . A sua volta, questa classe sfrutta la sun.misc.Unsafeclasse non documentata e il suo compareAndSwapInt()metodo CAS.

Esplorazione del pacchetto di variabili atomiche

Non è necessario eseguire l'implementazione compareAndSwap()tramite l'interfaccia nativa Java non portatile. Invece, Java 5 offre questo supporto tramite java.util.concurrent.atomic: un toolkit di classi utilizzate per la programmazione priva di blocchi e thread-safe su singole variabili.

Secondo java.util.concurrent.atomicJavadoc, queste classi

estendere la nozione di volatilevalori, campi ed elementi di matrice a quelli che forniscono anche un'operazione di aggiornamento condizionale atomico del modulo boolean compareAndSet(expectedValue, updateValue). Questo metodo (che varia nei tipi di argomento tra classi diverse) imposta atomicamente una variabile su updateValuese attualmente contiene il expectedValue, riportando vero in caso di successo.

Questo pacchetto offre classi per i tipi Boolean ( AtomicBoolean), integer ( AtomicInteger), long integer ( AtomicLong) e reference ( AtomicReference). Offre anche versioni array di classi di riferimento intero, intero lungo e riferimento ( AtomicIntegerArray, AtomicLongArraye AtomicReferenceArray), contrassegnabili e stampate per l'aggiornamento atomico di una coppia di valori ( AtomicMarkableReferencee AtomicStampedReference) e altro ancora.

Implementazione di compareAndSet ()

Java viene implementato compareAndSet()tramite il costrutto nativo più veloce disponibile (ad esempio cmpxchgo load-link / store-conditional) o (nel peggiore dei casi) spin lock .

Considera AtomicInteger, che ti consente di aggiornare un intvalore in modo atomico. Possiamo usare questa classe per implementare il contatore mostrato nel Listato 6. Il Listato 7 presenta il codice sorgente equivalente.

Listato 7. Counter.java (versione 3)

import java.util.concurrent.atomic.AtomicInteger; public class Counter { private AtomicInteger value = new AtomicInteger(); public int getValue() { return value.get(); } public int increment() { int readValue = value.get(); while (!value.compareAndSet(readValue, readValue+1)) readValue = value.get(); return readValue+1; } }

Listing 7 is very similar to Listing 6 except that it replaces EmulatedCAS with AtomicInteger. Incidentally, you can simplify increment() because AtomicInteger supplies its own int getAndIncrement() method (and similar methods).

Fork/Join framework

Computer hardware has evolved significantly since Java's debut in 1995. Back in the day, single-processor systems dominated the computing landscape and Java's synchronization primitives, such as synchronized and volatile, as well as its threading library (the Thread class, for example) were generally adequate.

Multiprocessor systems became cheaper and developers found themselves needing to create Java applications that effectively exploited the hardware parallelism that these systems offered. However, they soon discovered that Java's low-level threading primitives and library were very difficult to use in this context, and the resulting solutions were often riddled with errors.

What is parallelism?

Parallelism is the simultaneous execution of multiple threads/tasks via some combination of multiple processors and processor cores.

The Java Concurrency Utilities framework simplifies the development of these applications; however, the utilities offered by this framework do not scale to thousands of processors or processor cores. In our many-core era, we need a solution for achieving a finer-grained parallelism, or we risk keeping processors idle even when there is lots of work for them to handle.

Professor Doug Lea presented a solution to this problem in his paper introducing the idea for a Java-based fork/join framework. Lea describes a framework that supports "a style of parallel programming in which problems are solved by (recursively) splitting them into subtasks that are solved in parallel." The Fork/Join framework was eventually included in Java 7.

Overview of the Fork/Join framework

The Fork/Join framework is based on a special executor service for running a special kind of task. It consists of the following types that are located in the java.util.concurrent package:

  • ForkJoinPool: an ExecutorService implementation that runs ForkJoinTasks. ForkJoinPool provides task-submission methods, such as void execute(ForkJoinTask task), along with management and monitoring methods, such as int getParallelism() and long getStealCount().
  • ForkJoinTask: an abstract base class for tasks that run within a ForkJoinPool context. ForkJoinTask describes thread-like entities that have a much lighter weight than normal threads. Many tasks and subtasks can be hosted by very few actual threads in a ForkJoinPool instance.
  • ForkJoinWorkerThread: a class that describes a thread managed by a ForkJoinPool instance. ForkJoinWorkerThread is responsible for executing ForkJoinTasks.
  • RecursiveAction: an abstract class that describes a recursive resultless ForkJoinTask.
  • RecursiveTask: an abstract class that describes a recursive result-bearing ForkJoinTask.

The ForkJoinPool executor service is the entry-point for submitting tasks that are typically described by subclasses of RecursiveAction or RecursiveTask. Behind the scenes, the task is divided into smaller tasks that are forked (distributed among different threads for execution) from the pool. A task waits until joined (its subtasks finish so that results can be combined).

ForkJoinPool manages a pool of worker threads, where each worker thread has its own double-ended work queue (deque). When a task forks a new subtask, the thread pushes the subtask onto the head of its deque. When a task tries to join with another task that hasn't finished, the thread pops another task off the head of its deque and executes the task. If the thread's deque is empty, it tries to steal another task from the tail of another thread's deque. This work stealing behavior maximizes throughput while minimizing contention.

Using the Fork/Join framework

Fork/Join was designed to efficiently execute divide-and-conquer algorithms, which recursively divide problems into sub-problems until they are simple enough to solve directly; for example, a merge sort. The solutions to these sub-problems are combined to provide a solution to the original problem. Each sub-problem can be executed independently on a different processor or core.

Lea's paper presents the following pseudocode to describe the divide-and-conquer behavior:

Result solve(Problem problem) { if (problem is small) directly solve problem else { split problem into independent parts fork new subtasks to solve each part join all subtasks compose result from subresults } }

The pseudocode presents a solve method that's called with some problem to solve and which returns a Result that contains the problem's solution. If the problem is too small to solve via parallelism, it's solved directly. (The overhead of using parallelism on a small problem exceeds any gained benefit.) Otherwise, the problem is divided into subtasks: each subtask independently focuses on part of the problem.

Operation fork launches a new fork/join subtask that will execute in parallel with other subtasks. Operation join delays the current task until the forked subtask finishes. At some point, the problem will be small enough to be executed sequentially, and its result will be combined along with other subresults to achieve an overall solution that's returned to the caller.

The Javadoc for the RecursiveAction and RecursiveTask classes presents several divide-and-conquer algorithm examples implemented as fork/join tasks. For RecursiveAction the examples sort an array of long integers, increment each element in an array, and sum the squares of each element in an array of doubles. RecursiveTask's solitary example computes a Fibonacci number.

Il Listato 8 presenta un'applicazione che mostra l'esempio di ordinamento in contesti non fork / join e fork / join. Presenta anche alcune informazioni sui tempi per contrastare le velocità di smistamento.