Task-oriented guides for using the Katzenpost thin client API

Thin Client How-to Guide

This guide shows how to accomplish specific tasks with the Katzenpost thin client. Each section is self-contained: find the task you need and follow the steps.

If you are new to Pigeonhole, read Understanding Pigeonhole first for the concepts, then return here for the recipes, and consult the Thin Client API Reference for the precise signatures.

Throughout this guide and the API the words channel and stream are used interchangeably: they denote one and the same thing.

Authoritative working examples

A word of caution before you proceed. The code fragments in this guide are illustrative: they are written to teach one task at a time, and to keep the reader’s eye on the matter at hand they omit imports, error handling, and surrounding context. They are not compiled or run by our continuous integration, and so, as the API evolves, an individual snippet may fall out of step with it.

The integration tests below carry no such caveat. They are exercised on every change by CI, so they are guaranteed to compile and to pass against the code they accompany. When a fragment in this guide and a test disagree, the test is correct. Treat these files as the canonical, runnable companion to the prose:

LanguageTest fileRepository
Goclient/pigeonhole_docker_test.gokatzenpost
Pythontests/test_new_pigeonhole_api.py, tests/test_new_methods.pythin_client
Rusttests/channel_api_test.rsthin_client

These links track the main branch of each repository; should you be working against a pinned release, consult the corresponding files at that tag instead.

Table of Contents

SectionDescription
Connect to the daemon and handle eventsEstablish a connection and process events
Discover network servicesFind services in the PKI document
Verify the PKI document yourselfCheck directory authority signatures against your own trust store
Send a message to a mixnet serviceEcho ping and other non-Pigeonhole services
Create a Pigeonhole channelGenerate a stream with write/read capabilities
Write a messageEncrypt and send a write to a stream
Read a messageRetrieve and decrypt a message from a stream
Read back a multi-box payloadConcatenate consecutive boxes into one payload
Send and receive a large payload (stream API)One-call windowed SACK transfer of any size
Wait for a message not yet writtenPoll with bounded retry around BoxIDNotFound
Persist and restore channel stateSurvive a process restart without losing your place
Prepare operations offlineDo the local crypto now, transmit when connected
A complete end-to-end exampleOne runnable Alice-writes, Bob-reads program
Delete messages with tombstonesTombstone one or more boxes
Detect a tombstone when readingRecognise a deleted box on the read side
Send to one channel atomicallySingle-destination copy command
Send to multiple channels atomicallyMulti-destination copy command
Multi-call buffer passingIncremental copy streams with crash recovery
Tombstone a range via copy streamAtomic tombstoning through a courier
Cancel in-flight operationsCancel individual operations or stop all at once
Handle daemon disconnectsAutomatic reconnection and request replay

How to connect to the daemon and handle events

Connect to the kpclientd daemon and set up event handling:

cfg, err := thin.LoadFile("thinclient.toml")
if err != nil {
    log.Fatal(err)
}

logging := &config.Logging{Level: "INFO"}
client := thin.NewThinClient(cfg, logging)

err = client.Dial()
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// Listen for events
eventCh := client.EventSink()
defer client.StopEventSink(eventCh)

for ev := range eventCh {
    switch v := ev.(type) {
    case *thin.ConnectionStatusEvent:
        fmt.Printf("Connected: %v\n", v.IsConnected)
    case *thin.NewDocumentEvent:
        fmt.Println("New PKI document received")
    case *thin.MessageReplyEvent:
        fmt.Printf("Reply received for SURB %x\n", v.SURBID)
    }
}
let config = Config::new("thinclient.toml")?;
let client = ThinClient::new(config).await?;

// Listen for events
let mut event_rx = client.event_sink();
tokio::spawn(async move {
    while let Some(event) = event_rx.recv().await {
        // Process events
        println!("Event: {:?}", event);
    }
});

// ... do work ...

client.stop().await;
async def on_connection_status(event):
    print(f"Connected: {event.get('is_connected')}")

async def on_new_document(event):
    print("New PKI document received")

async def on_message_reply(event):
    print(f"Reply received")

config = Config("thinclient.toml")
config.on_connection_status = on_connection_status
config.on_new_pki_document = on_new_document
config.on_message_reply = on_message_reply

client = ThinClient(config)
loop = asyncio.get_running_loop()
await client.start(loop)

# ... do work ...

client.stop()

How to discover network services via the PKI document

NOTE that this isn’t necessary for using the Pigeonhole protocol because kpclientd does courier service discovery automatically.

The PKI document lists all available mixnet services. Use GetService to get a random instance of a named service:

doc := client.PKIDocument()
if doc == nil {
    log.Fatal("No PKI document available")
}

// Get a random echo service
desc, err := client.GetService("echo")
if err != nil {
    log.Fatal(err)
}

// Use desc.MixDescriptor.IdentityKey and desc.RecipientQueueID
// as the destination for SendMessage
destNode, destQueue := desc.ToDestination()
let doc = client.pki_document().await?;

// Get a random echo service
let desc = client.get_service("echo").await?;

// Use the destination for send_message
let (dest_node, dest_queue) = desc.to_destination();
doc = client.pki_document()
if doc is None:
    raise Exception("No PKI document available")

# Get a random echo service
desc = client.get_service("echo")

# Use the destination for send_message
dest_node, dest_queue = desc.to_destination()

How to verify the PKI document yourself

In ordinary use you do not need this section. kpclientd already verifies every PKI document against the directory authorities listed in client.toml, and only after a sufficient threshold of authority signatures has passed does it push the document on to the thin client. The pki_document() method described above hands you the post-verification document, and you inherit the daemon’s guarantee without further work. The signature map is stripped before that handoff precisely because the verification has already happened; carrying the signatures through would only invite confusion about whose trust root is in force.

get_pki_document_raw is the trapdoor for special applications and integrations that want the signed document. The cases that come up in practice include:

  • An application that wishes to anchor its own root of trust, independently of kpclientd’s configuration, for instance when shipping a hardened build with the authority keys compiled in.
  • A relay that forwards the signed document to a separate consumer (for archival, audit, or out-of-band verification) which does not itself speak the thin-client protocol.
  • A diagnostic or monitoring tool that wishes to display which authorities signed which consensus, across time.

The method returns the cert.Certificate-wrapped signed document together with the epoch the daemon resolved to. Pass 0 for the requested epoch to mean “whatever the daemon currently believes is the latest”.

The examples below verify the document against the post-quantum hybrid signature scheme Falcon-padded-512-Ed25519, the recommended production scheme published by hpqc in both its Python and Go forms. The authority public keys must come from the application’s own trust store, never from the daemon: if the daemon supplied them, the verification would establish only that the daemon was internally consistent, not that the document was signed by the real authorities.

import struct
from hashlib import blake2b

import cbor2
from hpqc.sign.hybrid import FalconPadded512Ed25519


# The directory authority public keys in the wire format expected by
# hpqc's hybrid scheme: the byte concatenation
# ``falcon_padded_512_pub || ed25519_pub`` (929 bytes per authority).
# These must be obtained out of band, typically baked into the
# application or carried in a separately signed bundle. The hex
# strings below are placeholders.
AUTHORITY_PUBLIC_KEYS = [
    bytes.fromhex("ab" * 929),  # auth1
    bytes.fromhex("cd" * 929),  # auth2
    bytes.fromhex("ef" * 929),  # auth3
]
THRESHOLD = len(AUTHORITY_PUBLIC_KEYS) // 2 + 1
SCHEME_NAME = "Falcon-padded-512-Ed25519"


