Utilizing ZooKeeper Clients and Distributed Naming Services
ZooKeeper Java Clients
Development with ZooKeeper primarily involves using Java client APIs to connect to and operate a ZooKeeper cluster. The available Java client APIs are:
- The official ZooKeeper Java client API.
- Third-party Java client APIs, such as Curator.
The official API offers fundamental operations like session creation, node creation, data reading, data updates, node deletion, and existence checks. However, for practical development, the official API presents several limitation:
- ZooKeeper's Watcher notifications are one-time; they must be re-registered after each trigger.
- It lacks an automatic reconnection mechanism upon session timeout.
- Exception handling is cumbersome, as ZooKeeper throws numerous exceptions that can be challenging for developers to manage.
- It only provides simple
byte[]array interfaces, lacking support for Java POJO-level serialization. - When node creation fails with an exception, the developer must manually check if the node already exists.
- It does not support recursive deletion.
Overall, the official API is relatively basic and can be unwieldy for real-world applications, making it generally less recommended.
Working with the Native ZooKeeper Java Client
Include the ZooKeeper client dependency.
<!-- zookeeper client -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
Note: It's advisable to keep the client version consistent with the server version to avoid compatibility issues.
The main class for using the native client is org.apache.zookeeper.ZooKeeper.
Common ZooKeeper Constructor
ZooKeeper(connectionString, sessionTimeout, watcher)
connectionString: A comma-separated list ofhost:portpairs for ZooKeeper servers. The client will attempt to connect to one of these.sessionTimeout: The session timeout period. The client maintains a persistent connection with the server via heartbeats. If no communication occurs within this period, the connection is considered lost.watcher: A callback object to handle events from the ZooKeeper cluster.
Connecting to a ZooKeeper Cluster using the Native API
public class ZooKeeperConnector {
private static final int CONNECTION_TIMEOUT = 5000;
public static ZooKeeper createConnection(String servers) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper(servers, CONNECTION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent e) {
if (e.getType() == Event.EventType.None && e.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
System.out.println("Connected to ZooKeeper.");
} else {
System.out.println("Connection failed.");
latch.countDown();
}
}
});
System.out.println("Establishing ZooKeeper connection...");
latch.await();
return zk;
}
}
Synchronously Creating a Node
@Test
public void createNodeSync() throws Exception {
ZooKeeper zk = getZooKeeperConnection();
String nodePath = zk.create("/demo", "initialData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
Updating Node Data
@Test
public void updateNodeData() throws Exception {
ZooKeeper zk = getZooKeeperConnection();
Stat metadata = new Stat();
zk.getData("/demo", false, metadata);
int currentVersion = metadata.getVersion();
zk.setData("/demo", "updatedData".getBytes(), currentVersion);
// Use -1 for unconditional update:
// zk.setData("/demo", "data".getBytes(), -1);
}
Asynchronously Creating a Node
@Test
public void createNodeAsync() throws Exception {
getZooKeeperConnection().create("/asyncNode", "asyncData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
// Async callback logic
}, "contextObject");
TimeUnit.SECONDS.sleep(10);
}
Key ZooKeeper Methods
create(path, data, acl, createMode): Creates a znode at the given path with the specified data and type.delete(path, version): Deletes the znode if its version matches the provided version.exists(path, watch): Checks for the existence of a znode and optionally sets a watch.getData(path, watch, stat): Retrieves data from a znode and optionally sets a watch.setData(path, data, version): Updates znode data if the version matches.getChildren(path, watch): Lists the children of a znode and optionally sets a watch.sync(path): Synchronizes the client session with the leader node.
Method Characteristics:
- All data retrieval APIs can set a watch to monitor changes.
- All data update APIs have two variants: unconditional update (version = -1) and conditional update (requires version match).
- All methods have both synchronous and asynchronous versions. Async methods place the request in a queue and return immediately, using a callback for the server response.
Using the Curator Open-Source Client
Curator, an Apache top-level project, is a ZooKeeper client framework from Netflix that addresses the low-level complexities of the native client, such as connection management, watcher re-registration, and exception handling. It provides a fluent API and includes common distributed recipes like locks, leader election, and barriers.
Curator is to ZooKeeper as Guava is to Java.
Adding Dependencies Curator consists of several modules:
curator-framework: Wrappers around low-level ZooKeeper APIs.curator-client: Client utilities like retry policies.curator-recipes: High-level features (listeners, locks, counters, etc.).
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
Creating a Client Instance
There are two primary ways to create a CuratorFramework instance:
- Using
newClient:
String connectionStr = "server1:2181,server2:2181,server3:2181";
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, retry);
client.start();
- Using the builder pattern:
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectionStr)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retry)
.namespace("myApp") // Adds a root namespace
.build();
client.start();
- Retry Policies: Curator provides several policies:
| Policy Name | Description |
|---|---|
ExponentialBackoffRetry |
Retries with increasing delay between attempts. |
RetryNTimes |
Retries up to a maximum number of times. |
RetryOneTime |
Retries only once. |
RetryUntilElapsed |
Retries continuously until a specified time elapses. |
- Timeouts:
sessionTimeoutMssets the session validity on the server, whileconnectionTimeoutMslimits the client-side connection establishment time.
Creating a Node
@Test
public void createNode() throws Exception {
CuratorFramework cf = getCuratorClient();
String createdPath = cf.create()
.withProtection() // Prevents orphaned nodes
.withMode(CreateMode.PERSISTENT)
.forPath("/exampleNode", "payload".getBytes());
}
Creating a Hierarchical Path
@Test
public void createPathRecursively() throws Exception {
CuratorFramework cf = getCuratorClient();
String fullPath = "/level1/level2/node";
cf.create().creatingParentsIfNeeded().forPath(fullPath);
}
Retrieving Data
@Test
public void fetchNodeData() throws Exception {
CuratorFramework cf = getCuratorClient();
byte[] data = cf.getData().forPath("/exampleNode");
}
Updating Data
@Test
public void modifyNodeData() throws Exception {
CuratorFramework cf = getCuratorClient();
cf.setData().forPath("/exampleNode", "newPayload".getBytes());
}
Deleting a Node
@Test
public void removeNode() throws Exception {
CuratorFramework cf = getCuratorClient();
String pathToDelete = "/level1";
cf.delete()
.guaranteed() // Keeps trying until successful
.deletingChildrenIfNeeded() // Recursive delete
.forPath(pathToDelete);
}
Asynchronous Operations
Curator uses the BackgroundCallback interface for handling async results, which are processed by default in an event thread.
public interface BackgroundCallback {
void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}
Example Async Call
@Test
public void performAsyncOperation() throws Exception {
CuratorFramework cf = getCuratorClient();
cf.getData().inBackground((client, event) -> {
System.out.println("Async result: " + event);
}).forPath("/testNode");
TimeUnit.SECONDS.sleep(5);
}
Using a Custom Thread Pool
@Test
public void asyncWithCustomExecutor() throws Exception {
CuratorFramework cf = getCuratorClient();
ExecutorService executor = Executors.newSingleThreadExecutor();
cf.getData().inBackground((client, event) -> {
System.out.println("Processed in custom thread: " + event);
}, executor).forPath("/myNode");
}
Curator Listeners
The CuratorListener interface receives notifications for background events and errors.
Cache Event Listeners Curator provides cache implementations for persistent watchers:
- NodeCache: Listens for changes to a specific node.
// Create and start a NodeCache
NodeCache cache = new NodeCache(zkClient, "/watchedNode");
cache.start(true);
// Register a listener
cache.getListenable().addListener(() -> {
ChildData currentData = cache.getCurrentData();
if (currentData != null) {
System.out.println("Node updated: " + new String(currentData.getData()));
}
});
Applications of ZooKeeper in Distributed Naming Services
Naming services provide identification for system resources. ZooKeeper's hierarchical tree structure and sequential node capabilities make it suitable for resource naming in distributed systems.
Common application scenarios include:
- Distributed API Directories
- Distributed Node Naming
- Distributed ID Generators
Distributed API Directory
This provides a distributed, JNDI-like directory for API endpoints. Frameworks like Dubbo use ZooKeeper for this purpose:
- Service providers register their address under a node like
/dubbo/${serviceName}/providers. - Service consumers subscribe to this node to discover all available provider addresses.
Distributed Node Naming
In dynamic clusters where nodes frequently join and leave, manual naming is impractical. ZooKeeper can be used to generate unique node identifiers:
- On startup, a node connects to ZooKeeper and creates a sequential ephemeral znode under a root directory.
- The sequence number of this znode is used as the node's unique ID.
- The ephemeral node is automatically removed when the node disconnects.
Distributed ID Generation
Distributed systems often require globally unique IDs for records, messages, logs, etc. Common solutions include:
- Java UUID
- Redis atomic increments
- Twitter's Snowflake algorithm
- ZooKeeper sequential nodes
- MongoDB's ObjectId
Implementing an ID Generator with ZooKeeper
ZooKeeper's sequential node types (PERSISTENT_SEQUENTIAL, EPHEMERAL_SEQUENTIAL) provide globally unique, monotonically increasing numbers. By creating such a node, its sequence number can serve as a unique ID. ZooKeeper's linearizable writes ensure the uniqueness and ordering of these IDs.
The general process is:
- A client connects to ZooKeeper.
- It creates a sequential node (e.g., under
/id-generator/seq-). - The full path returned (e.g.,
/id-generator/seq-0000000005) contains the unique sequence number, which can be extracted and used as the ID.