A flawless Publisher-Subscriber using BlockingQueue
java.util.concurrent simply rocks. I cant believe how simple it has made every day programming tasks.
What is the first thing your learn when you do multi-threading - a producer consumer. It’s a great example to learn notify, wait and understanding the locking semantics of java threading. Its a pity I started earlier because those of us starting out with java 1.5/6 will have their lives too easy. The BlockingQueue is a fantastic addition to the language and using it one can implement a synchronized multi publisher-multi subscriber system using semantics and constructs no different from java collections.
Here is an example with 5 publishers and 2 subscribers:
package net.ahlawat;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.Date;
/**
* @author Pranay Ahlawat
*/
public class PubSubTest {
static class Publisher implements Runnable {
BlockingQueue<String> queue;
String name;
public Publisher(BlockingQueue<String> queue, String name) {
this.queue = queue;
this.name = name;
}
public void run() {
while(true) {
queue.add(String.format("Msg from %s: %s [on %s]", name, Math.random(), new Date()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
static class Subscriber implements Runnable {
BlockingQueue<String> queue;
String name;
public Subscriber(BlockingQueue<String> queue, String name) {
this.queue = queue;
this.name = name;
}
public void run() {
while(true) {
try {
String in = queue.take();
System.out.println(String.format("[%s GOT MESSAGE] %s",name,in));
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
public static void main(String[] args) {
final int numberOfPublishers = 5;
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
for (int x=1; x<=numberOfPublishers; x++) {
Publisher publisher = new Publisher(blockingQueue, x+"");
new Thread(publisher).start();
}
Subscriber subscriber1 = new Subscriber(blockingQueue, "Subscriber 1");
new Thread(subscriber1).start();
Subscriber subscriber2 = new Subscriber(blockingQueue, "Subscriber 2");
new Thread(subscriber2).start();
}
}
It’s quite straight forward - there are a total of 7 threads interacting with the queue. 5 publishers are putting stuff on the queue and 2 subscribers are picking up stuff from it and creatively printing it out the standard out. What I want you to see is the number of times I have used ’synchronized’ in the code - 0.
The output is not surprising:
[Subscriber 1 GOT MESSAGE] Msg from 4: 0.6466875854315378 [on Fri Dec 11 02:11:27 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 1: 0.33362845358296433 [on Fri Dec 11 02:11:27 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 2: 0.11207796566244055 [on Fri Dec 11 02:11:27 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 3: 0.6810655758824113 [on Fri Dec 11 02:11:27 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 5: 0.5679631128460616 [on Fri Dec 11 02:11:27 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 1: 0.6304440131162121 [on Fri Dec 11 02:11:28 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 2: 0.021117766277559014 [on Fri Dec 11 02:11:28 EST 2009] [Subscriber 2 GOT MESSAGE] Msg from 3: 0.1955294791717468 [on Fri Dec 11 02:11:28 EST 2009] [Subscriber 2 GOT MESSAGE] Msg from 4: 0.884529348835637 [on Fri Dec 11 02:11:28 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 5: 0.034690283475101946 [on Fri Dec 11 02:11:28 EST 2009] [Subscriber 2 GOT MESSAGE] Msg from 1: 0.5764439934861816 [on Fri Dec 11 02:11:29 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 2: 0.3629499102212388 [on Fri Dec 11 02:11:29 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 4: 0.3770428828123388 [on Fri Dec 11 02:11:29 EST 2009] [Subscriber 2 GOT MESSAGE] Msg from 3: 0.9450938944637225 [on Fri Dec 11 02:11:29 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 5: 0.8910317407643176 [on Fri Dec 11 02:11:29 EST 2009] [Subscriber 2 GOT MESSAGE] Msg from 1: 0.5785955008786261 [on Fri Dec 11 02:11:30 EST 2009] [Subscriber 2 GOT MESSAGE] Msg from 4: 0.9442550853581151 [on Fri Dec 11 02:11:30 EST 2009] [Subscriber 2 GOT MESSAGE] Msg from 3: 0.3308239883343358 [on Fri Dec 11 02:11:30 EST 2009] [Subscriber 2 GOT MESSAGE] Msg from 5: 0.5450057593023042 [on Fri Dec 11 02:11:30 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 2: 0.13504231409694423 [on Fri Dec 11 02:11:30 EST 2009] [Subscriber 2 GOT MESSAGE] Msg from 1: 0.1018850869879191 [on Fri Dec 11 02:11:31 EST 2009] [Subscriber 1 GOT MESSAGE] Msg from 2: 0.7325884278324815 [on Fri Dec 11 02:11:31 EST 2009] [Subscriber 2 GOT MESSAGE] Msg from 4: 0.8804538983093999 [on Fri Dec 11 02:11:31 EST 2009] ...
Such an elegant solution to a classic problem - the wonderful BlockingQueue …
