'Thread synchronization in Grails application using Hazelcast' post illustration

Thread synchronization in Grails application using Hazelcast

avatar

Recently, I have been looking for a way to implement thread synchronization in a Grails application which is hosted on several nodes on Rackspace. And while it's rather easy to synchronize threads within a single servlet container, it might be difficult to do it when the same data is simultaneously updated from separate server nodes. This post shows how this task can be solved with Hazelcast.

Hazelcast

Hazelcast (http://hazelcast.org/) is an open source clustering and data distribution platform for Java, in-memory data grid middleware. Generally speaking, it offers the following features (from the official documentation):

  • distributed implementations of java.util.{Queue, Set, List, Map}, java.util.concurrent.locks.Lock, java.util.concurrent.ExecutorService
  • distributed MultiMap for one-to-many relationships
  • distributed Topic for publish/subscribe messaging
  • distributed Query, MapReduce and Aggregators
  • specification compliant JCache implementation
  • socket level encryption support for secure clusters
  • second level cache provider for Hibernate
  • dynamic HTTP session clustering
  • and some others

But for our solution, we'll only need distributed implementation of java.util.concurrent.locks.Lock.

Grails Hazelcast plugin

As one might expect, there's a Grails plugin able to integrate Grails application with Hazelcast, and it’s called hazelgrails (https://grails.org/plugin/hazelgrails). Some introduction on using this plugin is given in this blog post: http://blog.hazelcast.com/distribute-grails-with-hazelcast/.

The only thing is that the plugin was last time updated in April 2012 and uses Hazelcast library of version 2.0.2 (and v2.5 in the current SNAPSHOT version of the plugin), whilst the latest release version of Hazelcast is 3.5.3. So, when I tried to use this plugin in the application, it didn't work: server nodes did not join into a cluster and hence didn't use distributed locks. Tuning Hazelcast configuration settings didn't help as well. Moreover, after a day of idle with this plugin in a test environment, Tomcat on one of the two nodes has crashed because of a memory leak.

So, let's upgrade the plugin to the newest Hazelcast version and adjust Hazelcast configuration to make it work.

First of all, we should load the hazelgrails as an in-place plugin by copying the plugin to a directory inside of the application and specifying the directory path in the grails-app/conf/BuildConfig.groovy:

BuildConfig.groovy
1
2
3
4
5
// ...
grails.project.dependency.resolution = {
    // ...
}
grails.plugin.location.hazelcast = 'plugins/hazelgrails-0.1'

And upgrade Hazelcast inside the plugin:

plugins/hazelgrails-0.1/dependencies.groovy
1
2
3
4
5
6
// ...
dependencies {
    compile 'com.hazelcast:hazelcast:3.5.3'
    compile 'com.hazelcast:hazelcast-hibernate:3.5.3'
}
// ...

It also turned out that the default Hazelcast configuration has changed since the 3.x version, so we need to update it as well for the plugin. The default configuration is located inside the hazelcast-3.5.3.jar, which can be found in the Ivy cache once all necessary dependencies are resolved and downloaded by Grails. The config should be placed into the plugins/hazelgrails-0.1/grails-app/conf/ directory and named hazelcast.xml.

At this point, the plugin still does not work correctly, because it was written for the 2.0.2 Hazelcast version, so we need to make a few more adjustments. See the comments below:

plugins/hazelgrails-0.1/grails-app/services/hazelgrails/HazelService.groovy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Lock lock(Object lock) {
    instance.getLock(lock) // this method accepts String instead of Object
}

long generateId(String name) {
    Hazelcast.getIdGenerator(name).newId() // this method is not static anymore and
                                           // should be invoked on a Hazelcast instance
}

/*
 * The three methods below use the MultiTask and DistributedTask classes
 * which have been removed since Hazelcast version 3.0.
 * As we really don't need this functionality for our current task,
 * we can simply remove these methods from the service.
 * But if you still need it for some purpose, you can try to rewrite them
 * using new IExecutorService interface, which supports all the functionality
 * of these removed classes. You can find more info here:
 * http://docs.hazelcast.org/docs/3.5/manual/html/upgradingfrom2x.html
 * http://docs.hazelcast.org/docs/3.5/manual/html/distributedcomputing.html#executor-service
 */

Collection executeOnAllMembers(Callable callable) {
    Collection<Member> members = instance.cluster.members
    MultiTask multitask = new MultiTask(callable, members)
    Hazelcast.executorService.execute(multitask)
    return multitask.get()
}

Object executeOnSomewhere(Callable callable) {
    Hazelcast.executorService.submit(callable).get()
}

Object executeOnMemberOwningTheKey(Callable callable, Object key) {
    FutureTask task = new DistributedTask(callable, key)
    Hazelcast.executorService.execute(task)
    return task.get()
}

So, this part of the service should now look like this:

1
2
3
4
5
6
7
Lock lock(String lock) {
    instance.getLock(lock)
}

long generateId(String name) {
    instance.getIdGenerator(name).newId()
}

Also, do not forget to remove all the unused and deprecated classes from imports.

Another minor flaw of the hazelgrails plugin is that it only allows to have a single Hazelcast configuration, so we can't have different configurations per environment. However, this can be solved easily enough by modifying the initialization method of the HazelService in the following way:

HazelService.groovy
1
2
3
4
5
6
@PostConstruct
void init() {
    def config = grailsApplication.config.grails.plugin.hazelcast.conf
    instance = Hazelcast.newHazelcastInstance(
        config ? new ClasspathXmlConfig(config as String) : null)
}

And by specifying the Hazelcast configuration path in the Config.groovy as follows:

Config.groovy
1
2
grails.plugins.hazelcast.conf =
    "hazelcast/hazelcast-${grails.util.Environment.currentEnvironment.name}.xml"

So we can now have a separate Hazelcast config for each environment in the grails-app/conf/hazelcast directory, e.g.: hazelcast-production.xml, hazelcast-staging.xml, hazelcast-development.xml.

Configuring Hazelcast

Let's now configure Hazelcast for our cluster.

Interfaces

Hazelcast supports clustering in one of the two ways - multicast or TCP/IP. Multicast is enabled by default, however it may not be supported in the production environment, so let's configure it for TCP/IP:

hazelcast-production.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<hazelcast>
    ...
    <network>
        ...
        <join>
            <multicast enabled="false">
                <multicast-group>224.2.2.3</multicast-group>
                <multicast-port>54327</multicast-port>
            </multicast>
            <tcp-ip enabled="true">
                <member>10.181.227.23</member>
                <member>10.181.229.117</member>
            </tcp-ip>
        </join>
        ...
    </network>
    ...
</hazelcast>

Here we have two server nodes in the cluster, so their IPs are added to the cluster member list.

If your server nodes have more than one network interface, you may also need to specify which ones Hazelcast should use. Otherwise, this can be omitted. For our cluster, we can use the following wildcard configuration:

1
2
3
4
5
6
7
8
9
10
<hazelcast>
    ...
    <network>
        ...
        <interfaces enabled="true">
            <interface>10.181.*.*</interface>
        </interfaces>
    </network>
    ...
</hazelcast>

Ports

Let's now configure ports used by Hazelcast. First of all, this is a port used to communicate between cluster members. By default, Hazelcast tries to find free ports between 5701 and 5801. This could be useful if we'd have several Hazelcast instances on the same machine, but it’s not the case for us, so let's just set it to a single port:

1
2
3
<network>
    <port auto-increment="false">5701</port>
</network>

Secondly, we need to configure outbound ports used during socket bind operation. By default, an ephemeral port is picked up, but security policies/firewalls may require it to be restricted.

1
2
3
4
5
<network>
    <outbound-ports>
        <ports>33000-35000</ports>
    </outbound-ports>
</network>

Let's then open these ports on every node:

1
2
iptables -A INPUT -p tcp -d 0/0 -s 0/0 --dport 5701 -j ACCEPT
iptables -A OUTPUT -p TCP --dport 33000:35000 -m state --state NEW -j ACCEPT

In addition, it can be useful to set the reuse-address property, which allows to ignore the TIME_WAIT state of the server socket port when starting a cluster member right after shutting it down:

1
2
3
<network>
    <reuse-address>true</reuse-address>
</network>

Logging

The last thing we need to configure for Hazelcast is logging. Firstly, let’s add the following into the log4j configuration in the Config.groovy:

Config.groovy
1
2
3
4
5
log4j = {
    // ...
    info 'com.hazelcast'
    // ...
}

Secondly, let’s turn off Hazelcast’s health monitor logging. Health monitor periodically prints logs about any related internal metrics when Hazelcast is under load (i.e., memory usage is above threshold percentage or process/CPU load is above threshold). This info is not really useful for us and would only pollute the logs, so it can be disabled in the following way:

hazelcast.xml
1
2
3
4
5
6
<hazelcast>
    ...
    <properties>
        <property name="hazelcast.health.monitoring.level">OFF</property>
    </properties>
</hazelcast>

Possible values are:

  • SILENT (is set by default; logs are printed only when values exceed some predefined threshold);
  • NOISY (logs are always printed periodically);
  • OFF (logs are turned off).

Implementing thread synchronization

Now that we have installed and configured Hazelcast, we can start implementing thread synchronization in our application. Let's do it in three steps:

  1. Implement thread synchronization as if the application would be hosted on a single node, i.e. using standard Java/Groovy ways.
  2. Enable Hazelcast to get the synchronization work across multiple server nodes as well as on a single node.
  3. Add an ability to enable/disable Hazelcast using a configuration option and make sure the synchronization works in both states.

Java synchronization

On the first stage, we are going to implement thread synchronization on a currently logged-in user using regular Java tools (specifically, the synchronized keyword).

So, let’s assume that user authentication/authorization is already implemented in our application (the Spring Security or Shiro Grails plugins are usually used for this purpose). Let’s also assume that there’s some UserService.groovy that allows to get instance of the currently logged-in user:

1
userService.getUser()

In complex web applications, it’s a common scenario when the same user instance is simultaneously updated from different places. For example, we can have:

  • a method that takes a payment from user’s credit card and subscribes the user to a mail list;
  • a webhook that receives events from a payment platform (e.g., Stripe) and updates some payerStatus field of the user;
  • a webhook for an email platform (e.g., SendGrid), which receives events from it and updates another field of the same user (for instance, subscriberStatus).

When these webhooks are fired at the same time, it can lead to a situation when one of them fails with the following exception:

Could not synchronize database state with session org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect): [User#21]

To avoid this, we need to get these threads to be executed synchronously for the same user. Apparently, we need to use user id for this purpose for the system to know when the same user is updated. Of course it wouldn’t work if the threads were just synchronized directly on user id, because we’d have two different monitor objects when getting the id from two different threads. Fortunately, Java has a simple way to get monitor object based on id, using the String::intern() method. It returns a canonical representation for the invoking string object from a pool of strings maintained by the String class. So, for any two equal strings, the intern() method returns the same object from the pool. It allows us to synchronize threads in the following way:

1
2
3
synchronized (user.id.toString().intern()) {
    // ...
}

Let’s now add an ability to safely synchronize threads on instances of any domain object by adding the getMonitorObject() method to every domain from BootStrap.groovy:

BootStrap.groovy
1
2
3
4
5
grailsApplication.domainClasses.each { DefaultGrailsDomainClass domain ->
    domain.metaClass.getMonitorObject << {
        "${domain.name}_${delegate.id.toString()}".intern()
    }
}

Cluster-wide synchronization

At this point, we already have an ability to synchronize threads on the currently logged-in user within a single servlet container:

1
2
3
synchronized (userService.user.monitorObject) {
    // ...
}

However, it surely won’t work if several threads simultaneously update the same user from different server nodes, so let’s enable Hazelcast to get the synchronization work in a cluster.

To do this, we need to use the HazelService::lock(String) method which allows to get a cluster-wide lock for the specified string (which is used as a key for this lock):

1
2
3
4
5
6
7
def lock = hazelService.lock(userService.user.monitorObject)
lock.lock()
try {
    // ...
} finally {
    lock.unlock()
}

For better convenience, let’s put all this code into a separate method of the HazelService. We can write it right inside the HazelService.groovy and give it a name synchronize. Or, alternatively, we can add it in BootStrap.groovy using Groovy metaprogramming capabilities which allow to name the method synchronized, just like the standard Java keyword:

BootStrap.groovy
1
2
3
4
5
6
7
8
9
HazelService.metaClass.synchronized << { String monitorObject, Closure c ->
    def lock = delegate.lock(monitorObject)
    lock.lock()
    try {
        c.call()
    } finally {
        lock.unlock()
    }
}

The method accepts a monitor object and a closure which needs to be executed synchronously.

Configuration option to enable/disable Hazelcast

Let’s now add a configuration option that would allow us to easily disable and enable Hazelcast whenever we need. For instance, we may need to disable it for the development environment.

Config.groovy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ...
grails {
    plugin {
        hazelcast {
            enabled = true
            conf = "hazelcast/hazelcast-${Environment.currentEnvironment.name}.xml"
        }
    }
}
// ...
environments {
    development {
        grails.plugin.hazelcast.enabled = false
    }
}
// ...

Now let’s tell the application not to create a Hazelcast instance if Hazelcast is disabled in config:

HazelService.groovy
1
2
3
4
5
6
7
8
@PostConstruct
void init() {
    def config = grailsApplication.config.grails.plugin.hazelcast
    if (config.enabled == true) {
        instance = Hazelcast.newHazelcastInstance(
            config.conf ? new ClasspathXmlConfig(config.conf as String) : null)
    }
}

...and to use the standard Java synchronization in this case:

BootStrap.groovy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
HazelService.metaClass.synchronized << { String monitorObject, Closure c ->
    if (grailsApplication.config.grails.plugin.hazelcast.enabled == true) {
        def lock = delegate.lock(monitorObject)
        lock.lock()
        try {
            c.call()
        } finally {
            lock.unlock()
        }
    } else {
        synchronized (monitorObject) {
            c.call()
        }
    }
}

Final recommendations

Now our application is able to synchronize threads on an instance of the currently logged-in user (or any other domain instance stored in the database) in a cluster as well as on a single node. Nevertheless, in order to create bullet-proof thread synchronization, transaction isolation concepts must be applied properly, so there are some more recommendations:

  1. Transaction should be opened inside a synchronized block, not conversely.
  2. Data should be refreshed from the database inside a synchronized block before transaction is started.
  3. Hibernate session should be flushed inside a synchronized block.

By adhering these rules we ensure that our synchronized threads will "see" any changes made to a distributed data by other threads.

Merry Christmas and Happy New Year 2016!

If you're looking for a developer or considering starting a new project,
we are always ready to help!