Thread in Python – the Producer – Consumer model (part 5)

In this new article we will extend the concept of threading with a model widely used in software engineering: the Producer-Consumer model that we will implement using two threads. In particular we will develop a Pipeline for internal communication between the two threads.


Producer-Consumer Threading

The Producer-Consumer problem is a standard and very common problem in the world of programming. This problem is highlighted when you need to manage the synchronization of processes in general, including threads.

Imagine the case in which a program must receive (not read) data from a resource (such as the network). The program will not receive the data at the time of the request (in fact it is not a read), but will wait for the reception while listening. Data from the resource is not issued on a regular basis. We identify this resource, or data source as a producer.

On the other side there is the program that listens, which once obtained the data from the resource, must send them (not write) to another resource, such as a database. Access to the database will not be instantaneous, but we take into account that it is fast enough to receive all the data coming from the program in a specific and reasonable period of time. The program will therefore sometimes have to wait for the database to be ready to receive data. This part of the program will identify you as a consumer.

We therefore have two entities, Producer and Consumer, three of which will create a Pipeline, which will have the task of managing the data between them.

Producer-Consumer model using Locks

Since we are talking about threading, let’s try to solve this problem using Locks.

As we said earlier, we have a thread producer that receives data from the network and places it in a pipeline. To simulate the network we will create a fake network.

We import the random module and the time module to have random values and simulate the indeterminacy of data reception by the network, combining them we will have random time periods. Then the concurrent.futures module for the ThreadPoolExecutor that we will use as a context manager for thread management (as we saw earlier). Finally we will import the Pipeline class from a file that we will implement later.

 import random
 import concurrent.futures
 import time
 from pipeline import Pipeline 

We define a generic END object that we will use to signal the end of message reception. This value will be generated by the Producer to indicate the end of the messages received, and inserted in the Pipeline. When the Consumer reads this END value from the Pipeline, it will interrupt the program.

 END = object() 

First, therefore, we implement the thread producer that takes a pipeline (pl) as its argument.

 def producer(pl):
   for index in range(10):
     message = random.randint(1, 101) 
     print("Producer received data: ", message, " (", index+1, " of 10 )")
     pl.set_message(message, "Producer")
   pl.set_message(END, "Producer") 

The thread will have to simulate the reception of 10 data (random numbers from 1 to 100) that will be acquired in non-regular times (using time.sleep with a random time number between 1 and 10 seconds). In this way the thread can receive a series of messages in variable times.

The received data is entered in message and then sent to the pipeline using the .set_message() function. It will then be from the pipeline that the consumer thread will receive the value.

At the end of the 10 data, the producer thread will insert in the pipeline a particular value (END) that will signal to the consumer thread the reception of all the messages, and therefore of the closure of the program.

On the other side of the pipeline is the consumer thread, which we will implement in the following way.

 def consumer(pl):
     message = 0
     while message is not END:
         message = pl.get_message("Consumer")
         if message is not END:
             print("Consumer stored data: ", message) 

The consumer thread reads a message from the pipeline and to simulate inserting the data into a database, a random time between 1 and 10 seconds is also used here. This is so that the processing times of data from the network, to the pipeline to get to the database are similar. Since irregular and different times between reception and insertion in the database, we will thus expect the accumulation in the pipeline of some extra data (message) before the consumer thread is able to process them all, inserting them in a database.

When the consumer thread receives the END message from the pipeline, it will understand that the messages to be received are terminated and will exit execution.

Once the two threads (consumer and product) are defined, we define the main program. First we will have to activate the pipeline (pl). Then with a ThreadPoolExecutor activate the two threads, assigning to each of them the same pipeline.

 pl = Pipeline()
 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
   executor.submit(producer, pl)
   executor.submit(consumer, pl) 

The last thing we have left to define is the Pipeline. Implement it with a class in an external file.

 import threading
 class Pipeline:
   def __init__(self):
     self.message = 0
     self.producer_lock = threading.Lock()
     self.consumer_lock = threading.Lock()
   def get_message(self, name):
     message = self.message
     return message
   def set_message(self, message, name):
     self.message = message

The Pipeline class has three members:

  • message that stores the value to be passed (message)
  • producer_lock is a threading.Lock that restricts access to the message only to the producer thread
  • consumer_lock is a threading.Lock that restricts access to the message only to the consumer thread

The first function we find defined within the class is __init__(). Its function is to initialize the three members of the class and then call acquire() on consumer_lock. Doing it this way, we set the starting state we want: the producer is allowed to add a new message to the pipeline, while the consumer has to wait.

In the Pipeline class, then two other methods are defined .get_message() and .set_messages().

.get_message() calls the .acquire() method on the consumer_lock. In this way the consumer will wait until a message is ready in the Pipeline. Then when the consumer has acquired the .consumer_lock, it will copy the value in message. Next it will call .release() on the .producer_lock. The consumer will then release the lock, allowing the producer to insert another message into the pipeline.

We pay particular attention to the final part. You might be tempted to get rid of message, closing the function with return self.message. But you shouldn’t do it.

Here is the answer. As soon as the consumer calls .producer_lock.release(), it stops to restart the producer. But the producer could restart before the .release() function returns the value. Should this happen, the consumer would return a self.message which is actually the next message generated, losing the first message forever. This is an example of a race condition.

Moving to .set_message(), you can see the opposite side of the transaction. The producer calling this function will acquire the .producer_lock. Then set .message and finally call .release() on the consumer_lock, allowing the consumer to read that value.

If we execute we will obtain a result similar to the following.

Thread in Python - Producer Consumer


In this fifth part on Python Threads we introduced the Producer – Consumer model using threads and with a Pipeline. In the next part we will see how to implement the same model by replacing the Pipeline with a Queue.

Leave a Reply