A flawless Publisher-Subscriber using BlockingQueue

java.util.concurrent simply rocks. I cant believe how simple it has made every day programming tasks.

What is the first thing your learn when you do multi-threading - a producer consumer. It’s a great example to learn notify, wait and understanding the locking semantics of java threading. Its a pity I started earlier because those of us starting out with java 1.5/6 will have their lives too easy. The BlockingQueue is a fantastic addition to the language and using it one can implement a synchronized multi publisher-multi subscriber system using semantics and constructs no different from java collections.

Here is an example with 5 publishers and 2 subscribers:

package net.ahlawat;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.Date;

/**
 * @author Pranay Ahlawat
 */
public class PubSubTest {
    static class Publisher implements Runnable {
        BlockingQueue<String> queue;
        String name;
        public Publisher(BlockingQueue<String> queue, String name) {
            this.queue = queue;
            this.name = name;
        }

        public void run() {
            while(true) {
                queue.add(String.format("Msg from %s: %s [on %s]", name, Math.random(), new Date()));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    }

    static class Subscriber implements Runnable {
       BlockingQueue<String> queue;
        String name;
        public Subscriber(BlockingQueue<String> queue, String name) {
            this.queue = queue;
            this.name = name;
        }

        public void run() {
            while(true) {
                try {
                    String in = queue.take();
                    System.out.println(String.format("[%s GOT MESSAGE] %s",name,in));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }

            }
        }
    }

    public static void main(String[] args) {
        final int numberOfPublishers = 5;
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
        for (int x=1; x<=numberOfPublishers; x++) {
            Publisher publisher = new Publisher(blockingQueue, x+"");
            new Thread(publisher).start();
        }

        Subscriber subscriber1 = new Subscriber(blockingQueue, "Subscriber 1");
        new Thread(subscriber1).start();
        Subscriber subscriber2 = new Subscriber(blockingQueue, "Subscriber 2");
        new Thread(subscriber2).start();
    }
}

It’s quite straight forward - there are a total of 7 threads interacting with the queue. 5 publishers are putting stuff on the queue and 2 subscribers are picking up stuff from it and creatively printing it out the standard out. What I want you to see is the number of times I have used ’synchronized’ in the code - 0.

The output is not surprising:

[Subscriber 1 GOT MESSAGE] Msg from 4: 0.6466875854315378 [on Fri Dec 11 02:11:27 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 1: 0.33362845358296433 [on Fri Dec 11 02:11:27 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 2: 0.11207796566244055 [on Fri Dec 11 02:11:27 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 3: 0.6810655758824113 [on Fri Dec 11 02:11:27 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 5: 0.5679631128460616 [on Fri Dec 11 02:11:27 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 1: 0.6304440131162121 [on Fri Dec 11 02:11:28 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 2: 0.021117766277559014 [on Fri Dec 11 02:11:28 EST 2009]
[Subscriber 2 GOT MESSAGE] Msg from 3: 0.1955294791717468 [on Fri Dec 11 02:11:28 EST 2009]
[Subscriber 2 GOT MESSAGE] Msg from 4: 0.884529348835637 [on Fri Dec 11 02:11:28 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 5: 0.034690283475101946 [on Fri Dec 11 02:11:28 EST 2009]
[Subscriber 2 GOT MESSAGE] Msg from 1: 0.5764439934861816 [on Fri Dec 11 02:11:29 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 2: 0.3629499102212388 [on Fri Dec 11 02:11:29 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 4: 0.3770428828123388 [on Fri Dec 11 02:11:29 EST 2009]
[Subscriber 2 GOT MESSAGE] Msg from 3: 0.9450938944637225 [on Fri Dec 11 02:11:29 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 5: 0.8910317407643176 [on Fri Dec 11 02:11:29 EST 2009]
[Subscriber 2 GOT MESSAGE] Msg from 1: 0.5785955008786261 [on Fri Dec 11 02:11:30 EST 2009]
[Subscriber 2 GOT MESSAGE] Msg from 4: 0.9442550853581151 [on Fri Dec 11 02:11:30 EST 2009]
[Subscriber 2 GOT MESSAGE] Msg from 3: 0.3308239883343358 [on Fri Dec 11 02:11:30 EST 2009]
[Subscriber 2 GOT MESSAGE] Msg from 5: 0.5450057593023042 [on Fri Dec 11 02:11:30 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 2: 0.13504231409694423 [on Fri Dec 11 02:11:30 EST 2009]
[Subscriber 2 GOT MESSAGE] Msg from 1: 0.1018850869879191 [on Fri Dec 11 02:11:31 EST 2009]
[Subscriber 1 GOT MESSAGE] Msg from 2: 0.7325884278324815 [on Fri Dec 11 02:11:31 EST 2009]
[Subscriber 2 GOT MESSAGE] Msg from 4: 0.8804538983093999 [on Fri Dec 11 02:11:31 EST 2009]
...

Such an elegant solution to a classic problem - the wonderful BlockingQueue …