Using etcd v3 in Go: KV operations, Leases, Watches, and Transactions
Install and import
Install the official v3 client module:
go get go.etcd.io/etcd/client/v3
References:
- API docs: https://pkg.go.dev/go.etcd.io/etcd/client/v3
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
Create a client
Configure endpoints and initial dial timeout. After a client is created, the library transparently handles reconnects.
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
The client exposes logical subsystems:
- Cluster: membership and cluster-admin APIs
- KV: key-value operations
- Lease: TTL-backed leases
- Watcher: change subscriptions
- Auth: users/roles
- Maintenance: maintenance endpoints (e.g., move leader)
Obtain a KV handle:
kv := clientv3.NewKV(cli)
Put
Store a key/value. Context drives cancellation/deadline.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
put1, err := kv.Put(ctx, "/demo/k1", "hello etcd")
if err != nil {
log.Fatal(err)
}
fmt.Printf("rev=%d\n", put1.Header.Revision)
// Overwrite and request previous value back
put2, err := kv.Put(ctx, "/demo/k1", "new value", clientv3.WithPrevKV())
if err != nil {
log.Fatal(err)
}
if put2.PrevKv != nil {
fmt.Printf("overwrote: %q -> %q\n", put2.PrevKv.Key, put2.PrevKv.Value)
}
Seed a few keys under a prefix for latter examples:
_, _ = kv.Put(context.Background(), "/demo/dir/a", "A")
_, _ = kv.Put(context.Background(), "/demo/dir/b", "B")
// Not under /demo/dir/
_, _ = kv.Put(context.Background(), "/demo_dir_noise", "noise")
Get
Read a single key:
get1, err := kv.Get(context.Background(), "/demo/k1")
if err != nil {
log.Fatal(err)
}
if len(get1.Kvs) == 0 {
fmt.Println("/demo/k1 not found")
} else {
fmt.Printf("/demo/k1=%s (ver=%d)\n", get1.Kvs[0].Value, get1.Kvs[0].Version)
}
List by prefix using server-side range selection:
list, err := kv.Get(context.Background(), "/demo/dir/", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
for _, kvp := range list.Kvs {
fmt.Printf("%s => %s\n", kvp.Key, kvp.Value)
}
Notes on options:
- WithPrefix() returns all keys starting with the provided prefix in sorted key order.
- WithFromKey() scans from a key (inclusive) to the logical end of key space.
- WithRev(n) reads a historical snapshot at revision n.
- WithLimit(n) caps returned keys; use the Count/More fields to paginate.
Lease (TTL-backed keys)
Create expiring keys by attaching a lease. If a lease expires, the server deletes associated keys automatically.
lease := clientv3.NewLease(cli)
// Create a 10-second lease
lg, err := lease.Grant(context.Background(), 10)
if err != nil {
log.Fatal(err)
}
// Put an ephemeral key using that lease
_, err = kv.Put(context.Background(), "/demo/ephemeral", "gone soon", clientv3.WithLease(lg.ID))
if err != nil {
// Lease may have already expired; create a new lease and retry if needed
log.Fatal(err)
}
Renew once (useful for your own scheduling):
ka1, err := lease.KeepAliveOnce(context.Background(), lg.ID)
if err != nil {
log.Fatal(err)
}
fmt.Printf("renewed TTL=%d\n", ka1.TTL)
Continuous renewal stream:
kaCh, err := lease.KeepAlive(context.Background(), lg.ID)
if err != nil {
log.Fatal(err)
}
go func() {
for ka := range kaCh {
fmt.Printf("keepalive TTL=%d\n", ka.TTL)
}
}()
If KeepAlive or Put reports an expired lease, request a new one and reattach it.
Op and Do
You can prebuild operations and execute them via Do without a explicit transaction:
ops := []clientv3.Op{
clientv3.OpPut("/op/key", "123"),
clientv3.OpGet("/op/key"),
clientv3.OpPut("/op/key", "456"),
clientv3.OpDelete("/op/key"),
}
for _, op := range ops {
or, err := cli.Do(context.Background(), op)
if err != nil {
log.Fatal(err)
}
switch {
case or.Get() != nil:
fmt.Println("GET", or.Get().Kvs)
case or.Put() != nil:
fmt.Println("PUT rev", or.Put().Header.Revision)
case or.Del() != nil:
fmt.Println("DEL deleted", or.Del().Deleted)
case or.Txn() != nil:
fmt.Println("TXN success:", or.Txn().Succeeded)
}
}
Transactions (Txn)
etcd transactions are conditional: If(compare...) Then(ops...) Else(ops...). All comparisons must pass for Then to run; otherwise Else runs.
// Prepare a guard key
_, _ = cli.Put(context.Background(), "/txn/guard", "v1")
res, err := cli.Txn(context.Background()).
If(
clientv3.Compare(clientv3.Value("/txn/guard"), ">", "u0"),
clientv3.Compare(clientv3.Version("/txn/guard"), "=", 1),
).
Then(
clientv3.OpPut("/txn/result", "then-branch"),
clientv3.OpPut("/txn/extra", "ok"),
).
Else(
clientv3.OpPut("/txn/result", "else-branch"),
clientv3.OpDelete("/txn/extra"),
).
Commit()
if err != nil {
log.Fatal(err)
}
fmt.Println("Succeeded:", res.Succeeded)
Common comparators:
- Value(key)
- Version(key)
- CreateRevision(key)
- ModRevision(key)
- LeaseValue(key)
Watch
Subscribe to changes on keys or prefixes. A Watch returns a channel of WatchResponse values; each contains one or more Events.
type RuntimeConfig struct {
Feature string `json:"feature"`
Rate int `json:"rate"`
}
var cfg RuntimeConfig
func watchJSON(cli *clientv3.Client, key string, target any) {
ch := cli.Watch(context.Background(), key)
go func() {
for wr := range ch {
for _, ev := range wr.Events {
if ev.Kv == nil { continue }
if err := json.Unmarshal(ev.Kv.Value, target); err != nil {
fmt.Println("watch decode error:", err)
continue
}
fmt.Println("updated config:", target)
}
}
}()
}
watchJSON(cli, "/cfg/runtime", &cfg)
Watch a whole subtree by prefix:
wch := cli.Watch(context.Background(), "/demo/dir/", clientv3.WithPrefix())
go func() {
for wr := range wch {
for _, ev := range wr.Events {
fmt.Printf("%s %q -> %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}()