If you ever wrote production multi-threaded server in Java you know how it is difficult to implement load balancing between worker threads. You need to fight many issues to have good load balancer:
- You need to limit somehow the number of worker threads, because with unlimited thread pool you can have memory exhausted.
- You need to implement sophisticated procedure for clean worker shutdown.
- If you are using Executors you might now that they are not provide out of the box solution. You need to run into some tricks to have Executors do the load balancing job right.
- And for multi-threaded code you have to use those painful synchronized and locks that make your application lock up and/or degrade performance. And the debugging lock issues is a real pain.
ZeroMQ might tremendously help you with solving all these problems and much more. ZeroMQ is a high-performance asynchronous messaging library. It is native, but it has bindings for many languages and to be fair it is worth the hassle of dealing with native code.
So, let me show you how to implement load balancing in Java using ZeroMQ.
If you are on linux zeromq might be already available for your linux distribution in its repository otherwise you have to download it from here, build and install:
http://www.zeromq.org
After that you will need Java bindings from here, again grab the source, build and install:
https://github.com/zeromq/jzmq
After installing you will have zmq.jar in /usr/share/java
If you are on a Windows the procedure of setting up ZeroMQ is a bit different, but the idea is the same – you need to have two DLLs built – ZeroMQ itself and ZeroMQ JNI and zmq.jar for Java-part of Java bindings, DLLs should be accessible from the PATH.
Create project in your favorite IDE and add zmq.jar into CLASSPATH. Now you are ready to use ZeroMQ.
ZeroMQ has several message sending patterns, we will use PUSH/PULL pattern. In this pattern message “pushed” to endpoint is “pulled” by one of the peers connected to this endpoint. Our main thread will bind to endpoint and our worker threads will be peers connected to the endpoint. Main thread will “push” work to endpoint and workers will “pull” the work from the endpoint at a rate they can.
Our main thread will have the following code:
public void run() {
try {
// Create ZeroMQ context
ZMQ.Context ctx = ZMQ.context(1);
// Create PUSH socket
ZMQ.Socket socket = ctx.socket(ZMQ.PUSH);
// Bind socket to in-process endpoint
socket.bind("inproc://workers");
// Create worker threads pool
Thread threads[] = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new WorkerThread(i, ctx);
threads[i].start();
}
// "Send" the work to workers
for (int i = 0; i < 100; i++) {
System.out.println("Sending work piece " + i);
byte[] msg = new byte[1];
msg[0] = (byte)i;
socket.send(msg, 0);
}
socket.close();
Thread.sleep(10000);
// Terminate ZeroMQ context
ctx.term();
System.out.println("Total handled " + totalHandled);
} catch (Throwable t) {
t.printStackTrace();
}
}
Code language: JavaScript (javascript)
Main thread creates workers pool and just sends all work pieces to the endpoint, where they are fetched by one of the free worker thread.
The worker thread has the following code:
public class WorkerThread extends Thread {
private ZMQ.Context ctx;
private int handled = 0;
private int threadNo = 0;
public WorkerThread(int threadNo, ZMQ.Context ctx) {
super("Worker-" + threadNo);
this.threadNo = threadNo;
this.ctx = ctx;
}
public void run() {
try {
// Create PULL socket
ZMQ.Socket socket = ctx.socket(ZMQ.PULL);
// Set high water mark to 2,
// so that when this peer had 2 messages
// in its buffer, ZeroMQ skipped to next workers
socket.setHWM(2);
// Connect to in-process endpoint
socket.connect("inproc://workers");
while (true) {
byte[] msg;
try {
// Get work piece
msg = socket.recv(0);
} catch (Exception e) {
// ZeroMQ throws exception when
// the context is terminated
socket.close();
break;
}
handled++;
totalHandled++;
System.out.println(getName()
+ " handled work piece " + msg[0]);
int sleepTime = (threadNo % 2 == 0) ? 100 : 200;
// Handle work, by sleeping for some time
Thread.sleep(sleepTime);
}
System.out.println(getName()
+ " handled count " + handled);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
Code language: PHP (php)
Our worker threads have varying speed of doing the work, some threads are faster and some are slower. Faster threads get more work done then slower ones. But how it is achieved?
When messages are pushed into the PUSH socket they get load balanced in round robin fashion among all the connected PULL peers. But if some PULL peer has its pipe full because of hitting the high water mark, ZeroMQ skips this peer temporarily when it does round robin. Thats why faster workers get more work thrown into them.
The additional benefit of this code is that it is essentially supports distributed computing. You can simply change the protocol to tcp://server:port and have this code perfectly working in distributed environment where each worker will be launched at different server of a LAN for example. ZeroMQ will does all the buffering, connection establishment/reestablishment, work nodes appearing/disappearing for you automatically. Isn’t this just great?
Full code of Balancer.java application:
import org.zeromq.ZMQ;
public class Balancer {
private volatile int totalHandled = 0;
public class WorkerThread extends Thread {
private ZMQ.Context ctx;
private int handled = 0;
private int threadNo = 0;
public WorkerThread(int threadNo, ZMQ.Context ctx) {
super("Worker-" + threadNo);
this.threadNo = threadNo;
this.ctx = ctx;
}
public void run() {
try {
// Create PULL socket
ZMQ.Socket socket = ctx.socket(ZMQ.PULL);
// Set high water mark to 2,
// so that when this peer
// had 2 messages in its buffer,
// ZeroMQ skipped to next workers
socket.setHWM(2);
// Connect to in-process endpoint
socket.connect("inproc://workers");
while (true) {
byte[] msg;
try {
// Get work piece
msg = socket.recv(0);
} catch (Exception e) {
// ZeroMQ throws exception
//when context is terminated
socket.close();
break;
}
handled++;
totalHandled++;
System.out.println(getName()
+ " handled work piece " + msg[0]);
int sleepTime = (threadNo % 2 == 0) ? 100 : 200;
// Handle work, by sleeping for some time
Thread.sleep(sleepTime);
}
System.out.println(getName()
+ " handled count " + handled);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
public void run() {
try {
// Create ZeroMQ context
ZMQ.Context ctx = ZMQ.context(1);
// Create PUSH socket
ZMQ.Socket socket = ctx.socket(ZMQ.PUSH);
// Bind socket to in-process endpoint
socket.bind("inproc://workers");
// Create worker threads pool
Thread threads[] = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new WorkerThread(i, ctx);
threads[i].start();
}
// "Send" the work to workers
for (int i = 0; i < 100; i++) {
System.out.println("Sending work piece " + i);
byte[] msg = new byte[1];
msg[0] = (byte)i;
socket.send(msg, 0);
}
socket.close();
Thread.sleep(10000);
// Terminate ZeroMQ context
ctx.term();
System.out.println("Total handled " + totalHandled);
} catch (Throwable t) {
t.printStackTrace();
}
}
public static void main(String[] args) {
Balancer balancer = new Balancer();
balancer.run();
}
}
Code language: PHP (php)
The output of this application is:
Sending work piece 0 Sending work piece 1 Sending work piece 2 Sending work piece 3 Sending work piece 4 Sending work piece 5 Sending work piece 6 Sending work piece 7 Sending work piece 8 Sending work piece 9 Sending work piece 10 Sending work piece 11 Sending work piece 12 Sending work piece 13 Sending work piece 14 Sending work piece 15 Sending work piece 16 Sending work piece 17 Sending work piece 18 worker-0 handled work piece 0 worker-1 handled work piece 1 worker-2 handled work piece 2 worker-3 handled work piece 3 worker-4 handled work piece 4 worker-5 handled work piece 5 worker-6 handled work piece 6 worker-7 handled work piece 7 worker-8 handled work piece 8 Sending work piece 19 Sending work piece 20 Sending work piece 21 Sending work piece 22 Sending work piece 23 Sending work piece 24 Sending work piece 25 Sending work piece 26 Sending work piece 27 Sending work piece 28 Sending work piece 29 Sending work piece 30 worker-9 handled work piece 27 worker-0 handled work piece 9 worker-2 handled work piece 11 Sending work piece 31 worker-4 handled work piece 13 worker-6 handled work piece 15 Sending work piece 32 Sending work piece 33 Sending work piece 34 worker-8 handled work piece 17 Sending work piece 35 worker-1 handled work piece 10 Sending work piece 36 Sending work piece 37 worker-3 handled work piece 12 worker-6 handled work piece 24 worker-2 handled work piece 20 worker-4 handled work piece 22 worker-7 handled work piece 16 worker-5 handled work piece 14 worker-0 handled work piece 18 worker-8 handled work piece 26 Sending work piece 38 Sending work piece 39 Sending work piece 40 Sending work piece 41 Sending work piece 42 Sending work piece 43 Sending work piece 44 worker-9 handled work piece 28 Sending work piece 45 Sending work piece 46 Sending work piece 47 worker-4 handled work piece 31 worker-2 handled work piece 32 worker-6 handled work piece 33 Sending work piece 48 worker-0 handled work piece 30 Sending work piece 49 Sending work piece 50 worker-8 handled work piece 34 worker-1 handled work piece 19 Sending work piece 51 worker-3 handled work piece 21 Sending work piece 52 worker-7 handled work piece 25 worker-5 handled work piece 23 worker-4 handled work piece 41 worker-2 handled work piece 36 worker-6 handled work piece 38 worker-0 handled work piece 39 Sending work piece 53 Sending work piece 54 Sending work piece 55 Sending work piece 56 Sending work piece 57 Sending work piece 58 worker-9 handled work piece 29 Sending work piece 59 worker-8 handled work piece 43 Sending work piece 60 worker-4 handled work piece 47 worker-0 handled work piece 48 worker-6 handled work piece 45 worker-2 handled work piece 46 Sending work piece 61 Sending work piece 62 Sending work piece 63 Sending work piece 64 worker-8 handled work piece 49 Sending work piece 65 Sending work piece 66 worker-1 handled work piece 35 Sending work piece 67 worker-3 handled work piece 37 worker-7 handled work piece 42 worker-5 handled work piece 40 Sending work piece 68 Sending work piece 69 worker-4 handled work piece 54 worker-0 handled work piece 57 worker-6 handled work piece 56 Sending work piece 70 Sending work piece 71 Sending work piece 72 Sending work piece 73 worker-9 handled work piece 44 worker-2 handled work piece 55 Sending work piece 74 worker-8 handled work piece 59 Sending work piece 75 worker-4 handled work piece 60 worker-0 handled work piece 62 worker-6 handled work piece 63 Sending work piece 76 Sending work piece 77 Sending work piece 78 Sending work piece 79 worker-2 handled work piece 61 worker-8 handled work piece 64 Sending work piece 80 Sending work piece 81 worker-3 handled work piece 51 worker-1 handled work piece 50 Sending work piece 82 worker-7 handled work piece 52 worker-5 handled work piece 53 Sending work piece 83 Sending work piece 84 worker-4 handled work piece 69 worker-0 handled work piece 70 Sending work piece 85 worker-6 handled work piece 71 worker-9 handled work piece 58 Sending work piece 86 Sending work piece 87 Sending work piece 88 worker-2 handled work piece 72 Sending work piece 89 Sending work piece 90 worker-8 handled work piece 74 worker-4 handled work piece 75 worker-6 handled work piece 77 Sending work piece 91 Sending work piece 92 Sending work piece 93 worker-0 handled work piece 76 worker-2 handled work piece 78 Sending work piece 94 worker-8 handled work piece 79 Sending work piece 95 Sending work piece 96 worker-1 handled work piece 65 worker-7 handled work piece 67 Sending work piece 97 Sending work piece 98 Sending work piece 99 worker-3 handled work piece 66 worker-5 handled work piece 68 worker-4 handled work piece 84 worker-6 handled work piece 86 worker-9 handled work piece 73 worker-0 handled work piece 85 worker-2 handled work piece 88 worker-8 handled work piece 89 worker-4 handled work piece 90 worker-6 handled work piece 91 worker-0 handled work piece 92 worker-2 handled work piece 93 worker-8 handled work piece 94 worker-1 handled work piece 80 worker-7 handled work piece 82 worker-3 handled work piece 81 worker-5 handled work piece 83 worker-9 handled work piece 87 worker-4 handled work piece 99 worker-1 handled work piece 96 worker-7 handled work piece 97 worker-5 handled work piece 98 worker-3 handled work piece 95 Total handled 100 worker-0 handled count 12 worker-5 handled count 8 worker-1 handled count 8 worker-7 handled count 8 worker-8 handled count 12 worker-3 handled count 8 worker-6 handled count 12 worker-4 handled count 13 worker-2 handled count 12 worker-9 handled count 7
Update: code was fixed to close sockets and work with latest ZeroMQ 2.1.0 version, thanks for comment from Helpful, who pointed out ZeroMQ API changes in latest version that affect code of this example.