Published on

zookeeper

Let's say you are building a distributed system that processes large amounts of data. They are scalable, available, resistant to failures, provide higher throughput etc. (Distributed systems are awesome!)

These benefits come with some challenges as well. Concurrency control, configuration management, coordination etc. to name a few.

A distributed system often communicates or maintains synchronization using a shared resource. Zookeeper can create and manage a resource (called znodes) and lets us do actions on it. These actions can be monitored by the distributed systems that are subscribed to/watch the given znode.

A zookeeper instance always starts with the root znode i.e /. A znode can be created under it, and another znode can be created under the child znode and so on. Each of the znode can store some data usually upto 1 MB (configurable). The data in the znode is optional. The entire structure looks like a tree. And you can traverse through them using the name of the znodes.

Examples:- The path of znode child2 = /A/child2.

znodes

The APIs provided by zookeeper are rather simple

  1. create - Creates a znode in the specified path.
  2. delete - Deletes a znode.
  3. exists - Checks if a znode exists in the path
  4. getChildren - Returns the child znode of the given znode
  5. setData - Sets the data in the given znode
  6. getData - Get data from the znode.

Setup

  1. Download the zookeeper tar from downloads page https://zookeeper.apache.org/releases.html
  2. Extract the tar
  3. Go to zookeeper/bin and invoke ./zkServer.sh
  4. By default, it uses the port 2181 for client connection.

The zookeeper is also bundled with a CLI and this can be invoked using zookeeper/bin/zkCli.sh for experimentation.

[~/repos/jp/zookeeper/zookeeper/bin][00:28:05]$ ./zkCli.sh
Connecting to localhost:2181

[zk: localhost:2181(CONNECTED) 3] ls /
[zookeeper]
# Only zookeeper namespaces exists in the root by default.
[zk: localhost:2181(CONNECTED) 4] create /A # Creates a znode A.
Created /A
[zk: localhost:2181(CONNECTED) 5] create /A/child1
Created /A/child1
[zk: localhost:2181(CONNECTED) 6]
[zk: localhost:2181(CONNECTED) 7] ls /
[A, zookeeper]
[zk: localhost:2181(CONNECTED) 8] ls /A
[child1]
[zk: localhost:2181(CONNECTED) 9] set /A 1000
[zk: localhost:2181(CONNECTED) 10]
[zk: localhost:2181(CONNECTED) 10] stat /A
cZxid = 0x7
ctime = Tue Sep 05 00:28:40 IST 2023
mZxid = 0x9
mtime = Tue Sep 05 00:28:49 IST 2023
pZxid = 0x8
cversion = 1
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4 # The length of data 1000 is 4.
numChildren = 1 # Number of children for the znode /A
[zk: localhost:2181(CONNECTED) 11] get /A
1000 # Data is returned because it was set previously using set command.
[zk: localhost:2181(CONNECTED) 12] get /

# No data is returned since no data was set.
[zk: localhost:2181(CONNECTED) 13]

There can be 3 kinds of znodes.

  1. Persistence - The node persists until it is deleted.
  2. Ephemeral - The node will be deleted once the session that creates it expires.
  3. Sequential - Appends a number at the end of the znode name/path.

That's the basics! Let's see an example to make it even clear!

Configuration management

You can store configuration/ data in a znode with listeners on it. Whenever something changes, the callback will be called to execute a procedure. This enables distributed systems to listen to zookeeper and update their configuration dynamically.

The below code creates a path and adds a watch over it. Whenever the data changes, the callback gets called.

node_watch.py
#! /bin/python3

# Configuration update watcher

from kazoo.client import KazooClient, KazooState

import logging
import time
import json
import random
import signal

def sigint_handler(sig, frame):
    logging.info('Exiting')
    exit(0)

signal.signal(signal.SIGINT, sigint_handler)
logging.basicConfig()
logging.root.setLevel(logging.INFO)

HOST = 'localhost:2181'
PATH = 'config/rootconfig'

zk = KazooClient(hosts=HOST)
zk.start()

if zk.state != KazooState.CONNECTED:
    logging.info("Can't connect to Zookeeper: " + str(zk.state))
    exit(0)

# If the path doesn't exist, it creates it.
zk.ensure_path(PATH)

def get_dynamic_data():
    data = {'A': random.randint(0, 10),
            'B:': random.randint(11, 20)
            }
    return data

# Add a watch for the path. This will be called whenever there
# is a change in the node.
@zk.DataWatch(PATH)
def watch(data, stat):
    logging.info('State changed: ' + data.decode('utf-8'))
    logging.info('Version: ' + str(stat.version))

def generate_data():
    # Press CTRL-C to abort
    while True:
        data = get_dynamic_data()
        zk.set(PATH, json.dumps(data).encode('utf-8'))
        time.sleep(5)

if __name__ == '__main__':
    generate_data()

Other recipes

Other recipes are explained well in the official documentation https://zookeeper.apache.org/doc/current/recipes.html

Misc

Curator framework is built around zookeeper APIs. It offers robust implementations of locks, queues and other features abstracting the working of low-level Zookeeper APIs.