def _signed_message(cert: dict) -> bytes:
    """Reconstruct the byte string the authorities signed.

    A deterministic little-endian concatenation of the Certificate
    fields preceding Signatures; see katzenpost/core/cert/cert.go for
    the canonical encoding.
    """
    return b"".join([
        struct.pack("<I", cert["Version"]),
        struct.pack("<Q", cert["Expiration"]),
        cert["KeyType"].encode("utf-8"),
        cert["Certified"],
    ])


async def fetch_and_verify_pki(client, epoch: int = 0) -> bytes:
    """Fetch the signed PKI document and verify it against the trust root.

    Returns the inner Certified payload (the CBOR-encoded Document)
    once a sufficient threshold of authority signatures has verified;
    raises ValueError otherwise.
    """
    payload, returned_epoch = await client.get_pki_document_raw(epoch)
    cert = cbor2.loads(payload)

    if cert["Version"] != 0:
        raise ValueError(f"unknown certificate version: {cert['Version']}")
    if cert["KeyType"] != SCHEME_NAME:
        raise ValueError(
            f"unexpected key type {cert['KeyType']!r}, "
            f"expected {SCHEME_NAME!r}"
        )

    msg = _signed_message(cert)
    signatures = cert.get("Signatures") or {}

    verified = 0
    for pubkey in AUTHORITY_PUBLIC_KEYS:
        key_hash = blake2b(pubkey, digest_size=32).digest()
        sig = signatures.get(key_hash)
        if sig is None:
            continue
        if FalconPadded512Ed25519.verify(pubkey, msg, sig["Payload"]):
            verified += 1

    if verified < THRESHOLD:
        raise ValueError(
            f"only {verified} of {len(AUTHORITY_PUBLIC_KEYS)} authority "
            f"signatures verified for epoch {returned_epoch}; threshold "
            f"is {THRESHOLD}"
        )

    # cert["Certified"] is the CBOR-encoded Document. Decode it with
    # cbor2.loads(cert["Certified"]) if the application needs the
    # contents themselves.
    return cert["Certified"]
package main

import (
    "encoding/hex"
    "fmt"
    "log"

    "github.com/katzenpost/hpqc/sign"
    "github.com/katzenpost/hpqc/sign/hybrid"

    "github.com/katzenpost/katzenpost/client/thin"
    "github.com/katzenpost/katzenpost/core/cert"
)

// AuthorityPublicKeysHex is the application's root of trust for the
// network's directory: the wire-format hybrid public keys of each
// authority, hex-encoded. They must be obtained out of band and
// never from the daemon. Replace these placeholders with your own.
var AuthorityPublicKeysHex = []string{
    "abab...", // auth1
    "cdcd...", // auth2
    "efef...", // auth3
}

// FetchAndVerifyPKI fetches the signed PKI document for the given
// epoch (pass 0 for "current") and verifies it against the
// authority public keys above using core/cert.VerifyThreshold.
func FetchAndVerifyPKI(client *thin.ThinClient, epoch uint64) ([]byte, error) {
    scheme := hybrid.FalconPadded512Ed25519
    verifiers := make([]sign.PublicKey, 0, len(AuthorityPublicKeysHex))
    for _, hexKey := range AuthorityPublicKeysHex {
        raw, err := hex.DecodeString(hexKey)
        if err != nil {
            return nil, fmt.Errorf("decoding authority key: %w", err)
        }
        pub, err := scheme.UnmarshalBinaryPublicKey(raw)
        if err != nil {
            return nil, fmt.Errorf("parsing authority key: %w", err)
        }
        verifiers = append(verifiers, pub)
    }

    payload, returnedEpoch, err := client.GetPKIDocumentRaw(epoch)
    if err != nil {
        return nil, fmt.Errorf("fetching signed PKI doc: %w", err)
    }

    threshold := len(verifiers)/2 + 1
    certified, good, _, err := cert.VerifyThreshold(verifiers, threshold, payload)
    if err != nil {
        return nil, fmt.Errorf(
            "threshold verification failed for epoch %d: %w",
            returnedEpoch, err,
        )
    }
    log.Printf("verified %d of %d authority signatures for epoch %d",
        len(good), len(verifiers), returnedEpoch)
    return certified, nil
}

Considerations:

  • The authority public keys must come from a trust root external to the daemon. If the daemon supplied them, verification would prove only that the daemon was internally consistent.
  • The threshold above (a simple majority) matches the policy that the authorities themselves enforce when they admit a consensus. An application may apply a stricter policy, but should not relax it.
  • Should the network ever be reconfigured to use a different signature scheme, swap the hybrid for the corresponding hpqc verifier and adjust the expected KeyType (Python) or the hybrid.* selector (Go) accordingly. The KeyType field of the certificate is what the authorities signed under, and is the authoritative indicator of the scheme in force.
  • A Rust binding is not shown because hpqc does not yet publish a Rust port; a Rust application can compose the verification with the ed25519-dalek and falcon crates by the same wire layout (the public key and signature are simple concatenations of the two component halves).

How to send a message to a mixnet service

NOTE that this API call is NOT used with the Pigeonhole protocol. However it is still useful for writing other protocols and proving the echo service.

Use BlockingSendMessage for simple request-response interactions with non-Pigeonhole services (like the echo service):

desc, err := client.GetService("echo")
if err != nil {
    log.Fatal(err)
}

destNode, destQueue := desc.ToDestination()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

reply, err := client.BlockingSendMessage(ctx, []byte("hello mixnet"), destNode, destQueue)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Reply: %s\n", reply)
let desc = client.get_service("echo").await?;
let (dest_node, dest_queue) = desc.to_destination();

let reply = client.blocking_send_message(
    b"hello mixnet",
    dest_node,
    dest_queue,
    std::time::Duration::from_secs(30),
).await?;
println!("Reply: {:?}", reply);
desc = client.get_service("echo")
dest_node, dest_queue = desc.to_destination()

reply = await client.blocking_send_message(
    b"hello mixnet", dest_node, dest_queue, timeout_seconds=30.0
)
print(f"Reply: {reply}")

How to create a Pigeonhole channel

NOTE that this does NOT produce any network traffic. It’s a local cryptographic operation only.

A Pigeonhole channel (stream) is created from a 32-byte random seed. The writer keeps the write cap; the reader receives the read cap and first index out-of-band.

seed := make([]byte, 32)
_, err := rand.Reader.Read(seed)
if err != nil {
    log.Fatal(err)
}

writeCap, readCap, firstIndex, err := client.NewKeypair(seed)
if err != nil {
    log.Fatal(err)
}

// Writer keeps: writeCap, firstIndex
// Share with reader out-of-band: readCap, firstIndex
let seed: [u8; 32] = rand::random();
let result = client.new_keypair(&seed).await?;

// Writer keeps: result.write_cap, result.first_message_index
// Share with reader out-of-band: result.read_cap, result.first_message_index
import os
seed = os.urandom(32)
result = await client.new_keypair(seed)

# Writer keeps: result.write_cap, result.first_message_index
# Share with reader out-of-band: result.read_cap, result.first_message_index

How to write a message to a Pigeonhole channel

Writing is a two-step process: encrypt the message, then send it via ARQ. A write is idempotent by default: re-sending a box that was already filled returns success rather than an error, so a retransmit after an uncertain failure is safe. When you must instead distinguish a fresh write from a repeat, for instance to implement optimistic concurrency, use the box-exists-aware variant (StartResendingEncryptedMessageReturnBoxExists in Go, no_idempotent_box_already_exists=True in Rust and Python), which surfaces BoxAlreadyExists as an error at the cost of one extra mixnet round trip.

// Encrypt the message
ciphertext, envDesc, envHash, nextIndex, err := client.EncryptWrite(
    []byte("hello"), writeCap, currentIndex,
)
if err != nil {
    log.Fatal(err)
}

