Managing Configuration of a Distributed System with Apache ZooKeeper

One of the steps towards building a successful distributed software system is establishing effective configuration management. It is a complex engineering process that is responsible for planning, identifying, tracking, and verifying changes in the software and its configuration, as well as maintaining configuration integrity throughout the life cycle of the system.

Let’s consider how to store and manage configuration settings of the entire system and its components using Apache ZooKeeper, a high-performance coordination service for distributed applications.

Configuration of a Distributed System

In this article, we will describe a concept of configuration settings management for the following types of distributed systems or their combination:

  • a server application in a cluster (several instances of the same application are deployed to a clustered environment for load-balancing and/or high-availability support);
  • a set of services with various functionality, which communicate with each other via a common protocol and form a custom software platform.

Generally, configuration items could be arranged by scope in the following groups:

  • global, which are the same for the entire system in any sub-configuration (system name, company website URL, etc.)
  • environment-specific, which may differ between environments: development, test, production (security settings,
    database server URLs, backup settings, etc.)
  • service-specific, which holds settings that are related to the functionality of the service (database constants,
    timeouts, links to external resources, etc.)
  • instance-specific, which is usually responsible for the identification of a specific instance in a cluster (host,
    role in the ensemble, recovery options, and so on)

However, items of the classification described above don’t have clear boundaries. It depends on the system architecture, size, and complexity.

The default way to manage settings is to use configuration files that typically include both common and individual sections. Hence, with the growth of the system scale and complexity, the volume of the unique configuration data increases. At the same time, common configuration entries are being copied between different components, and the risk of their inconsistency across the system grows. Moreover, the situation is often aggravated by the presence of several platform development environments (development, test, production, etc.), which require their own runtime configuration for both system-wide and service-specific settings.

Continuously increasing volume and variability of configuration data in the form of configuration files make the task of ensuring its integrity, scalability, and security quite complex and resource-consuming. In this article, I’m going to show how to use Apache ZooKeeper to design centralized configuration storage for distributed systems as an alternative to file-based solutions.

Apache ZooKeeper as a Centralized Configuration Storage

If you’re new to ZooKeeper, then you’re strongly suggested to take a look at its design concepts and architecture in the
Overview section of
Official Documentation.
The Programmers Guide
would be best if you’re already familiar with ZooKeeper’s fundamentals and looking for a starting point to develop
real-world applications using Apache ZooKeeper.

As you could find out from documentation and articles about ZooKeeper, it acts best as a coordination service for distributed applications. In our concept, ZooKeeper is planned to be used as a centralized configuration data storage. However, despite the fact that ZooKeeper has a data model that looks like a UNIX OS file system (ZNode can be interpreted as a “directory” that can have data associated with it), there are several constraints that limit using ZooKeeper as an ordinary file system:

  • The default ZNode size limit is 1MB. It can be increased by changing the jute.maxbuffer Java system property for all of the communicating ZooKeeper servers and clients. However, it is strongly discouraged to do that because it may cause a performance drop and seriously disrupt the operation of the whole system or even cause its malfunction. And another thing to remember – one megabyte limitation involves not only the ZNode value, but also its key and a list of child node names.
  • All the data is stored in the memory and duplicated on each server in the ZooKeeper ensemble. This fact must be taken
    into account during the planning phase of the system development. The server machine should have enough RAM and the correct JVM heap settings (max. 3/4 of the total amount of memory) to be able to operate properly for the most severe design
    conditions. Disk swapping would degrade ZooKeeper performance significantly.
  • Each write operation is flushed to the disk. Thereby, operations that require extra low latencies or that perform intensive writes of large amounts of data are wrong for ZooKeeper. Also, be aware that write speeds can not be increased by adding new instances to the ensemble; on the contrary, one can observe a slight performance drop on write operations.

On the other hand, there are several advantages of using ZooKeeper for our use case:

  • All required configuration data is in centralized storage, which will help to avoid issues with data integrity. Meanwhile, ZooKeeper won’t be a “single point of failure”, because it allows running several instances in an ensemble. It can survive failures as long as a majority of servers are active.
  • Centralized configuration storage, which is always online, allows for maintaining the dynamic configuration of the distributed system. This results in an ability to adjust system settings without its restart or even an ability to perform system auto-tuning depending on the values of environment metrics. For example, re-adjust the settings of a specific system component when it runs under high load.
  • Flexible control of access to the specific ZNode and its child ZNodes. For instance, this allows for restricting access of employees or services to specific environments/services, protecting config entries from unapproved changes by setting read-only access to them, etc.
  • Out-of-the-box scalable and reliable tool for synchronizing various runtime system metrics.

