2213 lines
84 KiB
JavaScript
2213 lines
84 KiB
JavaScript
// src/touch/interpreter_worker.ts
|
|
import { parentPort, workerData } from "node:worker_threads";
|
|
import { Result as Result4 } from "better-result";
|
|
|
|
// src/util/ds_error.ts
|
|
import { TaggedError } from "better-result";
|
|
|
|
class DurableStreamsError extends TaggedError("DurableStreamsError")() {
|
|
}
|
|
function dsError(message, opts) {
|
|
return new DurableStreamsError({
|
|
message,
|
|
...opts?.cause !== undefined ? { cause: opts.cause } : {},
|
|
...opts?.code !== undefined ? { code: opts.code } : {}
|
|
});
|
|
}
|
|
|
|
// src/db/schema.ts
|
|
var SCHEMA_VERSION = 11;
|
|
var DEFAULT_PRAGMAS_SQL = `
|
|
PRAGMA journal_mode = WAL;
|
|
PRAGMA synchronous = FULL;
|
|
PRAGMA foreign_keys = ON;
|
|
PRAGMA busy_timeout = 5000;
|
|
PRAGMA temp_store = MEMORY;
|
|
`;
|
|
var CREATE_TABLES_V4_SQL = `
|
|
CREATE TABLE IF NOT EXISTS streams (
|
|
stream TEXT PRIMARY KEY,
|
|
created_at_ms INTEGER NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL,
|
|
|
|
content_type TEXT NOT NULL,
|
|
stream_seq TEXT NULL,
|
|
closed INTEGER NOT NULL DEFAULT 0,
|
|
closed_producer_id TEXT NULL,
|
|
closed_producer_epoch INTEGER NULL,
|
|
closed_producer_seq INTEGER NULL,
|
|
ttl_seconds INTEGER NULL,
|
|
|
|
epoch INTEGER NOT NULL,
|
|
next_offset INTEGER NOT NULL,
|
|
sealed_through INTEGER NOT NULL,
|
|
uploaded_through INTEGER NOT NULL,
|
|
uploaded_segment_count INTEGER NOT NULL DEFAULT 0,
|
|
|
|
pending_rows INTEGER NOT NULL,
|
|
pending_bytes INTEGER NOT NULL,
|
|
|
|
-- Logical size of retained rows in the wal table for this stream (payload-only bytes).
|
|
-- This is explicitly tracked because SQLite file size is high-water and does not shrink
|
|
-- deterministically after DELETE-based GC/retention trimming.
|
|
wal_rows INTEGER NOT NULL DEFAULT 0,
|
|
wal_bytes INTEGER NOT NULL DEFAULT 0,
|
|
|
|
last_append_ms INTEGER NOT NULL,
|
|
last_segment_cut_ms INTEGER NOT NULL,
|
|
segment_in_progress INTEGER NOT NULL,
|
|
|
|
expires_at_ms INTEGER NULL,
|
|
stream_flags INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS streams_pending_bytes_idx ON streams(pending_bytes);
|
|
CREATE INDEX IF NOT EXISTS streams_last_cut_idx ON streams(last_segment_cut_ms);
|
|
CREATE INDEX IF NOT EXISTS streams_inprog_pending_idx ON streams(segment_in_progress, pending_bytes, last_segment_cut_ms);
|
|
|
|
CREATE TABLE IF NOT EXISTS wal (
|
|
id INTEGER PRIMARY KEY,
|
|
stream TEXT NOT NULL,
|
|
offset INTEGER NOT NULL,
|
|
ts_ms INTEGER NOT NULL,
|
|
payload BLOB NOT NULL,
|
|
payload_len INTEGER NOT NULL,
|
|
routing_key BLOB NULL,
|
|
content_type TEXT NULL,
|
|
flags INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
|
|
CREATE UNIQUE INDEX IF NOT EXISTS wal_stream_offset_uniq ON wal(stream, offset);
|
|
CREATE INDEX IF NOT EXISTS wal_stream_offset_idx ON wal(stream, offset);
|
|
CREATE INDEX IF NOT EXISTS wal_ts_idx ON wal(ts_ms);
|
|
|
|
CREATE TABLE IF NOT EXISTS segments (
|
|
segment_id TEXT PRIMARY KEY,
|
|
stream TEXT NOT NULL,
|
|
segment_index INTEGER NOT NULL,
|
|
start_offset INTEGER NOT NULL,
|
|
end_offset INTEGER NOT NULL,
|
|
block_count INTEGER NOT NULL,
|
|
last_append_ms INTEGER NOT NULL,
|
|
size_bytes INTEGER NOT NULL,
|
|
local_path TEXT NOT NULL,
|
|
created_at_ms INTEGER NOT NULL,
|
|
uploaded_at_ms INTEGER NULL,
|
|
r2_etag TEXT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS stream_segment_meta (
|
|
stream TEXT PRIMARY KEY,
|
|
segment_count INTEGER NOT NULL,
|
|
segment_offsets BLOB NOT NULL,
|
|
segment_blocks BLOB NOT NULL,
|
|
segment_last_ts BLOB NOT NULL
|
|
);
|
|
|
|
CREATE UNIQUE INDEX IF NOT EXISTS segments_stream_index_uniq ON segments(stream, segment_index);
|
|
CREATE INDEX IF NOT EXISTS segments_stream_start_idx ON segments(stream, start_offset);
|
|
CREATE INDEX IF NOT EXISTS segments_pending_upload_idx ON segments(uploaded_at_ms);
|
|
|
|
CREATE TABLE IF NOT EXISTS manifests (
|
|
stream TEXT PRIMARY KEY,
|
|
generation INTEGER NOT NULL,
|
|
uploaded_generation INTEGER NOT NULL,
|
|
last_uploaded_at_ms INTEGER NULL,
|
|
last_uploaded_etag TEXT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS schemas (
|
|
stream TEXT PRIMARY KEY,
|
|
schema_json TEXT NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS producer_state (
|
|
stream TEXT NOT NULL,
|
|
producer_id TEXT NOT NULL,
|
|
epoch INTEGER NOT NULL,
|
|
last_seq INTEGER NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL,
|
|
PRIMARY KEY (stream, producer_id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS stream_interpreters (
|
|
stream TEXT PRIMARY KEY,
|
|
interpreted_through INTEGER NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL
|
|
);
|
|
|
|
-- Live dynamic template registry (per base stream).
|
|
CREATE TABLE IF NOT EXISTS live_templates (
|
|
stream TEXT NOT NULL,
|
|
template_id TEXT NOT NULL,
|
|
entity TEXT NOT NULL,
|
|
fields_json TEXT NOT NULL,
|
|
encodings_json TEXT NOT NULL,
|
|
state TEXT NOT NULL,
|
|
created_at_ms INTEGER NOT NULL,
|
|
last_seen_at_ms INTEGER NOT NULL,
|
|
inactivity_ttl_ms INTEGER NOT NULL,
|
|
active_from_source_offset INTEGER NOT NULL,
|
|
retired_at_ms INTEGER NULL,
|
|
retired_reason TEXT NULL,
|
|
PRIMARY KEY (stream, template_id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS live_templates_stream_entity_state_last_seen_idx
|
|
ON live_templates(stream, entity, state, last_seen_at_ms);
|
|
CREATE INDEX IF NOT EXISTS live_templates_stream_state_last_seen_idx
|
|
ON live_templates(stream, state, last_seen_at_ms);
|
|
`;
|
|
var CREATE_INDEX_TABLES_SQL = `
|
|
CREATE TABLE IF NOT EXISTS index_state (
|
|
stream TEXT PRIMARY KEY,
|
|
index_secret BLOB NOT NULL,
|
|
indexed_through INTEGER NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS index_runs (
|
|
run_id TEXT PRIMARY KEY,
|
|
stream TEXT NOT NULL,
|
|
level INTEGER NOT NULL,
|
|
start_segment INTEGER NOT NULL,
|
|
end_segment INTEGER NOT NULL,
|
|
object_key TEXT NOT NULL,
|
|
filter_len INTEGER NOT NULL,
|
|
record_count INTEGER NOT NULL,
|
|
retired_gen INTEGER NULL,
|
|
retired_at_ms INTEGER NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS index_runs_stream_idx ON index_runs(stream, level, start_segment);
|
|
`;
|
|
var CREATE_TABLES_V4_SUFFIX_SQL = (suffix) => `
|
|
CREATE TABLE streams_${suffix} (
|
|
stream TEXT PRIMARY KEY,
|
|
created_at_ms INTEGER NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL,
|
|
|
|
content_type TEXT NOT NULL,
|
|
stream_seq TEXT NULL,
|
|
closed INTEGER NOT NULL DEFAULT 0,
|
|
closed_producer_id TEXT NULL,
|
|
closed_producer_epoch INTEGER NULL,
|
|
closed_producer_seq INTEGER NULL,
|
|
ttl_seconds INTEGER NULL,
|
|
|
|
epoch INTEGER NOT NULL,
|
|
next_offset INTEGER NOT NULL,
|
|
sealed_through INTEGER NOT NULL,
|
|
uploaded_through INTEGER NOT NULL,
|
|
uploaded_segment_count INTEGER NOT NULL DEFAULT 0,
|
|
|
|
pending_rows INTEGER NOT NULL,
|
|
pending_bytes INTEGER NOT NULL,
|
|
|
|
last_append_ms INTEGER NOT NULL,
|
|
last_segment_cut_ms INTEGER NOT NULL,
|
|
segment_in_progress INTEGER NOT NULL,
|
|
|
|
expires_at_ms INTEGER NULL,
|
|
stream_flags INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
|
|
CREATE TABLE wal_${suffix} (
|
|
id INTEGER PRIMARY KEY,
|
|
stream TEXT NOT NULL,
|
|
offset INTEGER NOT NULL,
|
|
ts_ms INTEGER NOT NULL,
|
|
payload BLOB NOT NULL,
|
|
payload_len INTEGER NOT NULL,
|
|
routing_key BLOB NULL,
|
|
content_type TEXT NULL,
|
|
flags INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
|
|
CREATE TABLE segments_${suffix} (
|
|
segment_id TEXT PRIMARY KEY,
|
|
stream TEXT NOT NULL,
|
|
segment_index INTEGER NOT NULL,
|
|
start_offset INTEGER NOT NULL,
|
|
end_offset INTEGER NOT NULL,
|
|
block_count INTEGER NOT NULL,
|
|
last_append_ms INTEGER NOT NULL,
|
|
size_bytes INTEGER NOT NULL,
|
|
local_path TEXT NOT NULL,
|
|
created_at_ms INTEGER NOT NULL,
|
|
uploaded_at_ms INTEGER NULL,
|
|
r2_etag TEXT NULL
|
|
);
|
|
|
|
CREATE TABLE manifests_${suffix} (
|
|
stream TEXT PRIMARY KEY,
|
|
generation INTEGER NOT NULL,
|
|
uploaded_generation INTEGER NOT NULL,
|
|
last_uploaded_at_ms INTEGER NULL,
|
|
last_uploaded_etag TEXT NULL
|
|
);
|
|
|
|
CREATE TABLE schemas_${suffix} (
|
|
stream TEXT PRIMARY KEY,
|
|
schema_json TEXT NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL
|
|
);
|
|
|
|
CREATE TABLE producer_state_${suffix} (
|
|
stream TEXT NOT NULL,
|
|
producer_id TEXT NOT NULL,
|
|
epoch INTEGER NOT NULL,
|
|
last_seq INTEGER NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL,
|
|
PRIMARY KEY (stream, producer_id)
|
|
);
|
|
`;
|
|
var CREATE_INDEXES_V4_SQL = `
|
|
CREATE UNIQUE INDEX IF NOT EXISTS wal_stream_offset_uniq ON wal(stream, offset);
|
|
CREATE INDEX IF NOT EXISTS wal_stream_offset_idx ON wal(stream, offset);
|
|
CREATE INDEX IF NOT EXISTS wal_ts_idx ON wal(ts_ms);
|
|
|
|
CREATE INDEX IF NOT EXISTS streams_pending_bytes_idx ON streams(pending_bytes);
|
|
CREATE INDEX IF NOT EXISTS streams_last_cut_idx ON streams(last_segment_cut_ms);
|
|
CREATE INDEX IF NOT EXISTS streams_inprog_pending_idx ON streams(segment_in_progress, pending_bytes, last_segment_cut_ms);
|
|
|
|
CREATE UNIQUE INDEX IF NOT EXISTS segments_stream_index_uniq ON segments(stream, segment_index);
|
|
CREATE INDEX IF NOT EXISTS segments_stream_start_idx ON segments(stream, start_offset);
|
|
CREATE INDEX IF NOT EXISTS segments_pending_upload_idx ON segments(uploaded_at_ms);
|
|
`;
|
|
function initSchema(db, opts = {}) {
|
|
db.exec(DEFAULT_PRAGMAS_SQL);
|
|
if (opts.skipMigrations)
|
|
return;
|
|
db.exec(`CREATE TABLE IF NOT EXISTS schema_version (version INTEGER NOT NULL);`);
|
|
const readSchemaVersion = () => {
|
|
const row = db.query("SELECT version FROM schema_version LIMIT 1;").get();
|
|
if (!row)
|
|
return null;
|
|
const raw = row.version;
|
|
if (typeof raw === "bigint")
|
|
return Number(raw);
|
|
if (typeof raw === "number")
|
|
return raw;
|
|
return Number(raw);
|
|
};
|
|
const version0 = readSchemaVersion();
|
|
if (version0 == null) {
|
|
db.exec(CREATE_TABLES_V4_SQL);
|
|
db.exec(CREATE_INDEX_TABLES_SQL);
|
|
db.query("INSERT INTO schema_version(version) VALUES (?);").run(SCHEMA_VERSION);
|
|
return;
|
|
}
|
|
if (version0 === SCHEMA_VERSION)
|
|
return;
|
|
let version = version0;
|
|
while (version !== SCHEMA_VERSION) {
|
|
if (version === 1) {
|
|
migrateV1ToV4(db);
|
|
} else if (version === 2) {
|
|
migrateV2ToV4(db);
|
|
} else if (version === 3) {
|
|
migrateV3ToV4(db);
|
|
} else if (version === 4) {
|
|
migrateV4ToV5(db);
|
|
} else if (version === 5) {
|
|
migrateV5ToV6(db);
|
|
} else if (version === 6) {
|
|
migrateV6ToV7(db);
|
|
} else if (version === 7) {
|
|
migrateV7ToV8(db);
|
|
} else if (version === 8) {
|
|
migrateV8ToV9(db);
|
|
} else if (version === 9) {
|
|
migrateV9ToV10(db);
|
|
} else if (version === 10) {
|
|
migrateV10ToV11(db);
|
|
} else {
|
|
throw dsError(`unexpected schema version: ${version} (expected ${SCHEMA_VERSION})`);
|
|
}
|
|
const next = readSchemaVersion();
|
|
if (next == null)
|
|
throw dsError("schema_version row missing after migration");
|
|
version = next;
|
|
}
|
|
}
|
|
function migrateV1ToV4(db) {
|
|
const tx = db.transaction(() => {
|
|
db.exec(CREATE_TABLES_V4_SUFFIX_SQL("v4"));
|
|
db.exec(`
|
|
INSERT INTO streams_v4(
|
|
stream, created_at_ms, updated_at_ms,
|
|
content_type, stream_seq, closed, closed_producer_id, closed_producer_epoch, closed_producer_seq, ttl_seconds,
|
|
epoch,
|
|
next_offset, sealed_through, uploaded_through,
|
|
pending_rows, pending_bytes,
|
|
last_append_ms, last_segment_cut_ms, segment_in_progress,
|
|
expires_at_ms, stream_flags
|
|
)
|
|
SELECT
|
|
stream,
|
|
CAST(created_at_ns / 1000000 AS INTEGER),
|
|
CAST(updated_at_ns / 1000000 AS INTEGER),
|
|
'application/octet-stream',
|
|
NULL,
|
|
0,
|
|
NULL,
|
|
NULL,
|
|
NULL,
|
|
NULL,
|
|
epoch,
|
|
next_seq,
|
|
sealed_through_seq,
|
|
uploaded_through_seq,
|
|
pending_rows,
|
|
pending_bytes,
|
|
CAST(last_append_ns / 1000000 AS INTEGER),
|
|
CAST(last_segment_cut_ns / 1000000 AS INTEGER),
|
|
segment_in_progress,
|
|
CASE WHEN expires_at_ns IS NULL THEN NULL ELSE CAST(expires_at_ns / 1000000 AS INTEGER) END,
|
|
CASE WHEN deleted != 0 THEN 1 ELSE 0 END
|
|
FROM streams;
|
|
`);
|
|
db.exec(`
|
|
INSERT INTO wal_v4(
|
|
stream, offset, ts_ms, payload, payload_len, routing_key, content_type, flags
|
|
)
|
|
SELECT
|
|
stream,
|
|
seq,
|
|
CAST(append_ns / 1000000 AS INTEGER),
|
|
payload,
|
|
payload_len,
|
|
CASE WHEN routing_key IS NULL THEN NULL ELSE CAST(routing_key AS BLOB) END,
|
|
CASE WHEN is_json != 0 THEN 'application/json' ELSE NULL END,
|
|
0
|
|
FROM wal;
|
|
`);
|
|
db.exec(`
|
|
INSERT INTO segments_v4(
|
|
segment_id, stream, segment_index, start_offset, end_offset, block_count,
|
|
last_append_ms, size_bytes, local_path, created_at_ms, uploaded_at_ms, r2_etag
|
|
)
|
|
SELECT
|
|
segment_id,
|
|
stream,
|
|
segment_index,
|
|
start_seq,
|
|
end_seq,
|
|
block_count,
|
|
CAST(last_append_ns / 1000000 AS INTEGER),
|
|
size_bytes,
|
|
local_path,
|
|
CAST(created_at_ns / 1000000 AS INTEGER),
|
|
CASE WHEN uploaded_at_ns IS NULL THEN NULL ELSE CAST(uploaded_at_ns / 1000000 AS INTEGER) END,
|
|
NULL
|
|
FROM segments;
|
|
`);
|
|
db.exec(`
|
|
INSERT INTO manifests_v4(
|
|
stream, generation, uploaded_generation, last_uploaded_at_ms, last_uploaded_etag
|
|
)
|
|
SELECT
|
|
stream,
|
|
generation,
|
|
uploaded_generation,
|
|
CASE WHEN last_uploaded_at_ns IS NULL THEN NULL ELSE CAST(last_uploaded_at_ns / 1000000 AS INTEGER) END,
|
|
last_uploaded_etag
|
|
FROM manifests;
|
|
`);
|
|
db.exec(`
|
|
INSERT INTO schemas_v4(stream, schema_json, updated_at_ms)
|
|
SELECT stream, schema_json, CAST(updated_at_ns / 1000000 AS INTEGER)
|
|
FROM schemas;
|
|
`);
|
|
db.exec(`DROP TABLE wal;`);
|
|
db.exec(`DROP TABLE streams;`);
|
|
db.exec(`DROP TABLE segments;`);
|
|
db.exec(`DROP TABLE manifests;`);
|
|
db.exec(`DROP TABLE schemas;`);
|
|
db.exec(`ALTER TABLE streams_v4 RENAME TO streams;`);
|
|
db.exec(`ALTER TABLE wal_v4 RENAME TO wal;`);
|
|
db.exec(`ALTER TABLE segments_v4 RENAME TO segments;`);
|
|
db.exec(`ALTER TABLE manifests_v4 RENAME TO manifests;`);
|
|
db.exec(`ALTER TABLE schemas_v4 RENAME TO schemas;`);
|
|
db.exec(`ALTER TABLE producer_state_v4 RENAME TO producer_state;`);
|
|
db.exec(CREATE_INDEXES_V4_SQL);
|
|
db.exec(CREATE_INDEX_TABLES_SQL);
|
|
db.exec(`UPDATE schema_version SET version = 4;`);
|
|
});
|
|
tx();
|
|
}
|
|
function migrateV2ToV4(db) {
|
|
const tx = db.transaction(() => {
|
|
db.exec(`ALTER TABLE segments ADD COLUMN block_count INTEGER NOT NULL DEFAULT 0;`);
|
|
db.exec(`ALTER TABLE segments ADD COLUMN last_append_ms INTEGER NOT NULL DEFAULT 0;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN content_type TEXT NOT NULL DEFAULT 'application/octet-stream';`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN stream_seq TEXT NULL;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN closed INTEGER NOT NULL DEFAULT 0;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_id TEXT NULL;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_epoch INTEGER NULL;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_seq INTEGER NULL;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN ttl_seconds INTEGER NULL;`);
|
|
db.exec(`
|
|
CREATE TABLE IF NOT EXISTS producer_state (
|
|
stream TEXT NOT NULL,
|
|
producer_id TEXT NOT NULL,
|
|
epoch INTEGER NOT NULL,
|
|
last_seq INTEGER NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL,
|
|
PRIMARY KEY (stream, producer_id)
|
|
);
|
|
`);
|
|
db.exec(CREATE_INDEX_TABLES_SQL);
|
|
db.exec(`UPDATE schema_version SET version = 4;`);
|
|
});
|
|
tx();
|
|
}
|
|
function migrateV3ToV4(db) {
|
|
const tx = db.transaction(() => {
|
|
db.exec(`ALTER TABLE streams ADD COLUMN content_type TEXT NOT NULL DEFAULT 'application/octet-stream';`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN stream_seq TEXT NULL;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN closed INTEGER NOT NULL DEFAULT 0;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_id TEXT NULL;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_epoch INTEGER NULL;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN closed_producer_seq INTEGER NULL;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN ttl_seconds INTEGER NULL;`);
|
|
db.exec(`
|
|
CREATE TABLE IF NOT EXISTS producer_state (
|
|
stream TEXT NOT NULL,
|
|
producer_id TEXT NOT NULL,
|
|
epoch INTEGER NOT NULL,
|
|
last_seq INTEGER NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL,
|
|
PRIMARY KEY (stream, producer_id)
|
|
);
|
|
`);
|
|
db.exec(CREATE_INDEX_TABLES_SQL);
|
|
db.exec(`UPDATE schema_version SET version = 4;`);
|
|
});
|
|
tx();
|
|
}
|
|
function migrateV4ToV5(db) {
|
|
const tx = db.transaction(() => {
|
|
db.exec(CREATE_INDEX_TABLES_SQL);
|
|
db.exec(`UPDATE schema_version SET version = 5;`);
|
|
});
|
|
tx();
|
|
}
|
|
function migrateV5ToV6(db) {
|
|
const tx = db.transaction(() => {
|
|
db.exec(`ALTER TABLE streams ADD COLUMN uploaded_segment_count INTEGER NOT NULL DEFAULT 0;`);
|
|
db.exec(`
|
|
CREATE TABLE IF NOT EXISTS stream_segment_meta (
|
|
stream TEXT PRIMARY KEY,
|
|
segment_count INTEGER NOT NULL,
|
|
segment_offsets BLOB NOT NULL,
|
|
segment_blocks BLOB NOT NULL,
|
|
segment_last_ts BLOB NOT NULL
|
|
);
|
|
`);
|
|
db.exec(`UPDATE schema_version SET version = 6;`);
|
|
});
|
|
tx();
|
|
}
|
|
function migrateV6ToV7(db) {
|
|
const tx = db.transaction(() => {
|
|
db.exec(`
|
|
CREATE TABLE IF NOT EXISTS stream_interpreters (
|
|
stream TEXT PRIMARY KEY,
|
|
interpreted_through INTEGER NOT NULL,
|
|
updated_at_ms INTEGER NOT NULL
|
|
);
|
|
`);
|
|
db.exec(`UPDATE schema_version SET version = 7;`);
|
|
});
|
|
tx();
|
|
}
|
|
function migrateV7ToV8(db) {
|
|
const tx = db.transaction(() => {
|
|
db.exec(`UPDATE schema_version SET version = 8;`);
|
|
});
|
|
tx();
|
|
}
|
|
function migrateV8ToV9(db) {
|
|
const tx = db.transaction(() => {
|
|
db.exec(`
|
|
CREATE TABLE IF NOT EXISTS live_templates (
|
|
stream TEXT NOT NULL,
|
|
template_id TEXT NOT NULL,
|
|
entity TEXT NOT NULL,
|
|
fields_json TEXT NOT NULL,
|
|
encodings_json TEXT NOT NULL,
|
|
state TEXT NOT NULL,
|
|
created_at_ms INTEGER NOT NULL,
|
|
last_seen_at_ms INTEGER NOT NULL,
|
|
inactivity_ttl_ms INTEGER NOT NULL,
|
|
active_from_source_offset INTEGER NOT NULL,
|
|
retired_at_ms INTEGER NULL,
|
|
retired_reason TEXT NULL,
|
|
PRIMARY KEY (stream, template_id)
|
|
);
|
|
`);
|
|
db.exec(`
|
|
CREATE INDEX IF NOT EXISTS live_templates_stream_entity_state_last_seen_idx
|
|
ON live_templates(stream, entity, state, last_seen_at_ms);
|
|
`);
|
|
db.exec(`
|
|
CREATE INDEX IF NOT EXISTS live_templates_stream_state_last_seen_idx
|
|
ON live_templates(stream, state, last_seen_at_ms);
|
|
`);
|
|
db.exec(`UPDATE schema_version SET version = 9;`);
|
|
});
|
|
tx();
|
|
}
|
|
function migrateV9ToV10(db) {
|
|
const tx = db.transaction(() => {
|
|
db.exec(`ALTER TABLE streams ADD COLUMN wal_rows INTEGER NOT NULL DEFAULT 0;`);
|
|
db.exec(`ALTER TABLE streams ADD COLUMN wal_bytes INTEGER NOT NULL DEFAULT 0;`);
|
|
db.exec(`DROP TABLE IF EXISTS temp.wal_stats;`);
|
|
db.exec(`
|
|
CREATE TEMP TABLE wal_stats AS
|
|
SELECT stream, COUNT(*) as rows, COALESCE(SUM(payload_len), 0) as bytes
|
|
FROM wal
|
|
GROUP BY stream;
|
|
`);
|
|
db.exec(`
|
|
UPDATE streams
|
|
SET wal_rows = COALESCE((SELECT rows FROM wal_stats WHERE wal_stats.stream = streams.stream), 0),
|
|
wal_bytes = COALESCE((SELECT bytes FROM wal_stats WHERE wal_stats.stream = streams.stream), 0);
|
|
`);
|
|
db.exec(`DROP TABLE wal_stats;`);
|
|
db.exec(`UPDATE schema_version SET version = 10;`);
|
|
});
|
|
tx();
|
|
}
|
|
function migrateV10ToV11(db) {
|
|
const tx = db.transaction(() => {
|
|
db.exec(`DROP INDEX IF EXISTS wal_touch_stream_rk_offset_idx;`);
|
|
db.exec(`UPDATE schema_version SET version = ${SCHEMA_VERSION};`);
|
|
});
|
|
tx();
|
|
}
|
|
|
|
// src/sqlite/adapter.ts
|
|
import { createRequire } from "node:module";
|
|
|
|
// src/runtime/host_runtime.ts
|
|
function detectHostRuntime() {
|
|
return typeof globalThis.Bun !== "undefined" || Boolean(process.versions?.bun) ? "bun" : "node";
|
|
}
|
|
|
|
// src/sqlite/adapter.ts
|
|
class BunStatementAdapter {
|
|
stmt;
|
|
constructor(stmt) {
|
|
this.stmt = stmt;
|
|
}
|
|
get(...params) {
|
|
return this.stmt.get(...params);
|
|
}
|
|
all(...params) {
|
|
return this.stmt.all(...params);
|
|
}
|
|
run(...params) {
|
|
return this.stmt.run(...params);
|
|
}
|
|
iterate(...params) {
|
|
return this.stmt.iterate(...params);
|
|
}
|
|
finalize() {
|
|
if (typeof this.stmt.finalize === "function")
|
|
this.stmt.finalize();
|
|
}
|
|
}
|
|
|
|
class BunDatabaseAdapter {
|
|
db;
|
|
constructor(db) {
|
|
this.db = db;
|
|
}
|
|
exec(sql) {
|
|
this.db.exec(sql);
|
|
}
|
|
query(sql) {
|
|
return new BunStatementAdapter(this.db.query(sql));
|
|
}
|
|
transaction(fn) {
|
|
return this.db.transaction(fn);
|
|
}
|
|
close() {
|
|
this.db.close();
|
|
}
|
|
}
|
|
|
|
class NodeStatementAdapter {
|
|
stmt;
|
|
constructor(stmt) {
|
|
this.stmt = stmt;
|
|
}
|
|
get(...params) {
|
|
return this.stmt.get(...params);
|
|
}
|
|
all(...params) {
|
|
return this.stmt.all(...params);
|
|
}
|
|
run(...params) {
|
|
return this.stmt.run(...params);
|
|
}
|
|
iterate(...params) {
|
|
return this.stmt.iterate(...params);
|
|
}
|
|
finalize() {
|
|
if (typeof this.stmt.finalize === "function")
|
|
this.stmt.finalize();
|
|
}
|
|
}
|
|
|
|
class NodeDatabaseAdapter {
|
|
txDepth = 0;
|
|
txCounter = 0;
|
|
db;
|
|
constructor(db) {
|
|
this.db = db;
|
|
}
|
|
exec(sql) {
|
|
this.db.exec(sql);
|
|
}
|
|
query(sql) {
|
|
const stmt = this.db.prepare(sql);
|
|
if (typeof stmt?.setReadBigInts === "function")
|
|
stmt.setReadBigInts(true);
|
|
return new NodeStatementAdapter(stmt);
|
|
}
|
|
transaction(fn) {
|
|
return () => {
|
|
const nested = this.txDepth > 0;
|
|
const savepoint = `ds_tx_${++this.txCounter}`;
|
|
this.txDepth += 1;
|
|
try {
|
|
if (nested)
|
|
this.db.exec(`SAVEPOINT ${savepoint};`);
|
|
else
|
|
this.db.exec("BEGIN;");
|
|
const out = fn();
|
|
if (nested)
|
|
this.db.exec(`RELEASE SAVEPOINT ${savepoint};`);
|
|
else
|
|
this.db.exec("COMMIT;");
|
|
return out;
|
|
} catch (err) {
|
|
try {
|
|
if (nested) {
|
|
this.db.exec(`ROLLBACK TO SAVEPOINT ${savepoint};`);
|
|
this.db.exec(`RELEASE SAVEPOINT ${savepoint};`);
|
|
} else {
|
|
this.db.exec("ROLLBACK;");
|
|
}
|
|
} catch {}
|
|
throw err;
|
|
} finally {
|
|
this.txDepth = Math.max(0, this.txDepth - 1);
|
|
}
|
|
};
|
|
}
|
|
close() {
|
|
this.db.close();
|
|
}
|
|
}
|
|
var openImpl = null;
|
|
var openImplRuntime = null;
|
|
var runtimeOverride = null;
|
|
var require2 = createRequire(import.meta.url);
|
|
function selectedRuntime() {
|
|
return runtimeOverride ?? detectHostRuntime();
|
|
}
|
|
function buildOpenImpl(runtime) {
|
|
if (runtime === "bun") {
|
|
const { Database } = require2("bun:sqlite");
|
|
return (path) => new BunDatabaseAdapter(new Database(path));
|
|
}
|
|
const { DatabaseSync } = require2("node:sqlite");
|
|
return (path) => new NodeDatabaseAdapter(new DatabaseSync(path));
|
|
}
|
|
function setSqliteRuntimeOverride(runtime) {
|
|
runtimeOverride = runtime;
|
|
if (runtimeOverride && openImplRuntime && runtimeOverride !== openImplRuntime) {
|
|
openImpl = null;
|
|
openImplRuntime = null;
|
|
}
|
|
}
|
|
function openSqliteDatabase(path) {
|
|
const runtime = selectedRuntime();
|
|
if (!openImpl || openImplRuntime !== runtime) {
|
|
openImpl = buildOpenImpl(runtime);
|
|
openImplRuntime = runtime;
|
|
}
|
|
if (!openImpl)
|
|
throw dsError("sqlite adapter not initialized");
|
|
return openImpl(path);
|
|
}
|
|
|
|
// src/db/db.ts
|
|
import { Result } from "better-result";
|
|
var STREAM_FLAG_DELETED = 1 << 0;
|
|
var STREAM_FLAG_TOUCH = 1 << 1;
|
|
var BASE_WAL_GC_CHUNK_OFFSETS = (() => {
|
|
const raw = process.env.DS_BASE_WAL_GC_CHUNK_OFFSETS;
|
|
if (raw == null || raw.trim() === "")
|
|
return 1e5;
|
|
const n = Number(raw);
|
|
if (!Number.isFinite(n) || n <= 0)
|
|
return 1e5;
|
|
return Math.floor(n);
|
|
})();
|
|
|
|
class SqliteDurableStore {
|
|
db;
|
|
dbstatReady = null;
|
|
stmts;
|
|
constructor(path, opts = {}) {
|
|
this.db = openSqliteDatabase(path);
|
|
initSchema(this.db, { skipMigrations: opts.skipMigrations });
|
|
if (opts.cacheBytes && opts.cacheBytes > 0) {
|
|
const kb = Math.max(1, Math.floor(opts.cacheBytes / 1024));
|
|
this.db.exec(`PRAGMA cache_size = -${kb};`);
|
|
}
|
|
this.stmts = {
|
|
getStream: this.db.query(`SELECT stream, created_at_ms, updated_at_ms,
|
|
content_type, stream_seq, closed, closed_producer_id, closed_producer_epoch, closed_producer_seq, ttl_seconds,
|
|
epoch, next_offset, sealed_through, uploaded_through, uploaded_segment_count,
|
|
pending_rows, pending_bytes, wal_rows, wal_bytes, last_append_ms, last_segment_cut_ms, segment_in_progress,
|
|
expires_at_ms, stream_flags
|
|
FROM streams WHERE stream = ? LIMIT 1;`),
|
|
upsertStream: this.db.query(`INSERT INTO streams(stream, created_at_ms, updated_at_ms,
|
|
content_type, stream_seq, closed, closed_producer_id, closed_producer_epoch, closed_producer_seq, ttl_seconds,
|
|
epoch, next_offset, sealed_through, uploaded_through, uploaded_segment_count,
|
|
pending_rows, pending_bytes, wal_rows, wal_bytes, last_append_ms, last_segment_cut_ms, segment_in_progress,
|
|
expires_at_ms, stream_flags)
|
|
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(stream) DO UPDATE SET
|
|
updated_at_ms=excluded.updated_at_ms,
|
|
expires_at_ms=excluded.expires_at_ms,
|
|
ttl_seconds=excluded.ttl_seconds,
|
|
content_type=excluded.content_type,
|
|
stream_flags=excluded.stream_flags;`),
|
|
listStreams: this.db.query(`SELECT stream, created_at_ms, updated_at_ms,
|
|
content_type, stream_seq, closed, closed_producer_id, closed_producer_epoch, closed_producer_seq, ttl_seconds,
|
|
epoch, next_offset, sealed_through, uploaded_through, uploaded_segment_count,
|
|
pending_rows, pending_bytes, wal_rows, wal_bytes, last_append_ms, last_segment_cut_ms, segment_in_progress,
|
|
expires_at_ms, stream_flags
|
|
FROM streams
|
|
WHERE (stream_flags & ?) = 0
|
|
AND (expires_at_ms IS NULL OR expires_at_ms > ?)
|
|
ORDER BY stream
|
|
LIMIT ? OFFSET ?;`),
|
|
setDeleted: this.db.query(`UPDATE streams SET stream_flags = (stream_flags | ?), updated_at_ms=? WHERE stream=?;`),
|
|
insertWal: this.db.query(`INSERT INTO wal(stream, offset, ts_ms, payload, payload_len, routing_key, content_type, flags)
|
|
VALUES(?, ?, ?, ?, ?, ?, ?, ?);`),
|
|
updateStreamAppend: this.db.query(`UPDATE streams
|
|
SET next_offset = ?, updated_at_ms = ?, last_append_ms = ?,
|
|
pending_rows = pending_rows + ?, pending_bytes = pending_bytes + ?,
|
|
wal_rows = wal_rows + ?, wal_bytes = wal_bytes + ?
|
|
WHERE stream = ? AND (stream_flags & ?) = 0;`),
|
|
updateStreamAppendSeqCheck: this.db.query(`UPDATE streams
|
|
SET next_offset = ?, updated_at_ms = ?, last_append_ms = ?,
|
|
pending_rows = pending_rows + ?, pending_bytes = pending_bytes + ?,
|
|
wal_rows = wal_rows + ?, wal_bytes = wal_bytes + ?
|
|
WHERE stream = ? AND (stream_flags & ?) = 0 AND next_offset = ?;`),
|
|
candidateStreams: this.db.query(`SELECT stream, pending_bytes, pending_rows, last_segment_cut_ms, sealed_through, next_offset, epoch
|
|
FROM streams
|
|
WHERE (stream_flags & ?) = 0
|
|
AND segment_in_progress = 0
|
|
AND (pending_bytes >= ? OR pending_rows >= ? OR (? - last_segment_cut_ms) >= ?)
|
|
ORDER BY pending_bytes DESC
|
|
LIMIT ?;`),
|
|
candidateStreamsNoInterval: this.db.query(`SELECT stream, pending_bytes, pending_rows, last_segment_cut_ms, sealed_through, next_offset, epoch
|
|
FROM streams
|
|
WHERE (stream_flags & ?) = 0
|
|
AND segment_in_progress = 0
|
|
AND (pending_bytes >= ? OR pending_rows >= ?)
|
|
ORDER BY pending_bytes DESC
|
|
LIMIT ?;`),
|
|
listExpiredStreams: this.db.query(`SELECT stream
|
|
FROM streams
|
|
WHERE (stream_flags & ?) = 0
|
|
AND expires_at_ms IS NOT NULL
|
|
AND expires_at_ms <= ?
|
|
ORDER BY expires_at_ms ASC
|
|
LIMIT ?;`),
|
|
streamWalRange: this.db.query(`SELECT offset, ts_ms, routing_key, content_type, payload
|
|
FROM wal
|
|
WHERE stream = ? AND offset >= ? AND offset <= ?
|
|
ORDER BY offset ASC;`),
|
|
streamWalRangeByKey: this.db.query(`SELECT offset, ts_ms, routing_key, content_type, payload
|
|
FROM wal
|
|
WHERE stream = ? AND offset >= ? AND offset <= ? AND routing_key = ?
|
|
ORDER BY offset ASC;`),
|
|
createSegment: this.db.query(`INSERT INTO segments(segment_id, stream, segment_index, start_offset, end_offset, block_count,
|
|
last_append_ms, size_bytes, local_path, created_at_ms, uploaded_at_ms, r2_etag)
|
|
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL);`),
|
|
listSegmentsForStream: this.db.query(`SELECT segment_id, stream, segment_index, start_offset, end_offset, block_count, last_append_ms, size_bytes,
|
|
local_path, created_at_ms, uploaded_at_ms, r2_etag
|
|
FROM segments WHERE stream=? ORDER BY segment_index ASC;`),
|
|
getSegmentByIndex: this.db.query(`SELECT segment_id, stream, segment_index, start_offset, end_offset, block_count, last_append_ms, size_bytes,
|
|
local_path, created_at_ms, uploaded_at_ms, r2_etag
|
|
FROM segments WHERE stream=? AND segment_index=? LIMIT 1;`),
|
|
findSegmentForOffset: this.db.query(`SELECT segment_id, stream, segment_index, start_offset, end_offset, block_count, last_append_ms, size_bytes,
|
|
local_path, created_at_ms, uploaded_at_ms, r2_etag
|
|
FROM segments
|
|
WHERE stream=? AND start_offset <= ? AND end_offset >= ?
|
|
ORDER BY segment_index DESC
|
|
LIMIT 1;`),
|
|
nextSegmentIndex: this.db.query(`SELECT COALESCE(MAX(segment_index)+1, 0) as next_idx FROM segments WHERE stream=?;`),
|
|
markSegmentUploaded: this.db.query(`UPDATE segments SET r2_etag=?, uploaded_at_ms=? WHERE segment_id=?;`),
|
|
pendingUploadSegments: this.db.query(`SELECT segment_id, stream, segment_index, start_offset, end_offset, block_count, last_append_ms, size_bytes,
|
|
local_path, created_at_ms, uploaded_at_ms, r2_etag
|
|
FROM segments WHERE uploaded_at_ms IS NULL ORDER BY created_at_ms ASC LIMIT ?;`),
|
|
countPendingSegments: this.db.query(`SELECT COUNT(*) as cnt FROM segments WHERE uploaded_at_ms IS NULL;`),
|
|
countSegmentsForStream: this.db.query(`SELECT COUNT(*) as cnt FROM segments WHERE stream=?;`),
|
|
tryClaimSegment: this.db.query(`UPDATE streams SET segment_in_progress=1, updated_at_ms=? WHERE stream=? AND segment_in_progress=0;`),
|
|
getManifest: this.db.query(`SELECT stream, generation, uploaded_generation, last_uploaded_at_ms, last_uploaded_etag FROM manifests WHERE stream=? LIMIT 1;`),
|
|
upsertManifest: this.db.query(`INSERT INTO manifests(stream, generation, uploaded_generation, last_uploaded_at_ms, last_uploaded_etag)
|
|
VALUES(?, ?, ?, ?, ?)
|
|
ON CONFLICT(stream) DO UPDATE SET
|
|
generation=excluded.generation,
|
|
uploaded_generation=excluded.uploaded_generation,
|
|
last_uploaded_at_ms=excluded.last_uploaded_at_ms,
|
|
last_uploaded_etag=excluded.last_uploaded_etag;`),
|
|
getIndexState: this.db.query(`SELECT stream, index_secret, indexed_through, updated_at_ms
|
|
FROM index_state WHERE stream=? LIMIT 1;`),
|
|
upsertIndexState: this.db.query(`INSERT INTO index_state(stream, index_secret, indexed_through, updated_at_ms)
|
|
VALUES(?, ?, ?, ?)
|
|
ON CONFLICT(stream) DO UPDATE SET
|
|
index_secret=excluded.index_secret,
|
|
indexed_through=excluded.indexed_through,
|
|
updated_at_ms=excluded.updated_at_ms;`),
|
|
updateIndexedThrough: this.db.query(`UPDATE index_state SET indexed_through=?, updated_at_ms=? WHERE stream=?;`),
|
|
listIndexRuns: this.db.query(`SELECT run_id, stream, level, start_segment, end_segment, object_key, filter_len, record_count, retired_gen, retired_at_ms
|
|
FROM index_runs WHERE stream=? AND retired_gen IS NULL
|
|
ORDER BY start_segment ASC, level ASC;`),
|
|
listIndexRunsAll: this.db.query(`SELECT run_id, stream, level, start_segment, end_segment, object_key, filter_len, record_count, retired_gen, retired_at_ms
|
|
FROM index_runs WHERE stream=?
|
|
ORDER BY start_segment ASC, level ASC;`),
|
|
listRetiredIndexRuns: this.db.query(`SELECT run_id, stream, level, start_segment, end_segment, object_key, filter_len, record_count, retired_gen, retired_at_ms
|
|
FROM index_runs WHERE stream=? AND retired_gen IS NOT NULL
|
|
ORDER BY retired_at_ms ASC;`),
|
|
insertIndexRun: this.db.query(`INSERT OR IGNORE INTO index_runs(run_id, stream, level, start_segment, end_segment, object_key, filter_len, record_count, retired_gen, retired_at_ms)
|
|
VALUES(?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL);`),
|
|
retireIndexRun: this.db.query(`UPDATE index_runs SET retired_gen=?, retired_at_ms=? WHERE run_id=?;`),
|
|
deleteIndexRun: this.db.query(`DELETE FROM index_runs WHERE run_id=?;`),
|
|
countUploadedSegments: this.db.query(`SELECT COALESCE(MAX(segment_index), -1) as max_idx
|
|
FROM segments WHERE stream=? AND r2_etag IS NOT NULL;`),
|
|
getSegmentMeta: this.db.query(`SELECT stream, segment_count, segment_offsets, segment_blocks, segment_last_ts
|
|
FROM stream_segment_meta WHERE stream=? LIMIT 1;`),
|
|
ensureSegmentMeta: this.db.query(`INSERT INTO stream_segment_meta(stream, segment_count, segment_offsets, segment_blocks, segment_last_ts)
|
|
VALUES(?, 0, x'', x'', x'')
|
|
ON CONFLICT(stream) DO NOTHING;`),
|
|
appendSegmentMeta: this.db.query(`UPDATE stream_segment_meta
|
|
SET segment_count = segment_count + 1,
|
|
segment_offsets = segment_offsets || ?,
|
|
segment_blocks = segment_blocks || ?,
|
|
segment_last_ts = segment_last_ts || ?
|
|
WHERE stream = ?;`),
|
|
upsertSegmentMeta: this.db.query(`INSERT INTO stream_segment_meta(stream, segment_count, segment_offsets, segment_blocks, segment_last_ts)
|
|
VALUES(?, ?, ?, ?, ?)
|
|
ON CONFLICT(stream) DO UPDATE SET
|
|
segment_count=excluded.segment_count,
|
|
segment_offsets=excluded.segment_offsets,
|
|
segment_blocks=excluded.segment_blocks,
|
|
segment_last_ts=excluded.segment_last_ts;`),
|
|
setUploadedSegmentCount: this.db.query(`UPDATE streams SET uploaded_segment_count=?, updated_at_ms=? WHERE stream=?;`),
|
|
advanceUploadedThrough: this.db.query(`UPDATE streams SET uploaded_through=?, updated_at_ms=? WHERE stream=?;`),
|
|
deleteWalBeforeOffset: this.db.query(`DELETE FROM wal WHERE stream=? AND offset <= ?;`),
|
|
getSchemaRegistry: this.db.query(`SELECT stream, schema_json, updated_at_ms FROM schemas WHERE stream=? LIMIT 1;`),
|
|
upsertSchemaRegistry: this.db.query(`INSERT INTO schemas(stream, schema_json, updated_at_ms) VALUES(?, ?, ?)
|
|
ON CONFLICT(stream) DO UPDATE SET schema_json=excluded.schema_json, updated_at_ms=excluded.updated_at_ms;`),
|
|
getStreamInterpreter: this.db.query(`SELECT stream, interpreted_through, updated_at_ms
|
|
FROM stream_interpreters WHERE stream=? LIMIT 1;`),
|
|
upsertStreamInterpreter: this.db.query(`INSERT INTO stream_interpreters(stream, interpreted_through, updated_at_ms)
|
|
VALUES(?, ?, ?)
|
|
ON CONFLICT(stream) DO UPDATE SET
|
|
interpreted_through=excluded.interpreted_through,
|
|
updated_at_ms=excluded.updated_at_ms;`),
|
|
deleteStreamInterpreter: this.db.query(`DELETE FROM stream_interpreters WHERE stream=?;`),
|
|
listStreamInterpreters: this.db.query(`SELECT stream, interpreted_through, updated_at_ms
|
|
FROM stream_interpreters
|
|
ORDER BY stream ASC;`),
|
|
countStreams: this.db.query(`SELECT COUNT(*) as cnt FROM streams WHERE (stream_flags & ?) = 0;`),
|
|
sumPendingBytes: this.db.query(`SELECT COALESCE(SUM(pending_bytes), 0) as total FROM streams;`),
|
|
sumPendingSegmentBytes: this.db.query(`SELECT COALESCE(SUM(size_bytes), 0) as total FROM segments WHERE uploaded_at_ms IS NULL;`)
|
|
};
|
|
}
|
|
toBigInt(v) {
|
|
return typeof v === "bigint" ? v : BigInt(v);
|
|
}
|
|
bindInt(v) {
|
|
const max = BigInt(Number.MAX_SAFE_INTEGER);
|
|
const min = BigInt(Number.MIN_SAFE_INTEGER);
|
|
if (v <= max && v >= min)
|
|
return Number(v);
|
|
return v.toString();
|
|
}
|
|
encodeU64Le(value) {
|
|
const buf = new Uint8Array(8);
|
|
const dv = new DataView(buf.buffer, buf.byteOffset, buf.byteLength);
|
|
dv.setBigUint64(0, value, true);
|
|
return buf;
|
|
}
|
|
encodeU32Le(value) {
|
|
const buf = new Uint8Array(4);
|
|
const dv = new DataView(buf.buffer, buf.byteOffset, buf.byteLength);
|
|
dv.setUint32(0, value >>> 0, true);
|
|
return buf;
|
|
}
|
|
coerceStreamRow(row) {
|
|
return {
|
|
stream: String(row.stream),
|
|
created_at_ms: this.toBigInt(row.created_at_ms),
|
|
updated_at_ms: this.toBigInt(row.updated_at_ms),
|
|
content_type: String(row.content_type),
|
|
stream_seq: row.stream_seq == null ? null : String(row.stream_seq),
|
|
closed: Number(row.closed),
|
|
closed_producer_id: row.closed_producer_id == null ? null : String(row.closed_producer_id),
|
|
closed_producer_epoch: row.closed_producer_epoch == null ? null : Number(row.closed_producer_epoch),
|
|
closed_producer_seq: row.closed_producer_seq == null ? null : Number(row.closed_producer_seq),
|
|
ttl_seconds: row.ttl_seconds == null ? null : Number(row.ttl_seconds),
|
|
epoch: Number(row.epoch),
|
|
next_offset: this.toBigInt(row.next_offset),
|
|
sealed_through: this.toBigInt(row.sealed_through),
|
|
uploaded_through: this.toBigInt(row.uploaded_through),
|
|
uploaded_segment_count: Number(row.uploaded_segment_count ?? 0),
|
|
pending_rows: this.toBigInt(row.pending_rows),
|
|
pending_bytes: this.toBigInt(row.pending_bytes),
|
|
wal_rows: this.toBigInt(row.wal_rows ?? 0),
|
|
wal_bytes: this.toBigInt(row.wal_bytes ?? 0),
|
|
last_append_ms: this.toBigInt(row.last_append_ms),
|
|
last_segment_cut_ms: this.toBigInt(row.last_segment_cut_ms),
|
|
segment_in_progress: Number(row.segment_in_progress),
|
|
expires_at_ms: row.expires_at_ms == null ? null : this.toBigInt(row.expires_at_ms),
|
|
stream_flags: Number(row.stream_flags)
|
|
};
|
|
}
|
|
coerceSegmentRow(row) {
|
|
return {
|
|
segment_id: String(row.segment_id),
|
|
stream: String(row.stream),
|
|
segment_index: Number(row.segment_index),
|
|
start_offset: this.toBigInt(row.start_offset),
|
|
end_offset: this.toBigInt(row.end_offset),
|
|
block_count: Number(row.block_count),
|
|
last_append_ms: this.toBigInt(row.last_append_ms),
|
|
size_bytes: Number(row.size_bytes),
|
|
local_path: String(row.local_path),
|
|
created_at_ms: this.toBigInt(row.created_at_ms),
|
|
uploaded_at_ms: row.uploaded_at_ms == null ? null : this.toBigInt(row.uploaded_at_ms),
|
|
r2_etag: row.r2_etag == null ? null : String(row.r2_etag)
|
|
};
|
|
}
|
|
close() {
|
|
this.db.close();
|
|
}
|
|
nowMs() {
|
|
return BigInt(Date.now());
|
|
}
|
|
isDeleted(row) {
|
|
return (row.stream_flags & STREAM_FLAG_DELETED) !== 0;
|
|
}
|
|
getStream(stream) {
|
|
const row = this.stmts.getStream.get(stream);
|
|
return row ? this.coerceStreamRow(row) : null;
|
|
}
|
|
ensureStream(stream, opts) {
|
|
const existing = this.getStream(stream);
|
|
if (existing)
|
|
return existing;
|
|
const now = this.nowMs();
|
|
const epoch = 0;
|
|
const nextOffset = 0n;
|
|
const contentType = opts?.contentType ?? "application/octet-stream";
|
|
const closed = opts?.closed ? 1 : 0;
|
|
const closedProducer = opts?.closedProducer ?? null;
|
|
const expiresAtMs = opts?.expiresAtMs ?? null;
|
|
const ttlSeconds = opts?.ttlSeconds ?? null;
|
|
const streamFlags = opts?.streamFlags ?? 0;
|
|
this.db.query(`INSERT INTO streams(
|
|
stream, created_at_ms, updated_at_ms,
|
|
content_type, stream_seq, closed, closed_producer_id, closed_producer_epoch, closed_producer_seq, ttl_seconds,
|
|
epoch, next_offset, sealed_through, uploaded_through, uploaded_segment_count,
|
|
pending_rows, pending_bytes, last_append_ms, last_segment_cut_ms, segment_in_progress,
|
|
expires_at_ms, stream_flags
|
|
)
|
|
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`).run(stream, now, now, contentType, null, closed, closedProducer ? closedProducer.id : null, closedProducer ? closedProducer.epoch : null, closedProducer ? closedProducer.seq : null, ttlSeconds, epoch, nextOffset, -1n, -1n, 0, 0n, 0n, now, now, 0, expiresAtMs, streamFlags);
|
|
this.stmts.upsertManifest.run(stream, 0, 0, null, null);
|
|
this.ensureSegmentMeta(stream);
|
|
return this.getStream(stream);
|
|
}
|
|
restoreStreamRow(row) {
|
|
this.stmts.upsertStream.run(row.stream, row.created_at_ms, row.updated_at_ms, row.content_type, row.stream_seq, row.closed, row.closed_producer_id, row.closed_producer_epoch, row.closed_producer_seq, row.ttl_seconds, row.epoch, row.next_offset, row.sealed_through, row.uploaded_through, row.uploaded_segment_count, row.pending_rows, row.pending_bytes, row.wal_rows, row.wal_bytes, row.last_append_ms, row.last_segment_cut_ms, row.segment_in_progress, row.expires_at_ms, row.stream_flags);
|
|
}
|
|
listStreams(limit, offset) {
|
|
const now = this.nowMs();
|
|
const rows = this.stmts.listStreams.all(STREAM_FLAG_DELETED | STREAM_FLAG_TOUCH, now, limit, offset);
|
|
return rows.map((r) => this.coerceStreamRow(r));
|
|
}
|
|
listExpiredStreams(limit) {
|
|
const now = this.nowMs();
|
|
const rows = this.stmts.listExpiredStreams.all(STREAM_FLAG_DELETED | STREAM_FLAG_TOUCH, now, limit);
|
|
return rows.map((r) => String(r.stream));
|
|
}
|
|
deleteStream(stream) {
|
|
const existing = this.getStream(stream);
|
|
if (!existing)
|
|
return false;
|
|
const now = this.nowMs();
|
|
this.stmts.setDeleted.run(STREAM_FLAG_DELETED, now, stream);
|
|
return true;
|
|
}
|
|
hardDeleteStream(stream) {
|
|
const tx = this.db.transaction(() => {
|
|
const existing = this.getStream(stream);
|
|
if (!existing)
|
|
return false;
|
|
this.db.query(`DELETE FROM wal WHERE stream=?;`).run(stream);
|
|
this.db.query(`DELETE FROM segments WHERE stream=?;`).run(stream);
|
|
this.db.query(`DELETE FROM manifests WHERE stream=?;`).run(stream);
|
|
this.db.query(`DELETE FROM schemas WHERE stream=?;`).run(stream);
|
|
this.db.query(`DELETE FROM stream_interpreters WHERE stream=?;`).run(stream);
|
|
this.db.query(`DELETE FROM live_templates WHERE stream=?;`).run(stream);
|
|
this.db.query(`DELETE FROM producer_state WHERE stream=?;`).run(stream);
|
|
this.db.query(`DELETE FROM index_state WHERE stream=?;`).run(stream);
|
|
this.db.query(`DELETE FROM index_runs WHERE stream=?;`).run(stream);
|
|
this.db.query(`DELETE FROM stream_segment_meta WHERE stream=?;`).run(stream);
|
|
this.db.query(`DELETE FROM streams WHERE stream=?;`).run(stream);
|
|
return true;
|
|
});
|
|
return tx();
|
|
}
|
|
getSchemaRegistry(stream) {
|
|
const row = this.stmts.getSchemaRegistry.get(stream);
|
|
if (!row)
|
|
return null;
|
|
return { stream: String(row.stream), registry_json: String(row.schema_json), updated_at_ms: this.toBigInt(row.updated_at_ms) };
|
|
}
|
|
upsertSchemaRegistry(stream, registryJson) {
|
|
this.stmts.upsertSchemaRegistry.run(stream, registryJson, this.nowMs());
|
|
}
|
|
getStreamInterpreter(stream) {
|
|
const row = this.stmts.getStreamInterpreter.get(stream);
|
|
if (!row)
|
|
return null;
|
|
return {
|
|
stream: String(row.stream),
|
|
interpreted_through: this.toBigInt(row.interpreted_through),
|
|
updated_at_ms: this.toBigInt(row.updated_at_ms)
|
|
};
|
|
}
|
|
listStreamInterpreters() {
|
|
const rows = this.stmts.listStreamInterpreters.all();
|
|
return rows.map((row) => ({
|
|
stream: String(row.stream),
|
|
interpreted_through: this.toBigInt(row.interpreted_through),
|
|
updated_at_ms: this.toBigInt(row.updated_at_ms)
|
|
}));
|
|
}
|
|
ensureStreamInterpreter(stream) {
|
|
const existing = this.getStreamInterpreter(stream);
|
|
if (existing)
|
|
return;
|
|
const srow = this.getStream(stream);
|
|
const initialThrough = srow ? srow.next_offset - 1n : -1n;
|
|
this.stmts.upsertStreamInterpreter.run(stream, this.bindInt(initialThrough), this.nowMs());
|
|
}
|
|
updateStreamInterpreterThrough(stream, interpretedThrough) {
|
|
this.stmts.upsertStreamInterpreter.run(stream, this.bindInt(interpretedThrough), this.nowMs());
|
|
}
|
|
deleteStreamInterpreter(stream) {
|
|
this.stmts.deleteStreamInterpreter.run(stream);
|
|
}
|
|
addStreamFlags(stream, flags) {
|
|
if (!Number.isFinite(flags) || flags <= 0)
|
|
return;
|
|
this.db.query(`UPDATE streams SET stream_flags = (stream_flags | ?), updated_at_ms=? WHERE stream=?;`).run(flags, this.nowMs(), stream);
|
|
}
|
|
getWalOldestOffset(stream) {
|
|
const row = this.db.query(`SELECT MIN(offset) as min_off FROM wal WHERE stream=?;`).get(stream);
|
|
if (!row || row.min_off == null)
|
|
return null;
|
|
return this.toBigInt(row.min_off);
|
|
}
|
|
trimWalByAge(stream, maxAgeMs) {
|
|
const ageMs = Math.max(0, Math.floor(maxAgeMs));
|
|
if (!Number.isFinite(ageMs))
|
|
return { trimmedRows: 0, trimmedBytes: 0, keptFromOffset: null };
|
|
const tx = this.db.transaction(() => {
|
|
const lastRow = this.db.query(`SELECT offset, ts_ms FROM wal WHERE stream=? ORDER BY offset DESC LIMIT 1;`).get(stream);
|
|
if (!lastRow || lastRow.offset == null)
|
|
return { trimmedRows: 0, trimmedBytes: 0, keptFromOffset: null };
|
|
const lastOffset = this.toBigInt(lastRow.offset);
|
|
let keepFromOffset;
|
|
if (ageMs === 0) {
|
|
keepFromOffset = lastOffset;
|
|
} else {
|
|
const cutoff = this.nowMs() - BigInt(ageMs);
|
|
const keepRow = this.db.query(`SELECT offset FROM wal WHERE stream=? AND ts_ms >= ? ORDER BY offset ASC LIMIT 1;`).get(stream, this.bindInt(cutoff));
|
|
keepFromOffset = keepRow && keepRow.offset != null ? this.toBigInt(keepRow.offset) : lastOffset;
|
|
}
|
|
if (keepFromOffset <= 0n)
|
|
return { trimmedRows: 0, trimmedBytes: 0, keptFromOffset: keepFromOffset };
|
|
const stats = this.db.query(`SELECT COALESCE(SUM(payload_len), 0) as bytes, COUNT(*) as rows
|
|
FROM wal WHERE stream=? AND offset < ?;`).get(stream, this.bindInt(keepFromOffset));
|
|
const bytes = this.toBigInt(stats?.bytes ?? 0);
|
|
const rows = this.toBigInt(stats?.rows ?? 0);
|
|
if (rows <= 0n)
|
|
return { trimmedRows: 0, trimmedBytes: 0, keptFromOffset: keepFromOffset };
|
|
this.db.query(`DELETE FROM wal WHERE stream=? AND offset < ?;`).run(stream, this.bindInt(keepFromOffset));
|
|
const now = this.nowMs();
|
|
this.db.query(`UPDATE streams
|
|
SET pending_bytes = CASE WHEN pending_bytes >= ? THEN pending_bytes - ? ELSE 0 END,
|
|
pending_rows = CASE WHEN pending_rows >= ? THEN pending_rows - ? ELSE 0 END,
|
|
wal_bytes = CASE WHEN wal_bytes >= ? THEN wal_bytes - ? ELSE 0 END,
|
|
wal_rows = CASE WHEN wal_rows >= ? THEN wal_rows - ? ELSE 0 END,
|
|
updated_at_ms = ?
|
|
WHERE stream = ?;`).run(bytes, bytes, rows, rows, bytes, bytes, rows, rows, now, stream);
|
|
const trimmedBytes = bytes <= BigInt(Number.MAX_SAFE_INTEGER) ? Number(bytes) : Number.MAX_SAFE_INTEGER;
|
|
const trimmedRows = rows <= BigInt(Number.MAX_SAFE_INTEGER) ? Number(rows) : Number.MAX_SAFE_INTEGER;
|
|
return { trimmedRows, trimmedBytes, keptFromOffset: keepFromOffset };
|
|
});
|
|
return tx();
|
|
}
|
|
countStreams() {
|
|
const row = this.stmts.countStreams.get(STREAM_FLAG_DELETED | STREAM_FLAG_TOUCH);
|
|
return row ? Number(row.cnt) : 0;
|
|
}
|
|
sumPendingBytes() {
|
|
const row = this.stmts.sumPendingBytes.get();
|
|
const total = row?.total ?? 0;
|
|
return Number(this.toBigInt(total));
|
|
}
|
|
sumPendingSegmentBytes() {
|
|
const row = this.stmts.sumPendingSegmentBytes.get();
|
|
const total = row?.total ?? 0;
|
|
return Number(this.toBigInt(total));
|
|
}
|
|
ensureDbStat() {
|
|
if (this.dbstatReady != null)
|
|
return this.dbstatReady;
|
|
try {
|
|
this.db.exec("CREATE VIRTUAL TABLE IF NOT EXISTS temp.dbstat USING dbstat;");
|
|
this.dbstatReady = true;
|
|
} catch {
|
|
this.dbstatReady = false;
|
|
}
|
|
return this.dbstatReady;
|
|
}
|
|
estimateWalBytes() {
|
|
try {
|
|
const row = this.db.query(`SELECT
|
|
COALESCE(SUM(payload_len), 0) as payload,
|
|
COALESCE(SUM(LENGTH(routing_key)), 0) as rk,
|
|
COALESCE(SUM(LENGTH(content_type)), 0) as ct
|
|
FROM wal;`).get();
|
|
return Number(row?.payload ?? 0) + Number(row?.rk ?? 0) + Number(row?.ct ?? 0);
|
|
} catch {
|
|
return 0;
|
|
}
|
|
}
|
|
estimateMetaBytes() {
|
|
try {
|
|
const streams = this.db.query(`SELECT
|
|
COALESCE(SUM(LENGTH(stream)), 0) as stream,
|
|
COALESCE(SUM(LENGTH(content_type)), 0) as content_type,
|
|
COALESCE(SUM(LENGTH(stream_seq)), 0) as stream_seq,
|
|
COALESCE(SUM(LENGTH(closed_producer_id)), 0) as closed_producer_id
|
|
FROM streams;`).get();
|
|
const segments = this.db.query(`SELECT
|
|
COALESCE(SUM(LENGTH(segment_id)), 0) as segment_id,
|
|
COALESCE(SUM(LENGTH(stream)), 0) as stream,
|
|
COALESCE(SUM(LENGTH(local_path)), 0) as local_path,
|
|
COALESCE(SUM(LENGTH(r2_etag)), 0) as r2_etag
|
|
FROM segments;`).get();
|
|
const manifests = this.db.query(`SELECT
|
|
COALESCE(SUM(LENGTH(stream)), 0) as stream,
|
|
COALESCE(SUM(LENGTH(last_uploaded_etag)), 0) as last_uploaded_etag
|
|
FROM manifests;`).get();
|
|
const schemas = this.db.query(`SELECT COALESCE(SUM(LENGTH(schema_json)), 0) as schema_json FROM schemas;`).get();
|
|
const producers = this.db.query(`SELECT
|
|
COALESCE(SUM(LENGTH(stream)), 0) as stream,
|
|
COALESCE(SUM(LENGTH(producer_id)), 0) as producer_id
|
|
FROM producer_state;`).get();
|
|
const total = Number(streams?.stream ?? 0) + Number(streams?.content_type ?? 0) + Number(streams?.stream_seq ?? 0) + Number(streams?.closed_producer_id ?? 0) + Number(segments?.segment_id ?? 0) + Number(segments?.stream ?? 0) + Number(segments?.local_path ?? 0) + Number(segments?.r2_etag ?? 0) + Number(manifests?.stream ?? 0) + Number(manifests?.last_uploaded_etag ?? 0) + Number(schemas?.schema_json ?? 0) + Number(producers?.stream ?? 0) + Number(producers?.producer_id ?? 0);
|
|
return total;
|
|
} catch {
|
|
return 0;
|
|
}
|
|
}
|
|
getWalDbSizeBytes() {
|
|
if (this.ensureDbStat()) {
|
|
try {
|
|
const row = this.db.query(`SELECT COALESCE(SUM(pgsize), 0) as total FROM temp.dbstat WHERE name = 'wal';`).get();
|
|
return Number(row?.total ?? 0);
|
|
} catch {}
|
|
}
|
|
return this.estimateWalBytes();
|
|
}
|
|
getMetaDbSizeBytes() {
|
|
if (this.ensureDbStat()) {
|
|
try {
|
|
const row = this.db.query(`SELECT COALESCE(SUM(pgsize), 0) as total FROM temp.dbstat WHERE name != 'wal';`).get();
|
|
return Number(row?.total ?? 0);
|
|
} catch {}
|
|
}
|
|
return this.estimateMetaBytes();
|
|
}
|
|
appendWalRows(args) {
|
|
const { stream, startOffset, expectedOffset, rows } = args;
|
|
if (rows.length === 0)
|
|
return Result.err({ kind: "no_rows" });
|
|
const tx = this.db.transaction(() => {
|
|
const st = this.getStream(stream);
|
|
if (!st || this.isDeleted(st))
|
|
return Result.err({ kind: "stream_missing" });
|
|
if (st.expires_at_ms != null && this.nowMs() > st.expires_at_ms)
|
|
return Result.err({ kind: "stream_expired" });
|
|
if (expectedOffset !== undefined && st.next_offset !== expectedOffset) {
|
|
return Result.err({ kind: "seq_mismatch", expectedNext: st.next_offset });
|
|
}
|
|
let totalBytes = 0n;
|
|
let offset = startOffset;
|
|
for (const r of rows) {
|
|
const payloadLen = r.payload.byteLength;
|
|
totalBytes += BigInt(payloadLen);
|
|
this.stmts.insertWal.run(stream, offset, r.appendMs, r.payload, payloadLen, r.routingKey, r.contentType, 0);
|
|
offset += 1n;
|
|
}
|
|
const lastOffset = offset - 1n;
|
|
const newNextOffset = lastOffset + 1n;
|
|
const now = this.nowMs();
|
|
const pendingRows = BigInt(rows.length);
|
|
const lastAppend = rows[rows.length - 1].appendMs;
|
|
this.stmts.updateStreamAppend.run(newNextOffset, now, lastAppend, pendingRows, totalBytes, pendingRows, totalBytes, stream, STREAM_FLAG_DELETED);
|
|
return Result.ok({ lastOffset });
|
|
});
|
|
return tx();
|
|
}
|
|
*iterWalRange(stream, startOffset, endOffset, routingKey) {
|
|
const start = this.bindInt(startOffset);
|
|
const end = this.bindInt(endOffset);
|
|
const stmt = routingKey ? this.db.query(`SELECT offset, ts_ms, routing_key, content_type, payload
|
|
FROM wal
|
|
WHERE stream = ? AND offset >= ? AND offset <= ? AND routing_key = ?
|
|
ORDER BY offset ASC;`) : this.db.query(`SELECT offset, ts_ms, routing_key, content_type, payload
|
|
FROM wal
|
|
WHERE stream = ? AND offset >= ? AND offset <= ?
|
|
ORDER BY offset ASC;`);
|
|
try {
|
|
const it = routingKey ? stmt.iterate(stream, start, end, routingKey) : stmt.iterate(stream, start, end);
|
|
for (const row of it) {
|
|
yield row;
|
|
}
|
|
} finally {
|
|
try {
|
|
stmt.finalize?.();
|
|
} catch {}
|
|
}
|
|
}
|
|
nextSegmentIndexForStream(stream) {
|
|
const row = this.stmts.nextSegmentIndex.get(stream);
|
|
return Number(row?.next_idx ?? 0);
|
|
}
|
|
createSegmentRow(row) {
|
|
this.stmts.createSegment.run(row.segmentId, row.stream, row.segmentIndex, row.startOffset, row.endOffset, row.blockCount, row.lastAppendMs, row.sizeBytes, row.localPath, this.nowMs());
|
|
}
|
|
commitSealedSegment(row) {
|
|
const tx = this.db.transaction(() => {
|
|
this.createSegmentRow(row);
|
|
this.appendSegmentMeta(row.stream, row.endOffset + 1n, row.blockCount, row.lastAppendMs * 1000000n);
|
|
this.setStreamSealedThrough(row.stream, row.endOffset, row.payloadBytes, row.rowsSealed);
|
|
});
|
|
tx();
|
|
}
|
|
listSegmentsForStream(stream) {
|
|
const rows = this.stmts.listSegmentsForStream.all(stream);
|
|
return rows.map((r) => this.coerceSegmentRow(r));
|
|
}
|
|
getSegmentByIndex(stream, segmentIndex) {
|
|
const row = this.stmts.getSegmentByIndex.get(stream, segmentIndex);
|
|
return row ? this.coerceSegmentRow(row) : null;
|
|
}
|
|
findSegmentForOffset(stream, offset) {
|
|
const bound = this.bindInt(offset);
|
|
const row = this.stmts.findSegmentForOffset.get(stream, bound, bound);
|
|
return row ? this.coerceSegmentRow(row) : null;
|
|
}
|
|
pendingUploadSegments(limit) {
|
|
const rows = this.stmts.pendingUploadSegments.all(limit);
|
|
return rows.map((r) => this.coerceSegmentRow(r));
|
|
}
|
|
countPendingSegments() {
|
|
const row = this.stmts.countPendingSegments.get();
|
|
return row ? Number(row.cnt) : 0;
|
|
}
|
|
countSegmentsForStream(stream) {
|
|
const row = this.stmts.countSegmentsForStream.get(stream);
|
|
return row ? Number(row.cnt) : 0;
|
|
}
|
|
getSegmentMeta(stream) {
|
|
const row = this.stmts.getSegmentMeta.get(stream);
|
|
if (!row)
|
|
return null;
|
|
const offsets = row.segment_offsets instanceof Uint8Array ? row.segment_offsets : new Uint8Array(row.segment_offsets);
|
|
const blocks = row.segment_blocks instanceof Uint8Array ? row.segment_blocks : new Uint8Array(row.segment_blocks);
|
|
const lastTs = row.segment_last_ts instanceof Uint8Array ? row.segment_last_ts : new Uint8Array(row.segment_last_ts);
|
|
return {
|
|
stream: String(row.stream),
|
|
segment_count: Number(row.segment_count),
|
|
segment_offsets: offsets,
|
|
segment_blocks: blocks,
|
|
segment_last_ts: lastTs
|
|
};
|
|
}
|
|
ensureSegmentMeta(stream) {
|
|
this.stmts.ensureSegmentMeta.run(stream);
|
|
}
|
|
appendSegmentMeta(stream, offsetPlusOne, blockCount, lastAppendNs) {
|
|
this.ensureSegmentMeta(stream);
|
|
const offsetBytes = this.encodeU64Le(offsetPlusOne);
|
|
const blockBytes = this.encodeU32Le(blockCount);
|
|
const tsBytes = this.encodeU64Le(lastAppendNs);
|
|
this.stmts.appendSegmentMeta.run(offsetBytes, blockBytes, tsBytes, stream);
|
|
}
|
|
upsertSegmentMeta(stream, count, offsets, blocks, lastTs) {
|
|
this.stmts.upsertSegmentMeta.run(stream, count, offsets, blocks, lastTs);
|
|
}
|
|
rebuildSegmentMeta(stream) {
|
|
const rows = this.db.query(`SELECT end_offset, block_count, last_append_ms
|
|
FROM segments WHERE stream=? ORDER BY segment_index ASC;`).all(stream);
|
|
const count = rows.length;
|
|
const offsets = new Uint8Array(count * 8);
|
|
const blocks = new Uint8Array(count * 4);
|
|
const lastTs = new Uint8Array(count * 8);
|
|
const dvOffsets = new DataView(offsets.buffer, offsets.byteOffset, offsets.byteLength);
|
|
const dvBlocks = new DataView(blocks.buffer, blocks.byteOffset, blocks.byteLength);
|
|
const dvLastTs = new DataView(lastTs.buffer, lastTs.byteOffset, lastTs.byteLength);
|
|
for (let i = 0;i < rows.length; i++) {
|
|
const endOffset = this.toBigInt(rows[i].end_offset);
|
|
const blockCount = Number(rows[i].block_count);
|
|
const lastAppendMs = this.toBigInt(rows[i].last_append_ms);
|
|
dvOffsets.setBigUint64(i * 8, endOffset + 1n, true);
|
|
dvBlocks.setUint32(i * 4, blockCount >>> 0, true);
|
|
dvLastTs.setBigUint64(i * 8, lastAppendMs * 1000000n, true);
|
|
}
|
|
this.upsertSegmentMeta(stream, count, offsets, blocks, lastTs);
|
|
return { stream, segment_count: count, segment_offsets: offsets, segment_blocks: blocks, segment_last_ts: lastTs };
|
|
}
|
|
setUploadedSegmentCount(stream, count) {
|
|
this.stmts.setUploadedSegmentCount.run(count, this.nowMs(), stream);
|
|
}
|
|
advanceUploadedSegmentCount(stream) {
|
|
const row = this.getStream(stream);
|
|
if (!row)
|
|
return 0;
|
|
let count = row.uploaded_segment_count ?? 0;
|
|
for (;; ) {
|
|
const seg = this.getSegmentByIndex(stream, count);
|
|
if (!seg || !seg.r2_etag)
|
|
break;
|
|
count += 1;
|
|
}
|
|
if (count !== row.uploaded_segment_count) {
|
|
this.stmts.setUploadedSegmentCount.run(count, this.nowMs(), stream);
|
|
}
|
|
return count;
|
|
}
|
|
markSegmentUploaded(segmentId, etag, uploadedAtMs) {
|
|
this.stmts.markSegmentUploaded.run(etag, uploadedAtMs, segmentId);
|
|
}
|
|
setStreamSealedThrough(stream, sealedThrough, bytesSealed, rowsSealed) {
|
|
const now = this.nowMs();
|
|
this.db.query(`UPDATE streams
|
|
SET sealed_through = ?,
|
|
pending_bytes = CASE WHEN pending_bytes >= ? THEN pending_bytes - ? ELSE 0 END,
|
|
pending_rows = CASE WHEN pending_rows >= ? THEN pending_rows - ? ELSE 0 END,
|
|
last_segment_cut_ms = ?,
|
|
updated_at_ms = ?
|
|
WHERE stream = ?;`).run(sealedThrough, bytesSealed, bytesSealed, rowsSealed, rowsSealed, now, now, stream);
|
|
}
|
|
setSegmentInProgress(stream, inProgress) {
|
|
this.db.query(`UPDATE streams SET segment_in_progress=?, updated_at_ms=? WHERE stream=?;`).run(inProgress, this.nowMs(), stream);
|
|
}
|
|
tryClaimSegment(stream) {
|
|
const res = this.stmts.tryClaimSegment.run(this.nowMs(), stream);
|
|
const changes = typeof res?.changes === "bigint" ? res.changes : BigInt(Number(res?.changes ?? 0));
|
|
return changes > 0n;
|
|
}
|
|
resetSegmentInProgress() {
|
|
this.db.query(`UPDATE streams SET segment_in_progress=0 WHERE segment_in_progress != 0;`).run();
|
|
}
|
|
advanceUploadedThrough(stream, uploadedThrough) {
|
|
this.stmts.advanceUploadedThrough.run(uploadedThrough, this.nowMs(), stream);
|
|
}
|
|
deleteWalThrough(stream, uploadedThrough) {
|
|
const through = this.bindInt(uploadedThrough);
|
|
const tx = this.db.transaction(() => {
|
|
const stats = this.db.query(`SELECT COALESCE(SUM(payload_len), 0) as bytes, COUNT(*) as rows
|
|
FROM wal WHERE stream=? AND offset <= ?;`).get(stream, through);
|
|
const bytes = this.toBigInt(stats?.bytes ?? 0);
|
|
const rows = this.toBigInt(stats?.rows ?? 0);
|
|
if (rows <= 0n)
|
|
return { deletedRows: 0, deletedBytes: 0 };
|
|
this.stmts.deleteWalBeforeOffset.run(stream, through);
|
|
const now = this.nowMs();
|
|
this.db.query(`UPDATE streams
|
|
SET wal_bytes = CASE WHEN wal_bytes >= ? THEN wal_bytes - ? ELSE 0 END,
|
|
wal_rows = CASE WHEN wal_rows >= ? THEN wal_rows - ? ELSE 0 END,
|
|
updated_at_ms = ?
|
|
WHERE stream = ?;`).run(bytes, bytes, rows, rows, now, stream);
|
|
const deletedBytes = bytes <= BigInt(Number.MAX_SAFE_INTEGER) ? Number(bytes) : Number.MAX_SAFE_INTEGER;
|
|
const deletedRows = rows <= BigInt(Number.MAX_SAFE_INTEGER) ? Number(rows) : Number.MAX_SAFE_INTEGER;
|
|
return { deletedRows, deletedBytes };
|
|
});
|
|
return tx();
|
|
}
|
|
getManifestRow(stream) {
|
|
const row = this.stmts.getManifest.get(stream);
|
|
if (!row) {
|
|
this.stmts.upsertManifest.run(stream, 0, 0, null, null);
|
|
const fresh = this.stmts.getManifest.get(stream);
|
|
return {
|
|
stream: String(fresh.stream),
|
|
generation: Number(fresh.generation),
|
|
uploaded_generation: Number(fresh.uploaded_generation),
|
|
last_uploaded_at_ms: fresh.last_uploaded_at_ms == null ? null : this.toBigInt(fresh.last_uploaded_at_ms),
|
|
last_uploaded_etag: fresh.last_uploaded_etag == null ? null : String(fresh.last_uploaded_etag)
|
|
};
|
|
}
|
|
return {
|
|
stream: String(row.stream),
|
|
generation: Number(row.generation),
|
|
uploaded_generation: Number(row.uploaded_generation),
|
|
last_uploaded_at_ms: row.last_uploaded_at_ms == null ? null : this.toBigInt(row.last_uploaded_at_ms),
|
|
last_uploaded_etag: row.last_uploaded_etag == null ? null : String(row.last_uploaded_etag)
|
|
};
|
|
}
|
|
upsertManifestRow(stream, generation, uploadedGeneration, uploadedAtMs, etag) {
|
|
this.stmts.upsertManifest.run(stream, generation, uploadedGeneration, uploadedAtMs, etag);
|
|
}
|
|
getIndexState(stream) {
|
|
const row = this.stmts.getIndexState.get(stream);
|
|
if (!row)
|
|
return null;
|
|
return {
|
|
stream: String(row.stream),
|
|
index_secret: row.index_secret instanceof Uint8Array ? row.index_secret : new Uint8Array(row.index_secret),
|
|
indexed_through: Number(row.indexed_through),
|
|
updated_at_ms: this.toBigInt(row.updated_at_ms)
|
|
};
|
|
}
|
|
upsertIndexState(stream, indexSecret, indexedThrough) {
|
|
this.stmts.upsertIndexState.run(stream, indexSecret, indexedThrough, this.nowMs());
|
|
}
|
|
updateIndexedThrough(stream, indexedThrough) {
|
|
this.stmts.updateIndexedThrough.run(indexedThrough, this.nowMs(), stream);
|
|
}
|
|
listIndexRuns(stream) {
|
|
const rows = this.stmts.listIndexRuns.all(stream);
|
|
return rows.map((r) => ({
|
|
run_id: String(r.run_id),
|
|
stream: String(r.stream),
|
|
level: Number(r.level),
|
|
start_segment: Number(r.start_segment),
|
|
end_segment: Number(r.end_segment),
|
|
object_key: String(r.object_key),
|
|
filter_len: Number(r.filter_len),
|
|
record_count: Number(r.record_count),
|
|
retired_gen: r.retired_gen == null ? null : Number(r.retired_gen),
|
|
retired_at_ms: r.retired_at_ms == null ? null : this.toBigInt(r.retired_at_ms)
|
|
}));
|
|
}
|
|
listIndexRunsAll(stream) {
|
|
const rows = this.stmts.listIndexRunsAll.all(stream);
|
|
return rows.map((r) => ({
|
|
run_id: String(r.run_id),
|
|
stream: String(r.stream),
|
|
level: Number(r.level),
|
|
start_segment: Number(r.start_segment),
|
|
end_segment: Number(r.end_segment),
|
|
object_key: String(r.object_key),
|
|
filter_len: Number(r.filter_len),
|
|
record_count: Number(r.record_count),
|
|
retired_gen: r.retired_gen == null ? null : Number(r.retired_gen),
|
|
retired_at_ms: r.retired_at_ms == null ? null : this.toBigInt(r.retired_at_ms)
|
|
}));
|
|
}
|
|
listRetiredIndexRuns(stream) {
|
|
const rows = this.stmts.listRetiredIndexRuns.all(stream);
|
|
return rows.map((r) => ({
|
|
run_id: String(r.run_id),
|
|
stream: String(r.stream),
|
|
level: Number(r.level),
|
|
start_segment: Number(r.start_segment),
|
|
end_segment: Number(r.end_segment),
|
|
object_key: String(r.object_key),
|
|
filter_len: Number(r.filter_len),
|
|
record_count: Number(r.record_count),
|
|
retired_gen: r.retired_gen == null ? null : Number(r.retired_gen),
|
|
retired_at_ms: r.retired_at_ms == null ? null : this.toBigInt(r.retired_at_ms)
|
|
}));
|
|
}
|
|
insertIndexRun(row) {
|
|
this.stmts.insertIndexRun.run(row.run_id, row.stream, row.level, row.start_segment, row.end_segment, row.object_key, row.filter_len, row.record_count);
|
|
}
|
|
retireIndexRuns(runIds, retiredGen, retiredAtMs) {
|
|
if (runIds.length === 0)
|
|
return;
|
|
const tx = this.db.transaction(() => {
|
|
for (const runId of runIds) {
|
|
this.stmts.retireIndexRun.run(retiredGen, retiredAtMs, runId);
|
|
}
|
|
});
|
|
tx();
|
|
}
|
|
deleteIndexRuns(runIds) {
|
|
if (runIds.length === 0)
|
|
return;
|
|
const tx = this.db.transaction(() => {
|
|
for (const runId of runIds) {
|
|
this.stmts.deleteIndexRun.run(runId);
|
|
}
|
|
});
|
|
tx();
|
|
}
|
|
countUploadedSegments(stream) {
|
|
const row = this.stmts.countUploadedSegments.get(stream);
|
|
const maxIdx = row ? Number(row.max_idx) : -1;
|
|
return maxIdx >= 0 ? maxIdx + 1 : 0;
|
|
}
|
|
commitManifest(stream, generation, etag, uploadedAtMs, uploadedThrough) {
|
|
const tx = this.db.transaction(() => {
|
|
this.stmts.upsertManifest.run(stream, generation, generation, uploadedAtMs, etag);
|
|
this.stmts.advanceUploadedThrough.run(uploadedThrough, this.nowMs(), stream);
|
|
let gcThrough = uploadedThrough;
|
|
const interp = this.stmts.getStreamInterpreter.get(stream);
|
|
if (interp) {
|
|
const interpretedThrough = this.toBigInt(interp.interpreted_through);
|
|
gcThrough = interpretedThrough < gcThrough ? interpretedThrough : gcThrough;
|
|
}
|
|
if (gcThrough < 0n)
|
|
return;
|
|
let deleteThrough = gcThrough;
|
|
if (BASE_WAL_GC_CHUNK_OFFSETS > 0) {
|
|
const oldest = this.getWalOldestOffset(stream);
|
|
if (oldest != null) {
|
|
const maxThrough = oldest + BigInt(BASE_WAL_GC_CHUNK_OFFSETS) - 1n;
|
|
if (deleteThrough > maxThrough)
|
|
deleteThrough = maxThrough;
|
|
}
|
|
}
|
|
if (deleteThrough < 0n)
|
|
return;
|
|
const bound = this.bindInt(deleteThrough);
|
|
const stats = this.db.query(`SELECT COALESCE(SUM(payload_len), 0) as bytes, COUNT(*) as rows
|
|
FROM wal WHERE stream=? AND offset <= ?;`).get(stream, bound);
|
|
const bytes = this.toBigInt(stats?.bytes ?? 0);
|
|
const rows = this.toBigInt(stats?.rows ?? 0);
|
|
if (rows <= 0n)
|
|
return;
|
|
this.stmts.deleteWalBeforeOffset.run(stream, bound);
|
|
const now = this.nowMs();
|
|
this.db.query(`UPDATE streams
|
|
SET wal_bytes = CASE WHEN wal_bytes >= ? THEN wal_bytes - ? ELSE 0 END,
|
|
wal_rows = CASE WHEN wal_rows >= ? THEN wal_rows - ? ELSE 0 END,
|
|
updated_at_ms = ?
|
|
WHERE stream = ?;`).run(bytes, bytes, rows, rows, now, stream);
|
|
});
|
|
tx();
|
|
}
|
|
candidates(minPendingBytes, minPendingRows, maxIntervalMs, limit) {
|
|
if (maxIntervalMs <= 0n) {
|
|
return this.stmts.candidateStreamsNoInterval.all(STREAM_FLAG_DELETED | STREAM_FLAG_TOUCH, minPendingBytes, minPendingRows, limit);
|
|
}
|
|
const now = this.nowMs();
|
|
return this.stmts.candidateStreams.all(STREAM_FLAG_DELETED | STREAM_FLAG_TOUCH, minPendingBytes, minPendingRows, now, maxIntervalMs, limit);
|
|
}
|
|
}
|
|
|
|
// src/util/log.ts
|
|
var patched = false;
|
|
function wrapConsole(orig, level) {
|
|
return (...args) => {
|
|
const prefix = `[${new Date().toISOString()}] [${level}]`;
|
|
if (args.length === 0)
|
|
return orig(prefix);
|
|
return orig(prefix, ...args);
|
|
};
|
|
}
|
|
function initConsoleLogging() {
|
|
if (patched)
|
|
return;
|
|
patched = true;
|
|
const globalAny = globalThis;
|
|
if (globalAny.__ds_console_patched)
|
|
return;
|
|
globalAny.__ds_console_patched = true;
|
|
console.log = wrapConsole(console.log.bind(console), "INFO");
|
|
console.info = wrapConsole(console.info.bind(console), "INFO");
|
|
console.warn = wrapConsole(console.warn.bind(console), "WARN");
|
|
console.error = wrapConsole(console.error.bind(console), "ERROR");
|
|
if (console.debug)
|
|
console.debug = wrapConsole(console.debug.bind(console), "DEBUG");
|
|
}
|
|
|
|
// src/touch/engine.ts
|
|
function interpretRecordToChanges(record, _cfg) {
|
|
return interpretStateProtocolRecord(record);
|
|
}
|
|
function interpretStateProtocolRecord(record) {
|
|
if (!record || typeof record !== "object" || Array.isArray(record))
|
|
return [];
|
|
const headers = record.headers;
|
|
if (!headers || typeof headers !== "object" || Array.isArray(headers))
|
|
return [];
|
|
if (typeof headers.control === "string")
|
|
return [];
|
|
const opRaw = headers.operation;
|
|
if (typeof opRaw !== "string")
|
|
return [];
|
|
const op = opRaw;
|
|
if (op !== "insert" && op !== "update" && op !== "delete")
|
|
return [];
|
|
const type = record.type;
|
|
const key = record.key;
|
|
if (typeof type !== "string" || type.trim() === "")
|
|
return [];
|
|
if (typeof key !== "string" || key.trim() === "")
|
|
return [];
|
|
const before = Object.prototype.hasOwnProperty.call(record, "oldValue") ? record.oldValue : Object.prototype.hasOwnProperty.call(record, "old_value") ? record.old_value : undefined;
|
|
const after = Object.prototype.hasOwnProperty.call(record, "value") ? record.value : undefined;
|
|
return [{ entity: type, key, op, before, after }];
|
|
}
|
|
|
|
// src/runtime/hash.ts
|
|
import { Result as Result2 } from "better-result";
|
|
import { createRequire as createRequire2 } from "node:module";
|
|
import { fileURLToPath } from "node:url";
|
|
var xxh3Hasher = null;
|
|
var xxh64Hasher = null;
|
|
var xxh32Hasher = null;
|
|
var isBunRuntime = typeof globalThis.Bun !== "undefined";
|
|
var require3 = createRequire2(import.meta.url);
|
|
function loadVendoredModule(name) {
|
|
const path = fileURLToPath(new URL(`./hash_vendor/${name}`, import.meta.url));
|
|
return require3(path);
|
|
}
|
|
if (!isBunRuntime) {
|
|
const xxh3Module = loadVendoredModule("xxhash3.umd.min.cjs");
|
|
const xxh64Module = loadVendoredModule("xxhash64.umd.min.cjs");
|
|
const xxh32Module = loadVendoredModule("xxhash32.umd.min.cjs");
|
|
xxh3Hasher = await xxh3Module.createXXHash3();
|
|
xxh64Hasher = await xxh64Module.createXXHash64();
|
|
xxh32Hasher = await xxh32Module.createXXHash32();
|
|
}
|
|
function toBigIntDigest(value) {
|
|
if (typeof value === "bigint")
|
|
return value;
|
|
if (typeof value === "number")
|
|
return BigInt(value >>> 0);
|
|
const hex = value.startsWith("0x") ? value.slice(2) : value;
|
|
if (hex.length === 0)
|
|
return 0n;
|
|
return BigInt(`0x${hex}`);
|
|
}
|
|
function bunHash64(input, fn) {
|
|
return fn(input);
|
|
}
|
|
function nodeHash64Result(input, hasher, label) {
|
|
if (!hasher)
|
|
return Result2.err({ kind: "hasher_not_initialized", message: `${label} hasher not initialized` });
|
|
hasher.init();
|
|
hasher.update(input);
|
|
const digest = hasher.digest("hex");
|
|
return Result2.ok(toBigIntDigest(digest));
|
|
}
|
|
function xxh3BigIntResult(input) {
|
|
if (isBunRuntime)
|
|
return Result2.ok(bunHash64(input, (x) => Bun.hash.xxHash3(x)));
|
|
return nodeHash64Result(input, xxh3Hasher, "xxh3");
|
|
}
|
|
function xxh3BigInt(input) {
|
|
const res = xxh3BigIntResult(input);
|
|
if (Result2.isError(res))
|
|
throw dsError(res.error.message);
|
|
return res.value;
|
|
}
|
|
|
|
// src/touch/live_keys.ts
|
|
function utf8(s) {
|
|
return new TextEncoder().encode(s);
|
|
}
|
|
function encodeU64Be(v) {
|
|
const out = new Uint8Array(8);
|
|
let x = v;
|
|
for (let i = 7;i >= 0; i--) {
|
|
out[i] = Number(x & 0xffn);
|
|
x >>= 8n;
|
|
}
|
|
return out;
|
|
}
|
|
function xxh3Low32(bytes) {
|
|
const h = xxh3BigInt(bytes);
|
|
return Number(h & 0xffffffffn) >>> 0;
|
|
}
|
|
function tableKeyIdFor(entity) {
|
|
return xxh3Low32(concat([utf8("tbl\x00"), utf8(entity)]));
|
|
}
|
|
function templateKeyIdFor(templateIdHex16) {
|
|
const tplBytes = encodeU64Be(BigInt(`0x${templateIdHex16}`));
|
|
return xxh3Low32(concat([utf8("tpl\x00"), tplBytes]));
|
|
}
|
|
function watchKeyIdFor(templateIdHex16, encodedArgs) {
|
|
const tplBytes = encodeU64Be(BigInt(`0x${templateIdHex16}`));
|
|
const parts = [utf8("key\x00"), tplBytes];
|
|
for (const a of encodedArgs) {
|
|
parts.push(utf8("\x00"));
|
|
parts.push(utf8(a));
|
|
}
|
|
return xxh3Low32(concat(parts));
|
|
}
|
|
function encodeTemplateArg(value, encoding) {
|
|
if (value === null || value === undefined)
|
|
return null;
|
|
switch (encoding) {
|
|
case "string": {
|
|
if (typeof value === "string")
|
|
return value;
|
|
if (typeof value === "number" && Number.isFinite(value))
|
|
return String(value);
|
|
if (typeof value === "boolean")
|
|
return value ? "true" : "false";
|
|
return null;
|
|
}
|
|
case "int64": {
|
|
if (typeof value === "bigint")
|
|
return value.toString();
|
|
if (typeof value === "number" && Number.isFinite(value) && Number.isInteger(value))
|
|
return String(value);
|
|
if (typeof value === "string" && /^-?(0|[1-9][0-9]*)$/.test(value.trim()))
|
|
return value.trim();
|
|
return null;
|
|
}
|
|
case "bool": {
|
|
if (typeof value !== "boolean")
|
|
return null;
|
|
return value ? "1" : "0";
|
|
}
|
|
case "datetime": {
|
|
if (typeof value !== "string")
|
|
return null;
|
|
const d = new Date(value);
|
|
if (!Number.isFinite(d.getTime()))
|
|
return null;
|
|
return d.toISOString();
|
|
}
|
|
case "bytes": {
|
|
if (typeof value !== "string")
|
|
return null;
|
|
return value;
|
|
}
|
|
}
|
|
}
|
|
function concat(parts) {
|
|
let total = 0;
|
|
for (const p of parts)
|
|
total += p.byteLength;
|
|
const out = new Uint8Array(total);
|
|
let off = 0;
|
|
for (const p of parts) {
|
|
out.set(p, off);
|
|
off += p.byteLength;
|
|
}
|
|
return out;
|
|
}
|
|
|
|
// src/touch/spec.ts
|
|
import { Result as Result3 } from "better-result";
|
|
function isTouchEnabled(cfg) {
|
|
return !!cfg?.touch?.enabled;
|
|
}
|
|
|
|
// src/touch/interpreter_worker.ts
|
|
initConsoleLogging();
|
|
var data = workerData;
|
|
var cfg = data.config;
|
|
setSqliteRuntimeOverride(data.hostRuntime ?? null);
|
|
var db = new SqliteDurableStore(cfg.dbPath, { cacheBytes: cfg.sqliteCacheBytes, skipMigrations: true });
|
|
var decoder = new TextDecoder;
|
|
async function handleProcess(msg) {
|
|
const { stream, fromOffset, toOffset, interpreter, maxRows, maxBytes } = msg;
|
|
const failProcess = (message) => {
|
|
const err = Result4.err({ kind: "missing_old_value", message });
|
|
parentPort?.postMessage({
|
|
type: "error",
|
|
id: msg.id,
|
|
stream,
|
|
message: err.error.message
|
|
});
|
|
};
|
|
if (!isTouchEnabled(interpreter)) {
|
|
parentPort?.postMessage({
|
|
type: "error",
|
|
id: msg.id,
|
|
stream,
|
|
message: "touch not enabled for interpreter"
|
|
});
|
|
return;
|
|
}
|
|
const touch = interpreter.touch;
|
|
const fineBudgetRaw = msg.fineTouchBudget ?? touch.fineTouchBudgetPerBatch;
|
|
const fineBudget = fineBudgetRaw == null ? null : Math.max(0, Math.floor(fineBudgetRaw));
|
|
const fineGranularity = msg.fineGranularity === "template" ? "template" : "key";
|
|
const interpretMode = msg.interpretMode === "hotTemplatesOnly" ? "hotTemplatesOnly" : "full";
|
|
const hotTemplatesOnly = fineGranularity === "template" && interpretMode === "hotTemplatesOnly";
|
|
const emitFineTouches = msg.emitFineTouches !== false && fineBudget !== 0;
|
|
let fineBudgetExhausted = fineBudget != null && fineBudget <= 0;
|
|
let fineKeysBudgetRemaining = fineBudget;
|
|
let fineTouchesSuppressedDueToBudget = false;
|
|
const filterHotTemplates = msg.filterHotTemplates === true;
|
|
const hotTemplateIdsRaw = filterHotTemplates ? msg.hotTemplateIds ?? [] : [];
|
|
const hotTemplateIds = filterHotTemplates ? new Set(hotTemplateIdsRaw.filter((x) => typeof x === "string" && /^[0-9a-f]{16}$/.test(x))) : null;
|
|
const coarseIntervalMs = Math.max(1, Math.floor(touch.coarseIntervalMs ?? 100));
|
|
const coalesceWindowMs = Math.max(1, Math.floor(touch.touchCoalesceWindowMs ?? 100));
|
|
const onMissingBefore = touch.onMissingBefore ?? "coarse";
|
|
const templatesByEntity = new Map;
|
|
const coldTemplateCountByEntity = new Map;
|
|
if (emitFineTouches) {
|
|
try {
|
|
const rows = db.db.query(`SELECT template_id, entity, fields_json, encodings_json, active_from_source_offset
|
|
FROM live_templates
|
|
WHERE stream=? AND state='active';`).all(stream);
|
|
for (const row of rows) {
|
|
const templateId = String(row.template_id ?? "");
|
|
if (!/^[0-9a-f]{16}$/.test(templateId))
|
|
continue;
|
|
const entity = String(row.entity ?? "");
|
|
if (entity.trim() === "")
|
|
continue;
|
|
let fields;
|
|
let encodings;
|
|
try {
|
|
fields = JSON.parse(String(row.fields_json ?? "[]"));
|
|
encodings = JSON.parse(String(row.encodings_json ?? "[]"));
|
|
} catch {
|
|
continue;
|
|
}
|
|
if (!Array.isArray(fields) || !Array.isArray(encodings) || fields.length !== encodings.length)
|
|
continue;
|
|
const f = fields.map(String);
|
|
const e = encodings.map(String);
|
|
if (f.length === 0 || f.length > 3)
|
|
continue;
|
|
if (!e.every((x) => x === "string" || x === "int64" || x === "bool" || x === "datetime" || x === "bytes"))
|
|
continue;
|
|
if (hotTemplateIds && !hotTemplateIds.has(templateId)) {
|
|
coldTemplateCountByEntity.set(entity, (coldTemplateCountByEntity.get(entity) ?? 0) + 1);
|
|
continue;
|
|
}
|
|
const activeFromSourceOffset = typeof row.active_from_source_offset === "bigint" ? row.active_from_source_offset : BigInt(row.active_from_source_offset ?? 0);
|
|
const tpl = { templateId, entity, fields: f, encodings: e, activeFromSourceOffset };
|
|
const arr = templatesByEntity.get(entity) ?? [];
|
|
arr.push(tpl);
|
|
templatesByEntity.set(entity, arr);
|
|
}
|
|
} catch {}
|
|
}
|
|
let rowsRead = 0;
|
|
let bytesRead = 0;
|
|
let changes = 0;
|
|
let maxSourceTsMs = 0;
|
|
let processedThrough = fromOffset - 1n;
|
|
const pending = new Map;
|
|
const templateOnlyEntityTouch = new Map;
|
|
const touches = [];
|
|
let fineTouchesDroppedDueToBudget = 0;
|
|
let fineTouchesSkippedColdTemplate = 0;
|
|
const flush = (_mapKey, p) => {
|
|
touches.push({ keyId: p.keyId >>> 0, watermark: p.watermark, entity: p.entity, kind: p.kind, templateId: p.templateId });
|
|
};
|
|
const queueTouch = (args) => {
|
|
const mapKey = `i:${args.keyId >>> 0}`;
|
|
const prev = pending.get(mapKey);
|
|
if (args.kind !== "table" && fineBudget != null && !fineBudgetExhausted && !prev) {
|
|
const remaining = fineKeysBudgetRemaining ?? 0;
|
|
if (remaining <= 0) {
|
|
fineBudgetExhausted = true;
|
|
fineTouchesSuppressedDueToBudget = true;
|
|
fineTouchesDroppedDueToBudget += 1;
|
|
return;
|
|
}
|
|
fineKeysBudgetRemaining = remaining - 1;
|
|
} else if (args.kind !== "table" && fineBudget != null && !prev && fineBudgetExhausted) {
|
|
fineTouchesSuppressedDueToBudget = true;
|
|
fineTouchesDroppedDueToBudget += 1;
|
|
return;
|
|
}
|
|
if (!prev) {
|
|
pending.set(mapKey, {
|
|
keyId: args.keyId >>> 0,
|
|
windowStartMs: args.tsMs,
|
|
watermark: args.watermark,
|
|
entity: args.entity,
|
|
kind: args.kind,
|
|
templateId: args.templateId
|
|
});
|
|
return;
|
|
}
|
|
if (args.tsMs - prev.windowStartMs < args.windowMs) {
|
|
prev.watermark = args.watermark;
|
|
return;
|
|
}
|
|
flush(mapKey, prev);
|
|
pending.set(mapKey, {
|
|
keyId: args.keyId >>> 0,
|
|
windowStartMs: args.tsMs,
|
|
watermark: args.watermark,
|
|
entity: args.entity,
|
|
kind: args.kind,
|
|
templateId: args.templateId
|
|
});
|
|
};
|
|
for (const row of db.iterWalRange(stream, fromOffset, toOffset)) {
|
|
const payload = row.payload;
|
|
const payloadLen = payload.byteLength;
|
|
if (rowsRead > 0 && (rowsRead >= maxRows || bytesRead + payloadLen > maxBytes))
|
|
break;
|
|
rowsRead++;
|
|
bytesRead += payloadLen;
|
|
const offset = typeof row.offset === "bigint" ? row.offset : BigInt(row.offset);
|
|
processedThrough = offset;
|
|
const tsMsRaw = row.ts_ms;
|
|
const tsMs = typeof tsMsRaw === "bigint" ? Number(tsMsRaw) : Number(tsMsRaw);
|
|
if (!Number.isFinite(tsMs))
|
|
continue;
|
|
if (tsMs > maxSourceTsMs)
|
|
maxSourceTsMs = tsMs;
|
|
let value;
|
|
try {
|
|
value = JSON.parse(decoder.decode(payload));
|
|
} catch {
|
|
continue;
|
|
}
|
|
const canonical = interpretRecordToChanges(value, interpreter);
|
|
changes += canonical.length;
|
|
if (canonical.length === 0)
|
|
continue;
|
|
const watermark = offset.toString();
|
|
for (const ch of canonical) {
|
|
const entity = ch.entity;
|
|
const coarseKeyId = tableKeyIdFor(entity);
|
|
queueTouch({
|
|
keyId: coarseKeyId,
|
|
tsMs,
|
|
watermark,
|
|
entity,
|
|
kind: "table",
|
|
windowMs: coarseIntervalMs
|
|
});
|
|
if (!emitFineTouches)
|
|
continue;
|
|
if (fineBudgetExhausted)
|
|
continue;
|
|
const tpls = templatesByEntity.get(entity);
|
|
if (filterHotTemplates) {
|
|
fineTouchesSkippedColdTemplate += coldTemplateCountByEntity.get(entity) ?? 0;
|
|
}
|
|
if (!tpls || tpls.length === 0)
|
|
continue;
|
|
if (hotTemplatesOnly) {
|
|
const prev = templateOnlyEntityTouch.get(entity);
|
|
if (!prev || offset > prev.offset)
|
|
templateOnlyEntityTouch.set(entity, { offset, tsMs, watermark });
|
|
continue;
|
|
}
|
|
for (const tpl of tpls) {
|
|
if (fineBudgetExhausted)
|
|
break;
|
|
if (offset < tpl.activeFromSourceOffset)
|
|
continue;
|
|
if (fineGranularity === "template") {
|
|
queueTouch({
|
|
keyId: templateKeyIdFor(tpl.templateId) >>> 0,
|
|
tsMs,
|
|
watermark,
|
|
entity,
|
|
kind: "template",
|
|
templateId: tpl.templateId,
|
|
windowMs: coalesceWindowMs
|
|
});
|
|
if (fineBudgetExhausted)
|
|
break;
|
|
continue;
|
|
}
|
|
const afterObj = ch.after;
|
|
const beforeObj = ch.before;
|
|
const watchKeyIds = new Set;
|
|
const compute = (obj) => {
|
|
if (!obj || typeof obj !== "object" || Array.isArray(obj))
|
|
return null;
|
|
const args = [];
|
|
for (let i = 0;i < tpl.fields.length; i++) {
|
|
const name = tpl.fields[i];
|
|
const enc = tpl.encodings[i];
|
|
const v = obj[name];
|
|
const encoded = encodeTemplateArg(v, enc);
|
|
if (encoded == null)
|
|
return null;
|
|
args.push(encoded);
|
|
}
|
|
return watchKeyIdFor(tpl.templateId, args) >>> 0;
|
|
};
|
|
if (ch.op === "insert") {
|
|
const k = compute(afterObj);
|
|
if (k != null)
|
|
watchKeyIds.add(k >>> 0);
|
|
} else if (ch.op === "delete") {
|
|
const k = compute(beforeObj);
|
|
if (k != null)
|
|
watchKeyIds.add(k >>> 0);
|
|
} else {
|
|
const kAfter = compute(afterObj);
|
|
const kBefore = compute(beforeObj);
|
|
if (kBefore != null) {
|
|
watchKeyIds.add(kBefore >>> 0);
|
|
if (kAfter != null)
|
|
watchKeyIds.add(kAfter >>> 0);
|
|
} else {
|
|
if (beforeObj === undefined) {
|
|
if (onMissingBefore === "error") {
|
|
failProcess(`missing oldValue for update (entity=${entity}, templateId=${tpl.templateId})`);
|
|
return;
|
|
}
|
|
} else {
|
|
if (onMissingBefore === "error") {
|
|
failProcess(`oldValue missing required fields for update (entity=${entity}, templateId=${tpl.templateId})`);
|
|
return;
|
|
}
|
|
}
|
|
if (onMissingBefore === "skipBefore") {
|
|
if (kAfter != null)
|
|
watchKeyIds.add(kAfter >>> 0);
|
|
} else {}
|
|
}
|
|
}
|
|
for (const watchKeyId of watchKeyIds) {
|
|
queueTouch({
|
|
keyId: watchKeyId >>> 0,
|
|
tsMs,
|
|
watermark,
|
|
entity,
|
|
kind: "template",
|
|
templateId: tpl.templateId,
|
|
windowMs: coalesceWindowMs
|
|
});
|
|
if (fineBudgetExhausted)
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (emitFineTouches && hotTemplatesOnly && !fineBudgetExhausted && templateOnlyEntityTouch.size > 0) {
|
|
for (const [entity, agg] of templateOnlyEntityTouch.entries()) {
|
|
if (fineBudgetExhausted)
|
|
break;
|
|
const tpls = templatesByEntity.get(entity);
|
|
if (!tpls || tpls.length === 0)
|
|
continue;
|
|
for (const tpl of tpls) {
|
|
if (fineBudgetExhausted)
|
|
break;
|
|
if (agg.offset < tpl.activeFromSourceOffset)
|
|
continue;
|
|
queueTouch({
|
|
keyId: templateKeyIdFor(tpl.templateId) >>> 0,
|
|
tsMs: agg.tsMs,
|
|
watermark: agg.watermark,
|
|
entity,
|
|
kind: "template",
|
|
templateId: tpl.templateId,
|
|
windowMs: coalesceWindowMs
|
|
});
|
|
}
|
|
}
|
|
}
|
|
for (const [key, p] of pending.entries()) {
|
|
flush(key, p);
|
|
}
|
|
touches.sort((a, b) => {
|
|
const ak = a.keyId >>> 0;
|
|
const bk = b.keyId >>> 0;
|
|
if (ak < bk)
|
|
return -1;
|
|
if (ak > bk)
|
|
return 1;
|
|
const aw = BigInt(a.watermark);
|
|
const bw = BigInt(b.watermark);
|
|
if (aw < bw)
|
|
return -1;
|
|
if (aw > bw)
|
|
return 1;
|
|
return 0;
|
|
});
|
|
let tableTouchesEmitted = 0;
|
|
let templateTouchesEmitted = 0;
|
|
for (const t of touches) {
|
|
if (t.kind === "table")
|
|
tableTouchesEmitted++;
|
|
else
|
|
templateTouchesEmitted++;
|
|
}
|
|
parentPort?.postMessage({
|
|
type: "result",
|
|
id: msg.id,
|
|
stream,
|
|
processedThrough,
|
|
touches,
|
|
stats: {
|
|
rowsRead,
|
|
bytesRead,
|
|
changes,
|
|
touchesEmitted: touches.length,
|
|
tableTouchesEmitted,
|
|
templateTouchesEmitted,
|
|
maxSourceTsMs,
|
|
fineTouchesDroppedDueToBudget,
|
|
fineTouchesSuppressedDueToBudget,
|
|
fineTouchesSkippedColdTemplate
|
|
}
|
|
});
|
|
}
|
|
parentPort?.on("message", (msg) => {
|
|
if (!msg || typeof msg !== "object")
|
|
return;
|
|
if (msg.type === "stop") {
|
|
try {
|
|
db.close();
|
|
} catch {}
|
|
try {
|
|
parentPort?.postMessage({ type: "stopped" });
|
|
} catch {}
|
|
return;
|
|
}
|
|
if (msg.type === "process") {
|
|
handleProcess(msg).catch((e) => {
|
|
try {
|
|
parentPort?.postMessage({
|
|
type: "error",
|
|
id: msg.id,
|
|
stream: msg.stream,
|
|
message: String(e?.message ?? e),
|
|
stack: e?.stack ? String(e.stack) : undefined
|
|
});
|
|
} catch {}
|
|
});
|
|
}
|
|
});
|