Files
2026-04-05 03:08:53 +02:00

2213 lines
84 KiB
JavaScript

// src/touch/interpreter_worker.ts
import { parentPort, workerData } from "node:worker_threads";
import { Result as Result4 } from "better-result";
// src/util/ds_error.ts
import { TaggedError } from "better-result";
class DurableStreamsError extends TaggedError("DurableStreamsError")() {
}
function dsError(message, opts) {
return new DurableStreamsError({
message,
...opts?.cause !== undefined ? { cause: opts.cause } : {},
...opts?.code !== undefined ? { code: opts.code } : {}
});
}
// src/db/schema.ts
var SCHEMA_VERSION = 11;
var DEFAULT_PRAGMAS_SQL = `
PRAGMA journal_mode = WAL;
PRAGMA synchronous = FULL;
PRAGMA foreign_keys = ON;
PRAGMA busy_timeout = 5000;
PRAGMA temp_store = MEMORY;
`;
var CREATE_TABLES_V4_SQL = `
CREATE TABLE IF NOT EXISTS streams (
stream TEXT PRIMARY KEY,
created_at_ms INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
content_type TEXT NOT NULL,
stream_seq TEXT NULL,
closed INTEGER NOT NULL DEFAULT 0,
closed_producer_id TEXT NULL,
closed_producer_epoch INTEGER NULL,
closed_producer_seq INTEGER NULL,
ttl_seconds INTEGER NULL,
epoch INTEGER NOT NULL,
next_offset INTEGER NOT NULL,
sealed_through INTEGER NOT NULL,
uploaded_through INTEGER NOT NULL,
uploaded_segment_count INTEGER NOT NULL DEFAULT 0,
pending_rows INTEGER NOT NULL,
pending_bytes INTEGER NOT NULL,
-- Logical size of retained rows in the wal table for this stream (payload-only bytes).
-- This is explicitly tracked because SQLite file size is high-water and does not shrink
-- deterministically after DELETE-based GC/retention trimming.
wal_rows INTEGER NOT NULL DEFAULT 0,
wal_bytes INTEGER NOT NULL DEFAULT 0,
last_append_ms INTEGER NOT NULL,
last_segment_cut_ms INTEGER NOT NULL,
segment_in_progress INTEGER NOT NULL,
expires_at_ms INTEGER NULL,
stream_flags INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS streams_pending_bytes_idx ON streams(pending_bytes);
CREATE INDEX IF NOT EXISTS streams_last_cut_idx ON streams(last_segment_cut_ms);
CREATE INDEX IF NOT EXISTS streams_inprog_pending_idx ON streams(segment_in_progress, pending_bytes, last_segment_cut_ms);
CREATE TABLE IF NOT EXISTS wal (
id INTEGER PRIMARY KEY,
stream TEXT NOT NULL,
offset INTEGER NOT NULL,
ts_ms INTEGER NOT NULL,
payload BLOB NOT NULL,
payload_len INTEGER NOT NULL,
routing_key BLOB NULL,
content_type TEXT NULL,
flags INTEGER NOT NULL DEFAULT 0
);
CREATE UNIQUE INDEX IF NOT EXISTS wal_stream_offset_uniq ON wal(stream, offset);
CREATE INDEX IF NOT EXISTS wal_stream_offset_idx ON wal(stream, offset);
CREATE INDEX IF NOT EXISTS wal_ts_idx ON wal(ts_ms);
CREATE TABLE IF NOT EXISTS segments (
segment_id TEXT PRIMARY KEY,
stream TEXT NOT NULL,
segment_index INTEGER NOT NULL,
start_offset INTEGER NOT NULL,
end_offset INTEGER NOT NULL,
block_count INTEGER NOT NULL,
last_append_ms INTEGER NOT NULL,
size_bytes INTEGER NOT NULL,
local_path TEXT NOT NULL,
created_at_ms INTEGER NOT NULL,
uploaded_at_ms INTEGER NULL,
r2_etag TEXT NULL
);
CREATE TABLE IF NOT EXISTS stream_segment_meta (
stream TEXT PRIMARY KEY,
segment_count INTEGER NOT NULL,
segment_offsets BLOB NOT NULL,
segment_blocks BLOB NOT NULL,
segment_last_ts BLOB NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS segments_stream_index_uniq ON segments(stream, segment_index);
CREATE INDEX IF NOT EXISTS segments_stream_start_idx ON segments(stream, start_offset);
CREATE INDEX IF NOT EXISTS segments_pending_upload_idx ON segments(uploaded_at_ms);
CREATE TABLE IF NOT EXISTS manifests (
stream TEXT PRIMARY KEY,
generation INTEGER NOT NULL,
uploaded_generation INTEGER NOT NULL,
last_uploaded_at_ms INTEGER NULL,
last_uploaded_etag TEXT NULL
);
CREATE TABLE IF NOT EXISTS schemas (
stream TEXT PRIMARY KEY,
schema_json TEXT NOT NULL,
updated_at_ms INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS producer_state (
stream TEXT NOT NULL,
producer_id TEXT NOT NULL,
epoch INTEGER NOT NULL,
last_seq INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
PRIMARY KEY (stream, producer_id)
);
CREATE TABLE IF NOT EXISTS stream_interpreters (
stream TEXT PRIMARY KEY,
interpreted_through INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL
);
-- Live dynamic template registry (per base stream).
CREATE TABLE IF NOT EXISTS live_templates (
stream TEXT NOT NULL,
template_id TEXT NOT NULL,
entity TEXT NOT NULL,
fields_json TEXT NOT NULL,
encodings_json TEXT NOT NULL,
state TEXT NOT NULL,
created_at_ms INTEGER NOT NULL,
last_seen_at_ms INTEGER NOT NULL,
inactivity_ttl_ms INTEGER NOT NULL,
active_from_source_offset INTEGER NOT NULL,
retired_at_ms INTEGER NULL,
retired_reason TEXT NULL,
PRIMARY KEY (stream, template_id)
);
CREATE INDEX IF NOT EXISTS live_templates_stream_entity_state_last_seen_idx
ON live_templates(stream, entity, state, last_seen_at_ms);
CREATE INDEX IF NOT EXISTS live_templates_stream_state_last_seen_idx
ON live_templates(stream, state, last_seen_at_ms);
`;
var CREATE_INDEX_TABLES_SQL = `
CREATE TABLE IF NOT EXISTS index_state (
stream TEXT PRIMARY KEY,
index_secret BLOB NOT NULL,
indexed_through INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS index_runs (
run_id TEXT PRIMARY KEY,
stream TEXT NOT NULL,
level INTEGER NOT NULL,
start_segment INTEGER NOT NULL,
end_segment INTEGER NOT NULL,
object_key TEXT NOT NULL,
filter_len INTEGER NOT NULL,
record_count INTEGER NOT NULL,
retired_gen INTEGER NULL,
retired_at_ms INTEGER NULL
);
CREATE INDEX IF NOT EXISTS index_runs_stream_idx ON index_runs(stream, level, start_segment);
`;
var CREATE_TABLES_V4_SUFFIX_SQL = (suffix) => `
CREATE TABLE streams_${suffix} (
stream TEXT PRIMARY KEY,
created_at_ms INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
content_type TEXT NOT NULL,
stream_seq TEXT NULL,
closed INTEGER NOT NULL DEFAULT 0,
closed_producer_id TEXT NULL,
closed_producer_epoch INTEGER NULL,
closed_producer_seq INTEGER NULL,
ttl_seconds INTEGER NULL,
epoch INTEGER NOT NULL,
next_offset INTEGER NOT NULL,
sealed_through INTEGER NOT NULL,
uploaded_through INTEGER NOT NULL,
uploaded_segment_count INTEGER NOT NULL DEFAULT 0,
pending_rows INTEGER NOT NULL,
pending_bytes INTEGER NOT NULL,
last_append_ms INTEGER NOT NULL,
last_segment_cut_ms INTEGER NOT NULL,
segment_in_progress INTEGER NOT NULL,
expires_at_ms INTEGER NULL,
stream_flags INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE wal_${suffix} (
id INTEGER PRIMARY KEY,
stream TEXT NOT NULL,
offset INTEGER NOT NULL,
ts_ms INTEGER NOT NULL,
payload BLOB NOT NULL,
payload_len INTEGER NOT NULL,
routing_key BLOB NULL,
content_type TEXT NULL,
flags INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE segments_${suffix} (
segment_id TEXT PRIMARY KEY,
stream TEXT NOT NULL,
segment_index INTEGER NOT NULL,
start_offset INTEGER NOT NULL,
end_offset INTEGER NOT NULL,
block_count INTEGER NOT NULL,
last_append_ms INTEGER NOT NULL,
size_bytes INTEGER NOT NULL,
local_path TEXT NOT NULL,
created_at_ms INTEGER NOT NULL,
uploaded_at_ms INTEGER NULL,
r2_etag TEXT NULL
);
CREATE TABLE manifests_${suffix} (
stream TEXT PRIMARY KEY,
generation INTEGER NOT NULL,
uploaded_generation INTEGER NOT NULL,
last_uploaded_at_ms INTEGER NULL,
last_uploaded_etag TEXT NULL
);
CREATE TABLE schemas_${suffix} (
stream TEXT PRIMARY KEY,
schema_json TEXT NOT NULL,
updated_at_ms INTEGER NOT NULL
);
CREATE TABLE producer_state_${suffix} (
stream TEXT NOT NULL,
producer_id TEXT NOT NULL,
epoch INTEGER NOT NULL,
last_seq INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
PRIMARY KEY (stream, producer_id)
);
`;
var CREATE_INDEXES_V4_SQL = `
CREATE UNIQUE INDEX IF NOT EXISTS wal_stream_offset_uniq ON wal(stream, offset);
CREATE INDEX IF NOT EXISTS wal_stream_offset_idx ON wal(stream, offset);
CREATE INDEX IF NOT EXISTS wal_ts_idx ON wal(ts_ms);
CREATE INDEX IF NOT EXISTS streams_pending_bytes_idx ON streams(pending_bytes);
CREATE INDEX IF NOT EXISTS streams_last_cut_idx ON streams(last_segment_cut_ms);
CREATE INDEX IF NOT EXISTS streams_inprog_pending_idx ON streams(segment_in_progress, pending_bytes, last_segment_cut_ms);
CREATE UNIQUE INDEX IF NOT EXISTS segments_stream_index_uniq ON segments(stream, segment_index);
CREATE INDEX IF NOT EXISTS segments_stream_start_idx ON segments(stream, start_offset);
CREATE INDEX IF NOT EXISTS segments_pending_upload_idx ON segments(uploaded_at_ms);
`;
function initSchema(db, opts = {}) {
db.exec(DEFAULT_PRAGMAS_SQL);
if (opts.skipMigrations)
return;
db.exec(`CREATE TABLE IF NOT EXISTS schema_version (version INTEGER NOT NULL);`);
const readSchemaVersion = () => {
const row = db.query("SELECT version FROM schema_version LIMIT 1;").get();
if (!row)
return null;
const raw = row.version;
if (typeof raw === "bigint")
return Number(raw);
if (typeof raw === "number")
return raw;
return Number(raw);
};
const version0 = readSchemaVersion();
if (version0 == null) {
db.exec(CREATE_TABLES_V4_SQL);
db.exec(CREATE_INDEX_TABLES_SQL);
db.query("INSERT INTO schema_version(version) VALUES (?);").run(SCHEMA_VERSION);
return;
}
if (version0 === SCHEMA_VERSION)
return;
let version = version0;
while (version !== SCHEMA_VERSION) {
if (version === 1) {
migrateV1ToV4(db);
} else if (version === 2) {
migrateV2ToV4(db);
} else if (version === 3) {
migrateV3ToV4(db);
} else if (version === 4) {
migrateV4ToV5(db);
} else if (version === 5) {
migrateV5ToV6(db);
} else if (version === 6) {
migrateV6ToV7(db);
} else if (version === 7) {
migrateV7ToV8(db);
} else if (version === 8) {
migrateV8ToV9(db);
} else if (version === 9) {
migrateV9ToV10(db);
} else if (version === 10) {
migrateV10ToV11(db);
} else {
throw dsError(`unexpected schema version: ${version} (expected ${SCHEMA_VERSION})`);
}
const next = readSchemaVersion();
if (next == null)
throw dsError("schema_version row missing after migration");
version = next;
}
}
function migrateV1ToV4(db) {
const tx = db.transaction(() => {
db.exec(CREATE_TABLES_V4_SUFFIX_SQL("v4"));
db.exec(`
INSERT INTO streams_v4(
stream, created_at_ms, updated_at_ms,
content_type, stream_seq, closed, closed_producer_id, closed_producer_epoch, closed_producer_seq, ttl_seconds,
epoch,
next_offset, sealed_through, uploaded_through,
pending_rows, pending_bytes,
last_append_ms, last_segment_cut_ms, segment_in_progress,
expires_at_ms, stream_flags
)
SELECT
stream,
CAST(created_at_ns / 1000000 AS INTEGER),
CAST(updated_at_ns / 1000000 AS INTEGER),
'application/octet-stream',
NULL,
0,
NULL,
NULL,
NULL,
NULL,
epoch,
next_seq,
sealed_through_seq,
uploaded_through_seq,
pending_rows,
pending_bytes,
CAST(last_append_ns / 1000000 AS INTEGER),
CAST(last_segment_cut_ns / 1000000 AS INTEGER),
segment_in_progress,
CASE WHEN expires_at_ns IS NULL THEN NULL ELSE CAST(expires_at_ns / 1000000 AS INTEGER) END,
CASE WHEN deleted != 0 THEN 1 ELSE 0 END
FROM streams;
`);
db.exec(`
INSERT INTO wal_v4(
stream, offset, ts_ms, payload, payload_len, routing_key, content_type, flags
)
SELECT
stream,
seq,
CAST(append_ns / 1000000 AS INTEGER),
payload,
payload_len,
CASE WHEN routing_key IS NULL THEN NULL ELSE CAST(routing_key AS BLOB) END,
CASE WHEN is_json != 0 THEN 'application/json' ELSE NULL END,
0
FROM wal;
`);
db.exec(`
INSERT INTO segments_v4(
segment_id, stream, segment_index, start_offset, end_offset, block_count,
last_append_ms, size_bytes, local_path, created_at_ms, uploaded_at_ms, r2_etag
)
SELECT
segment_id,
stream,
segment_index,
start_seq,
end_seq,
block_count,
CAST(last_append_ns / 1000000 AS INTEGER),
size_bytes,
local_path,
CAST(created_at_ns / 1000000 AS INTEGER),
CASE WHEN uploaded_at_ns IS NULL THEN NULL ELSE CAST(uploaded_at_ns / 1000000 AS INTEGER) END,
NULL
FROM segments;
`);
db.exec(`
INSERT INTO manifests_v4(
stream, generation, uploaded_generation, last_uploaded_at_ms, last_uploaded_etag
)
SELECT
stream,
generation,
uploaded_generation,
CASE WHEN last_uploaded_at_ns IS NULL THEN NULL ELSE CAST(last_uploaded_at_ns / 1000000 AS INTEGER) END,
last_uploaded_etag
FROM manifests;
`);
db.exec(`
INSERT INTO schemas_v4(stream, schema_json, updated_at_ms)
SELECT stream, schema_json, CAST(updated_at_ns / 1000000 AS INTEGER)
FROM schemas;
`);
db.exec(`DROP TABLE wal;`);
db.exec(`DROP TABLE streams;`);
db.exec(`DROP TABLE segments;`);
db.exec(`DROP TABLE manifests;`);
db.exec(`DROP TABLE schemas;`);
db.exec(`ALTER TABLE streams_v4 RENAME TO streams;`);
db.exec(`ALTER TABLE wal_v4 RENAME TO wal;`);
db.exec(`ALTER TABLE segments_v4 RENAME TO segments;`);
db.exec(`ALTER TABLE manifests_v4 RENAME TO manifests;`);
db.exec(`ALTER TABLE schemas_v4 RENAME TO schemas;`);
db.exec(`ALTER TABLE producer_state_v4 RENAME TO producer_state;`);
db.exec(CREATE_INDEXES_V4_SQL);
db.exec(CREATE_INDEX_TABLES_SQL);
db.exec(`UPDATE schema_version SET version = 4;`);
});
tx();
}
function migrateV2ToV4(db) {
const tx = db.transaction(() => {
db.exec(`ALTER TABLE segments ADD COLUMN block_count INTEGER NOT NULL DEFAULT 0;`);
db.exec(`ALTER TABLE segments ADD COLUMN last_append_ms INTEGER NOT NULL DEFAULT 0;`);
db.exec(`ALTER TABLE streams ADD COLUMN content_type TEXT NOT NULL DEFAULT 'application/octet-stream';`);
db.exec(`ALTER TABLE streams ADD COLUMN stream_seq TEXT NULL;`);
db.exec(`ALTER TABLE streams ADD COLUMN closed INTEGER NOT NULL DEFAULT 0;`);
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_id TEXT NULL;`);
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_epoch INTEGER NULL;`);
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_seq INTEGER NULL;`);
db.exec(`ALTER TABLE streams ADD COLUMN ttl_seconds INTEGER NULL;`);
db.exec(`
CREATE TABLE IF NOT EXISTS producer_state (
stream TEXT NOT NULL,
producer_id TEXT NOT NULL,
epoch INTEGER NOT NULL,
last_seq INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
PRIMARY KEY (stream, producer_id)
);
`);
db.exec(CREATE_INDEX_TABLES_SQL);
db.exec(`UPDATE schema_version SET version = 4;`);
});
tx();
}
function migrateV3ToV4(db) {
const tx = db.transaction(() => {
db.exec(`ALTER TABLE streams ADD COLUMN content_type TEXT NOT NULL DEFAULT 'application/octet-stream';`);
db.exec(`ALTER TABLE streams ADD COLUMN stream_seq TEXT NULL;`);
db.exec(`ALTER TABLE streams ADD COLUMN closed INTEGER NOT NULL DEFAULT 0;`);
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_id TEXT NULL;`);
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_epoch INTEGER NULL;`);
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_seq INTEGER NULL;`);
db.exec(`ALTER TABLE streams ADD COLUMN ttl_seconds INTEGER NULL;`);
db.exec(`
CREATE TABLE IF NOT EXISTS producer_state (
stream TEXT NOT NULL,
producer_id TEXT NOT NULL,
epoch INTEGER NOT NULL,
last_seq INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
PRIMARY KEY (stream, producer_id)
);
`);
db.exec(CREATE_INDEX_TABLES_SQL);
db.exec(`UPDATE schema_version SET version = 4;`);
});
tx();
}
function migrateV4ToV5(db) {
const tx = db.transaction(() => {
db.exec(CREATE_INDEX_TABLES_SQL);
db.exec(`UPDATE schema_version SET version = 5;`);
});
tx();
}
function migrateV5ToV6(db) {
const tx = db.transaction(() => {
db.exec(`ALTER TABLE streams ADD COLUMN uploaded_segment_count INTEGER NOT NULL DEFAULT 0;`);
db.exec(`
CREATE TABLE IF NOT EXISTS stream_segment_meta (
stream TEXT PRIMARY KEY,
segment_count INTEGER NOT NULL,
segment_offsets BLOB NOT NULL,
segment_blocks BLOB NOT NULL,
segment_last_ts BLOB NOT NULL
);
`);
db.exec(`UPDATE schema_version SET version = 6;`);
});
tx();
}
function migrateV6ToV7(db) {
const tx = db.transaction(() => {
db.exec(`
CREATE TABLE IF NOT EXISTS stream_interpreters (
stream TEXT PRIMARY KEY,
interpreted_through INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL
);
`);
db.exec(`UPDATE schema_version SET version = 7;`);
});
tx();
}
function migrateV7ToV8(db) {
const tx = db.transaction(() => {
db.exec(`UPDATE schema_version SET version = 8;`);
});
tx();
}
function migrateV8ToV9(db) {
const tx = db.transaction(() => {
db.exec(`
CREATE TABLE IF NOT EXISTS live_templates (
stream TEXT NOT NULL,
template_id TEXT NOT NULL,
entity TEXT NOT NULL,
fields_json TEXT NOT NULL,
encodings_json TEXT NOT NULL,
state TEXT NOT NULL,
created_at_ms INTEGER NOT NULL,
last_seen_at_ms INTEGER NOT NULL,
inactivity_ttl_ms INTEGER NOT NULL,
active_from_source_offset INTEGER NOT NULL,
retired_at_ms INTEGER NULL,
retired_reason TEXT NULL,
PRIMARY KEY (stream, template_id)
);
`);
db.exec(`
CREATE INDEX IF NOT EXISTS live_templates_stream_entity_state_last_seen_idx
ON live_templates(stream, entity, state, last_seen_at_ms);
`);
db.exec(`
CREATE INDEX IF NOT EXISTS live_templates_stream_state_last_seen_idx
ON live_templates(stream, state, last_seen_at_ms);
`);
db.exec(`UPDATE schema_version SET version = 9;`);
});
tx();
}
function migrateV9ToV10(db) {
const tx = db.transaction(() => {
db.exec(`ALTER TABLE streams ADD COLUMN wal_rows INTEGER NOT NULL DEFAULT 0;`);
db.exec(`ALTER TABLE streams ADD COLUMN wal_bytes INTEGER NOT NULL DEFAULT 0;`);
db.exec(`DROP TABLE IF EXISTS temp.wal_stats;`);
db.exec(`
CREATE TEMP TABLE wal_stats AS
SELECT stream, COUNT(*) as rows, COALESCE(SUM(payload_len), 0) as bytes
FROM wal
GROUP BY stream;
`);
db.exec(`
UPDATE streams
SET wal_rows = COALESCE((SELECT rows FROM wal_stats WHERE wal_stats.stream = streams.stream), 0),
wal_bytes = COALESCE((SELECT bytes FROM wal_stats WHERE wal_stats.stream = streams.stream), 0);
`);
db.exec(`DROP TABLE wal_stats;`);
db.exec(`UPDATE schema_version SET version = 10;`);
});
tx();
}
function migrateV10ToV11(db) {
const tx = db.transaction(() => {
db.exec(`DROP INDEX IF EXISTS wal_touch_stream_rk_offset_idx;`);
db.exec(`UPDATE schema_version SET version = ${SCHEMA_VERSION};`);
});
tx();
}
// src/sqlite/adapter.ts
import { createRequire } from "node:module";
// src/runtime/host_runtime.ts
function detectHostRuntime() {
return typeof globalThis.Bun !== "undefined" || Boolean(process.versions?.bun) ? "bun" : "node";
}
// src/sqlite/adapter.ts
class BunStatementAdapter {
stmt;
constructor(stmt) {
this.stmt = stmt;
}
get(...params) {
return this.stmt.get(...params);
}
all(...params) {
return this.stmt.all(...params);
}
run(...params) {
return this.stmt.run(...params);
}
iterate(...params) {
return this.stmt.iterate(...params);
}
finalize() {
if (typeof this.stmt.finalize === "function")
this.stmt.finalize();
}
}
class BunDatabaseAdapter {
db;
constructor(db) {
this.db = db;
}
exec(sql) {
this.db.exec(sql);
}
query(sql) {
return new BunStatementAdapter(this.db.query(sql));
}
transaction(fn) {
return this.db.transaction(fn);
}
close() {
this.db.close();
}
}
class NodeStatementAdapter {
stmt;
constructor(stmt) {
this.stmt = stmt;
}
get(...params) {
return this.stmt.get(...params);
}
all(...params) {
return this.stmt.all(...params);
}
run(...params) {
return this.stmt.run(...params);
}
iterate(...params) {
return this.stmt.iterate(...params);
}
finalize() {
if (typeof this.stmt.finalize === "function")
this.stmt.finalize();
}
}
class NodeDatabaseAdapter {
txDepth = 0;
txCounter = 0;
db;
constructor(db) {
this.db = db;
}
exec(sql) {
this.db.exec(sql);
}
query(sql) {
const stmt = this.db.prepare(sql);
if (typeof stmt?.setReadBigInts === "function")
stmt.setReadBigInts(true);
return new NodeStatementAdapter(stmt);
}
transaction(fn) {
return () => {
const nested = this.txDepth > 0;
const savepoint = `ds_tx_${++this.txCounter}`;
this.txDepth += 1;
try {
if (nested)
this.db.exec(`SAVEPOINT ${savepoint};`);
else
this.db.exec("BEGIN;");
const out = fn();
if (nested)
this.db.exec(`RELEASE SAVEPOINT ${savepoint};`);
else
this.db.exec("COMMIT;");
return out;
} catch (err) {
try {
if (nested) {
this.db.exec(`ROLLBACK TO SAVEPOINT ${savepoint};`);
this.db.exec(`RELEASE SAVEPOINT ${savepoint};`);
} else {
this.db.exec("ROLLBACK;");
}
} catch {}
throw err;
} finally {
this.txDepth = Math.max(0, this.txDepth - 1);
}
};
}
close() {
this.db.close();
}
}
var openImpl = null;
var openImplRuntime = null;
var runtimeOverride = null;
var require2 = createRequire(import.meta.url);
function selectedRuntime() {
return runtimeOverride ?? detectHostRuntime();
}
function buildOpenImpl(runtime) {
if (runtime === "bun") {
const { Database } = require2("bun:sqlite");
return (path) => new BunDatabaseAdapter(new Database(path));
}
const { DatabaseSync } = require2("node:sqlite");
return (path) => new NodeDatabaseAdapter(new DatabaseSync(path));
}
function setSqliteRuntimeOverride(runtime) {
runtimeOverride = runtime;
if (runtimeOverride && openImplRuntime && runtimeOverride !== openImplRuntime) {
openImpl = null;
openImplRuntime = null;
}
}
function openSqliteDatabase(path) {
const runtime = selectedRuntime();
if (!openImpl || openImplRuntime !== runtime) {
openImpl = buildOpenImpl(runtime);
openImplRuntime = runtime;
}
if (!openImpl)
throw dsError("sqlite adapter not initialized");
return openImpl(path);
}
// src/db/db.ts
import { Result } from "better-result";
var STREAM_FLAG_DELETED = 1 << 0;
var STREAM_FLAG_TOUCH = 1 << 1;
var BASE_WAL_GC_CHUNK_OFFSETS = (() => {
const raw = process.env.DS_BASE_WAL_GC_CHUNK_OFFSETS;
if (raw == null || raw.trim() === "")
return 1e5;
const n = Number(raw);
if (!Number.isFinite(n) || n <= 0)
return 1e5;
return Math.floor(n);
})();
class SqliteDurableStore {
db;
dbstatReady = null;
stmts;
constructor(path, opts = {}) {
this.db = openSqliteDatabase(path);
initSchema(this.db, { skipMigrations: opts.skipMigrations });
if (opts.cacheBytes && opts.cacheBytes > 0) {
const kb = Math.max(1, Math.floor(opts.cacheBytes / 1024));
this.db.exec(`PRAGMA cache_size = -${kb};`);
}
this.stmts = {
getStream: this.db.query(`SELECT stream, created_at_ms, updated_at_ms,
content_type, stream_seq, closed, closed_producer_id, closed_producer_epoch, closed_producer_seq, ttl_seconds,
epoch, next_offset, sealed_through, uploaded_through, uploaded_segment_count,
pending_rows, pending_bytes, wal_rows, wal_bytes, last_append_ms, last_segment_cut_ms, segment_in_progress,
expires_at_ms, stream_flags
FROM streams WHERE stream = ? LIMIT 1;`),
upsertStream: this.db.query(`INSERT INTO streams(stream, created_at_ms, updated_at_ms,
content_type, stream_seq, closed, closed_producer_id, closed_producer_epoch, closed_producer_seq, ttl_seconds,
epoch, next_offset, sealed_through, uploaded_through, uploaded_segment_count,
pending_rows, pending_bytes, wal_rows, wal_bytes, last_append_ms, last_segment_cut_ms, segment_in_progress,
expires_at_ms, stream_flags)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(stream) DO UPDATE SET
updated_at_ms=excluded.updated_at_ms,
expires_at_ms=excluded.expires_at_ms,
ttl_seconds=excluded.ttl_seconds,
content_type=excluded.content_type,
stream_flags=excluded.stream_flags;`),
listStreams: this.db.query(`SELECT stream, created_at_ms, updated_at_ms,
content_type, stream_seq, closed, closed_producer_id, closed_producer_epoch, closed_producer_seq, ttl_seconds,
epoch, next_offset, sealed_through, uploaded_through, uploaded_segment_count,
pending_rows, pending_bytes, wal_rows, wal_bytes, last_append_ms, last_segment_cut_ms, segment_in_progress,
expires_at_ms, stream_flags
FROM streams
WHERE (stream_flags & ?) = 0
AND (expires_at_ms IS NULL OR expires_at_ms > ?)
ORDER BY stream
LIMIT ? OFFSET ?;`),
setDeleted: this.db.query(`UPDATE streams SET stream_flags = (stream_flags | ?), updated_at_ms=? WHERE stream=?;`),
insertWal: this.db.query(`INSERT INTO wal(stream, offset, ts_ms, payload, payload_len, routing_key, content_type, flags)
VALUES(?, ?, ?, ?, ?, ?, ?, ?);`),
updateStreamAppend: this.db.query(`UPDATE streams
SET next_offset = ?, updated_at_ms = ?, last_append_ms = ?,
pending_rows = pending_rows + ?, pending_bytes = pending_bytes + ?,
wal_rows = wal_rows + ?, wal_bytes = wal_bytes + ?
WHERE stream = ? AND (stream_flags & ?) = 0;`),
updateStreamAppendSeqCheck: this.db.query(`UPDATE streams
SET next_offset = ?, updated_at_ms = ?, last_append_ms = ?,
pending_rows = pending_rows + ?, pending_bytes = pending_bytes + ?,
wal_rows = wal_rows + ?, wal_bytes = wal_bytes + ?
WHERE stream = ? AND (stream_flags & ?) = 0 AND next_offset = ?;`),
candidateStreams: this.db.query(`SELECT stream, pending_bytes, pending_rows, last_segment_cut_ms, sealed_through, next_offset, epoch
FROM streams
WHERE (stream_flags & ?) = 0
AND segment_in_progress = 0
AND (pending_bytes >= ? OR pending_rows >= ? OR (? - last_segment_cut_ms) >= ?)
ORDER BY pending_bytes DESC
LIMIT ?;`),
candidateStreamsNoInterval: this.db.query(`SELECT stream, pending_bytes, pending_rows, last_segment_cut_ms, sealed_through, next_offset, epoch
FROM streams
WHERE (stream_flags & ?) = 0
AND segment_in_progress = 0
AND (pending_bytes >= ? OR pending_rows >= ?)
ORDER BY pending_bytes DESC
LIMIT ?;`),
listExpiredStreams: this.db.query(`SELECT stream
FROM streams
WHERE (stream_flags & ?) = 0
AND expires_at_ms IS NOT NULL
AND expires_at_ms <= ?
ORDER BY expires_at_ms ASC
LIMIT ?;`),
streamWalRange: this.db.query(`SELECT offset, ts_ms, routing_key, content_type, payload
FROM wal
WHERE stream = ? AND offset >= ? AND offset <= ?
ORDER BY offset ASC;`),
streamWalRangeByKey: this.db.query(`SELECT offset, ts_ms, routing_key, content_type, payload
FROM wal
WHERE stream = ? AND offset >= ? AND offset <= ? AND routing_key = ?
ORDER BY offset ASC;`),
createSegment: this.db.query(`INSERT INTO segments(segment_id, stream, segment_index, start_offset, end_offset, block_count,
last_append_ms, size_bytes, local_path, created_at_ms, uploaded_at_ms, r2_etag)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL);`),
listSegmentsForStream: this.db.query(`SELECT segment_id, stream, segment_index, start_offset, end_offset, block_count, last_append_ms, size_bytes,
local_path, created_at_ms, uploaded_at_ms, r2_etag
FROM segments WHERE stream=? ORDER BY segment_index ASC;`),
getSegmentByIndex: this.db.query(`SELECT segment_id, stream, segment_index, start_offset, end_offset, block_count, last_append_ms, size_bytes,
local_path, created_at_ms, uploaded_at_ms, r2_etag
FROM segments WHERE stream=? AND segment_index=? LIMIT 1;`),
findSegmentForOffset: this.db.query(`SELECT segment_id, stream, segment_index, start_offset, end_offset, block_count, last_append_ms, size_bytes,
local_path, created_at_ms, uploaded_at_ms, r2_etag
FROM segments
WHERE stream=? AND start_offset <= ? AND end_offset >= ?
ORDER BY segment_index DESC
LIMIT 1;`),
nextSegmentIndex: this.db.query(`SELECT COALESCE(MAX(segment_index)+1, 0) as next_idx FROM segments WHERE stream=?;`),
markSegmentUploaded: this.db.query(`UPDATE segments SET r2_etag=?, uploaded_at_ms=? WHERE segment_id=?;`),
pendingUploadSegments: this.db.query(`SELECT segment_id, stream, segment_index, start_offset, end_offset, block_count, last_append_ms, size_bytes,
local_path, created_at_ms, uploaded_at_ms, r2_etag
FROM segments WHERE uploaded_at_ms IS NULL ORDER BY created_at_ms ASC LIMIT ?;`),
countPendingSegments: this.db.query(`SELECT COUNT(*) as cnt FROM segments WHERE uploaded_at_ms IS NULL;`),
countSegmentsForStream: this.db.query(`SELECT COUNT(*) as cnt FROM segments WHERE stream=?;`),
tryClaimSegment: this.db.query(`UPDATE streams SET segment_in_progress=1, updated_at_ms=? WHERE stream=? AND segment_in_progress=0;`),
getManifest: this.db.query(`SELECT stream, generation, uploaded_generation, last_uploaded_at_ms, last_uploaded_etag FROM manifests WHERE stream=? LIMIT 1;`),
upsertManifest: this.db.query(`INSERT INTO manifests(stream, generation, uploaded_generation, last_uploaded_at_ms, last_uploaded_etag)
VALUES(?, ?, ?, ?, ?)
ON CONFLICT(stream) DO UPDATE SET
generation=excluded.generation,
uploaded_generation=excluded.uploaded_generation,
last_uploaded_at_ms=excluded.last_uploaded_at_ms,
last_uploaded_etag=excluded.last_uploaded_etag;`),
getIndexState: this.db.query(`SELECT stream, index_secret, indexed_through, updated_at_ms
FROM index_state WHERE stream=? LIMIT 1;`),
upsertIndexState: this.db.query(`INSERT INTO index_state(stream, index_secret, indexed_through, updated_at_ms)
VALUES(?, ?, ?, ?)
ON CONFLICT(stream) DO UPDATE SET
index_secret=excluded.index_secret,
indexed_through=excluded.indexed_through,
updated_at_ms=excluded.updated_at_ms;`),
updateIndexedThrough: this.db.query(`UPDATE index_state SET indexed_through=?, updated_at_ms=? WHERE stream=?;`),
listIndexRuns: this.db.query(`SELECT run_id, stream, level, start_segment, end_segment, object_key, filter_len, record_count, retired_gen, retired_at_ms
FROM index_runs WHERE stream=? AND retired_gen IS NULL
ORDER BY start_segment ASC, level ASC;`),
listIndexRunsAll: this.db.query(`SELECT run_id, stream, level, start_segment, end_segment, object_key, filter_len, record_count, retired_gen, retired_at_ms
FROM index_runs WHERE stream=?
ORDER BY start_segment ASC, level ASC;`),
listRetiredIndexRuns: this.db.query(`SELECT run_id, stream, level, start_segment, end_segment, object_key, filter_len, record_count, retired_gen, retired_at_ms
FROM index_runs WHERE stream=? AND retired_gen IS NOT NULL
ORDER BY retired_at_ms ASC;`),
insertIndexRun: this.db.query(`INSERT OR IGNORE INTO index_runs(run_id, stream, level, start_segment, end_segment, object_key, filter_len, record_count, retired_gen, retired_at_ms)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL);`),
retireIndexRun: this.db.query(`UPDATE index_runs SET retired_gen=?, retired_at_ms=? WHERE run_id=?;`),
deleteIndexRun: this.db.query(`DELETE FROM index_runs WHERE run_id=?;`),
countUploadedSegments: this.db.query(`SELECT COALESCE(MAX(segment_index), -1) as max_idx
FROM segments WHERE stream=? AND r2_etag IS NOT NULL;`),
getSegmentMeta: this.db.query(`SELECT stream, segment_count, segment_offsets, segment_blocks, segment_last_ts
FROM stream_segment_meta WHERE stream=? LIMIT 1;`),
ensureSegmentMeta: this.db.query(`INSERT INTO stream_segment_meta(stream, segment_count, segment_offsets, segment_blocks, segment_last_ts)
VALUES(?, 0, x'', x'', x'')
ON CONFLICT(stream) DO NOTHING;`),
appendSegmentMeta: this.db.query(`UPDATE stream_segment_meta
SET segment_count = segment_count + 1,
segment_offsets = segment_offsets || ?,
segment_blocks = segment_blocks || ?,
segment_last_ts = segment_last_ts || ?
WHERE stream = ?;`),
upsertSegmentMeta: this.db.query(`INSERT INTO stream_segment_meta(stream, segment_count, segment_offsets, segment_blocks, segment_last_ts)
VALUES(?, ?, ?, ?, ?)
ON CONFLICT(stream) DO UPDATE SET
segment_count=excluded.segment_count,
segment_offsets=excluded.segment_offsets,
segment_blocks=excluded.segment_blocks,
segment_last_ts=excluded.segment_last_ts;`),
setUploadedSegmentCount: this.db.query(`UPDATE streams SET uploaded_segment_count=?, updated_at_ms=? WHERE stream=?;`),
advanceUploadedThrough: this.db.query(`UPDATE streams SET uploaded_through=?, updated_at_ms=? WHERE stream=?;`),
deleteWalBeforeOffset: this.db.query(`DELETE FROM wal WHERE stream=? AND offset <= ?;`),
getSchemaRegistry: this.db.query(`SELECT stream, schema_json, updated_at_ms FROM schemas WHERE stream=? LIMIT 1;`),
upsertSchemaRegistry: this.db.query(`INSERT INTO schemas(stream, schema_json, updated_at_ms) VALUES(?, ?, ?)
ON CONFLICT(stream) DO UPDATE SET schema_json=excluded.schema_json, updated_at_ms=excluded.updated_at_ms;`),
getStreamInterpreter: this.db.query(`SELECT stream, interpreted_through, updated_at_ms
FROM stream_interpreters WHERE stream=? LIMIT 1;`),
upsertStreamInterpreter: this.db.query(`INSERT INTO stream_interpreters(stream, interpreted_through, updated_at_ms)
VALUES(?, ?, ?)
ON CONFLICT(stream) DO UPDATE SET
interpreted_through=excluded.interpreted_through,
updated_at_ms=excluded.updated_at_ms;`),
deleteStreamInterpreter: this.db.query(`DELETE FROM stream_interpreters WHERE stream=?;`),
listStreamInterpreters: this.db.query(`SELECT stream, interpreted_through, updated_at_ms
FROM stream_interpreters
ORDER BY stream ASC;`),
countStreams: this.db.query(`SELECT COUNT(*) as cnt FROM streams WHERE (stream_flags & ?) = 0;`),
sumPendingBytes: this.db.query(`SELECT COALESCE(SUM(pending_bytes), 0) as total FROM streams;`),
sumPendingSegmentBytes: this.db.query(`SELECT COALESCE(SUM(size_bytes), 0) as total FROM segments WHERE uploaded_at_ms IS NULL;`)
};
}
toBigInt(v) {
return typeof v === "bigint" ? v : BigInt(v);
}
bindInt(v) {
const max = BigInt(Number.MAX_SAFE_INTEGER);
const min = BigInt(Number.MIN_SAFE_INTEGER);
if (v <= max && v >= min)
return Number(v);
return v.toString();
}
encodeU64Le(value) {
const buf = new Uint8Array(8);
const dv = new DataView(buf.buffer, buf.byteOffset, buf.byteLength);
dv.setBigUint64(0, value, true);
return buf;
}
encodeU32Le(value) {
const buf = new Uint8Array(4);
const dv = new DataView(buf.buffer, buf.byteOffset, buf.byteLength);
dv.setUint32(0, value >>> 0, true);
return buf;
}
coerceStreamRow(row) {
return {
stream: String(row.stream),
created_at_ms: this.toBigInt(row.created_at_ms),
updated_at_ms: this.toBigInt(row.updated_at_ms),
content_type: String(row.content_type),
stream_seq: row.stream_seq == null ? null : String(row.stream_seq),
closed: Number(row.closed),
closed_producer_id: row.closed_producer_id == null ? null : String(row.closed_producer_id),
closed_producer_epoch: row.closed_producer_epoch == null ? null : Number(row.closed_producer_epoch),
closed_producer_seq: row.closed_producer_seq == null ? null : Number(row.closed_producer_seq),
ttl_seconds: row.ttl_seconds == null ? null : Number(row.ttl_seconds),
epoch: Number(row.epoch),
next_offset: this.toBigInt(row.next_offset),
sealed_through: this.toBigInt(row.sealed_through),
uploaded_through: this.toBigInt(row.uploaded_through),
uploaded_segment_count: Number(row.uploaded_segment_count ?? 0),
pending_rows: this.toBigInt(row.pending_rows),
pending_bytes: this.toBigInt(row.pending_bytes),
wal_rows: this.toBigInt(row.wal_rows ?? 0),
wal_bytes: this.toBigInt(row.wal_bytes ?? 0),
last_append_ms: this.toBigInt(row.last_append_ms),
last_segment_cut_ms: this.toBigInt(row.last_segment_cut_ms),
segment_in_progress: Number(row.segment_in_progress),
expires_at_ms: row.expires_at_ms == null ? null : this.toBigInt(row.expires_at_ms),
stream_flags: Number(row.stream_flags)
};
}
coerceSegmentRow(row) {
return {
segment_id: String(row.segment_id),
stream: String(row.stream),
segment_index: Number(row.segment_index),
start_offset: this.toBigInt(row.start_offset),
end_offset: this.toBigInt(row.end_offset),
block_count: Number(row.block_count),
last_append_ms: this.toBigInt(row.last_append_ms),
size_bytes: Number(row.size_bytes),
local_path: String(row.local_path),
created_at_ms: this.toBigInt(row.created_at_ms),
uploaded_at_ms: row.uploaded_at_ms == null ? null : this.toBigInt(row.uploaded_at_ms),
r2_etag: row.r2_etag == null ? null : String(row.r2_etag)
};
}
close() {
this.db.close();
}
nowMs() {
return BigInt(Date.now());
}
isDeleted(row) {
return (row.stream_flags & STREAM_FLAG_DELETED) !== 0;
}
getStream(stream) {
const row = this.stmts.getStream.get(stream);
return row ? this.coerceStreamRow(row) : null;
}
ensureStream(stream, opts) {
const existing = this.getStream(stream);
if (existing)
return existing;
const now = this.nowMs();
const epoch = 0;
const nextOffset = 0n;
const contentType = opts?.contentType ?? "application/octet-stream";
const closed = opts?.closed ? 1 : 0;
const closedProducer = opts?.closedProducer ?? null;
const expiresAtMs = opts?.expiresAtMs ?? null;
const ttlSeconds = opts?.ttlSeconds ?? null;
const streamFlags = opts?.streamFlags ?? 0;
this.db.query(`INSERT INTO streams(
stream, created_at_ms, updated_at_ms,
content_type, stream_seq, closed, closed_producer_id, closed_producer_epoch, closed_producer_seq, ttl_seconds,
epoch, next_offset, sealed_through, uploaded_through, uploaded_segment_count,
pending_rows, pending_bytes, last_append_ms, last_segment_cut_ms, segment_in_progress,
expires_at_ms, stream_flags
)
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`).run(stream, now, now, contentType, null, closed, closedProducer ? closedProducer.id : null, closedProducer ? closedProducer.epoch : null, closedProducer ? closedProducer.seq : null, ttlSeconds, epoch, nextOffset, -1n, -1n, 0, 0n, 0n, now, now, 0, expiresAtMs, streamFlags);
this.stmts.upsertManifest.run(stream, 0, 0, null, null);
this.ensureSegmentMeta(stream);
return this.getStream(stream);
}
restoreStreamRow(row) {
this.stmts.upsertStream.run(row.stream, row.created_at_ms, row.updated_at_ms, row.content_type, row.stream_seq, row.closed, row.closed_producer_id, row.closed_producer_epoch, row.closed_producer_seq, row.ttl_seconds, row.epoch, row.next_offset, row.sealed_through, row.uploaded_through, row.uploaded_segment_count, row.pending_rows, row.pending_bytes, row.wal_rows, row.wal_bytes, row.last_append_ms, row.last_segment_cut_ms, row.segment_in_progress, row.expires_at_ms, row.stream_flags);
}
listStreams(limit, offset) {
const now = this.nowMs();
const rows = this.stmts.listStreams.all(STREAM_FLAG_DELETED | STREAM_FLAG_TOUCH, now, limit, offset);
return rows.map((r) => this.coerceStreamRow(r));
}
listExpiredStreams(limit) {
const now = this.nowMs();
const rows = this.stmts.listExpiredStreams.all(STREAM_FLAG_DELETED | STREAM_FLAG_TOUCH, now, limit);
return rows.map((r) => String(r.stream));
}
deleteStream(stream) {
const existing = this.getStream(stream);
if (!existing)
return false;
const now = this.nowMs();
this.stmts.setDeleted.run(STREAM_FLAG_DELETED, now, stream);
return true;
}
hardDeleteStream(stream) {
const tx = this.db.transaction(() => {
const existing = this.getStream(stream);
if (!existing)
return false;
this.db.query(`DELETE FROM wal WHERE stream=?;`).run(stream);
this.db.query(`DELETE FROM segments WHERE stream=?;`).run(stream);
this.db.query(`DELETE FROM manifests WHERE stream=?;`).run(stream);
this.db.query(`DELETE FROM schemas WHERE stream=?;`).run(stream);
this.db.query(`DELETE FROM stream_interpreters WHERE stream=?;`).run(stream);
this.db.query(`DELETE FROM live_templates WHERE stream=?;`).run(stream);
this.db.query(`DELETE FROM producer_state WHERE stream=?;`).run(stream);
this.db.query(`DELETE FROM index_state WHERE stream=?;`).run(stream);
this.db.query(`DELETE FROM index_runs WHERE stream=?;`).run(stream);
this.db.query(`DELETE FROM stream_segment_meta WHERE stream=?;`).run(stream);
this.db.query(`DELETE FROM streams WHERE stream=?;`).run(stream);
return true;
});
return tx();
}
getSchemaRegistry(stream) {
const row = this.stmts.getSchemaRegistry.get(stream);
if (!row)
return null;
return { stream: String(row.stream), registry_json: String(row.schema_json), updated_at_ms: this.toBigInt(row.updated_at_ms) };
}
upsertSchemaRegistry(stream, registryJson) {
this.stmts.upsertSchemaRegistry.run(stream, registryJson, this.nowMs());
}
getStreamInterpreter(stream) {
const row = this.stmts.getStreamInterpreter.get(stream);
if (!row)
return null;
return {
stream: String(row.stream),
interpreted_through: this.toBigInt(row.interpreted_through),
updated_at_ms: this.toBigInt(row.updated_at_ms)
};
}
listStreamInterpreters() {
const rows = this.stmts.listStreamInterpreters.all();
return rows.map((row) => ({
stream: String(row.stream),
interpreted_through: this.toBigInt(row.interpreted_through),
updated_at_ms: this.toBigInt(row.updated_at_ms)
}));
}
ensureStreamInterpreter(stream) {
const existing = this.getStreamInterpreter(stream);
if (existing)
return;
const srow = this.getStream(stream);
const initialThrough = srow ? srow.next_offset - 1n : -1n;
this.stmts.upsertStreamInterpreter.run(stream, this.bindInt(initialThrough), this.nowMs());
}
updateStreamInterpreterThrough(stream, interpretedThrough) {
this.stmts.upsertStreamInterpreter.run(stream, this.bindInt(interpretedThrough), this.nowMs());
}
deleteStreamInterpreter(stream) {
this.stmts.deleteStreamInterpreter.run(stream);
}
addStreamFlags(stream, flags) {
if (!Number.isFinite(flags) || flags <= 0)
return;
this.db.query(`UPDATE streams SET stream_flags = (stream_flags | ?), updated_at_ms=? WHERE stream=?;`).run(flags, this.nowMs(), stream);
}
getWalOldestOffset(stream) {
const row = this.db.query(`SELECT MIN(offset) as min_off FROM wal WHERE stream=?;`).get(stream);
if (!row || row.min_off == null)
return null;
return this.toBigInt(row.min_off);
}
trimWalByAge(stream, maxAgeMs) {
const ageMs = Math.max(0, Math.floor(maxAgeMs));
if (!Number.isFinite(ageMs))
return { trimmedRows: 0, trimmedBytes: 0, keptFromOffset: null };
const tx = this.db.transaction(() => {
const lastRow = this.db.query(`SELECT offset, ts_ms FROM wal WHERE stream=? ORDER BY offset DESC LIMIT 1;`).get(stream);
if (!lastRow || lastRow.offset == null)
return { trimmedRows: 0, trimmedBytes: 0, keptFromOffset: null };
const lastOffset = this.toBigInt(lastRow.offset);
let keepFromOffset;
if (ageMs === 0) {
keepFromOffset = lastOffset;
} else {
const cutoff = this.nowMs() - BigInt(ageMs);
const keepRow = this.db.query(`SELECT offset FROM wal WHERE stream=? AND ts_ms >= ? ORDER BY offset ASC LIMIT 1;`).get(stream, this.bindInt(cutoff));
keepFromOffset = keepRow && keepRow.offset != null ? this.toBigInt(keepRow.offset) : lastOffset;
}
if (keepFromOffset <= 0n)
return { trimmedRows: 0, trimmedBytes: 0, keptFromOffset: keepFromOffset };
const stats = this.db.query(`SELECT COALESCE(SUM(payload_len), 0) as bytes, COUNT(*) as rows
FROM wal WHERE stream=? AND offset < ?;`).get(stream, this.bindInt(keepFromOffset));
const bytes = this.toBigInt(stats?.bytes ?? 0);
const rows = this.toBigInt(stats?.rows ?? 0);
if (rows <= 0n)
return { trimmedRows: 0, trimmedBytes: 0, keptFromOffset: keepFromOffset };
this.db.query(`DELETE FROM wal WHERE stream=? AND offset < ?;`).run(stream, this.bindInt(keepFromOffset));
const now = this.nowMs();
this.db.query(`UPDATE streams
SET pending_bytes = CASE WHEN pending_bytes >= ? THEN pending_bytes - ? ELSE 0 END,
pending_rows = CASE WHEN pending_rows >= ? THEN pending_rows - ? ELSE 0 END,
wal_bytes = CASE WHEN wal_bytes >= ? THEN wal_bytes - ? ELSE 0 END,
wal_rows = CASE WHEN wal_rows >= ? THEN wal_rows - ? ELSE 0 END,
updated_at_ms = ?
WHERE stream = ?;`).run(bytes, bytes, rows, rows, bytes, bytes, rows, rows, now, stream);
const trimmedBytes = bytes <= BigInt(Number.MAX_SAFE_INTEGER) ? Number(bytes) : Number.MAX_SAFE_INTEGER;
const trimmedRows = rows <= BigInt(Number.MAX_SAFE_INTEGER) ? Number(rows) : Number.MAX_SAFE_INTEGER;
return { trimmedRows, trimmedBytes, keptFromOffset: keepFromOffset };
});
return tx();
}
countStreams() {
const row = this.stmts.countStreams.get(STREAM_FLAG_DELETED | STREAM_FLAG_TOUCH);
return row ? Number(row.cnt) : 0;
}
sumPendingBytes() {
const row = this.stmts.sumPendingBytes.get();
const total = row?.total ?? 0;
return Number(this.toBigInt(total));
}
sumPendingSegmentBytes() {
const row = this.stmts.sumPendingSegmentBytes.get();
const total = row?.total ?? 0;
return Number(this.toBigInt(total));
}
ensureDbStat() {
if (this.dbstatReady != null)
return this.dbstatReady;
try {
this.db.exec("CREATE VIRTUAL TABLE IF NOT EXISTS temp.dbstat USING dbstat;");
this.dbstatReady = true;
} catch {
this.dbstatReady = false;
}
return this.dbstatReady;
}
estimateWalBytes() {
try {
const row = this.db.query(`SELECT
COALESCE(SUM(payload_len), 0) as payload,
COALESCE(SUM(LENGTH(routing_key)), 0) as rk,
COALESCE(SUM(LENGTH(content_type)), 0) as ct
FROM wal;`).get();
return Number(row?.payload ?? 0) + Number(row?.rk ?? 0) + Number(row?.ct ?? 0);
} catch {
return 0;
}
}
estimateMetaBytes() {
try {
const streams = this.db.query(`SELECT
COALESCE(SUM(LENGTH(stream)), 0) as stream,
COALESCE(SUM(LENGTH(content_type)), 0) as content_type,
COALESCE(SUM(LENGTH(stream_seq)), 0) as stream_seq,
COALESCE(SUM(LENGTH(closed_producer_id)), 0) as closed_producer_id
FROM streams;`).get();
const segments = this.db.query(`SELECT
COALESCE(SUM(LENGTH(segment_id)), 0) as segment_id,
COALESCE(SUM(LENGTH(stream)), 0) as stream,
COALESCE(SUM(LENGTH(local_path)), 0) as local_path,
COALESCE(SUM(LENGTH(r2_etag)), 0) as r2_etag
FROM segments;`).get();
const manifests = this.db.query(`SELECT
COALESCE(SUM(LENGTH(stream)), 0) as stream,
COALESCE(SUM(LENGTH(last_uploaded_etag)), 0) as last_uploaded_etag
FROM manifests;`).get();
const schemas = this.db.query(`SELECT COALESCE(SUM(LENGTH(schema_json)), 0) as schema_json FROM schemas;`).get();
const producers = this.db.query(`SELECT
COALESCE(SUM(LENGTH(stream)), 0) as stream,
COALESCE(SUM(LENGTH(producer_id)), 0) as producer_id
FROM producer_state;`).get();
const total = Number(streams?.stream ?? 0) + Number(streams?.content_type ?? 0) + Number(streams?.stream_seq ?? 0) + Number(streams?.closed_producer_id ?? 0) + Number(segments?.segment_id ?? 0) + Number(segments?.stream ?? 0) + Number(segments?.local_path ?? 0) + Number(segments?.r2_etag ?? 0) + Number(manifests?.stream ?? 0) + Number(manifests?.last_uploaded_etag ?? 0) + Number(schemas?.schema_json ?? 0) + Number(producers?.stream ?? 0) + Number(producers?.producer_id ?? 0);
return total;
} catch {
return 0;
}
}
getWalDbSizeBytes() {
if (this.ensureDbStat()) {
try {
const row = this.db.query(`SELECT COALESCE(SUM(pgsize), 0) as total FROM temp.dbstat WHERE name = 'wal';`).get();
return Number(row?.total ?? 0);
} catch {}
}
return this.estimateWalBytes();
}
getMetaDbSizeBytes() {
if (this.ensureDbStat()) {
try {
const row = this.db.query(`SELECT COALESCE(SUM(pgsize), 0) as total FROM temp.dbstat WHERE name != 'wal';`).get();
return Number(row?.total ?? 0);
} catch {}
}
return this.estimateMetaBytes();
}
appendWalRows(args) {
const { stream, startOffset, expectedOffset, rows } = args;
if (rows.length === 0)
return Result.err({ kind: "no_rows" });
const tx = this.db.transaction(() => {
const st = this.getStream(stream);
if (!st || this.isDeleted(st))
return Result.err({ kind: "stream_missing" });
if (st.expires_at_ms != null && this.nowMs() > st.expires_at_ms)
return Result.err({ kind: "stream_expired" });
if (expectedOffset !== undefined && st.next_offset !== expectedOffset) {
return Result.err({ kind: "seq_mismatch", expectedNext: st.next_offset });
}
let totalBytes = 0n;
let offset = startOffset;
for (const r of rows) {
const payloadLen = r.payload.byteLength;
totalBytes += BigInt(payloadLen);
this.stmts.insertWal.run(stream, offset, r.appendMs, r.payload, payloadLen, r.routingKey, r.contentType, 0);
offset += 1n;
}
const lastOffset = offset - 1n;
const newNextOffset = lastOffset + 1n;
const now = this.nowMs();
const pendingRows = BigInt(rows.length);
const lastAppend = rows[rows.length - 1].appendMs;
this.stmts.updateStreamAppend.run(newNextOffset, now, lastAppend, pendingRows, totalBytes, pendingRows, totalBytes, stream, STREAM_FLAG_DELETED);
return Result.ok({ lastOffset });
});
return tx();
}
*iterWalRange(stream, startOffset, endOffset, routingKey) {
const start = this.bindInt(startOffset);
const end = this.bindInt(endOffset);
const stmt = routingKey ? this.db.query(`SELECT offset, ts_ms, routing_key, content_type, payload
FROM wal
WHERE stream = ? AND offset >= ? AND offset <= ? AND routing_key = ?
ORDER BY offset ASC;`) : this.db.query(`SELECT offset, ts_ms, routing_key, content_type, payload
FROM wal
WHERE stream = ? AND offset >= ? AND offset <= ?
ORDER BY offset ASC;`);
try {
const it = routingKey ? stmt.iterate(stream, start, end, routingKey) : stmt.iterate(stream, start, end);
for (const row of it) {
yield row;
}
} finally {
try {
stmt.finalize?.();
} catch {}
}
}
nextSegmentIndexForStream(stream) {
const row = this.stmts.nextSegmentIndex.get(stream);
return Number(row?.next_idx ?? 0);
}
createSegmentRow(row) {
this.stmts.createSegment.run(row.segmentId, row.stream, row.segmentIndex, row.startOffset, row.endOffset, row.blockCount, row.lastAppendMs, row.sizeBytes, row.localPath, this.nowMs());
}
commitSealedSegment(row) {
const tx = this.db.transaction(() => {
this.createSegmentRow(row);
this.appendSegmentMeta(row.stream, row.endOffset + 1n, row.blockCount, row.lastAppendMs * 1000000n);
this.setStreamSealedThrough(row.stream, row.endOffset, row.payloadBytes, row.rowsSealed);
});
tx();
}
listSegmentsForStream(stream) {
const rows = this.stmts.listSegmentsForStream.all(stream);
return rows.map((r) => this.coerceSegmentRow(r));
}
getSegmentByIndex(stream, segmentIndex) {
const row = this.stmts.getSegmentByIndex.get(stream, segmentIndex);
return row ? this.coerceSegmentRow(row) : null;
}
findSegmentForOffset(stream, offset) {
const bound = this.bindInt(offset);
const row = this.stmts.findSegmentForOffset.get(stream, bound, bound);
return row ? this.coerceSegmentRow(row) : null;
}
pendingUploadSegments(limit) {
const rows = this.stmts.pendingUploadSegments.all(limit);
return rows.map((r) => this.coerceSegmentRow(r));
}
countPendingSegments() {
const row = this.stmts.countPendingSegments.get();
return row ? Number(row.cnt) : 0;
}
countSegmentsForStream(stream) {
const row = this.stmts.countSegmentsForStream.get(stream);
return row ? Number(row.cnt) : 0;
}
getSegmentMeta(stream) {
const row = this.stmts.getSegmentMeta.get(stream);
if (!row)
return null;
const offsets = row.segment_offsets instanceof Uint8Array ? row.segment_offsets : new Uint8Array(row.segment_offsets);
const blocks = row.segment_blocks instanceof Uint8Array ? row.segment_blocks : new Uint8Array(row.segment_blocks);
const lastTs = row.segment_last_ts instanceof Uint8Array ? row.segment_last_ts : new Uint8Array(row.segment_last_ts);
return {
stream: String(row.stream),
segment_count: Number(row.segment_count),
segment_offsets: offsets,
segment_blocks: blocks,
segment_last_ts: lastTs
};
}
ensureSegmentMeta(stream) {
this.stmts.ensureSegmentMeta.run(stream);
}
appendSegmentMeta(stream, offsetPlusOne, blockCount, lastAppendNs) {
this.ensureSegmentMeta(stream);
const offsetBytes = this.encodeU64Le(offsetPlusOne);
const blockBytes = this.encodeU32Le(blockCount);
const tsBytes = this.encodeU64Le(lastAppendNs);
this.stmts.appendSegmentMeta.run(offsetBytes, blockBytes, tsBytes, stream);
}
upsertSegmentMeta(stream, count, offsets, blocks, lastTs) {
this.stmts.upsertSegmentMeta.run(stream, count, offsets, blocks, lastTs);
}
rebuildSegmentMeta(stream) {
const rows = this.db.query(`SELECT end_offset, block_count, last_append_ms
FROM segments WHERE stream=? ORDER BY segment_index ASC;`).all(stream);
const count = rows.length;
const offsets = new Uint8Array(count * 8);
const blocks = new Uint8Array(count * 4);
const lastTs = new Uint8Array(count * 8);
const dvOffsets = new DataView(offsets.buffer, offsets.byteOffset, offsets.byteLength);
const dvBlocks = new DataView(blocks.buffer, blocks.byteOffset, blocks.byteLength);
const dvLastTs = new DataView(lastTs.buffer, lastTs.byteOffset, lastTs.byteLength);
for (let i = 0;i < rows.length; i++) {
const endOffset = this.toBigInt(rows[i].end_offset);
const blockCount = Number(rows[i].block_count);
const lastAppendMs = this.toBigInt(rows[i].last_append_ms);
dvOffsets.setBigUint64(i * 8, endOffset + 1n, true);
dvBlocks.setUint32(i * 4, blockCount >>> 0, true);
dvLastTs.setBigUint64(i * 8, lastAppendMs * 1000000n, true);
}
this.upsertSegmentMeta(stream, count, offsets, blocks, lastTs);
return { stream, segment_count: count, segment_offsets: offsets, segment_blocks: blocks, segment_last_ts: lastTs };
}
setUploadedSegmentCount(stream, count) {
this.stmts.setUploadedSegmentCount.run(count, this.nowMs(), stream);
}
advanceUploadedSegmentCount(stream) {
const row = this.getStream(stream);
if (!row)
return 0;
let count = row.uploaded_segment_count ?? 0;
for (;; ) {
const seg = this.getSegmentByIndex(stream, count);
if (!seg || !seg.r2_etag)
break;
count += 1;
}
if (count !== row.uploaded_segment_count) {
this.stmts.setUploadedSegmentCount.run(count, this.nowMs(), stream);
}
return count;
}
markSegmentUploaded(segmentId, etag, uploadedAtMs) {
this.stmts.markSegmentUploaded.run(etag, uploadedAtMs, segmentId);
}
setStreamSealedThrough(stream, sealedThrough, bytesSealed, rowsSealed) {
const now = this.nowMs();
this.db.query(`UPDATE streams
SET sealed_through = ?,
pending_bytes = CASE WHEN pending_bytes >= ? THEN pending_bytes - ? ELSE 0 END,
pending_rows = CASE WHEN pending_rows >= ? THEN pending_rows - ? ELSE 0 END,
last_segment_cut_ms = ?,
updated_at_ms = ?
WHERE stream = ?;`).run(sealedThrough, bytesSealed, bytesSealed, rowsSealed, rowsSealed, now, now, stream);
}
setSegmentInProgress(stream, inProgress) {
this.db.query(`UPDATE streams SET segment_in_progress=?, updated_at_ms=? WHERE stream=?;`).run(inProgress, this.nowMs(), stream);
}
tryClaimSegment(stream) {
const res = this.stmts.tryClaimSegment.run(this.nowMs(), stream);
const changes = typeof res?.changes === "bigint" ? res.changes : BigInt(Number(res?.changes ?? 0));
return changes > 0n;
}
resetSegmentInProgress() {
this.db.query(`UPDATE streams SET segment_in_progress=0 WHERE segment_in_progress != 0;`).run();
}
advanceUploadedThrough(stream, uploadedThrough) {
this.stmts.advanceUploadedThrough.run(uploadedThrough, this.nowMs(), stream);
}
deleteWalThrough(stream, uploadedThrough) {
const through = this.bindInt(uploadedThrough);
const tx = this.db.transaction(() => {
const stats = this.db.query(`SELECT COALESCE(SUM(payload_len), 0) as bytes, COUNT(*) as rows
FROM wal WHERE stream=? AND offset <= ?;`).get(stream, through);
const bytes = this.toBigInt(stats?.bytes ?? 0);
const rows = this.toBigInt(stats?.rows ?? 0);
if (rows <= 0n)
return { deletedRows: 0, deletedBytes: 0 };
this.stmts.deleteWalBeforeOffset.run(stream, through);
const now = this.nowMs();
this.db.query(`UPDATE streams
SET wal_bytes = CASE WHEN wal_bytes >= ? THEN wal_bytes - ? ELSE 0 END,
wal_rows = CASE WHEN wal_rows >= ? THEN wal_rows - ? ELSE 0 END,
updated_at_ms = ?
WHERE stream = ?;`).run(bytes, bytes, rows, rows, now, stream);
const deletedBytes = bytes <= BigInt(Number.MAX_SAFE_INTEGER) ? Number(bytes) : Number.MAX_SAFE_INTEGER;
const deletedRows = rows <= BigInt(Number.MAX_SAFE_INTEGER) ? Number(rows) : Number.MAX_SAFE_INTEGER;
return { deletedRows, deletedBytes };
});
return tx();
}
getManifestRow(stream) {
const row = this.stmts.getManifest.get(stream);
if (!row) {
this.stmts.upsertManifest.run(stream, 0, 0, null, null);
const fresh = this.stmts.getManifest.get(stream);
return {
stream: String(fresh.stream),
generation: Number(fresh.generation),
uploaded_generation: Number(fresh.uploaded_generation),
last_uploaded_at_ms: fresh.last_uploaded_at_ms == null ? null : this.toBigInt(fresh.last_uploaded_at_ms),
last_uploaded_etag: fresh.last_uploaded_etag == null ? null : String(fresh.last_uploaded_etag)
};
}
return {
stream: String(row.stream),
generation: Number(row.generation),
uploaded_generation: Number(row.uploaded_generation),
last_uploaded_at_ms: row.last_uploaded_at_ms == null ? null : this.toBigInt(row.last_uploaded_at_ms),
last_uploaded_etag: row.last_uploaded_etag == null ? null : String(row.last_uploaded_etag)
};
}
upsertManifestRow(stream, generation, uploadedGeneration, uploadedAtMs, etag) {
this.stmts.upsertManifest.run(stream, generation, uploadedGeneration, uploadedAtMs, etag);
}
getIndexState(stream) {
const row = this.stmts.getIndexState.get(stream);
if (!row)
return null;
return {
stream: String(row.stream),
index_secret: row.index_secret instanceof Uint8Array ? row.index_secret : new Uint8Array(row.index_secret),
indexed_through: Number(row.indexed_through),
updated_at_ms: this.toBigInt(row.updated_at_ms)
};
}
upsertIndexState(stream, indexSecret, indexedThrough) {
this.stmts.upsertIndexState.run(stream, indexSecret, indexedThrough, this.nowMs());
}
updateIndexedThrough(stream, indexedThrough) {
this.stmts.updateIndexedThrough.run(indexedThrough, this.nowMs(), stream);
}
listIndexRuns(stream) {
const rows = this.stmts.listIndexRuns.all(stream);
return rows.map((r) => ({
run_id: String(r.run_id),
stream: String(r.stream),
level: Number(r.level),
start_segment: Number(r.start_segment),
end_segment: Number(r.end_segment),
object_key: String(r.object_key),
filter_len: Number(r.filter_len),
record_count: Number(r.record_count),
retired_gen: r.retired_gen == null ? null : Number(r.retired_gen),
retired_at_ms: r.retired_at_ms == null ? null : this.toBigInt(r.retired_at_ms)
}));
}
listIndexRunsAll(stream) {
const rows = this.stmts.listIndexRunsAll.all(stream);
return rows.map((r) => ({
run_id: String(r.run_id),
stream: String(r.stream),
level: Number(r.level),
start_segment: Number(r.start_segment),
end_segment: Number(r.end_segment),
object_key: String(r.object_key),
filter_len: Number(r.filter_len),
record_count: Number(r.record_count),
retired_gen: r.retired_gen == null ? null : Number(r.retired_gen),
retired_at_ms: r.retired_at_ms == null ? null : this.toBigInt(r.retired_at_ms)
}));
}
listRetiredIndexRuns(stream) {
const rows = this.stmts.listRetiredIndexRuns.all(stream);
return rows.map((r) => ({
run_id: String(r.run_id),
stream: String(r.stream),
level: Number(r.level),
start_segment: Number(r.start_segment),
end_segment: Number(r.end_segment),
object_key: String(r.object_key),
filter_len: Number(r.filter_len),
record_count: Number(r.record_count),
retired_gen: r.retired_gen == null ? null : Number(r.retired_gen),
retired_at_ms: r.retired_at_ms == null ? null : this.toBigInt(r.retired_at_ms)
}));
}
insertIndexRun(row) {
this.stmts.insertIndexRun.run(row.run_id, row.stream, row.level, row.start_segment, row.end_segment, row.object_key, row.filter_len, row.record_count);
}
retireIndexRuns(runIds, retiredGen, retiredAtMs) {
if (runIds.length === 0)
return;
const tx = this.db.transaction(() => {
for (const runId of runIds) {
this.stmts.retireIndexRun.run(retiredGen, retiredAtMs, runId);
}
});
tx();
}
deleteIndexRuns(runIds) {
if (runIds.length === 0)
return;
const tx = this.db.transaction(() => {
for (const runId of runIds) {
this.stmts.deleteIndexRun.run(runId);
}
});
tx();
}
countUploadedSegments(stream) {
const row = this.stmts.countUploadedSegments.get(stream);
const maxIdx = row ? Number(row.max_idx) : -1;
return maxIdx >= 0 ? maxIdx + 1 : 0;
}
commitManifest(stream, generation, etag, uploadedAtMs, uploadedThrough) {
const tx = this.db.transaction(() => {
this.stmts.upsertManifest.run(stream, generation, generation, uploadedAtMs, etag);
this.stmts.advanceUploadedThrough.run(uploadedThrough, this.nowMs(), stream);
let gcThrough = uploadedThrough;
const interp = this.stmts.getStreamInterpreter.get(stream);
if (interp) {
const interpretedThrough = this.toBigInt(interp.interpreted_through);
gcThrough = interpretedThrough < gcThrough ? interpretedThrough : gcThrough;
}
if (gcThrough < 0n)
return;
let deleteThrough = gcThrough;
if (BASE_WAL_GC_CHUNK_OFFSETS > 0) {
const oldest = this.getWalOldestOffset(stream);
if (oldest != null) {
const maxThrough = oldest + BigInt(BASE_WAL_GC_CHUNK_OFFSETS) - 1n;
if (deleteThrough > maxThrough)
deleteThrough = maxThrough;
}
}
if (deleteThrough < 0n)
return;
const bound = this.bindInt(deleteThrough);
const stats = this.db.query(`SELECT COALESCE(SUM(payload_len), 0) as bytes, COUNT(*) as rows
FROM wal WHERE stream=? AND offset <= ?;`).get(stream, bound);
const bytes = this.toBigInt(stats?.bytes ?? 0);
const rows = this.toBigInt(stats?.rows ?? 0);
if (rows <= 0n)
return;
this.stmts.deleteWalBeforeOffset.run(stream, bound);
const now = this.nowMs();
this.db.query(`UPDATE streams
SET wal_bytes = CASE WHEN wal_bytes >= ? THEN wal_bytes - ? ELSE 0 END,
wal_rows = CASE WHEN wal_rows >= ? THEN wal_rows - ? ELSE 0 END,
updated_at_ms = ?
WHERE stream = ?;`).run(bytes, bytes, rows, rows, now, stream);
});
tx();
}
candidates(minPendingBytes, minPendingRows, maxIntervalMs, limit) {
if (maxIntervalMs <= 0n) {
return this.stmts.candidateStreamsNoInterval.all(STREAM_FLAG_DELETED | STREAM_FLAG_TOUCH, minPendingBytes, minPendingRows, limit);
}
const now = this.nowMs();
return this.stmts.candidateStreams.all(STREAM_FLAG_DELETED | STREAM_FLAG_TOUCH, minPendingBytes, minPendingRows, now, maxIntervalMs, limit);
}
}
// src/util/log.ts
var patched = false;
function wrapConsole(orig, level) {
return (...args) => {
const prefix = `[${new Date().toISOString()}] [${level}]`;
if (args.length === 0)
return orig(prefix);
return orig(prefix, ...args);
};
}
function initConsoleLogging() {
if (patched)
return;
patched = true;
const globalAny = globalThis;
if (globalAny.__ds_console_patched)
return;
globalAny.__ds_console_patched = true;
console.log = wrapConsole(console.log.bind(console), "INFO");
console.info = wrapConsole(console.info.bind(console), "INFO");
console.warn = wrapConsole(console.warn.bind(console), "WARN");
console.error = wrapConsole(console.error.bind(console), "ERROR");
if (console.debug)
console.debug = wrapConsole(console.debug.bind(console), "DEBUG");
}
// src/touch/engine.ts
function interpretRecordToChanges(record, _cfg) {
return interpretStateProtocolRecord(record);
}
function interpretStateProtocolRecord(record) {
if (!record || typeof record !== "object" || Array.isArray(record))
return [];
const headers = record.headers;
if (!headers || typeof headers !== "object" || Array.isArray(headers))
return [];
if (typeof headers.control === "string")
return [];
const opRaw = headers.operation;
if (typeof opRaw !== "string")
return [];
const op = opRaw;
if (op !== "insert" && op !== "update" && op !== "delete")
return [];
const type = record.type;
const key = record.key;
if (typeof type !== "string" || type.trim() === "")
return [];
if (typeof key !== "string" || key.trim() === "")
return [];
const before = Object.prototype.hasOwnProperty.call(record, "oldValue") ? record.oldValue : Object.prototype.hasOwnProperty.call(record, "old_value") ? record.old_value : undefined;
const after = Object.prototype.hasOwnProperty.call(record, "value") ? record.value : undefined;
return [{ entity: type, key, op, before, after }];
}
// src/runtime/hash.ts
import { Result as Result2 } from "better-result";
import { createRequire as createRequire2 } from "node:module";
import { fileURLToPath } from "node:url";
var xxh3Hasher = null;
var xxh64Hasher = null;
var xxh32Hasher = null;
var isBunRuntime = typeof globalThis.Bun !== "undefined";
var require3 = createRequire2(import.meta.url);
function loadVendoredModule(name) {
const path = fileURLToPath(new URL(`./hash_vendor/${name}`, import.meta.url));
return require3(path);
}
if (!isBunRuntime) {
const xxh3Module = loadVendoredModule("xxhash3.umd.min.cjs");
const xxh64Module = loadVendoredModule("xxhash64.umd.min.cjs");
const xxh32Module = loadVendoredModule("xxhash32.umd.min.cjs");
xxh3Hasher = await xxh3Module.createXXHash3();
xxh64Hasher = await xxh64Module.createXXHash64();
xxh32Hasher = await xxh32Module.createXXHash32();
}
function toBigIntDigest(value) {
if (typeof value === "bigint")
return value;
if (typeof value === "number")
return BigInt(value >>> 0);
const hex = value.startsWith("0x") ? value.slice(2) : value;
if (hex.length === 0)
return 0n;
return BigInt(`0x${hex}`);
}
function bunHash64(input, fn) {
return fn(input);
}
function nodeHash64Result(input, hasher, label) {
if (!hasher)
return Result2.err({ kind: "hasher_not_initialized", message: `${label} hasher not initialized` });
hasher.init();
hasher.update(input);
const digest = hasher.digest("hex");
return Result2.ok(toBigIntDigest(digest));
}
function xxh3BigIntResult(input) {
if (isBunRuntime)
return Result2.ok(bunHash64(input, (x) => Bun.hash.xxHash3(x)));
return nodeHash64Result(input, xxh3Hasher, "xxh3");
}
function xxh3BigInt(input) {
const res = xxh3BigIntResult(input);
if (Result2.isError(res))
throw dsError(res.error.message);
return res.value;
}
// src/touch/live_keys.ts
function utf8(s) {
return new TextEncoder().encode(s);
}
function encodeU64Be(v) {
const out = new Uint8Array(8);
let x = v;
for (let i = 7;i >= 0; i--) {
out[i] = Number(x & 0xffn);
x >>= 8n;
}
return out;
}
function xxh3Low32(bytes) {
const h = xxh3BigInt(bytes);
return Number(h & 0xffffffffn) >>> 0;
}
function tableKeyIdFor(entity) {
return xxh3Low32(concat([utf8("tbl\x00"), utf8(entity)]));
}
function templateKeyIdFor(templateIdHex16) {
const tplBytes = encodeU64Be(BigInt(`0x${templateIdHex16}`));
return xxh3Low32(concat([utf8("tpl\x00"), tplBytes]));
}
function watchKeyIdFor(templateIdHex16, encodedArgs) {
const tplBytes = encodeU64Be(BigInt(`0x${templateIdHex16}`));
const parts = [utf8("key\x00"), tplBytes];
for (const a of encodedArgs) {
parts.push(utf8("\x00"));
parts.push(utf8(a));
}
return xxh3Low32(concat(parts));
}
function encodeTemplateArg(value, encoding) {
if (value === null || value === undefined)
return null;
switch (encoding) {
case "string": {
if (typeof value === "string")
return value;
if (typeof value === "number" && Number.isFinite(value))
return String(value);
if (typeof value === "boolean")
return value ? "true" : "false";
return null;
}
case "int64": {
if (typeof value === "bigint")
return value.toString();
if (typeof value === "number" && Number.isFinite(value) && Number.isInteger(value))
return String(value);
if (typeof value === "string" && /^-?(0|[1-9][0-9]*)$/.test(value.trim()))
return value.trim();
return null;
}
case "bool": {
if (typeof value !== "boolean")
return null;
return value ? "1" : "0";
}
case "datetime": {
if (typeof value !== "string")
return null;
const d = new Date(value);
if (!Number.isFinite(d.getTime()))
return null;
return d.toISOString();
}
case "bytes": {
if (typeof value !== "string")
return null;
return value;
}
}
}
function concat(parts) {
let total = 0;
for (const p of parts)
total += p.byteLength;
const out = new Uint8Array(total);
let off = 0;
for (const p of parts) {
out.set(p, off);
off += p.byteLength;
}
return out;
}
// src/touch/spec.ts
import { Result as Result3 } from "better-result";
function isTouchEnabled(cfg) {
return !!cfg?.touch?.enabled;
}
// src/touch/interpreter_worker.ts
initConsoleLogging();
var data = workerData;
var cfg = data.config;
setSqliteRuntimeOverride(data.hostRuntime ?? null);
var db = new SqliteDurableStore(cfg.dbPath, { cacheBytes: cfg.sqliteCacheBytes, skipMigrations: true });
var decoder = new TextDecoder;
async function handleProcess(msg) {
const { stream, fromOffset, toOffset, interpreter, maxRows, maxBytes } = msg;
const failProcess = (message) => {
const err = Result4.err({ kind: "missing_old_value", message });
parentPort?.postMessage({
type: "error",
id: msg.id,
stream,
message: err.error.message
});
};
if (!isTouchEnabled(interpreter)) {
parentPort?.postMessage({
type: "error",
id: msg.id,
stream,
message: "touch not enabled for interpreter"
});
return;
}
const touch = interpreter.touch;
const fineBudgetRaw = msg.fineTouchBudget ?? touch.fineTouchBudgetPerBatch;
const fineBudget = fineBudgetRaw == null ? null : Math.max(0, Math.floor(fineBudgetRaw));
const fineGranularity = msg.fineGranularity === "template" ? "template" : "key";
const interpretMode = msg.interpretMode === "hotTemplatesOnly" ? "hotTemplatesOnly" : "full";
const hotTemplatesOnly = fineGranularity === "template" && interpretMode === "hotTemplatesOnly";
const emitFineTouches = msg.emitFineTouches !== false && fineBudget !== 0;
let fineBudgetExhausted = fineBudget != null && fineBudget <= 0;
let fineKeysBudgetRemaining = fineBudget;
let fineTouchesSuppressedDueToBudget = false;
const filterHotTemplates = msg.filterHotTemplates === true;
const hotTemplateIdsRaw = filterHotTemplates ? msg.hotTemplateIds ?? [] : [];
const hotTemplateIds = filterHotTemplates ? new Set(hotTemplateIdsRaw.filter((x) => typeof x === "string" && /^[0-9a-f]{16}$/.test(x))) : null;
const coarseIntervalMs = Math.max(1, Math.floor(touch.coarseIntervalMs ?? 100));
const coalesceWindowMs = Math.max(1, Math.floor(touch.touchCoalesceWindowMs ?? 100));
const onMissingBefore = touch.onMissingBefore ?? "coarse";
const templatesByEntity = new Map;
const coldTemplateCountByEntity = new Map;
if (emitFineTouches) {
try {
const rows = db.db.query(`SELECT template_id, entity, fields_json, encodings_json, active_from_source_offset
FROM live_templates
WHERE stream=? AND state='active';`).all(stream);
for (const row of rows) {
const templateId = String(row.template_id ?? "");
if (!/^[0-9a-f]{16}$/.test(templateId))
continue;
const entity = String(row.entity ?? "");
if (entity.trim() === "")
continue;
let fields;
let encodings;
try {
fields = JSON.parse(String(row.fields_json ?? "[]"));
encodings = JSON.parse(String(row.encodings_json ?? "[]"));
} catch {
continue;
}
if (!Array.isArray(fields) || !Array.isArray(encodings) || fields.length !== encodings.length)
continue;
const f = fields.map(String);
const e = encodings.map(String);
if (f.length === 0 || f.length > 3)
continue;
if (!e.every((x) => x === "string" || x === "int64" || x === "bool" || x === "datetime" || x === "bytes"))
continue;
if (hotTemplateIds && !hotTemplateIds.has(templateId)) {
coldTemplateCountByEntity.set(entity, (coldTemplateCountByEntity.get(entity) ?? 0) + 1);
continue;
}
const activeFromSourceOffset = typeof row.active_from_source_offset === "bigint" ? row.active_from_source_offset : BigInt(row.active_from_source_offset ?? 0);
const tpl = { templateId, entity, fields: f, encodings: e, activeFromSourceOffset };
const arr = templatesByEntity.get(entity) ?? [];
arr.push(tpl);
templatesByEntity.set(entity, arr);
}
} catch {}
}
let rowsRead = 0;
let bytesRead = 0;
let changes = 0;
let maxSourceTsMs = 0;
let processedThrough = fromOffset - 1n;
const pending = new Map;
const templateOnlyEntityTouch = new Map;
const touches = [];
let fineTouchesDroppedDueToBudget = 0;
let fineTouchesSkippedColdTemplate = 0;
const flush = (_mapKey, p) => {
touches.push({ keyId: p.keyId >>> 0, watermark: p.watermark, entity: p.entity, kind: p.kind, templateId: p.templateId });
};
const queueTouch = (args) => {
const mapKey = `i:${args.keyId >>> 0}`;
const prev = pending.get(mapKey);
if (args.kind !== "table" && fineBudget != null && !fineBudgetExhausted && !prev) {
const remaining = fineKeysBudgetRemaining ?? 0;
if (remaining <= 0) {
fineBudgetExhausted = true;
fineTouchesSuppressedDueToBudget = true;
fineTouchesDroppedDueToBudget += 1;
return;
}
fineKeysBudgetRemaining = remaining - 1;
} else if (args.kind !== "table" && fineBudget != null && !prev && fineBudgetExhausted) {
fineTouchesSuppressedDueToBudget = true;
fineTouchesDroppedDueToBudget += 1;
return;
}
if (!prev) {
pending.set(mapKey, {
keyId: args.keyId >>> 0,
windowStartMs: args.tsMs,
watermark: args.watermark,
entity: args.entity,
kind: args.kind,
templateId: args.templateId
});
return;
}
if (args.tsMs - prev.windowStartMs < args.windowMs) {
prev.watermark = args.watermark;
return;
}
flush(mapKey, prev);
pending.set(mapKey, {
keyId: args.keyId >>> 0,
windowStartMs: args.tsMs,
watermark: args.watermark,
entity: args.entity,
kind: args.kind,
templateId: args.templateId
});
};
for (const row of db.iterWalRange(stream, fromOffset, toOffset)) {
const payload = row.payload;
const payloadLen = payload.byteLength;
if (rowsRead > 0 && (rowsRead >= maxRows || bytesRead + payloadLen > maxBytes))
break;
rowsRead++;
bytesRead += payloadLen;
const offset = typeof row.offset === "bigint" ? row.offset : BigInt(row.offset);
processedThrough = offset;
const tsMsRaw = row.ts_ms;
const tsMs = typeof tsMsRaw === "bigint" ? Number(tsMsRaw) : Number(tsMsRaw);
if (!Number.isFinite(tsMs))
continue;
if (tsMs > maxSourceTsMs)
maxSourceTsMs = tsMs;
let value;
try {
value = JSON.parse(decoder.decode(payload));
} catch {
continue;
}
const canonical = interpretRecordToChanges(value, interpreter);
changes += canonical.length;
if (canonical.length === 0)
continue;
const watermark = offset.toString();
for (const ch of canonical) {
const entity = ch.entity;
const coarseKeyId = tableKeyIdFor(entity);
queueTouch({
keyId: coarseKeyId,
tsMs,
watermark,
entity,
kind: "table",
windowMs: coarseIntervalMs
});
if (!emitFineTouches)
continue;
if (fineBudgetExhausted)
continue;
const tpls = templatesByEntity.get(entity);
if (filterHotTemplates) {
fineTouchesSkippedColdTemplate += coldTemplateCountByEntity.get(entity) ?? 0;
}
if (!tpls || tpls.length === 0)
continue;
if (hotTemplatesOnly) {
const prev = templateOnlyEntityTouch.get(entity);
if (!prev || offset > prev.offset)
templateOnlyEntityTouch.set(entity, { offset, tsMs, watermark });
continue;
}
for (const tpl of tpls) {
if (fineBudgetExhausted)
break;
if (offset < tpl.activeFromSourceOffset)
continue;
if (fineGranularity === "template") {
queueTouch({
keyId: templateKeyIdFor(tpl.templateId) >>> 0,
tsMs,
watermark,
entity,
kind: "template",
templateId: tpl.templateId,
windowMs: coalesceWindowMs
});
if (fineBudgetExhausted)
break;
continue;
}
const afterObj = ch.after;
const beforeObj = ch.before;
const watchKeyIds = new Set;
const compute = (obj) => {
if (!obj || typeof obj !== "object" || Array.isArray(obj))
return null;
const args = [];
for (let i = 0;i < tpl.fields.length; i++) {
const name = tpl.fields[i];
const enc = tpl.encodings[i];
const v = obj[name];
const encoded = encodeTemplateArg(v, enc);
if (encoded == null)
return null;
args.push(encoded);
}
return watchKeyIdFor(tpl.templateId, args) >>> 0;
};
if (ch.op === "insert") {
const k = compute(afterObj);
if (k != null)
watchKeyIds.add(k >>> 0);
} else if (ch.op === "delete") {
const k = compute(beforeObj);
if (k != null)
watchKeyIds.add(k >>> 0);
} else {
const kAfter = compute(afterObj);
const kBefore = compute(beforeObj);
if (kBefore != null) {
watchKeyIds.add(kBefore >>> 0);
if (kAfter != null)
watchKeyIds.add(kAfter >>> 0);
} else {
if (beforeObj === undefined) {
if (onMissingBefore === "error") {
failProcess(`missing oldValue for update (entity=${entity}, templateId=${tpl.templateId})`);
return;
}
} else {
if (onMissingBefore === "error") {
failProcess(`oldValue missing required fields for update (entity=${entity}, templateId=${tpl.templateId})`);
return;
}
}
if (onMissingBefore === "skipBefore") {
if (kAfter != null)
watchKeyIds.add(kAfter >>> 0);
} else {}
}
}
for (const watchKeyId of watchKeyIds) {
queueTouch({
keyId: watchKeyId >>> 0,
tsMs,
watermark,
entity,
kind: "template",
templateId: tpl.templateId,
windowMs: coalesceWindowMs
});
if (fineBudgetExhausted)
break;
}
}
}
}
if (emitFineTouches && hotTemplatesOnly && !fineBudgetExhausted && templateOnlyEntityTouch.size > 0) {
for (const [entity, agg] of templateOnlyEntityTouch.entries()) {
if (fineBudgetExhausted)
break;
const tpls = templatesByEntity.get(entity);
if (!tpls || tpls.length === 0)
continue;
for (const tpl of tpls) {
if (fineBudgetExhausted)
break;
if (agg.offset < tpl.activeFromSourceOffset)
continue;
queueTouch({
keyId: templateKeyIdFor(tpl.templateId) >>> 0,
tsMs: agg.tsMs,
watermark: agg.watermark,
entity,
kind: "template",
templateId: tpl.templateId,
windowMs: coalesceWindowMs
});
}
}
}
for (const [key, p] of pending.entries()) {
flush(key, p);
}
touches.sort((a, b) => {
const ak = a.keyId >>> 0;
const bk = b.keyId >>> 0;
if (ak < bk)
return -1;
if (ak > bk)
return 1;
const aw = BigInt(a.watermark);
const bw = BigInt(b.watermark);
if (aw < bw)
return -1;
if (aw > bw)
return 1;
return 0;
});
let tableTouchesEmitted = 0;
let templateTouchesEmitted = 0;
for (const t of touches) {
if (t.kind === "table")
tableTouchesEmitted++;
else
templateTouchesEmitted++;
}
parentPort?.postMessage({
type: "result",
id: msg.id,
stream,
processedThrough,
touches,
stats: {
rowsRead,
bytesRead,
changes,
touchesEmitted: touches.length,
tableTouchesEmitted,
templateTouchesEmitted,
maxSourceTsMs,
fineTouchesDroppedDueToBudget,
fineTouchesSuppressedDueToBudget,
fineTouchesSkippedColdTemplate
}
});
}
parentPort?.on("message", (msg) => {
if (!msg || typeof msg !== "object")
return;
if (msg.type === "stop") {
try {
db.close();
} catch {}
try {
parentPort?.postMessage({ type: "stopped" });
} catch {}
return;
}
if (msg.type === "process") {
handleProcess(msg).catch((e) => {
try {
parentPort?.postMessage({
type: "error",
id: msg.id,
stream: msg.stream,
message: String(e?.message ?? e),
stack: e?.stack ? String(e.stack) : undefined
});
} catch {}
});
}
});