You know that creating a complex all-purpose system that can do “almost everything” often leads to worse outcomes compared to the set of specialized tools, and even if you’re sure of the final result, the development process becomes very resource-consuming and results in a poor ROI. Considering all the pros and cons, we have to strictly define the boundaries of our system:

  • Individual configuration entries should have a maximum size of KBytes: numeric and text constants, XML/JSON configurations, etc.; large binary resources have to be avoided.
  • System components should avoid time-critical operations with variables and constants at runtime.
  • ZooKeeper doesn’t track changes to specific nodes itself, so you need to have separate backup/version control systems.

Example

Let’s build an application that will show basic examples of communicating with a ZooKeeper ensemble to coordinate configuration information. The system will consist of one running instance of Apache Zookeeper and a simple HTTP service that uses remote configuration to initialize itself on startup and serve several dummy requests. Structure of the system as well as its functionality could be easily extended according to your needs due to the great scalability of the ZooKeeper-based solutions.

Structure of Configuration Data

The structure of the ZooKeeper storage of system configuration data is equivalent to the file system structure of any UNIX-like operating system. In this case, we have several root paths for different kinds of initial and runtime configuration data.

The root structure of the storage of configuration constants is the following:

  • /system/<environment>/ – child ZNodes in this path should store global configuration constants, which are common for all services and nodes of the system for a target environment.
  • /system/<environment>/<service>/ – child ZNodes in this path should store service-specific configuration constants for a target environment.

Below is a description of the placeholders used:

  • <environment> means environment identifier: dev, test, prod, etc.
  • <service> is the name of a service (system component).

Technology Stack

The following software is used to build and run the example:

  • Apache ZooKeeper (3.4.6) – A high-performance coordination service for distributed applications.
  • Scala (2.10.4) – A general-purpose programming language that smoothly integrates features of object-oriented and functional programming paradigms.
  • Apache Curator Framework (2.7.0) – A high-level API library that simplifies using Apache ZooKeeper.
  • Spray (1.3.2) – An open-source toolkit for building REST/HTTP-based integration layers on top of Scala and Akka.
  • Akka (2.3.8) – An asynchronous event-driven middleware framework for building high-performance and reliable distributed applications.
  • SBT (0.13.5) – An interactive build tool.

Initializing ZooKeeper Сlient

The first step to get remote configuration data from ZooKeeper in our HTTP Service is creating a client. We use the capabilities of the Apache Curator Framework for these purposes. It is a pretty intelligent high-level API that adds many features that are built on ZooKeeper and handles the complexity of managing connections to the ZooKeeper cluster and retrying operations.

Below is the code that initializes a client.

/**
   * Creates ZooKeeper remote configuration client.
   *
   * Initializes connection to the ZooKeeper ensemble.
   *
   * @param service service name
   * @param environment system environment
   * @param connectionString connection string; default value is taken from
   *                         local configuration
   * @param connectionTimeout connection timeout; default value is taken from
   *                          local configuration
   * @param sessionTimeout session timeout; default value is taken from local
   *                       configuration
   * @param retryPolicy connection retry policy; default policy retries specified
   *                    number of times with increasing sleep time between retries
   * @param authScheme authentication scheme; null by default
   * @param authData authentication data bytes; null by default
   * @return client instance
   */
  def initZooKeeperClient(service: String,
                          environment: String = "dev",
                          connectionString: String = DefaultConnectionString,
                          connectionTimeout: Int = DefaultConnectionTimeout,
                          sessionTimeout: Int = DefaultSessionTimeout,
                          retryPolicy: RetryPolicy = DefaultRetryPolicy,
                          authScheme: String = null,
                          authData: Array[Byte] = null): CuratorFramework = {
    val lookupClient = CuratorFrameworkFactory.builder()
      .connectString(connectionString)
      .retryPolicy(new retry.RetryOneTime(RetryInterval))
      .buildTemp(LookupClientTimeout, TimeUnit.MILLISECONDS)
    val serviceConfigPath = "/%s/%s".format(environment, service)
    try {
      lookupClient.inTransaction().check().forPath(serviceConfigPath).and().commit()
    } catch {
      case ke: KeeperException => {
        throw new MissingResourceException(
          "Remote configuration for %s service in %s environment is unavailable: %s - %s."
          .format(service, environment, ke.code(), ke.getMessage), "ZNode", serviceConfigPath)
      }
    }

    val client = CuratorFrameworkFactory.builder()
      .connectString(connectionString)
      .connectionTimeoutMs(connectionTimeout)
      .sessionTimeoutMs(sessionTimeout)
      .retryPolicy(retryPolicy)
      .authorization(authScheme, authData)
      .namespace(environment)
      .build()

    try {
      client.start()
      client
    } catch {
      case t: Throwable =>
        throw new RuntimeException("Unable to start ZooKeeper remote configuration client: %s".
          format(t.getLocalizedMessage), t)
    }
  }

