Java - Rilevamento e gestione dei fili sospesi

Di Alex. C. Punnen

Architetto - Nokia Siemens Networks

Bangalore

I thread sospesi sono una sfida comune nello sviluppo di software che deve interfacciarsi con dispositivi proprietari utilizzando interfacce proprietarie o standardizzate come SNMP, Q3 o Telnet. Questo problema non si limita alla gestione della rete, ma si verifica in un'ampia gamma di campi come server Web, processi che invocano chiamate di procedure remote e così via.

Un thread che avvia una richiesta a un dispositivo necessita di un meccanismo per rilevare nel caso in cui il dispositivo non risponde o risponde solo parzialmente. In alcuni casi in cui viene rilevato un blocco di questo tipo, è necessario intraprendere un'azione specifica. L'azione specifica potrebbe essere un nuovo tentativo o informare l'utente finale dell'errore dell'attività o di qualche altra opzione di ripristino. In alcuni casi in cui un numero elevato di attività deve essere attivato su un numero elevato di elementi di rete da un componente, il rilevamento del thread sospeso è importante in modo che non diventi un collo di bottiglia per l'elaborazione di altre attività. Quindi ci sono due aspetti nella gestione dei thread sospesi: prestazioni e notifica .

Per l' aspetto delle notifiche possiamo adattare il pattern Java Observer per adattarlo al mondo multithread.

Adattamento del pattern Java Observer a sistemi multithread

A causa di attività sospese, l'utilizzo della ThreadPoolclasse Java con una strategia di adattamento è la prima soluzione che viene in mente. Tuttavia, l'utilizzo di Java ThreadPoolnel contesto di alcuni thread sospesi casualmente per un periodo di tempo fornisce un comportamento indesiderato basato sulla particolare strategia utilizzata, come la fame di thread nel caso di una strategia di pool di thread fissi. Ciò è dovuto principalmente al fatto che Java ThreadPoolnon dispone di un meccanismo per rilevare un blocco del thread.

Potremmo provare un pool di thread memorizzati nella cache, ma presenta anche problemi. Se c'è un alto tasso di avvio delle attività e alcuni thread si bloccano, il numero di thread potrebbe aumentare, causando alla fine la fame di risorse e le eccezioni di memoria insufficiente. Oppure potremmo usare una ThreadPoolstrategia personalizzata che richiama un file CallerRunsPolicy. Anche in questo caso, un blocco del thread potrebbe causare il blocco di tutti i thread. (Il thread principale non dovrebbe mai essere il chiamante, poiché esiste la possibilità che qualsiasi attività passata al thread principale possa bloccarsi, causando l'arresto di tutto.)

Allora, qual è la soluzione? Mostrerò un modello ThreadPool non così semplice che regola la dimensione del pool in base alla frequenza delle attività e in base al numero di thread sospesi. Passiamo prima al problema del rilevamento dei fili sospesi.

Rilevamento di fili sospesi

La figura 1 mostra un'astrazione del modello:

Ci sono due classi importanti qui: ThreadManagere ManagedThread. Entrambi si estendono dalla Threadclasse Java . La ThreadManagertiene un contenitore che contiene il ManagedThreads. Quando ManagedThreadviene creato un nuovo, si aggiunge a questo contenitore.

 ThreadHangTester testthread = new ThreadHangTester("threadhangertest",2000,false); testthread.start(); thrdManger.manage(testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start(); 

I ThreadManagerscorre questa lista e chiama il ManagedThread's isHung()metodo. Questa è fondamentalmente una logica di controllo del timestamp.

 if(System.currentTimeMillis() - lastprocessingtime.get() > maxprocessingtime ) { logger.debug("Thread is hung"); return true; } 

Se rileva che un thread è entrato in un ciclo di attività e non ha mai aggiornato i suoi risultati, utilizza un meccanismo di ripristino come stabilito da ManageThread.

 while(isRunning) { for (Iterator iterator = managedThreads.iterator(); iterator.hasNext();) { ManagedThreadData thrddata = (ManagedThreadData) iterator.next(); if(thrddata.getManagedThread().isHung()) { logger.warn("Thread Hang detected for ThreadName=" + thrddata.getManagedThread().getName() ); switch (thrddata.getManagedAction()) { case RESTART_THREAD: // The action here is to restart the the thread //remove from the manager iterator.remove(); //stop the processing of this thread if possible thrddata.getManagedThread().stopProcessing(); if(thrddata.getManagedThread().getClass() == ThreadHangTester.class) //To know which type of thread to create { ThreadHangTester newThread =new ThreadHangTester("restarted_ThrdHangTest",5000,true); //Create a new thread newThread.start(); //add it back to be managed manage(newThread, thrddata.getManagedAction(), thrddata.getThreadChecktime()); } break; ......... 

Affinché un nuovo ManagedThreadpossa essere creato e utilizzato al posto di quello sospeso, non dovrebbe contenere alcuno stato o contenitore. Per questo il contenitore su cui gli ManagedThreadatti dovrebbero essere separati. Qui stiamo usando il modello Singleton basato su ENUM per contenere l'elenco delle attività. Quindi il contenitore che contiene le attività è indipendente dal thread che elabora le attività. Fare clic sul collegamento seguente per scaricare l'origine per il pattern descritto: Java Thread Manager Source.

Thread sospesi e strategie Java ThreadPool

Java ThreadPoolnon ha un meccanismo per rilevare i thread sospesi. L'uso di una strategia come fixed threadpool ( Executors.newFixedThreadPool()) non funzionerà perché se alcune attività si bloccano nel tempo, tutti i thread alla fine saranno in uno stato bloccato. Un'altra opzione sta usando un criterio ThreadPool memorizzato nella cache (Executors.newCachedThreadPool()). Ciò potrebbe garantire che ci saranno sempre thread disponibili per elaborare un'attività, limitati solo dalla memoria della VM, dalla CPU e dai limiti dei thread. Tuttavia, con questa policy non è possibile controllare il numero di thread che vengono creati. Indipendentemente dal fatto che un thread di elaborazione si blocchi o meno, l'utilizzo di questo criterio mentre la frequenza delle attività è elevata porta alla creazione di un numero enorme di thread. Se non hai abbastanza risorse per la JVM molto presto, raggiungerai la soglia di memoria massima o la CPU alta. È abbastanza comune vedere il numero di thread raggiunto centinaia o migliaia. Anche se vengono rilasciati una volta che l'attività è stata elaborata, a volte durante la gestione burst il numero elevato di thread sovraccarica le risorse di sistema.

Una terza opzione consiste nell'utilizzare strategie o politiche personalizzate. Una di queste opzioni è avere un pool di thread che scala da 0 a un numero massimo. Quindi, anche se un thread si bloccasse, verrebbe creato un nuovo thread purché venga raggiunto il numero massimo di thread:

 execexec = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue()); 

Qui 3 è il numero massimo di thread e il tempo di mantenimento è impostato su 60 secondi poiché si tratta di un processo che richiede molte attività. Se forniamo un numero massimo di thread sufficientemente elevato, questa è più o meno una politica ragionevole da utilizzare nel contesto delle attività sospese. L'unico problema è che se i thread sospesi non vengono rilasciati alla fine c'è una piccola possibilità che tutti i thread possano a un certo punto bloccarsi. Se il numero massimo di thread è sufficientemente elevato e si presume che il blocco di un'attività sia un fenomeno raro, questa politica si adatterebbe al conto.

Sarebbe stato carino se ThreadPoolavesse anche un meccanismo collegabile per rilevare i fili sospesi. Discuterò più tardi uno di questi progetti. Ovviamente se tutti i thread vengono congelati, è possibile configurare e utilizzare la politica delle attività rifiutate del pool di thread. Se non vuoi scartare le attività dovresti usare CallerRunsPolicy:

 execexec = new ThreadPoolExecutor(0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue() new ThreadPoolExecutor.CallerRunsPolicy()); 

In questo caso, se un blocco del thread causava il rifiuto di un'attività, tale attività sarebbe stata assegnata al thread chiamante per essere gestita. C'è sempre una possibilità che questo compito sia troppo sospeso. In questo caso l'intero processo si bloccherebbe. Quindi è meglio non aggiungere una tale politica in questo contesto.

 public class NotificationProcessor implements Runnable { private final NotificationOriginator notificationOrginator; boolean isRunning = true; private final ExecutorService execexec; AlarmNotificationProcessor(NotificationOriginator norginator) { //ctor // execexec = Executors.newCachedThreadPool();// Too many threads // execexec = Executors.newFixedThreadPool(2);//, no hang tasks detection execexec = new ThreadPoolExecutor(0, 4, 250, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); } public void run() { while (isRunning) { try { final Task task = TaskQueue.INSTANCE.getTask(); Runnable thisTrap= new Runnable() { public void run() { ++alarmid; notificaionOrginator.notify(new OctetString(), // Task processing nbialarmnew.getOID(), nbialarmnew.createVariableBindingPayload()); É........}} ; execexec.execute(thisTrap); } 

Un ThreadPool personalizzato con rilevamento Hang

Sarebbe fantastico avere una libreria di pool di thread con la capacità di rilevamento e gestione del blocco delle attività. Ne ho sviluppato uno e lo dimostrerò di seguito. Questa è in realtà una porta da un pool di thread C ++ che ho progettato e utilizzato qualche tempo fa (vedi riferimenti). Fondamentalmente, questa soluzione utilizza il pattern Command e il pattern Chain of Responsibility. Tuttavia, implementare il pattern Command in Java senza l'ausilio del supporto dell'oggetto Function è un po 'difficile. Per questo ho dovuto modificare leggermente l'implementazione per utilizzare la riflessione Java. Si noti che il contesto in cui è stato progettato questo modello era quello in cui un pool di thread doveva essere adattato / collegato senza modificare nessuna delle classi esistenti.(Credo che l'unico grande vantaggio della programmazione orientata agli oggetti sia che ci dà un modo per progettare classi in modo da fare un uso efficace del principio aperto e chiuso. Ciò è particolarmente vero per il vecchio codice legacy complesso e potrebbe essere di minore rilevanza per sviluppo di nuovi prodotti.) Quindi ho usato la riflessione invece di usare un'interfaccia per implementare il modello di comando. Il resto del codice potrebbe essere portato senza modifiche importanti poiché quasi tutte le primitive di sincronizzazione e segnalazione dei thread sono disponibili in Java 1.5 in poi.Il resto del codice potrebbe essere portato senza modifiche importanti poiché quasi tutte le primitive di sincronizzazione e segnalazione dei thread sono disponibili in Java 1.5 in poi.Il resto del codice potrebbe essere portato senza modifiche importanti poiché quasi tutte le primitive di sincronizzazione e segnalazione dei thread sono disponibili in Java 1.5 in poi.

 public class Command { private Object[ ]argParameter; ........ //Ctor for a method with two args Command(T pObj, String methodName, long timeout, String key, int arg1, int arg2) { m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = key; argParameter = new Object[2]; argParameter[0] = arg1; argParameter[1] = arg2; } // Calls the method of the object void execute() { Class klass = m_objptr.getClass(); Class[] paramTypes = new Class[]{int.class, int.class}; try { Method methodName = klass.getMethod(m_methodName, paramTypes); //System.out.println("Found the method--> " + methodName); if (argParameter.length == 2) { methodName.invoke(m_objptr, (Object) argParameter[0], (Object) argParameter[1]); } 

Esempio di utilizzo di questo modello:

 public class CTask {.. public int DoSomething(int a, int b) {...} } 

Command cmd4 = new Command(task4, "DoMultiplication", 1, "key2",2,5);

Ora abbiamo altre due classi importanti qui. Uno è la ThreadChainclasse, che implementa il modello Catena di responsabilità:

 public class ThreadChain implements Runnable { public ThreadChain(ThreadChain p, ThreadPool pool, String name) { AddRef(); deleteMe = false; busy = false; //--> very important next = p; //set the thread chain - note this is like a linked list impl threadpool = pool; //set the thread pool - Root of the threadpool ........ threadId = ++ThreadId; ...... // start the thread thisThread = new Thread(this, name + inttid.toString()); thisThread.start(); } 

Questa classe ha due metodi principali. Uno è booleano CanHandle()che viene avviato dalla ThreadPoolclasse e quindi procede in modo ricorsivo. Questo controlla se il thread corrente ( ThreadChainistanza corrente ) è libero di gestire l'attività. Se sta già gestendo un'attività, chiama quella successiva nella catena.

 public Boolean canHandle() { if (!busy) { //If not busy System.out.println("Can Handle This Event in id=" + threadId); // todo signal an event try { condLock.lock(); condWait.signal(); //Signal the HandleRequest which is waiting for this in the run method ......................................... return true; } ......................................... ///Else see if the next object in the chain is free /// to handle the request return next.canHandle(); 

Si noti che HandleRequestè un metodo ThreadChainche viene richiamato dal Thread run()metodo e attende il segnale dal canHandlemetodo. Notare anche come l'attività viene gestita tramite il modello di comando.