// Send via ARQ (blocks until acknowledged)
_, err = client.StartResendingEncryptedMessage(
    nil,       // readCap (nil for writes)
    writeCap,
    nil,       // messageBoxIndex (nil for writes)
    nil,       // replyIndex
    envDesc,
    ciphertext,
    envHash,
)
if err != nil {
    log.Fatal(err)
}

// Advance the index for the next write
currentIndex = nextIndex
// Encrypt the message
let result = client.encrypt_write(
    b"hello", &write_cap, &current_index,
).await?;

// Send via ARQ (blocks until acknowledged)
client.start_resending_encrypted_message(
    None,                           // read_cap (None for writes)
    Some(&write_cap),
    None,                           // message_box_index
    None,                           // reply_index
    &result.envelope_descriptor,
    &result.message_ciphertext,
    &result.envelope_hash,
).await?;

// Advance the index for the next write
current_index = result.next_message_box_index;
# Encrypt the message
result = await client.encrypt_write(b"hello", write_cap, current_index)

# Send via ARQ (blocks until acknowledged)
await client.start_resending_encrypted_message(
    read_cap=None,
    write_cap=write_cap,
    message_box_index=None,
    reply_index=None,
    envelope_descriptor=result.envelope_descriptor,
    message_ciphertext=result.message_ciphertext,
    envelope_hash=result.envelope_hash,
)

# Advance the index for the next write
current_index = result.next_message_box_index

How to read a message from a Pigeonhole channel

Reading is also two steps: encrypt a read request, then send it via ARQ. The reply contains the plaintext. Unlike a write, a read must be given the index of the box being read (the messageBoxIndex argument, its marshalled bytes in Go); the daemon derives the box ID and decrypts the reply from it, so a read that omits the index comes back undecrypted.

// Encrypt a read request
ciphertext, envDesc, envHash, nextIndex, err := client.EncryptRead(
    readCap, currentIndex,
)
if err != nil {
    log.Fatal(err)
}

// A read MUST supply the marshalled index of the box being read.
// The daemon needs it to derive the box ID and to decrypt the reply;
// pass nil and the reply comes back undecrypted.
idxBytes, err := currentIndex.MarshalBinary()
if err != nil {
    log.Fatal(err)
}

// Send via ARQ (blocks until the message is retrieved)
result, err := client.StartResendingEncryptedMessage(
    readCap,
    nil,       // writeCap (nil for reads)
    idxBytes,  // messageBoxIndex (required for reads)
    nil,       // replyIndex (nil uses the default)
    envDesc,
    ciphertext,
    envHash,
)
if err != nil {
    log.Fatal(err)
}

plaintext := result.Plaintext
// Advance the index for the next read
currentIndex = nextIndex
// Encrypt a read request
let read_result = client.encrypt_read(&read_cap, &current_index).await?;

// Send via ARQ (blocks until the message is retrieved)
let result = client.start_resending_encrypted_message(
    Some(&read_cap),
    None,                            // write_cap (None for reads)
    Some(&current_index),            // message_box_index (required for reads)
    None,                            // reply_index (None uses the default)
    &read_result.envelope_descriptor,
    &read_result.message_ciphertext,
    &read_result.envelope_hash,
).await?;

let plaintext = result.plaintext;
// Advance the index for the next read
current_index = read_result.next_message_box_index;
# Encrypt a read request
read_result = await client.encrypt_read(read_cap, current_index)

# Send via ARQ (blocks until the message is retrieved)
plaintext = await client.start_resending_encrypted_message(
    read_cap=read_cap,
    write_cap=None,
    message_box_index=current_index,  # required for reads
    reply_index=None,                 # None uses the default
    envelope_descriptor=read_result.envelope_descriptor,
    message_ciphertext=read_result.message_ciphertext,
    envelope_hash=read_result.envelope_hash,
)

# Advance the index for the next read
current_index = read_result.next_message_box_index

How to read back a multi-box payload

A single read retrieves one box. A payload that spanned several boxes, anything a copy command wrote, or any write larger than one box of plaintext, is read back by stepping through consecutive indices and concatenating the plaintexts in the order the writer laid them down.

The reader must know where the payload ends. The boxes themselves carry no total length, so the writer frames it. The convention the integration tests use, and the one shown here, is a fixed four-byte big-endian length prefix on the very first box; the reader accumulates boxes until it holds the prefix plus that many bytes.

// readCap and firstIndex were shared by the writer, who prefixed the
// payload with a 4-byte big-endian length.
readIndex := firstIndex
var payload []byte
var total uint32
for {
    ct, ed, eh, nextIndex, err := client.EncryptRead(readCap, readIndex)
    if err != nil {
        log.Fatal(err)
    }
    idxBytes, err := readIndex.MarshalBinary()
    if err != nil {
        log.Fatal(err)
    }
    result, err := client.StartResendingEncryptedMessage(
        readCap, nil, idxBytes, nil, ed, ct, eh)
    if err != nil {
        log.Fatal(err)
    }
    payload = append(payload, result.Plaintext...)

    // Once the 4-byte prefix is in hand, the total length is known.
    if total == 0 && len(payload) >= 4 {
        total = binary.BigEndian.Uint32(payload[:4])
    }
    if total > 0 && uint32(len(payload)) >= total+4 {
        break
    }
    readIndex = nextIndex
}
payload = payload[4:] // drop the length prefix
// read_cap and first_message_index were shared by the writer, who
// prefixed the payload with a 4-byte big-endian length.
let mut read_index = first_message_index.clone();
let mut payload: Vec<u8> = Vec::new();
let mut total: Option<u32> = None;
loop {
    let read = client.encrypt_read(&read_cap, &read_index).await?;
    let result = client.start_resending_encrypted_message(
        Some(&read_cap), None, Some(&read_index), None,
        &read.envelope_descriptor, &read.message_ciphertext,
        &read.envelope_hash).await?;
    payload.extend_from_slice(&result.plaintext);

    if total.is_none() && payload.len() >= 4 {
        total = Some(u32::from_be_bytes(payload[..4].try_into().unwrap()));
    }
    if let Some(t) = total {
        if payload.len() as u32 >= t + 4 {
            break;
        }
    }
    read_index = read.next_message_box_index;
}
let payload = &payload[4..]; // drop the length prefix
import struct

# read_cap and first_message_index were shared by the writer, who
# prefixed the payload with a 4-byte big-endian length.
read_index = first_message_index
payload = b""
total = None
while True:
    read = await client.encrypt_read(read_cap, read_index)
    result = await client.start_resending_encrypted_message(
        read_cap=read_cap, write_cap=None,
        message_box_index=read_index, reply_index=None,
        envelope_descriptor=read.envelope_descriptor,
        message_ciphertext=read.message_ciphertext,
        envelope_hash=read.envelope_hash,
    )
    payload += result.plaintext

    if total is None and len(payload) >= 4:
        total = struct.unpack(">I", payload[:4])[0]
    if total is not None and len(payload) >= total + 4:
        break
    read_index = read.next_message_box_index

payload = payload[4:]  # drop the length prefix

The hand loop above sends one read per round trip. The same payload can be read far more efficiently with the windowed stream API (ReadStream): peek the first box to learn the framed length, then fetch the remaining boxes in a single call that keeps several in flight at once. Because ReadStream returns exactly the bytes that were written, the four-byte prefix it hands back is the same one the writer laid down, and the per-box capacity is MaxPlaintextPayloadLength - 4.

perBox := client.GetPigeonholeGeometry().MaxPlaintextPayloadLength - 4

// Peek the first box to learn the framed length.
first, idx1, err := client.ReadStream(readCap, firstIndex, 1, 0)
if err != nil {
    log.Fatal(err)
}
total := int(binary.BigEndian.Uint32(first[:4])) + 4 // prefix + data
totalBoxes := (total + perBox - 1) / perBox