Please, notice that we still need some minimal local configuration to be able to establish connection to a ZooKeeper
ensemble and set target environment. There are two options:

1) Local configuration file (application.conf):

# target environment
environment = "dev"

# zookeeper settings
zookeeper {
      # instance(s) of Zookeeper in ensemble
      connectionString = "localhost:2181"

      # connection timeout, in millis
      connectionTimeout = 15000

      # session timeout, in millis
      sessionTimeout = 60000

      # number of connection retries
      retryAttempts = 5

      # interval between connection retries, in millis
      retryInterval = 2000
}

2) JVM settings. They have higher priority than config entries in the file in case of conflicts. For example:
java -Denvironment=test -Dzookeeper.connectionString=localhost:2182,localhost:2183 -jar <path-to-service-jar>

Reading settings

Settings from the ZooKeeper could be obtained at any time after the client is initialized and a connection to the ensemble is established.

The code with the scaladoc below.

/**
   * Retrieves raw value from remote configuration.
   *
   * @param path path to configuration entry; for example, <i>section.subsection.entry</i>
   * @param client ZooKeeper remote config client
   * @return configuration setting value
   */
  def getSetting(path: String)(implicit client: CuratorFramework): Array[Byte] = {
    client.getData.forPath("/%s".format(path.trim.replaceAll("\\.", "/")))
  }

  /**
   * Retrieves optional raw value from remote configuration.
   *
   * @param path path to configuration entry; for example, <i>section.subsection.entry</i>
   * @param client remote configuration client
   * @return optional configuration setting value
   */
  def getOptionalSetting(path: String)(implicit client: CuratorFramework): Option[Array[Byte]] = {
    Try {
      getSetting(path)
    }.toOption
  }

The environment is set in the namespace parameter of the client on the initialization step, so you shouldn’t specify it explicitly in the path to the target znode. You have to specify a relative path to the znode with the target configuration entry. You can use “/” (ZooKeeper default) or “.” (HOCON path separator, which Akka uses in configuration files) as the delimiter between path tokens.

The default data representation of the ZooKeeper client is a byte array. However, it is not the most convenient data format for configuration data to work within an application. Below are the implications of converting data from ZooKeeper to the most popular data types. In addition to this, converters could wrap data in Option. So you’ll get None instead of an exception in case the specified configuration entry does not exist or can not be converted to the target data type. See scaladocs for details.

/**
   * Helps to convert data from ZooKeeper into base data types.
   *
   * @param zData raw data from ZooKeeper
   * @param charset charset to use for data conversion
   */
  implicit class ZDataConverter(val zData: Array[Byte])
    (implicit val charset: Charset = Charsets.UTF_8) {

    /**
     * Converts data from ZooKeeper to string, if possible.
     *
     * @return string value
     */
    def asString: String = new String(zData, charset)

    /**
     * Converts data from ZooKeeper to boolean, if possible.
     *
     * @return boolean value
     * @throws IllegalArgumentException if can not cast value to boolean
     */
    def asBoolean: Boolean = new String(zData, charset).toBoolean

    /**
     * Converts data from ZooKeeper to byte, if possible.
     *
     * @return byte value
     * @throws NumberFormatException if value is not a valid byte
     */
    def asByte: Byte = new String(zData, charset).toByte

    /**
     * Converts data from ZooKeeper to int, if possible.
     *
     * @return int value
     * @throws NumberFormatException if value is not a valid integer
     */
    def asInt: Int = new String(zData, charset).toInt

    /**
     * Converts data from ZooKeeper to long, if possible.
     *
     * @return long value
     * @throws NumberFormatException if value is not a valid long
     */
    def asLong: Long = new String(zData, charset).toLong

    /**
     * Converts data from ZooKeeper to double, if possible.
     *
     * @return double value
     * @throws NumberFormatException if value is not a valid double
     */
    def asDouble: Double = new String(zData, charset).toDouble
  }

  /**
   * Helps to convert optional data from ZooKeeper into base data types and wrap in Option.
   *
   * @param zDataOption data from ZooKeeper, wrapped in Option
   * @param charset charset to use for data conversion
   */
  implicit class ZDataOptionConverter(val zDataOption: Option[Array[Byte]])
    (implicit val charset: Charset = Charsets.UTF_8) {

    /**
     * Converts data from ZooKeeper to optional string, if possible.
     *
     * @return optional string value
     */
    def asOptionalString: Option[String] = zDataOption.map(_.asString)

    /**
     * Converts data from ZooKeeper to optional boolean, if possible.
     *
     * @return optional boolean value
     */
    def asOptionalBoolean: Option[Boolean] = zDataOption.flatMap(v => Try {
      v.asBoolean
    }.toOption)

    /**
     * Converts data from ZooKeeper to optional byte, if possible.
     *
     * @return optional byte value
     */
    def asOptionalByte: Option[Byte] = zDataOption.flatMap(v => Try {
      v.asByte
    }.toOption)

    /**
     * Converts data from ZooKeeper to optional integer, if possible.
     *
     * @return optional int value
     */
    def asOptionalInt: Option[Int] = zDataOption.flatMap(v => Try {
      v.asInt
    }.toOption)

    /**
     * Converts data from ZooKeeper to optional long, if possible.
     *
     * @return optional long value
     */
    def asOptionalLong: Option[Long] = zDataOption.flatMap(v => Try {
      v.asLong
    }.toOption)

    /**
     * Converts data from ZooKeeper to optional double, if possible.
     *
     * @return optional double value
     */
    def asOptionalDouble: Option[Double] = zDataOption.flatMap(v => Try {
      v.asDouble
    }.toOption)
  }

