Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Utilizing ZooKeeper Clients and Distributed Naming Services

Tech 1

ZooKeeper Java Clients

Development with ZooKeeper primarily involves using Java client APIs to connect to and operate a ZooKeeper cluster. The available Java client APIs are:

  • The official ZooKeeper Java client API.
  • Third-party Java client APIs, such as Curator.

The official API offers fundamental operations like session creation, node creation, data reading, data updates, node deletion, and existence checks. However, for practical development, the official API presents several limitation:

  • ZooKeeper's Watcher notifications are one-time; they must be re-registered after each trigger.
  • It lacks an automatic reconnection mechanism upon session timeout.
  • Exception handling is cumbersome, as ZooKeeper throws numerous exceptions that can be challenging for developers to manage.
  • It only provides simple byte[] array interfaces, lacking support for Java POJO-level serialization.
  • When node creation fails with an exception, the developer must manually check if the node already exists.
  • It does not support recursive deletion.

Overall, the official API is relatively basic and can be unwieldy for real-world applications, making it generally less recommended.

Working with the Native ZooKeeper Java Client

Include the ZooKeeper client dependency.

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

Note: It's advisable to keep the client version consistent with the server version to avoid compatibility issues.

The main class for using the native client is org.apache.zookeeper.ZooKeeper.

Common ZooKeeper Constructor

ZooKeeper(connectionString, sessionTimeout, watcher)
  • connectionString: A comma-separated list of host:port pairs for ZooKeeper servers. The client will attempt to connect to one of these.
  • sessionTimeout: The session timeout period. The client maintains a persistent connection with the server via heartbeats. If no communication occurs within this period, the connection is considered lost.
  • watcher: A callback object to handle events from the ZooKeeper cluster.

Connecting to a ZooKeeper Cluster using the Native API

public class ZooKeeperConnector {
    private static final int CONNECTION_TIMEOUT = 5000;

    public static ZooKeeper createConnection(String servers) throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        ZooKeeper zk = new ZooKeeper(servers, CONNECTION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent e) {
                if (e.getType() == Event.EventType.None && e.getState() == Event.KeeperState.SyncConnected) {
                    latch.countDown();
                    System.out.println("Connected to ZooKeeper.");
                } else {
                    System.out.println("Connection failed.");
                    latch.countDown();
                }
            }
        });
        System.out.println("Establishing ZooKeeper connection...");
        latch.await();
        return zk;
    }
}

Synchronously Creating a Node

@Test
public void createNodeSync() throws Exception {
    ZooKeeper zk = getZooKeeperConnection();
    String nodePath = zk.create("/demo", "initialData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

Updating Node Data

@Test
public void updateNodeData() throws Exception {
    ZooKeeper zk = getZooKeeperConnection();
    Stat metadata = new Stat();
    zk.getData("/demo", false, metadata);
    int currentVersion = metadata.getVersion();
    zk.setData("/demo", "updatedData".getBytes(), currentVersion);
    // Use -1 for unconditional update:
    // zk.setData("/demo", "data".getBytes(), -1);
}

Asynchronously Creating a Node

@Test
public void createNodeAsync() throws Exception {
    getZooKeeperConnection().create("/asyncNode", "asyncData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
                // Async callback logic
            }, "contextObject");
    TimeUnit.SECONDS.sleep(10);
}

Key ZooKeeper Methods

  • create(path, data, acl, createMode): Creates a znode at the given path with the specified data and type.
  • delete(path, version): Deletes the znode if its version matches the provided version.
  • exists(path, watch): Checks for the existence of a znode and optionally sets a watch.
  • getData(path, watch, stat): Retrieves data from a znode and optionally sets a watch.
  • setData(path, data, version): Updates znode data if the version matches.
  • getChildren(path, watch): Lists the children of a znode and optionally sets a watch.
  • sync(path): Synchronizes the client session with the leader node.

Method Characteristics:

  • All data retrieval APIs can set a watch to monitor changes.
  • All data update APIs have two variants: unconditional update (version = -1) and conditional update (requires version match).
  • All methods have both synchronous and asynchronous versions. Async methods place the request in a queue and return immediately, using a callback for the server response.

Using the Curator Open-Source Client

Curator, an Apache top-level project, is a ZooKeeper client framework from Netflix that addresses the low-level complexities of the native client, such as connection management, watcher re-registration, and exception handling. It provides a fluent API and includes common distributed recipes like locks, leader election, and barriers.

Curator is to ZooKeeper as Guava is to Java.

Adding Dependencies Curator consists of several modules:

  • curator-framework: Wrappers around low-level ZooKeeper APIs.
  • curator-client: Client utilities like retry policies.
  • curator-recipes: High-level features (listeners, locks, counters, etc.).
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Creating a Client Instance There are two primary ways to create a CuratorFramework instance:

  1. Using newClient:
String connectionStr = "server1:2181,server2:2181,server3:2181";
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, retry);
client.start();
  1. Using the builder pattern:
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString(connectionStr)
        .sessionTimeoutMs(5000)
        .connectionTimeoutMs(5000)
        .retryPolicy(retry)
        .namespace("myApp") // Adds a root namespace
        .build();
