Distributed, zeromq technologies

Load balancing work between Java threads using ZeroMQ

If you ever wrote production multi-threaded server in Java you know how it is difficult to implement load balancing between worker threads. You need to fight many issues to have good load balancer:

  1. You need to limit somehow the number of worker threads, because with unlimited thread pool you can have memory exhausted.
  2. You need to implement sophisticated procedure for clean worker shutdown.
  3. If you are using Executors you might now that they are not provide out of the box solution. You need to run into some tricks to have Executors do the load balancing job right.
  4. And for multi-threaded code you have to use those painful synchronized and locks that make your application lock up and/or degrade performance. And the debugging lock issues is a real pain.

ZeroMQ might tremendously help you with solving all these problems and much more. ZeroMQ is a high-performance asynchronous messaging library. It is native, but it has bindings for many languages and to be fair it is worth the hassle of dealing with native code.

So, let me show you how to implement load balancing in Java using ZeroMQ.

If you are on linux zeromq might be already available for your linux distribution in its repository otherwise you have to download it from here, build and install:
http://www.zeromq.org

After that you will need Java bindings from here, again grab the source, build and install:
https://github.com/zeromq/jzmq
After installing you will have zmq.jar in /usr/share/java

If you are on a Windows the procedure of setting up ZeroMQ is a bit different, but the idea is the same - you need to have two DLLs built - ZeroMQ itself and ZeroMQ JNI and zmq.jar for Java-part of Java bindings, DLLs should be accessible from the PATH.

Create project in your favorite IDE and add zmq.jar into CLASSPATH. Now you are ready to use ZeroMQ.

ZeroMQ has several message sending patterns, we will use PUSH/PULL pattern. In this pattern message "pushed" to endpoint is "pulled" by one of the peers connected to this endpoint. Our main thread will bind to endpoint and our worker threads will be peers connected to the endpoint. Main thread will "push" work to endpoint and workers will "pull" the work from the endpoint at a rate they can.

