Thread in Python – il modello Producer – Consumer (parte 5)

In questo nuovo articolo estenderemo ancora il concetto di threading con un modello molto utilizzato nell’ingegneria del software: il modello Producer-Consumer che implementeremo tramite due thread. In particolare svilupperemo una Pipeline per la comunicazione interna tra i due thread.

Thread in Python - Producer and Consumer (part 5)

Producer-Consumer Threading

Il problema Producer-Consumer è un problema standard e molto comune nel mondo della programmazione. Questo problema si evidenzia quando si deve gestire la sincronizzazione dei processi in generale, tra cui anche i thread.

Immaginiamo il caso in cui un programma deve ricevere (non leggere) dei dati da una risorsa (come per esempio la rete). Il programma non riceverà i dati al momento della richiesta (infatti non è una lettura), ma attenderà la ricezione rimanendo in ascolto. I dati provenienti dalla risorsa non sono emessi in maniera regolare. Identifichiamo questa risorsa, o sorgente di dati come producer.

Dall’altra parte vi è il programma in ascolto, che una volta ottenuti i dati dalla risorsa, li deve inviare (non scrivere) ad un’altra risorsa, come per esempio, un database. L’accesso al database non sarà istantaneo, ma facciamo conto che sia veloce a sufficienza per poter ricevere tutti i dati provenienti dal programma in un determinato e ragionevole periodo di tempo. Il programma quindi dovrà, a volte, attendere che il database sia pronto per ricevere i dati. Questa parte del programma la identificheremo come consumer.

Abbiamo quindi due entità, Producer e Consumer, tre le quali si creerà una Pipeline, che avrà il compito di gestire i dati tra di esse.

Modello Producer-Consumer utilizzando i Lock

Dato che stiamo parlando di threading, vediamo di risolvere questo problema usando i Lock.

Come abbiamo detto in precedenza, abbiamo un thread producer che riceve i dati dalla rete e li pone in una Pipeline. Per simulare la rete creeremo una fake network.

Importiamo il modulo random e il modulo time per avere dei valori casuali e simulare l’indeterminatezza della ricezione dei dati da parte della rete, combinandoli avremo dei periodi di tempo casuali. Poi il modulo concurrent.futures per il ThreadPoolExecutor che utilizzeremo come context manager per la gestione dei thread (come abbiamo visto in precedenza). Infine importeremo la classe Pipeline da un file pipeline.py che implementeremo più tardi.

Definiamo un oggetto generico END che utilizzeremo per segnalare la fine della ricezione dei messaggi. Questo valore verrà generato dal Producer per indicare il termine dei messaggi ricevuti, ed inserito nella Pipeline. Quando il Consumer leggerà questo valore END dalla Pipeline, interromperà il programma.

Per prima cosa, quindi, implementiamo il thread producer che prende come argomento una pipeline (pl).

Il thread dovrà simulare la ricezione di 10 dati (numeri casuali da 1 a 100) che verrranno acquisiti in tempi non regolari (si usa time.sleep con un numero di tempo casuale tra 1 e 10 secondi). In questo modo il thread potrà ricevere una serie di messaggi in tempi variabili.

Il dato ricevuto viene inserito in message e poi inviato alla pipeline tramite la funzione .set_message(). Sarà poi dalla pipeline che il thread consumer riceverà il valore.

Alla fine dei 10 dati, il thread producer inserirà nella pipeline un valore particolare (END) che segnalerà al thread consumer la ricezione di tutti i messaggi, e quindi della chiusura del programma.

Dall’altra parte della pipeline vi è il thread consumer, che implementeremo in maniera seguente.

Il thread consumer legge un messaggio dalla pipeline e per simulare l’inserimento del dato in un database si usa anche qui un tempo casuale tra 1 e 10 secondi. Questo per fare in modo che i tempi di processamento dei dati da rete, alla pipeline per arrivare al database siano simili. Essendo tempi irregolari e diversi tra ricezione e inserimento nel database, ci aspetteremo così l’accumulo nella pipeline di qualche dato in più (message) prima che il thread consumer sia in grado di processarli tutti, inserendoli in un database.

Quando il thread consumer riceverà dalla pipeline il messaggio END, comprenderà che i messaggi da ricevere sono terminati ed uscirà dall’esecuzione.

Definiti i due thread (consumer e produre) definiamo il programma principale. Per prima cosa dovremo attivare la pipeline (pl). Poi con un ThreadPoolExecutor attivare i due thread, assegnando a ciascuno di essi la stessa pipeline.

Ultima cosa che ci è rimasta da definire è proprio la Pipeline. Implementiamola con una classe in un file esterno pipeline.py.

La classe Pipeline ha tre membri:

  • .message che immagazzina il valore da passare (messaggio)
  • .producer_lock è un threading.Lock che restringe l’accesso al messaggio solo al thread producer
  • .consumer_lock è un threading.Lock che restringe l’accesso al messaggio solo al thread consumer

La prima funzione che troviamo definita all’interno della classe è __init__() . La sua funzione è quella di inizializzare i tre membri della classe e poi chiamare acquire() sul consumer_lock. Facendo in questo modo, impostiamo lo stato di partenza da noi desiderato: il producer ha il permesso di aggiungere un nuovo messaggio alla pipeline, mentre il consumer deve attendere.

Nella classe Pipeline, poi sono definiti altri due metodi .get_message() e .set_messages().

.get_message() effettua la chiamata .acquire() sul consumer_lock. In questo modo il consumer attenderà finchè un messaggio non sarà pronto nella Pipeline. Quando poi il consumer avrà acquisito il .consumer_lock, copierà il valore in message. Successivamente chiamerà .release() sul .producer_lock. Il consumer rilascerà quindi il lock, permettendo al producer di inserire un altro messaggio nella pipeline.

Poniamo una particolare attenzione alla parte finale. Si potrebbe esser tentati di disfarsi di message, chiudendo la funzione con return self.message. Ma non dovresti farlo.

Ecco la risposta. Non appena il consumer chiama .producer_lock.release(), si ferma per far ripartire il producer. Ma il producer potrebbe ripartire prima che la funzione .release() restituisca il valore. Se questo dovesse accadere, il consumer restituirebbe un self.message che è in realtà il successivo messaggio generato, perdendo il primo messaggio per sempre. Questo è un esempio di race condition.

Spostandoci su .set_message(), si può notare il lato opposto della transazione. Il producer chiamando questa funzione acquisirà il .producer_lock. Imposta poi .message ed infine chiama .release() sul consumer_lock, permettendo al consumer di leggere quel valore.

Se eseguiamo otterremo un risultato simile al seguente.

Thread in Python - Producer Consumer

Conclusioni

In questa quinta parte sui Thread in Python abbiamo introdotto il modello Producer – Consumer utilizzando i thread e con una Pipeline. Nella prossima parte vedremo come implementare lo stesso modello sostituendo la Pipeline con una Queue.

Lascia un commento

Questo sito usa Akismet per ridurre lo spam. Scopri come i tuoi dati vengono elaborati.