client.start();
  • Retry Policies: Curator provides several policies:
Policy Name Description
ExponentialBackoffRetry Retries with increasing delay between attempts.
RetryNTimes Retries up to a maximum number of times.
RetryOneTime Retries only once.
RetryUntilElapsed Retries continuously until a specified time elapses.
  • Timeouts: sessionTimeoutMs sets the session validity on the server, while connectionTimeoutMs limits the client-side connection establishment time.

Creating a Node

@Test
public void createNode() throws Exception {
    CuratorFramework cf = getCuratorClient();
    String createdPath = cf.create()
            .withProtection() // Prevents orphaned nodes
            .withMode(CreateMode.PERSISTENT)
            .forPath("/exampleNode", "payload".getBytes());
}

Creating a Hierarchical Path

@Test
public void createPathRecursively() throws Exception {
    CuratorFramework cf = getCuratorClient();
    String fullPath = "/level1/level2/node";
    cf.create().creatingParentsIfNeeded().forPath(fullPath);
}

Retrieving Data

@Test
public void fetchNodeData() throws Exception {
    CuratorFramework cf = getCuratorClient();
    byte[] data = cf.getData().forPath("/exampleNode");
}

Updating Data

@Test
public void modifyNodeData() throws Exception {
    CuratorFramework cf = getCuratorClient();
    cf.setData().forPath("/exampleNode", "newPayload".getBytes());
}

Deleting a Node

@Test
public void removeNode() throws Exception {
    CuratorFramework cf = getCuratorClient();
    String pathToDelete = "/level1";
    cf.delete()
        .guaranteed() // Keeps trying until successful
        .deletingChildrenIfNeeded() // Recursive delete
        .forPath(pathToDelete);
}

Asynchronous Operations Curator uses the BackgroundCallback interface for handling async results, which are processed by default in an event thread.

public interface BackgroundCallback {
    void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}

Example Async Call

@Test
public void performAsyncOperation() throws Exception {
    CuratorFramework cf = getCuratorClient();
    cf.getData().inBackground((client, event) -> {
        System.out.println("Async result: " + event);
    }).forPath("/testNode");
    TimeUnit.SECONDS.sleep(5);
}

Using a Custom Thread Pool

@Test
public void asyncWithCustomExecutor() throws Exception {
    CuratorFramework cf = getCuratorClient();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    cf.getData().inBackground((client, event) -> {
        System.out.println("Processed in custom thread: " + event);
    }, executor).forPath("/myNode");
}

Curator Listeners The CuratorListener interface receives notifications for background events and errors.

Cache Event Listeners Curator provides cache implementations for persistent watchers:

  • NodeCache: Listens for changes to a specific node.
// Create and start a NodeCache
NodeCache cache = new NodeCache(zkClient, "/watchedNode");
cache.start(true);

// Register a listener
cache.getListenable().addListener(() -> {
    ChildData currentData = cache.getCurrentData();
    if (currentData != null) {
        System.out.println("Node updated: " + new String(currentData.getData()));
    }
});

Applications of ZooKeeper in Distributed Naming Services

Naming services provide identification for system resources. ZooKeeper's hierarchical tree structure and sequential node capabilities make it suitable for resource naming in distributed systems.

Common application scenarios include:

  • Distributed API Directories
  • Distributed Node Naming
  • Distributed ID Generators

Distributed API Directory

This provides a distributed, JNDI-like directory for API endpoints. Frameworks like Dubbo use ZooKeeper for this purpose:

  • Service providers register their address under a node like /dubbo/${serviceName}/providers.
  • Service consumers subscribe to this node to discover all available provider addresses.

Distributed Node Naming

In dynamic clusters where nodes frequently join and leave, manual naming is impractical. ZooKeeper can be used to generate unique node identifiers:

  1. On startup, a node connects to ZooKeeper and creates a sequential ephemeral znode under a root directory.
  2. The sequence number of this znode is used as the node's unique ID.
  3. The ephemeral node is automatically removed when the node disconnects.

Distributed ID Generation

Distributed systems often require globally unique IDs for records, messages, logs, etc. Common solutions include:

  1. Java UUID
  2. Redis atomic increments
  3. Twitter's Snowflake algorithm
  4. ZooKeeper sequential nodes
  5. MongoDB's ObjectId

Implementing an ID Generator with ZooKeeper ZooKeeper's sequential node types (PERSISTENT_SEQUENTIAL, EPHEMERAL_SEQUENTIAL) provide globally unique, monotonically increasing numbers. By creating such a node, its sequence number can serve as a unique ID. ZooKeeper's linearizable writes ensure the uniqueness and ordering of these IDs.

The general process is:

  1. A client connects to ZooKeeper.
  2. It creates a sequential node (e.g., under /id-generator/seq-).
  3. The full path returned (e.g., /id-generator/seq-0000000005) contains the unique sequence number, which can be extracted and used as the ID.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.