// Fetch the remainder in one windowed call (window 0 = daemon default).
payload := first
if totalBoxes > 1 {
    rest, _, err := client.ReadStream(readCap, idx1, uint32(totalBoxes-1), 0)
    if err != nil {
        log.Fatal(err)
    }
    payload = append(payload, rest...)
}
payload = payload[4:] // drop the length prefix
let per_box = client.pigeonhole_geometry().max_plaintext_payload_length - 4;

// Peek the first box to learn the framed length.
let (first, idx1) = client.read_stream(
    &read_cap, &first_message_index, 1, 0).await?;
let total = u32::from_be_bytes(first[..4].try_into().unwrap()) as usize + 4;
let total_boxes = (total + per_box - 1) / per_box;

// Fetch the remainder in one windowed call (window 0 = daemon default).
let mut payload = first;
if total_boxes > 1 {
    let (rest, _) = client.read_stream(
        &read_cap, &idx1, (total_boxes - 1) as u32, 0).await?;
    payload.extend_from_slice(&rest);
}
let payload = &payload[4..]; // drop the length prefix
import struct

per_box = client.pigeonhole_geometry.max_plaintext_payload_length - 4

# Peek the first box to learn the framed length.
first, idx1 = await client.read_stream(read_cap, first_message_index, 1, 0)
total = struct.unpack(">I", first[:4])[0] + 4  # prefix + data
total_boxes = (total + per_box - 1) // per_box

# Fetch the remainder in one windowed call (window 0 = daemon default).
payload = first
if total_boxes > 1:
    rest, _ = await client.read_stream(read_cap, idx1, total_boxes - 1, 0)
    payload += rest
payload = payload[4:]  # drop the length prefix

When the reader already knows the payload’s size or box count, it can skip the peek and issue a single ReadStream; see How to send and receive a large payload with the stream API.


How to send and receive a large payload with the stream API

WriteStream and ReadStream move a payload of any size across as many boxes as it spans, using the daemon’s windowed selective-ack (SACK) ARQ. Where the per-box loop blocks on each box in turn, the stream API keeps several boxes in flight at once and retransmits only those whose acknowledgements time out, so a multi-box transfer is no longer serialised one round trip per box. The daemon does all the chunking, encryption, and reassembly; you hand it the whole payload.

Pass a window of 0 to let the daemon size the window itself from the network parameters (the number of routing layers and the Poisson send rate); there is no knob to tune by hand, and that is deliberate.

ReadStream must be told how many boxes to fetch. Derive that from the byte length the writer shares and the channel’s per-box capacity, which for the stream API is MaxPlaintextPayloadLength - 4: the daemon reserves four bytes per box for a length tag, so ReadStream returns exactly the bytes WriteStream was given, with no padding to trim.

// Writer: hand WriteStream the whole payload; window 0 = daemon default.
nextIndex, err := client.WriteStream(writeCap, firstIndex, payload, 0)
if err != nil {
    log.Fatal(err)
}
// Share readCap and firstIndex with the reader, and tell it len(payload).

// Reader: derive the box count from the byte length and the geometry.
perBox := client.GetPigeonholeGeometry().MaxPlaintextPayloadLength - 4
boxCount := (len(payload) + perBox - 1) / perBox
data, nextReadIndex, err := client.ReadStream(
    readCap, firstIndex, uint32(boxCount), 0)
if err != nil {
    log.Fatal(err)
}
// data == payload
// Writer: window 0 = daemon default.
let next_index = client.write_stream(
    &write_cap, &first_message_index, &payload, 0).await?;
// Share read_cap and first_message_index, and tell the reader payload.len().

// Reader: derive the box count from the byte length and the geometry.
let per_box = client.pigeonhole_geometry().max_plaintext_payload_length - 4;
let box_count = ((payload.len() + per_box - 1) / per_box) as u32;
let (data, next_read_index) = client.read_stream(
    &read_cap, &first_message_index, box_count, 0).await?;
// data == payload
# Writer: window 0 = daemon default.
next_index = await client.write_stream(
    write_cap, first_message_index, payload, 0)
# Share read_cap and first_message_index, and tell the reader len(payload).

# Reader: derive the box count from the byte length and the geometry.
per_box = client.pigeonhole_geometry.max_plaintext_payload_length - 4
box_count = (len(payload) + per_box - 1) // per_box
data, next_read_index = await client.read_stream(
    read_cap, first_message_index, box_count, 0)
# data == payload

How to wait for a message that has not been written yet

Reads and writes are not coordinated: a reader routinely asks for an index before the writer has filled it, and replication lag can briefly hide a box that was in fact written. In both cases the daemon reports BoxIDNotFound. This is the expected answer to “anything here yet?”, not a failure. The correct pattern is a bounded poll: retry on the expected outcome, with a short delay between attempts, until the data appears or an application deadline elapses. Use IsExpectedOutcome to tell a benign “not yet” apart from a real error, so that genuine failures are not silently retried forever.

Two refinements are worth knowing. First, the ordinary read already retries through brief replication lag on its own (the daemon retries a BoxIDNotFound read several times before returning it), so a single read often serves as a deterministic propagation gate: for a sequentially written stream, reading the last box written gates on all the earlier ones, which were written sooner and so have had at least as long to settle. This is the readiness signal to prefer over a fixed sleep. Second, when you would rather learn at once that a box is absent, for instance to peek at a peer’s next message before it has been produced, use the no-retry variant (StartResendingEncryptedMessageNoRetry in Go, no_retry_on_box_id_not_found=True in Rust and Python), which returns BoxIDNotFound immediately instead of waiting out the retries.

deadline := time.Now().Add(2 * time.Minute)
var plaintext []byte
for {
    ciphertext, envDesc, envHash, nextIndex, err := client.EncryptRead(
        readCap, currentIndex,
    )
    if err != nil {
        log.Fatal(err)
    }

    idxBytes, err := currentIndex.MarshalBinary()
    if err != nil {
        log.Fatal(err)
    }
    result, err := client.StartResendingEncryptedMessage(
        readCap, nil, idxBytes, nil, envDesc, ciphertext, envHash,
    )
    if err == nil {
        plaintext = result.Plaintext
        currentIndex = nextIndex
        break
    }

    // BoxIDNotFound here just means "not written yet". Anything that
    // is not an expected outcome is a real failure worth surfacing.
    if !thin.IsExpectedOutcome(err) {
        log.Fatal(err)
    }
    if time.Now().After(deadline) {
        log.Fatal("gave up waiting for the message")
    }
    time.Sleep(3 * time.Second)
}
let deadline = std::time::Instant::now()
    + std::time::Duration::from_secs(120);
let plaintext = loop {
    let read = client.encrypt_read(&read_cap, &current_index).await?;
    match client.start_resending_encrypted_message(
        Some(&read_cap), None, Some(&current_index), None,
        &read.envelope_descriptor,
        &read.message_ciphertext,
        &read.envelope_hash,
    ).await {
        Ok(result) => {
            current_index = read.next_message_box_index;
            break result.plaintext;
        }
        Err(e) if e.is_expected_outcome() => {
            if std::time::Instant::now() > deadline {
                return Err(e);
            }
            tokio::time::sleep(
                std::time::Duration::from_secs(3)).await;
        }
        Err(e) => return Err(e),
    }
};
import asyncio, time
from katzenpost_thinclient import is_expected_outcome

deadline = time.monotonic() + 120
while True:
    read = await client.encrypt_read(read_cap, current_index)
    try:
        plaintext = await client.start_resending_encrypted_message(
            read_cap=read_cap, write_cap=None,
            message_box_index=current_index, reply_index=None,
            envelope_descriptor=read.envelope_descriptor,
            message_ciphertext=read.message_ciphertext,
            envelope_hash=read.envelope_hash,
        )
        current_index = read.next_message_box_index
        break
    except Exception as exc:
        # "not written yet" is expected; anything else is a real error.
        if not is_expected_outcome(exc):
            raise
        if time.monotonic() > deadline:
            raise
        await asyncio.sleep(3)

