StreamDB
StreamDB is a type-safe reactive database in a durable stream.
Pass in a StandardSchema and get typed collections, reactive queries, and optimistic actions on top of Durable State.
Installation
npm install @durable-streams/state @tanstack/db@tanstack/db is a peer dependency required for StreamDB collections and queries. The StreamDB API is published under the @durable-streams/state/db subpath; the db-free protocol primitives (see Durable State) live on the main @durable-streams/state entry and need no extra dependencies.
Define a StandardSchema
Define your state structure with createStateSchema. Each collection maps an entity type to a Standard Schema validator and a primary key field:
import { createStateSchema } from "@durable-streams/state"
import { createStreamDB } from "@durable-streams/state/db"
import { z } from "zod"
const userSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
})
const messageSchema = z.object({
id: z.string(),
userId: z.string(),
text: z.string(),
timestamp: z.string(),
})
const schema = createStateSchema({
users: {
schema: userSchema,
type: "user",
primaryKey: "id",
},
messages: {
schema: messageSchema,
type: "message",
primaryKey: "id",
},
})Any Standard Schema library works, including Zod, Valibot, ArkType, or a manual implementation.
The schema also provides typed event helpers:
schema.users.insert({
value: { id: "1", name: "Alice", email: "alice@example.com" },
})
schema.users.update({ value: updatedUser, oldValue: previousUser })
schema.users.delete({ key: "1" })Create a StreamDB
createStreamDB connects your schema to a Durable Stream and creates a reactive, stream-backed database:
const db = createStreamDB({
streamOptions: {
url: "https://api.example.com/streams/my-stream",
contentType: "application/json",
},
live: "sse", // optional: true, "long-poll", "sse", or false
state: schema,
})
await db.preload()Calling preload() reads the stream from the beginning, materializes the current state, and then stays connected for live updates. By default StreamDB passes live: true; pass live: "sse" or live: "long-poll" to force a transport.
Reactive queries
StreamDB collections are TanStack DB collections. Use useLiveQuery for queries that update automatically when data changes:
import { useLiveQuery } from "@tanstack/react-db"
import { eq, count } from "@tanstack/db"
const allUsers = useLiveQuery((q) => q.from({ users: db.collections.users }))
const activeUsers = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => eq(users.active, true))
)
const messagesWithAuthors = useLiveQuery((q) =>
q
.from({ messages: db.collections.messages })
.join({ users: db.collections.users }, ({ messages, users }) =>
eq(messages.userId, users.id)
)
.select(({ messages, users }) => ({
text: messages.text,
userName: users.name,
}))
)
const messageCount = useLiveQuery((q) =>
q
.from({ messages: db.collections.messages })
.select(({ messages }) => ({ total: count(messages.id) }))
)TanStack DB uses differential dataflow, so queries update incrementally instead of recomputing everything from scratch.
Framework adapters are available for React, Solid, and Vue.
Lifecycle
await db.preload()
db.close()
await db.utils.awaitTxId("txid-uuid", 5000)Optimistic actions
StreamDB supports optimistic mutations through TanStack DB's action system. Actions update local state immediately while persisting changes to the stream asynchronously:
const db = createStreamDB({
streamOptions: { url: streamUrl, contentType: "application/json" },
state: schema,
actions: ({ db, stream }) => ({
addUser: {
onMutate: (user) => {
db.collections.users.insert(user)
},
mutationFn: async (user) => {
const txid = crypto.randomUUID()
await stream.append(
JSON.stringify(
schema.users.insert({ value: user, headers: { txid } })
)
)
await db.utils.awaitTxId(txid)
},
},
}),
})
await db.actions.addUser({ id: "1", name: "Alice", email: "alice@example.com" })If the server mutation fails, TanStack DB rolls back the optimistic update.
Common patterns
Key/value store
const schema = createStateSchema({
config: {
schema: configSchema,
type: "config",
primaryKey: "key",
},
})
await stream.append(
JSON.stringify(
schema.config.insert({ value: { key: "theme", value: "dark" } })
)
)Presence tracking
const schema = createStateSchema({
presence: {
schema: presenceSchema,
type: "presence",
primaryKey: "userId",
},
})
await stream.append(
JSON.stringify(
schema.presence.update({
value: { userId: "alice", status: "online", lastSeen: Date.now() },
})
)
)Multi-type chat room
const schema = createStateSchema({
users: { schema: userSchema, type: "user", primaryKey: "id" },
messages: { schema: messageSchema, type: "message", primaryKey: "id" },
reactions: { schema: reactionSchema, type: "reaction", primaryKey: "id" },
typing: { schema: typingSchema, type: "typing", primaryKey: "userId" },
})
await stream.append(JSON.stringify(schema.users.insert({ value: user })))
await stream.append(JSON.stringify(schema.messages.insert({ value: message })))
await stream.append(
JSON.stringify(schema.reactions.insert({ value: reaction }))
)Best practices
Use object values. StreamDB requires object values, not primitives, for the primary key pattern:
// Won't work
{ type: "count", key: "views", value: 42 }
// Works
{ type: "count", key: "views", value: { id: "views", count: 42 } }Always call close().
useEffect(() => {
const db = createStreamDB({ streamOptions, state: schema })
return () => db.close()
}, [])Use transaction IDs for critical operations.
const txid = crypto.randomUUID()
await stream.append(
JSON.stringify(schema.users.insert({ value: user, headers: { txid } }))
)
await db.utils.awaitTxId(txid, 10000)Validate at boundaries.
const userSchema = z.object({
id: z.string().uuid(),
email: z.string().email(),
age: z.number().min(0).max(150),
})Learn more
- Durable State for the underlying protocol
- Package README
- Examples
- TanStack DB