The Producer-Consumer pattern is a classic design pattern that allows multiple threads to safely share and communicate through a common resource (such as a buffer or queue). The producer thread produces items, while the consumer thread consumes those items. The pattern is often used in multithreaded programming where the producer generates data, and the consumer processes it.
In Java, you can implement this pattern using synchronization mechanisms such as wait()
, notify()
, or higher-level concurrency utilities like BlockingQueue
in the java.util.concurrent
package.
Let’s explore two approaches for implementing the Producer-Consumer pattern in Java: one using manual synchronization and another using the BlockingQueue
class.
1. Implementing the Producer-Consumer Pattern with Manual Synchronization
In this approach, we will use a shared buffer (a list or queue) and synchronize access to it using wait()
and notify()
. The producer will add items to the buffer, and the consumer will remove items from the buffer.
Example: Using wait()
and notify()
import java.util.LinkedList;
import java.util.Queue;
class SharedBuffer {
private final Queue<Integer> buffer = new LinkedList<>();
private final int MAX_SIZE = 5; // Maximum size of the buffer
// Method for producing data
public synchronized void produce(int item) throws InterruptedException {
while (buffer.size() == MAX_SIZE) {
wait(); // Wait if the buffer is full
}
buffer.add(item);
System.out.println("Produced: " + item);
notifyAll(); // Notify consumers that new data is available
}
// Method for consuming data
public synchronized int consume() throws InterruptedException {
while (buffer.isEmpty()) {
wait(); // Wait if the buffer is empty
}
int item = buffer.poll();
System.out.println("Consumed: " + item);
notifyAll(); // Notify producers that space is available
return item;
}
}
class Producer implements Runnable {
private final SharedBuffer buffer;
public Producer(SharedBuffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
int item = 0;
try {
while (true) {
buffer.produce(item++);
Thread.sleep(1000); // Simulate time taken to produce an item
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final SharedBuffer buffer;
public Consumer(SharedBuffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
try {
while (true) {
buffer.consume();
Thread.sleep(1500); // Simulate time taken to consume an item
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerExample {
public static void main(String[] args) {
SharedBuffer buffer = new SharedBuffer();
Thread producerThread = new Thread(new Producer(buffer));
Thread consumerThread = new Thread(new Consumer(buffer));
producerThread.start();
consumerThread.start();
}
}
Explanation:
- SharedBuffer: This is a synchronized shared resource between the producer and consumer. It has methods for producing and consuming items (
produce()
andconsume()
). - Producer: The producer continuously generates items and places them into the shared buffer.
- Consumer: The consumer continuously consumes items from the shared buffer.
- Synchronization: We use
synchronized
to ensure that only one thread (either producer or consumer) can access the shared buffer at a time. Thewait()
andnotifyAll()
methods are used to handle thread communication. - Producer will wait if the buffer is full, and Consumer will wait if the buffer is empty.
- NotifyAll ensures that the waiting threads are notified whenever the buffer is modified.
2. Implementing the Producer-Consumer Pattern with BlockingQueue
The BlockingQueue
class from the java.util.concurrent
package simplifies the implementation of the Producer-Consumer pattern by managing synchronization internally. You can use the ArrayBlockingQueue
, which is a thread-safe, bounded queue that supports blocking operations like put()
and take()
.
Example: Using BlockingQueue
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
class Producer implements Runnable {
private final BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
int item = 0;
try {
while (true) {
queue.put(item++);
System.out.println("Produced: " + item);
Thread.sleep(1000); // Simulate production time
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
class Consumer implements Runnable {
private final BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
int item = queue.take();
System.out.println("Consumed: " + item);
Thread.sleep(1500); // Simulate consumption time
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerWithBlockingQueue {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); // Buffer with capacity of 5
Thread producerThread = new Thread(new Producer(queue));
Thread consumerThread = new Thread(new Consumer(queue));
producerThread.start();
consumerThread.start();
}
}
Explanation:
- BlockingQueue: A thread-safe queue that automatically handles synchronization. We use
ArrayBlockingQueue
, which is a bounded queue with a fixed capacity. - Producer: The producer uses
queue.put(item)
to add items to the queue. If the queue is full, it will block until space is available. - Consumer: The consumer uses
queue.take()
to retrieve items from the queue. If the queue is empty, it will block until an item is available.
Key Differences Between the Two Approaches:
- Manual Synchronization (
wait()
/notify()
):- Requires explicit synchronization and handling of waiting and notifying threads.
- More control over thread interactions, but also more complexity in ensuring that conditions (e.g., full/empty buffer) are managed correctly.
BlockingQueue
:- Simplifies the implementation by providing built-in blocking behavior for both producers and consumers.
- More scalable and easier to use, as it abstracts away the need for manual synchronization.
- Automatically handles full/empty conditions with no explicit
wait()
/notify()
calls.