Note 1. Code of the converters, as well as data retrieval methods, are placed in the
com.sysgears.example.config.ZooKeeperConfiguration trait.

Note 2. You can use another charset instead of UTF-8 if you define your own implicit value for the parametercharset in the code of your application.

Below is an example of retrieving configuration values:

    val Service = "example"
    ...
    val host = getSetting("%s.host".format(Service)).asString,
    val port = getSetting("%s.port".format(Service)).asInt
    val maxConnections = getOptionalSetting("%s.db.maxConnections".format(Service))
      .asOptionalInt.getOrElse(10)

Running Example Service

Let’s start an example service to see how it works.

The service itself is pretty simple. On startup, it retrieves basic initialization data from ZooKeeper remote configuration (the host and port to start). And when you access /example the URI, it retrieves some other configuration data from ZooKeeper and displays it on the page.

It is based on the spray-can module of the Spray framework.

Note 3. The source code of the HTTP service actor is located in com.sysgears.example.service.ExampleService class.

Running ZooKeeper ensemble

Note 4. You must have Apache ZooKeeper installed on your machine to proceed. Please refer to the ZooKeeper Getting Started guide for download, installation, and configuration instructions.

You can run a configured ZooKeeper instance on a UNIX system by executing the following commands in a terminal:

$ cd ./<path-to-your-zookeeper-distribution>/bin
$ ./zkServer.sh start

Default distribution also includes a basic command-line interface to access the ZooKeeper instance. Execute the following in
the terminal to run it:

$ cd ./<path-to-your-zookeeper-distribution>/bin
$ ./zkCli.sh -server 127.0.0.1:2181

You’ll see the following string on the screen and be prompted to enter a string if everything goes right:

[zk: localhost:2181(CONNECTED) 0]

Enter help to see the complete list of supported commands:

ZooKeeper -server host:port cmd args
    connect host:port
    get path [watch]
    ls path [watch]
    set path data [version]
    rmr path
    delquota [-n|-b] path
    quit
    printwatches on|off
    create [-s] [-e] path data acl
    stat path [watch]
    close
    ls2 path [watch]
    history
    listquota path
    setAcl path acl
    getAcl path
    sync path
    redo cmdno
    addauth scheme auth
    delete path [version]
    setquota -n|-b val path

Import configuration settings

Using ZooKeeper CLI, execute the following commands to create initial example service configuration data for two
environments: dev and test.

create /system null
create /system/dev null
create /system/dev/db null
create /system/dev/db/host jdbc:mysql://10.10.10.1:3306/
create /system/dev/db/maxConnections 10
create /system/dev/example null
create /system/dev/example/host localhost
create /system/dev/example/port 8081
create /system/dev/example/db null
create /system/dev/example/db/name example
create /system/dev/example/db/user dev
create /system/dev/example/db/password password

create /system/test null
create /system/test/db null
create /system/test/db/host jdbc:mysql://10.10.10.2:3306/
create /system/test/db/maxConnections 50
create /system/test/example null
create /system/test/example/host localhost
create /system/test/example/port 8082
create /system/test/example/db null
create /system/test/example/db/name example
create /system/test/example/db/user test
create /system/test/example/db/password password

Start a service

You need SBT installed in your system to build this example. Refer to its installation instructions for details.

Then you can run the service from its root directory by executing the following command from the terminal:

$ sbt run

Hit the target endpoint

Open your browser and go to https://localhost:8081/example and you’ll see similar content with settings imported from the previous steps.

The source code of the example is available on GitHub.

Hope you find this helpful.

Update: The next part of the article, Managing configuration of a distributed system with Apache ZooKeeper: Loading initial configuration, is available now.

Enjoy reading!

Scala Developer & Technical Lead