Python client
Use durable-streams when you want direct read and write access to Durable Streams from Python.
It gives you:
stream()andastream()for read-only stream consumptionDurableStreamandAsyncDurableStreamfor read/write operationsIdempotentProducerfor exactly-once writes with batching and retries
Install
bash
pip install durable-streamsOr with uv:
bash
uv add durable-streamsRead from a stream
python
from durable_streams import stream
with stream("https://streams.example.com/my-stream") as res:
for item in res.iter_json():
print(item)The synchronous API is a good fit for scripts, workers, and services that want generator-based consumption with minimal overhead.
Async reading
python
from durable_streams import astream
async with astream("https://streams.example.com/my-stream") as res:
async for item in res.iter_json():
print(item)Use astream() when you want the same streaming model in an async application.
Create and append
python
from durable_streams import DurableStream
handle = DurableStream.create(
"https://streams.example.com/my-stream",
content_type="application/json",
ttl_seconds=3600,
)
handle.append({"message": "hello"})
handle.append({"message": "world"})
with handle.stream() as res:
for item in res.iter_json():
print(item)Exactly-once writes
For reliable, high-throughput writes with exactly-once semantics, use IdempotentProducer:
python
import asyncio
import json
from durable_streams import AsyncDurableStream, IdempotentProducer
async def main():
stream = await AsyncDurableStream.create(
"https://streams.example.com/events",
content_type="application/json",
)
producer = IdempotentProducer(
stream,
producer_id="event-processor-1",
auto_claim=True,
linger_ms=5,
max_batch_bytes=65536,
on_error=lambda err: print(f"Batch failed: {err}"),
)
events = [{"type": "click", "x": 100}, {"type": "scroll", "y": 200}]
for event in events:
producer.append_nowait(json.dumps(event))
await producer.flush()
await producer.close()
asyncio.run(main())Key features
- Sync and async APIs with context-manager support
- Generator-based streaming for memory-efficient reads
iter_json()and customdecode=hooks for structured payloadsIdempotentProducerwithappend_nowait()for fire-and-forget batched writes- Support for long-poll and SSE live modes
More
- Python client README
- JSON mode for structured message streams
- Other clients for the rest of the official client libraries