How to persist and restore channel state

The daemon keeps no per-application channel state. The write cap, the read cap, and above all the current index belong to your application, and if you lose the index across a restart you no longer know where to append next (re-using a filled index earns BoxAlreadyExists). Persist the index every time you advance it, durably, before you treat the write as done.

In Go the capabilities and the index are typed; serialise them with MarshalBinary and restore them with the bacap constructors. In Rust and Python new_keypair already hands you the caps and index as byte strings, so persistence is simply storing and reloading those bytes.

import "github.com/katzenpost/hpqc/bacap"

// Save: marshal each artefact to bytes and write atomically to disk.
wcBytes, _ := writeCap.MarshalBinary()
rcBytes, _ := readCap.MarshalBinary()
idxBytes, _ := currentIndex.MarshalBinary()
saveState(wcBytes, rcBytes, idxBytes) // your durable, atomic write

// Restore after a restart:
writeCap, err := bacap.NewWriteCapFromBytes(wcBytes)
if err != nil {
    log.Fatal(err)
}
readCap, err := bacap.ReadCapFromBytes(rcBytes)
if err != nil {
    log.Fatal(err)
}
currentIndex, err := bacap.NewEmptyMessageBoxIndexFromBytes(idxBytes)
if err != nil {
    log.Fatal(err)
}
// new_keypair already returns Vec<u8> for each artefact.
let kp = client.new_keypair(&seed).await?;
save_state(&kp.write_cap, &kp.read_cap, &kp.first_message_index);
let mut current_index = kp.first_message_index.clone();

// ... each time you advance, persist the new index bytes ...
save_index(&current_index);

// After a restart, the stored bytes are passed straight back into
// the API; no deserialisation step is required.
let (write_cap, read_cap, current_index) = load_state();
# new_keypair already returns bytes for each artefact.
kp = await client.new_keypair(seed)
save_state(kp.write_cap, kp.read_cap, kp.first_message_index)
current_index = kp.first_message_index

# ... each time you advance, persist the new index bytes ...
save_index(current_index)

# After a restart, the stored bytes are passed straight back into
# the API; no deserialisation step is required.
write_cap, read_cap, current_index = load_state()

The writer must persist currentIndex after every successful write, the reader after every successful read. Persist the index before acknowledging the message to the rest of your application, so that a crash cannot leave you having processed a message whose index you never recorded.


How to prepare operations offline

The daemon distinguishes two kinds of work. Key generation and envelope encryption (NewKeypair, EncryptWrite, EncryptRead, TombstoneRange, and the copy-stream constructors) are local cryptography and succeed even when the daemon is not connected to the mixnet. Only StartResendingEncryptedMessage and StartResendingCopyCommand require connectivity; called offline they fail rather than block.

You can therefore prepare envelopes while offline, persist them, and transmit once connectivity returns. Test IsConnected before transmitting, or watch the connection event and flush a queue when it turns true.

// Offline: this is pure local crypto and works regardless.
ciphertext, envDesc, envHash, nextIndex, err := client.EncryptWrite(
    []byte("written while offline"), writeCap, currentIndex,
)
if err != nil {
    log.Fatal(err)
}
enqueue(envDesc, ciphertext, envHash) // persist for later

// Later, only transmit once the daemon is connected.
if client.IsConnected() {
    for _, e := range drainQueue() {
        _, err = client.StartResendingEncryptedMessage(
            nil, writeCap, nil, nil, e.desc, e.ct, e.hash)
        if err != nil {
            log.Fatal(err)
        }
    }
}
// Offline: pure local crypto, works regardless.
let w = client.encrypt_write(
    b"written while offline", &write_cap, &current_index).await?;
enqueue(&w); // persist for later

// Later, only transmit once connected.
if client.is_connected() {
    for e in drain_queue() {
        client.start_resending_encrypted_message(
            None, Some(&write_cap), None, None,
            &e.envelope_descriptor, &e.message_ciphertext,
            &e.envelope_hash).await?;
    }
}
# Offline: pure local crypto, works regardless.
w = await client.encrypt_write(
    b"written while offline", write_cap, current_index)
enqueue(w)  # persist for later

# Later, only transmit once connected.
if client.is_connected():
    for e in drain_queue():
        await client.start_resending_encrypted_message(
            read_cap=None, write_cap=write_cap,
            message_box_index=None, reply_index=None,
            envelope_descriptor=e.envelope_descriptor,
            message_ciphertext=e.message_ciphertext,
            envelope_hash=e.envelope_hash)

A complete end-to-end example

The fragments above each show one task. Here they are assembled into a single runnable program: Alice creates a stream, writes one message, and Bob reads it back. This is the smallest complete program that exercises the Pigeonhole path. As with every example in this guide it omits production concerns (durable persistence, structured logging), but it compiles into the shape of a real application; the CI-verified tests are the authority on exact, current usage.

package main

import (
    "log"

    "github.com/katzenpost/hpqc/rand"

    "github.com/katzenpost/katzenpost/client/thin"
    "github.com/katzenpost/katzenpost/core/config"
)

func main() {
    cfg, err := thin.LoadFile("thinclient.toml")
    if err != nil {
        log.Fatal(err)
    }
    client := thin.NewThinClient(cfg, &config.Logging{Level: "INFO"})
    if err := client.Dial(); err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Alice creates a stream.
    seed := make([]byte, 32)
    if _, err := rand.Reader.Read(seed); err != nil {
        log.Fatal(err)
    }
    writeCap, readCap, idx, err := client.NewKeypair(seed)
    if err != nil {
        log.Fatal(err)
    }

    // Alice writes one message.
    ct, ed, eh, _, err := client.EncryptWrite(
        []byte("hello from Alice"), writeCap, idx)
    if err != nil {
        log.Fatal(err)
    }
    if _, err := client.StartResendingEncryptedMessage(
        nil, writeCap, nil, nil, ed, ct, eh); err != nil {
        log.Fatal(err)
    }

    // Bob reads it back (readCap would normally be shared out-of-band).
    rct, red, reh, _, err := client.EncryptRead(readCap, idx)
    if err != nil {
        log.Fatal(err)
    }
    idxBytes, err := idx.MarshalBinary()
    if err != nil {
        log.Fatal(err)
    }
    result, err := client.StartResendingEncryptedMessage(
        readCap, nil, idxBytes, nil, red, rct, reh)
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("Bob read: %s", result.Plaintext)
}
use katzenpost_thin_client::{Config, ThinClient};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = Config::new("thinclient.toml")?;
    let client = ThinClient::new(config).await?;

    // Alice creates a stream.
    let seed: [u8; 32] = rand::random();
    let kp = client.new_keypair(&seed).await?;

    // Alice writes one message.
    let w = client.encrypt_write(
        b"hello from Alice", &kp.write_cap, &kp.first_message_index).await?;
    client.start_resending_encrypted_message(
        None, Some(&kp.write_cap), None, None,
        &w.envelope_descriptor, &w.message_ciphertext,
        &w.envelope_hash).await?;

    // Bob reads it back (read_cap would normally be shared out-of-band).
    let r = client.encrypt_read(&kp.read_cap, &kp.first_message_index).await?;
    let result = client.start_resending_encrypted_message(
        Some(&kp.read_cap), None, Some(&kp.first_message_index), None,
        &r.envelope_descriptor, &r.message_ciphertext,
        &r.envelope_hash).await?;
    println!("Bob read: {:?}", result.plaintext);

    client.stop().await;
    Ok(())
}
import asyncio, os
from katzenpost_thinclient import ThinClient, Config

