Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Using etcd v3 in Go: KV operations, Leases, Watches, and Transactions

Tech 1

Install and import

Install the official v3 client module:

go get go.etcd.io/etcd/client/v3

References:

  • API docs: https://pkg.go.dev/go.etcd.io/etcd/client/v3
import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    clientv3 "go.etcd.io/etcd/client/v3"
)

Create a client

Configure endpoints and initial dial timeout. After a client is created, the library transparently handles reconnects.

cli, err := clientv3.New(clientv3.Config{
    Endpoints:   []string{"localhost:2379"},
    DialTimeout: 5 * time.Second,
})
if err != nil {
    log.Fatal(err)
}
defer cli.Close()

The client exposes logical subsystems:

  • Cluster: membership and cluster-admin APIs
  • KV: key-value operations
  • Lease: TTL-backed leases
  • Watcher: change subscriptions
  • Auth: users/roles
  • Maintenance: maintenance endpoints (e.g., move leader)

Obtain a KV handle:

kv := clientv3.NewKV(cli)

Put

Store a key/value. Context drives cancellation/deadline.

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

put1, err := kv.Put(ctx, "/demo/k1", "hello etcd")
if err != nil {
    log.Fatal(err)
}
fmt.Printf("rev=%d\n", put1.Header.Revision)

// Overwrite and request previous value back
put2, err := kv.Put(ctx, "/demo/k1", "new value", clientv3.WithPrevKV())
if err != nil {
    log.Fatal(err)
}
if put2.PrevKv != nil {
    fmt.Printf("overwrote: %q -> %q\n", put2.PrevKv.Key, put2.PrevKv.Value)
}

Seed a few keys under a prefix for latter examples:

_, _ = kv.Put(context.Background(), "/demo/dir/a", "A")
_, _ = kv.Put(context.Background(), "/demo/dir/b", "B")
// Not under /demo/dir/
_, _ = kv.Put(context.Background(), "/demo_dir_noise", "noise")

Get

Read a single key:

get1, err := kv.Get(context.Background(), "/demo/k1")
if err != nil {
    log.Fatal(err)
}
if len(get1.Kvs) == 0 {
    fmt.Println("/demo/k1 not found")
} else {
    fmt.Printf("/demo/k1=%s (ver=%d)\n", get1.Kvs[0].Value, get1.Kvs[0].Version)
}

List by prefix using server-side range selection:

list, err := kv.Get(context.Background(), "/demo/dir/", clientv3.WithPrefix())
if err != nil {
    log.Fatal(err)
}
for _, kvp := range list.Kvs {
    fmt.Printf("%s => %s\n", kvp.Key, kvp.Value)
}

Notes on options:

  • WithPrefix() returns all keys starting with the provided prefix in sorted key order.
  • WithFromKey() scans from a key (inclusive) to the logical end of key space.
  • WithRev(n) reads a historical snapshot at revision n.
  • WithLimit(n) caps returned keys; use the Count/More fields to paginate.

Lease (TTL-backed keys)

Create expiring keys by attaching a lease. If a lease expires, the server deletes associated keys automatically.

lease := clientv3.NewLease(cli)

// Create a 10-second lease
lg, err := lease.Grant(context.Background(), 10)
if err != nil {
    log.Fatal(err)
}

// Put an ephemeral key using that lease
_, err = kv.Put(context.Background(), "/demo/ephemeral", "gone soon", clientv3.WithLease(lg.ID))
if err != nil {
    // Lease may have already expired; create a new lease and retry if needed
    log.Fatal(err)
}

Renew once (useful for your own scheduling):

ka1, err := lease.KeepAliveOnce(context.Background(), lg.ID)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("renewed TTL=%d\n", ka1.TTL)

Continuous renewal stream:

kaCh, err := lease.KeepAlive(context.Background(), lg.ID)
if err != nil {
    log.Fatal(err)
}

go func() {
    for ka := range kaCh {
        fmt.Printf("keepalive TTL=%d\n", ka.TTL)
    }
}()

If KeepAlive or Put reports an expired lease, request a new one and reattach it.

Op and Do

You can prebuild operations and execute them via Do without a explicit transaction:

ops := []clientv3.Op{
    clientv3.OpPut("/op/key", "123"),
    clientv3.OpGet("/op/key"),
    clientv3.OpPut("/op/key", "456"),
    clientv3.OpDelete("/op/key"),
}

for _, op := range ops {
    or, err := cli.Do(context.Background(), op)
    if err != nil {
        log.Fatal(err)
    }
    switch {
    case or.Get() != nil:
        fmt.Println("GET", or.Get().Kvs)
    case or.Put() != nil:
        fmt.Println("PUT rev", or.Put().Header.Revision)
    case or.Del() != nil:
        fmt.Println("DEL deleted", or.Del().Deleted)
    case or.Txn() != nil:
        fmt.Println("TXN success:", or.Txn().Succeeded)
    }
}

Transactions (Txn)

etcd transactions are conditional: If(compare...) Then(ops...) Else(ops...). All comparisons must pass for Then to run; otherwise Else runs.

// Prepare a guard key
_, _ = cli.Put(context.Background(), "/txn/guard", "v1")

res, err := cli.Txn(context.Background()).
    If(
        clientv3.Compare(clientv3.Value("/txn/guard"), ">", "u0"),
        clientv3.Compare(clientv3.Version("/txn/guard"), "=", 1),
    ).
    Then(
        clientv3.OpPut("/txn/result", "then-branch"),
        clientv3.OpPut("/txn/extra", "ok"),
    ).
    Else(
        clientv3.OpPut("/txn/result", "else-branch"),
        clientv3.OpDelete("/txn/extra"),
    ).
    Commit()
if err != nil {
    log.Fatal(err)
}
fmt.Println("Succeeded:", res.Succeeded)

Common comparators:

  • Value(key)
  • Version(key)
  • CreateRevision(key)
  • ModRevision(key)
  • LeaseValue(key)

Watch

Subscribe to changes on keys or prefixes. A Watch returns a channel of WatchResponse values; each contains one or more Events.

type RuntimeConfig struct {
    Feature string `json:"feature"`
    Rate    int    `json:"rate"`
}

var cfg RuntimeConfig

func watchJSON(cli *clientv3.Client, key string, target any) {
    ch := cli.Watch(context.Background(), key)
    go func() {
        for wr := range ch {
            for _, ev := range wr.Events {
                if ev.Kv == nil { continue }
                if err := json.Unmarshal(ev.Kv.Value, target); err != nil {
                    fmt.Println("watch decode error:", err)
                    continue
                }
                fmt.Println("updated config:", target)
            }
        }
    }()
}

watchJSON(cli, "/cfg/runtime", &cfg)

Watch a whole subtree by prefix:

wch := cli.Watch(context.Background(), "/demo/dir/", clientv3.WithPrefix())
go func() {
    for wr := range wch {
        for _, ev := range wr.Events {
            fmt.Printf("%s %q -> %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
}()
Tags: goetcd

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.