StreamDB
StreamDB is a type-safe reactive database in a durable stream.
Pass in a StandardSchema and get typed collections, reactive queries, and optimistic actions on top of Durable State.
Installation
npm install @durable-streams/state @tanstack/db@tanstack/db is a peer dependency required for StreamDB collections and queries.
Define a StandardSchema
Define your state structure with createStateSchema. Each collection maps an entity type to a Standard Schema validator and a primary key field:
import { createStateSchema, createStreamDB } from "@durable-streams/state"
import { z } from "zod"
const userSchema = z.object({
id: z.string(),
name: z.string(),
email: z.string().email(),
})
const messageSchema = z.object({
id: z.string(),
userId: z.string(),
text: z.string(),
timestamp: z.string(),
})
const schema = createStateSchema({
users: {
schema: userSchema,
type: "user",
primaryKey: "id",
},
messages: {
schema: messageSchema,
type: "message",
primaryKey: "id",
},
})Any Standard Schema library works, including Zod, Valibot, ArkType, or a manual implementation.
The schema also provides typed event helpers:
schema.users.insert({
value: { id: "1", name: "Alice", email: "alice@example.com" },
})
schema.users.update({ value: updatedUser, oldValue: previousUser })
schema.users.delete({ key: "1" })Create a StreamDB
createStreamDB connects your schema to a Durable Stream and creates a reactive, stream-backed database:
const db = createStreamDB({
streamOptions: {
url: "https://api.example.com/streams/my-stream",
contentType: "application/json",
},
state: schema,
})
await db.preload()Calling preload() reads the stream from the beginning, materializes the current state, and then stays connected for live updates.
Reactive queries
StreamDB collections are TanStack DB collections. Use useLiveQuery for queries that update automatically when data changes:
import { useLiveQuery } from "@tanstack/react-db"
import { eq, count } from "@tanstack/db"
const allUsers = useLiveQuery((q) => q.from({ users: db.collections.users }))
const activeUsers = useLiveQuery((q) =>
q
.from({ users: db.collections.users })
.where(({ users }) => eq(users.active, true))
)
const messagesWithAuthors = useLiveQuery((q) =>
q
.from({ messages: db.collections.messages })
.join({ users: db.collections.users }, ({ messages, users }) =>
eq(messages.userId, users.id)
)
.select(({ messages, users }) => ({
text: messages.text,
userName: users.name,
}))
)
const messageCount = useLiveQuery((q) =>
q
.from({ messages: db.collections.messages })
.select(({ messages }) => ({ total: count(messages.id) }))
)TanStack DB uses differential dataflow, so queries update incrementally instead of recomputing everything from scratch.
Framework adapters are available for React, Solid, and Vue.
Lifecycle
await db.preload()
db.close()
await db.utils.awaitTxId("txid-uuid", 5000)Optimistic actions
StreamDB supports optimistic mutations through TanStack DB's action system. Actions update local state immediately while persisting changes to the stream asynchronously:
const db = createStreamDB({
streamOptions: { url: streamUrl, contentType: "application/json" },
state: schema,
actions: ({ db, stream }) => ({
addUser: {
onMutate: (user) => {
db.collections.users.insert(user)
},
mutationFn: async (user) => {
const txid = crypto.randomUUID()
await stream.append(
schema.users.insert({ value: user, headers: { txid } })
)
await db.utils.awaitTxId(txid)
},
},
}),
})
await db.actions.addUser({ id: "1", name: "Alice", email: "alice@example.com" })If the server mutation fails, TanStack DB rolls back the optimistic update.
Common patterns
Key/value store
const schema = createStateSchema({
config: {
schema: configSchema,
type: "config",
primaryKey: "key",
},
})
await stream.append(
schema.config.insert({ value: { key: "theme", value: "dark" } })
)Presence tracking
const schema = createStateSchema({
presence: {
schema: presenceSchema,
type: "presence",
primaryKey: "userId",
},
})
await stream.append(
schema.presence.update({
value: { userId: "alice", status: "online", lastSeen: Date.now() },
})
)Multi-type chat room
const schema = createStateSchema({
users: { schema: userSchema, type: "user", primaryKey: "id" },
messages: { schema: messageSchema, type: "message", primaryKey: "id" },
reactions: { schema: reactionSchema, type: "reaction", primaryKey: "id" },
typing: { schema: typingSchema, type: "typing", primaryKey: "userId" },
})
await stream.append(schema.users.insert({ value: user }))
await stream.append(schema.messages.insert({ value: message }))
await stream.append(schema.reactions.insert({ value: reaction }))Best practices
Use object values. StreamDB requires object values, not primitives, for the primary key pattern:
// Won't work
{ type: "count", key: "views", value: 42 }
// Works
{ type: "count", key: "views", value: { id: "views", count: 42 } }Always call close().
useEffect(() => {
const db = createStreamDB({ streamOptions, state: schema })
return () => db.close()
}, [])Use transaction IDs for critical operations.
const txid = crypto.randomUUID()
await stream.append(schema.users.insert({ value: user, headers: { txid } }))
await db.utils.awaitTxId(txid, 10000)Validate at boundaries.
const userSchema = z.object({
id: z.string().uuid(),
email: z.string().email(),
age: z.number().min(0).max(150),
})Learn more
- Durable State for the underlying protocol
- Package README
- Examples
- TanStack DB