async def main():
    config = Config("thinclient.toml")
    client = ThinClient(config)
    await client.start(asyncio.get_running_loop())

    # Alice creates a stream.
    seed = os.urandom(32)
    kp = await client.new_keypair(seed)

    # Alice writes one message.
    w = await client.encrypt_write(
        b"hello from Alice", kp.write_cap, kp.first_message_index)
    await client.start_resending_encrypted_message(
        read_cap=None, write_cap=kp.write_cap,
        message_box_index=None, reply_index=None,
        envelope_descriptor=w.envelope_descriptor,
        message_ciphertext=w.message_ciphertext,
        envelope_hash=w.envelope_hash)

    # Bob reads it back (read_cap would normally be shared out-of-band).
    r = await client.encrypt_read(kp.read_cap, kp.first_message_index)
    plaintext = await client.start_resending_encrypted_message(
        read_cap=kp.read_cap, write_cap=None,
        message_box_index=kp.first_message_index, reply_index=None,
        envelope_descriptor=r.envelope_descriptor,
        message_ciphertext=r.message_ciphertext,
        envelope_hash=r.envelope_hash)
    print("Bob read:", plaintext)

    client.stop()

asyncio.run(main())

How to delete messages with tombstones

Use TombstoneRange to create tombstone envelopes, then send each one via StartResendingEncryptedMessage. To tombstone a single box, use maxCount=1.

// Create tombstone envelopes for 5 boxes
result, err := client.TombstoneRange(writeCap, startIndex, 5)
if err != nil {
    log.Fatal(err)
}

// Send each tombstone
for _, envelope := range result.Envelopes {
    _, err = client.StartResendingEncryptedMessage(
        nil, writeCap, nil, nil,
        envelope.EnvelopeDescriptor,
        envelope.MessageCiphertext,
        envelope.EnvelopeHash,
    )
    if err != nil {
        log.Fatal(err)
    }
}
// result.Next is the index after the last tombstoned box
// Create tombstone envelopes for 5 boxes
let result = client.tombstone_range(&write_cap, &start_index, 5).await;

// Send each tombstone
for envelope in &result.envelopes {
    client.start_resending_encrypted_message(
        None,
        Some(&write_cap),
        None,
        None,
        &envelope.envelope_descriptor,
        &envelope.message_ciphertext,
        &envelope.envelope_hash,
    ).await?;
}
// result.next is the index after the last tombstoned box
# Create tombstone envelopes for 5 boxes
result = await client.tombstone_range(write_cap, start_index, 5)

# Send each tombstone
for envelope in result.envelopes:
    await client.start_resending_encrypted_message(
        read_cap=None,
        write_cap=write_cap,
        message_box_index=None,
        reply_index=None,
        envelope_descriptor=envelope.envelope_descriptor,
        message_ciphertext=envelope.message_ciphertext,
        envelope_hash=envelope.envelope_hash,
    )
# result.next is the index after the last tombstoned box

How to detect a tombstone when reading

The counterpart to deleting a box is recognising a deleted box on the read side. A reader that reaches a tombstoned box does not get silence; it gets a definite tombstone error (ErrTombstone in Go, ThinClientError::Tombstone in Rust, TombstoneError in Python). Like BoxIDNotFound, this is an expected outcome rather than a transport failure, so IsExpectedOutcome treats it as benign; catch it to learn that the box was deliberately deleted and to stop reading the stream.

idxBytes, err := currentIndex.MarshalBinary()
if err != nil {
    log.Fatal(err)
}
result, err := client.StartResendingEncryptedMessage(
    readCap, nil, idxBytes, nil, envDesc, ciphertext, envHash)
switch {
case err == nil:
    plaintext := result.Plaintext
    _ = plaintext // use the message
case errors.Is(err, thin.ErrTombstone):
    // The box was deliberately deleted. Expected, not a failure.
    log.Printf("box at this index was tombstoned")
default:
    log.Fatal(err)
}
match client.start_resending_encrypted_message(
    Some(&read_cap), None, Some(&current_index), None,
    &read.envelope_descriptor, &read.message_ciphertext,
    &read.envelope_hash,
).await {
    Ok(result) => { /* use result.plaintext */ }
    Err(ThinClientError::Tombstone) => {
        // The box was deliberately deleted. Expected, not a failure.
        println!("box at this index was tombstoned");
    }
    Err(e) => return Err(e.into()),
}
from katzenpost_thinclient import TombstoneError

try:
    result = await client.start_resending_encrypted_message(
        read_cap=read_cap, write_cap=None,
        message_box_index=current_index, reply_index=None,
        envelope_descriptor=read.envelope_descriptor,
        message_ciphertext=read.message_ciphertext,
        envelope_hash=read.envelope_hash,
    )
    plaintext = result.plaintext  # use the message
except TombstoneError:
    # The box was deliberately deleted. Expected, not a failure.
    print("box at this index was tombstoned")

How to send to one channel atomically via copy command

A copy command writes data to a destination channel atomically via a courier. The steps are:

  1. Create a temporary channel and a destination channel.
  2. Pack the payload into copy stream elements.
  3. Write each element to the temporary channel.
  4. Send a copy command referencing the temporary channel.
// Create destination channel
destSeed := make([]byte, 32)
rand.Reader.Read(destSeed)
destWriteCap, destReadCap, destFirstIndex, err := client.NewKeypair(destSeed)
if err != nil {
    log.Fatal(err)
}

// Create temporary channel
tempSeed := make([]byte, 32)
rand.Reader.Read(tempSeed)
tempWriteCap, _, tempFirstIndex, err := client.NewKeypair(tempSeed)
if err != nil {
    log.Fatal(err)
}

// Pack payload into copy stream elements
envelopes, _, err := client.CreateCourierEnvelopesFromPayload(
    payload, destWriteCap, destFirstIndex,
    true,  // isStart
    true,  // isLast
)
if err != nil {
    log.Fatal(err)
}

// Write each element to the temporary channel
tempIndex := tempFirstIndex
for _, chunk := range envelopes {
    ciphertext, envDesc, envHash, nextIdx, err := client.EncryptWrite(
        chunk, tempWriteCap, tempIndex,
    )
    if err != nil {
        log.Fatal(err)
    }
    _, err = client.StartResendingEncryptedMessage(
        nil, tempWriteCap, nil, nil, envDesc, ciphertext, envHash,
    )
    if err != nil {
        log.Fatal(err)
    }
    tempIndex = nextIdx
}

// Send the copy command (blocks until courier acknowledges)
err = client.StartResendingCopyCommand(tempWriteCap)
if err != nil {
    log.Fatal(err)
}

// Share destReadCap and destFirstIndex with the reader
// Create destination channel
let dest_seed: [u8; 32] = rand::random();
let dest = client.new_keypair(&dest_seed).await?;

// Create temporary channel
let temp_seed: [u8; 32] = rand::random();
let temp = client.new_keypair(&temp_seed).await?;

// Pack payload into copy stream elements
let envelopes_result = client.create_courier_envelopes_from_payload(
    &payload, &dest.write_cap, &dest.first_message_index,
    true,  // is_start
    true,  // is_last
).await?;

// Write each element to the temporary channel
let mut temp_index = temp.first_message_index.clone();
for chunk in &envelopes_result.envelopes {
    let write_result = client.encrypt_write(
        chunk, &temp.write_cap, &temp_index,
    ).await?;
    client.start_resending_encrypted_message(
        None, Some(&temp.write_cap), None, None,
        &write_result.envelope_descriptor,
        &write_result.message_ciphertext,
        &write_result.envelope_hash,
    ).await?;
    temp_index = write_result.next_message_box_index;
}