Our main thread will have the following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
    public void run() {
        try {
            // Create ZeroMQ context
            ZMQ.Context ctx = ZMQ.context(1);
            // Create PUSH socket
            ZMQ.Socket socket = ctx.socket(ZMQ.PUSH);
            // Bind socket to in-process endpoint
            socket.bind("inproc://workers");

            // Create worker threads pool
            Thread threads[] = new Thread[10];
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new WorkerThread(i, ctx);
                threads[i].start();
            }

            // "Send" the work to workers
            for (int i = 0; i < 100; i++) {
                System.out.println("Sending work piece " + i);
                byte[] msg = new byte[1];
                msg[0] = (byte)i;
                socket.send(msg, 0);
            }
            socket.close();

            Thread.sleep(10000);

            // Terminate ZeroMQ context
            ctx.term();

            System.out.println("Total handled " + totalHandled);
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

Main thread creates workers pool and just sends all work pieces to the endpoint, where they are fetched by one of the free worker thread.

The worker thread has the following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class WorkerThread extends Thread {
    private ZMQ.Context ctx;
    private int handled = 0;
    private int threadNo = 0;

    public WorkerThread(int threadNo, ZMQ.Context ctx) {
        super("Worker-" + threadNo);
        this.threadNo = threadNo;
        this.ctx = ctx;
    }

    public void run() {
        try {
            // Create PULL socket
            ZMQ.Socket socket = ctx.socket(ZMQ.PULL);
            // Set high water mark to 2,
            // so that when this peer had 2 messages
            // in its buffer, ZeroMQ skipped to next workers
            socket.setHWM(2);
            // Connect to in-process endpoint
            socket.connect("inproc://workers");

            while (true) {
                byte[] msg;
                try {
                    // Get work piece
                    msg = socket.recv(0);
                } catch (Exception e) {
                    // ZeroMQ throws exception when
                    // the context is terminated
                    socket.close();
                    break;
                }
                handled++;
                totalHandled++;
                System.out.println(getName()
                        + " handled work piece " + msg[0]);
                int sleepTime = (threadNo % 2 == 0) ? 100 : 200;
                // Handle work, by sleeping for some time
                Thread.sleep(sleepTime);
            }
            System.out.println(getName()
                        + " handled count " + handled);
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }
}

Our worker threads have varying speed of doing the work, some threads are faster and some are slower. Faster threads get more work done then slower ones. But how it is achieved?
When messages are pushed into the PUSH socket they get load balanced in round robin fashion among all the connected PULL peers. But if some PULL peer has its pipe full because of hitting the high water mark, ZeroMQ skips this peer temporarily when it does round robin. Thats why faster workers get more work thrown into them.

The additional benefit of this code is that it is essentially supports distributed computing. You can simply change the protocol to tcp://server:port and have this code perfectly working in distributed environment where each worker will be launched at different server of a LAN for example. ZeroMQ will does all the buffering, connection establishment/reestablishment, work nodes appearing/disappearing for you automatically. Isn't this just great?

Full code of Balancer.java application:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import org.zeromq.ZMQ;

public class Balancer {
    private volatile int totalHandled = 0;

    public class WorkerThread extends Thread {
        private ZMQ.Context ctx;
        private int handled = 0;
        private int threadNo = 0;

        public WorkerThread(int threadNo, ZMQ.Context ctx) {
            super("Worker-" + threadNo);
            this.threadNo = threadNo;
            this.ctx = ctx;
        }

        public void run() {
            try {
                // Create PULL socket
                ZMQ.Socket socket = ctx.socket(ZMQ.PULL);
                // Set high water mark to 2,
                // so that when this peer
                // had 2 messages in its buffer,
                // ZeroMQ skipped to next workers
                socket.setHWM(2);
                // Connect to in-process endpoint
                socket.connect("inproc://workers");

                while (true) {
                    byte[] msg;
                    try {
                        // Get work piece
                        msg = socket.recv(0);
                    } catch (Exception e) {
                        // ZeroMQ throws exception
                        //when context is terminated
                        socket.close();
                        break;
                    }
                    handled++;
                    totalHandled++;
                    System.out.println(getName()
                              + " handled work piece " + msg[0]);
                    int sleepTime = (threadNo % 2 == 0) ? 100 : 200;
                    // Handle work, by sleeping for some time
                    Thread.sleep(sleepTime);
                }
                System.out.println(getName()
                              + " handled count " + handled);
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }

    public void run() {
        try {
            // Create ZeroMQ context
            ZMQ.Context ctx = ZMQ.context(1);
            // Create PUSH socket
            ZMQ.Socket socket = ctx.socket(ZMQ.PUSH);
            // Bind socket to in-process endpoint
            socket.bind("inproc://workers");

            // Create worker threads pool
            Thread threads[] = new Thread[10];
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new WorkerThread(i, ctx);
                threads[i].start();
            }

            // "Send" the work to workers
            for (int i = 0; i < 100; i++) {
                System.out.println("Sending work piece " + i);
                byte[] msg = new byte[1];
                msg[0] = (byte)i;
                socket.send(msg, 0);
            }
            socket.close();

            Thread.sleep(10000);

            // Terminate ZeroMQ context
            ctx.term();

            System.out.println("Total handled " + totalHandled);
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Balancer balancer = new Balancer();
        balancer.run();
    }
}

The output of this application is:

Sending work piece 0
Sending work piece 1
Sending work piece 2
Sending work piece 3
Sending work piece 4
Sending work piece 5
Sending work piece 6
Sending work piece 7
Sending work piece 8
Sending work piece 9
Sending work piece 10
Sending work piece 11
Sending work piece 12
Sending work piece 13
Sending work piece 14
Sending work piece 15
Sending work piece 16
Sending work piece 17
Sending work piece 18
worker-0 handled work piece 0
worker-1 handled work piece 1
worker-2 handled work piece 2
worker-3 handled work piece 3
worker-4 handled work piece 4
worker-5 handled work piece 5
worker-6 handled work piece 6
worker-7 handled work piece 7
worker-8 handled work piece 8
Sending work piece 19
Sending work piece 20
Sending work piece 21
Sending work piece 22
Sending work piece 23
Sending work piece 24
Sending work piece 25
Sending work piece 26
Sending work piece 27
Sending work piece 28
Sending work piece 29
Sending work piece 30
worker-9 handled work piece 27
worker-0 handled work piece 9
worker-2 handled work piece 11
Sending work piece 31
worker-4 handled work piece 13
worker-6 handled work piece 15
Sending work piece 32
Sending work piece 33
Sending work piece 34
worker-8 handled work piece 17
Sending work piece 35
worker-1 handled work piece 10
Sending work piece 36
Sending work piece 37
worker-3 handled work piece 12
worker-6 handled work piece 24
worker-2 handled work piece 20
worker-4 handled work piece 22
worker-7 handled work piece 16
worker-5 handled work piece 14
worker-0 handled work piece 18
worker-8 handled work piece 26
Sending work piece 38
Sending work piece 39
Sending work piece 40
Sending work piece 41
Sending work piece 42
Sending work piece 43
Sending work piece 44
worker-9 handled work piece 28
Sending work piece 45
Sending work piece 46
Sending work piece 47
worker-4 handled work piece 31
worker-2 handled work piece 32
worker-6 handled work piece 33
Sending work piece 48
worker-0 handled work piece 30
Sending work piece 49
Sending work piece 50
worker-8 handled work piece 34
worker-1 handled work piece 19
Sending work piece 51
worker-3 handled work piece 21
Sending work piece 52
worker-7 handled work piece 25
worker-5 handled work piece 23
worker-4 handled work piece 41
worker-2 handled work piece 36
worker-6 handled work piece 38
worker-0 handled work piece 39
Sending work piece 53
Sending work piece 54
Sending work piece 55
Sending work piece 56
Sending work piece 57
Sending work piece 58
worker-9 handled work piece 29
Sending work piece 59
worker-8 handled work piece 43
Sending work piece 60
worker-4 handled work piece 47
worker-0 handled work piece 48
worker-6 handled work piece 45
worker-2 handled work piece 46
Sending work piece 61
Sending work piece 62
Sending work piece 63
Sending work piece 64
worker-8 handled work piece 49
Sending work piece 65
Sending work piece 66
worker-1 handled work piece 35
Sending work piece 67
worker-3 handled work piece 37
worker-7 handled work piece 42
worker-5 handled work piece 40
Sending work piece 68
Sending work piece 69
worker-4 handled work piece 54
worker-0 handled work piece 57
worker-6 handled work piece 56
Sending work piece 70
Sending work piece 71
Sending work piece 72
Sending work piece 73
worker-9 handled work piece 44
worker-2 handled work piece 55
Sending work piece 74
worker-8 handled work piece 59
Sending work piece 75
worker-4 handled work piece 60
worker-0 handled work piece 62
worker-6 handled work piece 63
Sending work piece 76
Sending work piece 77
Sending work piece 78
Sending work piece 79
worker-2 handled work piece 61
worker-8 handled work piece 64
Sending work piece 80
Sending work piece 81
worker-3 handled work piece 51
worker-1 handled work piece 50
Sending work piece 82
worker-7 handled work piece 52
worker-5 handled work piece 53
Sending work piece 83
Sending work piece 84
worker-4 handled work piece 69
worker-0 handled work piece 70
Sending work piece 85
worker-6 handled work piece 71
worker-9 handled work piece 58
Sending work piece 86
Sending work piece 87
Sending work piece 88
worker-2 handled work piece 72
Sending work piece 89
Sending work piece 90
worker-8 handled work piece 74
worker-4 handled work piece 75
worker-6 handled work piece 77
Sending work piece 91
Sending work piece 92
Sending work piece 93
worker-0 handled work piece 76
worker-2 handled work piece 78
Sending work piece 94
worker-8 handled work piece 79
Sending work piece 95
Sending work piece 96
worker-1 handled work piece 65
worker-7 handled work piece 67
Sending work piece 97
Sending work piece 98
Sending work piece 99
worker-3 handled work piece 66
worker-5 handled work piece 68
worker-4 handled work piece 84
worker-6 handled work piece 86
worker-9 handled work piece 73
worker-0 handled work piece 85
worker-2 handled work piece 88
worker-8 handled work piece 89
worker-4 handled work piece 90
worker-6 handled work piece 91
worker-0 handled work piece 92
worker-2 handled work piece 93
worker-8 handled work piece 94
worker-1 handled work piece 80
worker-7 handled work piece 82
worker-3 handled work piece 81
worker-5 handled work piece 83
worker-9 handled work piece 87
worker-4 handled work piece 99
worker-1 handled work piece 96
worker-7 handled work piece 97
worker-5 handled work piece 98
worker-3 handled work piece 95
Total handled 100
worker-0 handled count 12
worker-5 handled count 8
worker-1 handled count 8
worker-7 handled count 8
worker-8 handled count 12
worker-3 handled count 8
worker-6 handled count 12
worker-4 handled count 13
worker-2 handled count 12
worker-9 handled count 7

Update: code was fixed to close sockets and work with latest ZeroMQ 2.1.0 version, thanks for comment from Helpful, who pointed out ZeroMQ API changes in latest version that affect code of this example.

Looking to hire a software developer?
Don't hesitate to contact us.

Comments