// Send the copy command (blocks until courier acknowledges)
client.start_resending_copy_command(&temp.write_cap, None, None).await?;

// Share dest.read_cap and dest.first_message_index with the reader
import os

# Create destination channel
dest_seed = os.urandom(32)
dest = await client.new_keypair(dest_seed)

# Create temporary channel
temp_seed = os.urandom(32)
temp = await client.new_keypair(temp_seed)

# Pack payload into copy stream elements
envelopes_result = await client.create_courier_envelopes_from_payload(
    payload, dest.write_cap, dest.first_message_index,
    is_start=True,
    is_last=True,
)

# Write each element to the temporary channel
temp_index = temp.first_message_index
for chunk in envelopes_result.envelopes:
    write_result = await client.encrypt_write(chunk, temp.write_cap, temp_index)
    await client.start_resending_encrypted_message(
        read_cap=None, write_cap=temp.write_cap,
        message_box_index=None, reply_index=None,
        envelope_descriptor=write_result.envelope_descriptor,
        message_ciphertext=write_result.message_ciphertext,
        envelope_hash=write_result.envelope_hash,
    )
    temp_index = write_result.next_message_box_index

# Send the copy command (blocks until courier acknowledges)
await client.start_resending_copy_command(temp.write_cap)

# Share dest.read_cap and dest.first_message_index with the reader

How to send to multiple channels atomically

Use CreateCourierEnvelopesFromMultiPayload to pack payloads for different destinations into a single copy stream efficiently:

// Create destination channels
dest1WriteCap, dest1ReadCap, dest1Index, _ := client.NewKeypair(seed1)
dest2WriteCap, dest2ReadCap, dest2Index, _ := client.NewKeypair(seed2)

// Create temporary channel
tempWriteCap, _, tempFirstIndex, _ := client.NewKeypair(tempSeed)

// Pack multiple payloads
destinations := []thin.DestinationPayload{
    {Payload: payload1, WriteCap: dest1WriteCap, StartIndex: dest1Index},
    {Payload: payload2, WriteCap: dest2WriteCap, StartIndex: dest2Index},
}

result, err := client.CreateCourierEnvelopesFromMultiPayload(
    destinations,
    true,  // isStart
    true,  // isLast
    nil,   // buffer (nil for first call)
)
if err != nil {
    log.Fatal(err)
}

// Write envelopes to temporary channel
tempIndex := tempFirstIndex
for _, chunk := range result.Envelopes {
    ciphertext, envDesc, envHash, nextIdx, err := client.EncryptWrite(
        chunk, tempWriteCap, tempIndex,
    )
    if err != nil {
        log.Fatal(err)
    }
    _, err = client.StartResendingEncryptedMessage(
        nil, tempWriteCap, nil, nil, envDesc, ciphertext, envHash,
    )
    if err != nil {
        log.Fatal(err)
    }
    tempIndex = nextIdx
}

// Send copy command
err = client.StartResendingCopyCommand(tempWriteCap)
// Create destination channels
let dest1 = client.new_keypair(&seed1).await?;
let dest2 = client.new_keypair(&seed2).await?;

// Create temporary channel
let temp = client.new_keypair(&temp_seed).await?;

// Pack multiple payloads
let destinations = vec![
    (&payload1[..], &dest1.write_cap[..], &dest1.first_message_index[..]),
    (&payload2[..], &dest2.write_cap[..], &dest2.first_message_index[..]),
];

let result = client.create_courier_envelopes_from_multi_payload(
    destinations,
    true,  // is_start
    true,  // is_last
    None,  // buffer (None for first call)
).await?;

// Write envelopes to temporary channel
let mut temp_index = temp.first_message_index.clone();
for chunk in &result.envelopes {
    let write_result = client.encrypt_write(
        chunk, &temp.write_cap, &temp_index,
    ).await?;
    client.start_resending_encrypted_message(
        None, Some(&temp.write_cap), None, None,
        &write_result.envelope_descriptor,
        &write_result.message_ciphertext,
        &write_result.envelope_hash,
    ).await?;
    temp_index = write_result.next_message_box_index;
}

// Send copy command
client.start_resending_copy_command(&temp.write_cap, None, None).await?;
# Create destination channels
dest1 = await client.new_keypair(seed1)
dest2 = await client.new_keypair(seed2)

# Create temporary channel
temp = await client.new_keypair(temp_seed)

# Pack multiple payloads
destinations = [
    {"payload": payload1, "write_cap": dest1.write_cap, "start_index": dest1.first_message_index},
    {"payload": payload2, "write_cap": dest2.write_cap, "start_index": dest2.first_message_index},
]

result = await client.create_courier_envelopes_from_multi_payload(
    destinations,
    is_start=True,
    is_last=True,
    buffer=None,
)

# Write envelopes to temporary channel
temp_index = temp.first_message_index
for chunk in result.envelopes:
    write_result = await client.encrypt_write(chunk, temp.write_cap, temp_index)
    await client.start_resending_encrypted_message(
        read_cap=None, write_cap=temp.write_cap,
        message_box_index=None, reply_index=None,
        envelope_descriptor=write_result.envelope_descriptor,
        message_ciphertext=write_result.message_ciphertext,
        envelope_hash=write_result.envelope_hash,
    )
    temp_index = write_result.next_message_box_index

# Send copy command
await client.start_resending_copy_command(temp.write_cap)

How to handle multi-call buffer passing for large copy streams

When building a copy stream across multiple calls (because you have more data than fits in a single call, or data arrives incrementally), pass the buffer from each result to the next call:

var buffer []byte // nil on first call

// First batch of destinations
result1, err := client.CreateCourierEnvelopesFromMultiPayload(
    batch1Destinations,
    true,   // isStart (first call)
    false,  // isLast (more calls coming)
    buffer,
)
if err != nil {
    log.Fatal(err)
}
// Write result1.Envelopes to temp channel...
buffer = result1.Buffer // save for next call

// Persist buffer to disk for crash recovery
saveState(buffer)

// Second batch (final)
result2, err := client.CreateCourierEnvelopesFromMultiPayload(
    batch2Destinations,
    false,  // isStart (not the first call)
    true,   // isLast (final call)
    buffer,
)
if err != nil {
    log.Fatal(err)
}
// Write result2.Envelopes to temp channel...

// On crash recovery, reload buffer from disk and continue
// with isStart=false
let mut buffer: Option<Vec<u8>> = None; // None on first call

// First batch
let result1 = client.create_courier_envelopes_from_multi_payload(
    batch1_destinations,
    true,   // is_start
    false,  // is_last
    buffer,
).await?;
// Write result1.envelopes to temp channel...
buffer = result1.buffer; // save for next call

// Persist buffer to disk for crash recovery
save_state(&buffer);

// Second batch (final)
let result2 = client.create_courier_envelopes_from_multi_payload(
    batch2_destinations,
    false,  // is_start
    true,   // is_last
    buffer,
).await?;
// Write result2.envelopes to temp channel...
buffer = None  # None on first call

# First batch
result1 = await client.create_courier_envelopes_from_multi_payload(
    batch1_destinations,
    is_start=True,   # first call
    is_last=False,   # more calls coming
    buffer=buffer,
)
# Write result1.envelopes to temp channel...
buffer = result1.buffer  # save for next call

# Persist buffer to disk for crash recovery
save_state(buffer)

# Second batch (final)
result2 = await client.create_courier_envelopes_from_multi_payload(
    batch2_destinations,
    is_start=False,  # not the first call
    is_last=True,    # final call
    buffer=buffer,
)
# Write result2.envelopes to temp channel...

How to tombstone a range via copy stream

Use CreateCourierEnvelopesFromTombstoneRange to atomically tombstone boxes as part of a copy command. The courier performs the tombstoning, so it either all succeeds or none of it is visible.

tempWriteCap, _, tempFirstIndex, _ := client.NewKeypair(tempSeed)

// Create tombstone copy stream elements
envelopes, nextBuffer, nextDestIndex, err := client.CreateCourierEnvelopesFromTombstoneRange(
    destWriteCap,
    destStartIndex,
    10,     // tombstone 10 boxes
    true,   // isStart
    true,   // isLast
    nil,    // buffer
)
if err != nil {
    log.Fatal(err)
}

// Write to temporary channel
tempIndex := tempFirstIndex
for _, chunk := range envelopes {
    ciphertext, envDesc, envHash, nextIdx, err := client.EncryptWrite(
        chunk, tempWriteCap, tempIndex,
    )
    if err != nil {
        log.Fatal(err)
    }
    _, err = client.StartResendingEncryptedMessage(
        nil, tempWriteCap, nil, nil, envDesc, ciphertext, envHash,
    )
    if err != nil {
        log.Fatal(err)
    }
    tempIndex = nextIdx
}

// Send copy command
err = client.StartResendingCopyCommand(tempWriteCap)
let temp = client.new_keypair(&temp_seed).await?;

// Create tombstone copy stream elements
let result = client.create_courier_envelopes_from_tombstone_range(
    &dest_write_cap,
    &dest_start_index,
    10,     // tombstone 10 boxes
    true,   // is_start
    true,   // is_last
    None,   // buffer
).await?;

// Write to temporary channel
let mut temp_index = temp.first_message_index.clone();
for chunk in &result.envelopes {
    let write_result = client.encrypt_write(
        chunk, &temp.write_cap, &temp_index,
    ).await?;
    client.start_resending_encrypted_message(
        None, Some(&temp.write_cap), None, None,
        &write_result.envelope_descriptor,
        &write_result.message_ciphertext,
        &write_result.envelope_hash,
    ).await?;
    temp_index = write_result.next_message_box_index;
}

// Send copy command
client.start_resending_copy_command(&temp.write_cap, None, None).await?;
temp = await client.new_keypair(temp_seed)

# Create tombstone copy stream elements
result = await client.create_courier_envelopes_from_tombstone_range(
    dest_write_cap,
    dest_start_index,
    10,              # tombstone 10 boxes
    is_start=True,
    is_last=True,
    buffer=None,
)

# Write to temporary channel
temp_index = temp.first_message_index
for chunk in result.envelopes:
    write_result = await client.encrypt_write(chunk, temp.write_cap, temp_index)
    await client.start_resending_encrypted_message(
        read_cap=None, write_cap=temp.write_cap,
        message_box_index=None, reply_index=None,
        envelope_descriptor=write_result.envelope_descriptor,
        message_ciphertext=write_result.message_ciphertext,
        envelope_hash=write_result.envelope_hash,
    )
    temp_index = write_result.next_message_box_index

# Send copy command
await client.start_resending_copy_command(temp.write_cap)

How to cancel in-flight operations

Both StartResendingEncryptedMessage and StartResendingCopyCommand block until completion. You can cancel them individually, or stop everything at once by closing the thin client.

To cancel a specific operation, call the corresponding cancel method from another thread/task:

// Cancel an encrypted message operation
err := client.CancelResendingEncryptedMessage(envelopeHash)

// Cancel a copy command (needs blake2b-256 hash of the write cap)
writeCapBytes, _ := tempWriteCap.MarshalBinary()
writeCapHash := blake2b.Sum256(writeCapBytes)
err = client.CancelResendingCopyCommand(&writeCapHash)
// Cancel an encrypted message operation
client.cancel_resending_encrypted_message(&envelope_hash).await?;

// Cancel a copy command
use blake2::{Blake2b, Digest};
use digest::consts::U32;
let write_cap_hash: [u8; 32] = Blake2b::<U32>::digest(&temp_write_cap).into();
client.cancel_resending_copy_command(&write_cap_hash).await?;
# Cancel an encrypted message operation
await client.cancel_resending_encrypted_message(envelope_hash)

# Cancel a copy command
from hashlib import blake2b
write_cap_hash = blake2b(temp_write_cap, digest_size=32).digest()
await client.cancel_resending_copy_command(write_cap_hash)

To stop all in-flight operations at once, call Close() (Go), stop() (Rust), or stop() (Python). This shuts down the thin client entirely – all blocked callers receive an error, and the daemon stops all ARQ retransmission loops for this client. This is useful when your application is shutting down or when you want to abandon all pending work without cancelling each operation individually.


How to handle daemon disconnects and restarts

The thin client automatically reconnects when the daemon connection is lost. It uses an instance token to detect whether it reconnected to the same daemon or a new one:

  • Same instance token: The daemon still has its state. No action needed.

  • Different instance token: The daemon is a new process. The thin client automatically replays all in-flight StartResendingEncryptedMessage and StartResendingCopyCommand operations. Callers blocked on these methods are unaware of the disconnect.

Applications do not need to manage reconnection or replay. You can observe disconnect events to log or update UI state:

eventCh := client.EventSink()
defer client.StopEventSink(eventCh)

for ev := range eventCh {
    switch v := ev.(type) {
    case *thin.DaemonDisconnectedEvent:
        if v.IsGraceful {
            fmt.Println("Daemon shut down gracefully")
        } else {
            fmt.Printf("Daemon connection lost: %v\n", v.Err)
        }
        // No action needed -- thin client reconnects automatically
        // and replays in-flight requests if the daemon instance changed.
    case *thin.ConnectionStatusEvent:
        fmt.Printf("Connected: %v\n", v.IsConnected)
        // v.InstanceToken identifies the daemon process.
        // The thin client compares this internally on reconnect.
    }
}
let config = Config::new("thinclient.toml")?;
// Set disconnect callback during config
config.on_daemon_disconnected = Some(Box::new(|graceful, err_msg| {
    if graceful {
        println!("Daemon shut down gracefully");
    } else {
        println!("Daemon connection lost: {:?}", err_msg);
    }
    // No action needed -- thin client reconnects automatically
    // and replays in-flight requests if the daemon instance changed.
}));
let client = ThinClient::new(config).await?;
async def on_daemon_disconnected(event):
    if event.get("is_graceful"):
        print("Daemon shut down gracefully")
    else:
        print(f"Daemon connection lost: {event.get('error')}")
    # No action needed -- thin client reconnects automatically
    # and replays in-flight requests if the daemon instance changed.

async def on_connection_status(event):
    print(f"Connected: {event['is_connected']}")
    # event contains 'instance_token' identifying the daemon process.
    # The thin client compares this internally on reconnect.

config = Config(
    "thinclient.toml",
    on_daemon_disconnected=on_daemon_disconnected,
    on_connection_status=on_connection_status,
)
client = ThinClient(config)

If the thin client is disconnected when you cancel an operation, the cancel just removes it from in-flight tracking – it will not be replayed on reconnect:

// Safe to call while disconnected -- removes from tracking,
// no message sent to daemon since there is no connection.
err := client.CancelResendingEncryptedMessage(envelopeHash)
err = client.CancelResendingCopyCommand(&writeCapHash)
// Safe to call while disconnected -- removes from tracking,
// no message sent to daemon since there is no connection.
client.cancel_resending_encrypted_message(&envelope_hash).await?;
client.cancel_resending_copy_command(&write_cap_hash).await?;
# Safe to call while disconnected -- removes from tracking,
# no message sent to daemon since there is no connection.
await client.cancel_resending_encrypted_message(envelope_hash)
await client.cancel_resending_copy_command(write_cap_hash)

To terminate the thin client entirely (all blocked callers receive an error, daemon disconnects never kill the thin client):

err := client.Close()
client.stop().await;
client.stop()