first commit

This commit is contained in:
Ichitux
2026-04-05 03:08:53 +02:00
commit 1082d36c12
28015 changed files with 3767672 additions and 0 deletions

174
node_modules/@electric-sql/pglite-socket/CHANGELOG.md generated vendored Normal file
View File

@@ -0,0 +1,174 @@
# @electric-sql/pglite-socket
## 0.1.1
### Patch Changes
- Updated dependencies [37fb39e]
- @electric-sql/pglite@0.4.1
## 0.1.0
### Minor Changes
- Updated dependencies [d848955]
- @electric-sql/pglite-postgis@0.0.1
- @electric-sql/pglite@0.4.0
## 0.0.22
### Patch Changes
- Updated dependencies [3dfa40f]
- @electric-sql/pglite@0.3.16
## 0.0.21
### Patch Changes
- 8a03647: Fix: Message buffering, connection handling, and concurrent connection support;
## 0.0.20
### Patch Changes
- 54a4873: allow extensions to be loaded via '-e/--extensions <list>' cmd line parameter'
- 45bff97: added pgcrypto extension
- Updated dependencies [45bff97]
- Updated dependencies [5ec474f]
- @electric-sql/pglite@0.3.15
## 0.0.19
### Patch Changes
- Updated dependencies [8785034]
- Updated dependencies [90cfee8]
- @electric-sql/pglite@0.3.14
## 0.0.18
### Patch Changes
- Updated dependencies [ad3d0d8]
- @electric-sql/pglite@0.3.13
## 0.0.17
### Patch Changes
- Updated dependencies [ce0e74e]
- @electric-sql/pglite@0.3.12
## 0.0.16
### Patch Changes
- Updated dependencies [9a104b9]
- @electric-sql/pglite@0.3.11
## 0.0.15
### Patch Changes
- Updated dependencies [ad765ed]
- @electric-sql/pglite@0.3.10
## 0.0.14
### Patch Changes
- e40ccad: Upgrade emsdk
- Updated dependencies [e40ccad]
- @electric-sql/pglite@0.3.9
## 0.0.13
### Patch Changes
- bd263aa: fix oom; other fixes
- Updated dependencies [f12a582]
- Updated dependencies [bd263aa]
- @electric-sql/pglite@0.3.8
## 0.0.12
### Patch Changes
- Updated dependencies [0936962]
- @electric-sql/pglite@0.3.7
## 0.0.11
### Patch Changes
- Updated dependencies [6898469]
- Updated dependencies [469be18]
- Updated dependencies [64e33c7]
- @electric-sql/pglite@0.3.6
## 0.0.10
### Patch Changes
- Updated dependencies [6653899]
- Updated dependencies [5f007fc]
- @electric-sql/pglite@0.3.5
## 0.0.9
### Patch Changes
- 38a55d0: fix cjs/esm misconfigurations
- Updated dependencies [1fcaa3e]
- Updated dependencies [38a55d0]
- Updated dependencies [aac7003]
- Updated dependencies [8ca254d]
- @electric-sql/pglite@0.3.4
## 0.0.8
### Patch Changes
- Updated dependencies [ea2c7c7]
- @electric-sql/pglite@0.3.3
## 0.0.7
### Patch Changes
- 5a47f4d: better handling of closing the socket
- 6f8dd08: with the `npx pglite-server` command, add the ability to pass a command to run after the server is ready, along with passing a new DATABASE_URL environment variable to the command. This allows for a command like `npx pglite-server -r "npm run dev:inner" --include-database-url` to run a dev server that uses the pglite server as the database.
## 0.0.6
### Patch Changes
- Updated dependencies [e2c654b]
- @electric-sql/pglite@0.3.2
## 0.0.5
### Patch Changes
- f975f77: Updated README
- d9b52d5: allows unix socket connections
## 0.0.4
### Patch Changes
- 027baed: missing shebang
## 0.0.3
### Patch Changes
- 1c2dc84: fix pglite-socket exports
## 0.0.2
### Patch Changes
- Updated dependencies [713364e]
- @electric-sql/pglite@0.3.1

176
node_modules/@electric-sql/pglite-socket/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,176 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS

267
node_modules/@electric-sql/pglite-socket/README.md generated vendored Normal file
View File

@@ -0,0 +1,267 @@
# pglite-socket
A socket implementation for PGlite enabling remote connections. This package is a simple wrapper around the `net` module to allow PGlite to be used as a PostgreSQL server.
There are two main components to this package:
- [`PGLiteSocketServer`](#pglitesocketserver) - A TCP server that allows PostgreSQL clients to connect to a PGlite database instance.
- [`PGLiteSocketHandler`](#pglitesockethandler) - A low-level handler for a single socket connection to PGlite. This class handles the raw protocol communication between a socket and PGlite, and can be used to create a custom server.
The package also includes a [CLI](#cli-usage) for quickly starting a PGlite socket server.
Note: Although PGlite is a single-connection database, it is possible to open and use multiple simultaneous connections with pglite-server. This is achieved through a multiplexer implemented in the server (see the parameter `-m, --max-connections`). This is different from a normal Postgres installation, so not all use cases are guaranteed to work.
## Installation
```bash
npm install @electric-sql/pglite-socket
# or
yarn add @electric-sql/pglite-socket
# or
pnpm add @electric-sql/pglite-socket
```
## Usage
```typescript
import { PGlite } from '@electric-sql/pglite'
import { PGLiteSocketServer } from '@electric-sql/pglite-socket'
// Create a PGlite instance
const db = await PGlite.create()
// Create and start a socket server
const server = new PGLiteSocketServer({
db,
port: 5432,
host: '127.0.0.1',
})
await server.start()
console.log('Server started on 127.0.0.1:5432')
// Handle graceful shutdown
process.on('SIGINT', async () => {
await server.stop()
await db.close()
console.log('Server stopped and database closed')
process.exit(0)
})
```
## API
### PGLiteSocketServer
Creates a TCP server that allows PostgreSQL clients to connect to a PGlite database instance.
#### Options
- `db: PGlite` - The PGlite database instance
- `port?: number` - The port to listen on (default: 5432). Use port 0 to let the OS assign an available port
- `host?: string` - The host to bind to (default: 127.0.0.1)
- `path?: string` - Unix socket path to bind to (takes precedence over host:port)
- `inspect?: boolean` - Print the incoming and outgoing data to the console (default: false)
#### Methods
- `start(): Promise<void>` - Start the socket server
- `stop(): Promise<void>` - Stop the socket server
#### Events
- `listening` - Emitted when the server starts listening
- `connection` - Emitted when a client connects
- `error` - Emitted when an error occurs
- `close` - Emitted when the server is closed
### PGLiteSocketHandler
Low-level handler for a single socket connection to PGlite. This class handles the raw protocol communication between a socket and PGlite.
#### Options
- `db: PGlite` - The PGlite database instance
- `closeOnDetach?: boolean` - Whether to close the socket when detached (default: false)
- `inspect?: boolean` - Print the incoming and outgoing data to the console in hex and ascii (default: false)
#### Methods
- `attach(socket: Socket): Promise<PGLiteSocketHandler>` - Attach a socket to this handler
- `detach(close?: boolean): PGLiteSocketHandler` - Detach the current socket from this handler
- `isAttached: boolean` - Check if a socket is currently attached
#### Events
- `data` - Emitted when data is processed through the handler
- `error` - Emitted when an error occurs
- `close` - Emitted when the socket is closed
#### Example
```typescript
import { PGlite } from '@electric-sql/pglite'
import { PGLiteSocketHandler } from '@electric-sql/pglite-socket'
import { createServer, Socket } from 'net'
// Create a PGlite instance
const db = await PGlite.create()
// Create a handler
const handler = new PGLiteSocketHandler({
db,
closeOnDetach: true,
inspect: false,
})
// Create a server that uses the handler
const server = createServer(async (socket: Socket) => {
try {
await handler.attach(socket)
console.log('Client connected')
} catch (err) {
console.error('Error attaching socket', err)
socket.end()
}
})
server.listen(5432, '127.0.0.1')
```
## Examples
See the [examples directory](./examples) for more usage examples.
## CLI Usage
This package provides a command-line interface for quickly starting a PGlite socket server.
```bash
# Install globally
npm install -g @electric-sql/pglite-socket
# Start a server with default settings (in-memory database, port 5432)
pglite-server
# Start a server with custom options
pglite-server --db=/path/to/database --port=5433 --host=0.0.0.0 --debug=1
# Using short options
pglite-server -d /path/to/database -p 5433 -h 0.0.0.0 -v 1
# Show help
pglite-server --help
```
### CLI Options
- `-d, --db=PATH` - Database path (default: memory://)
- `-p, --port=PORT` - Port to listen on (default: 5432). Use 0 to let the OS assign an available port
- `-h, --host=HOST` - Host to bind to (default: 127.0.0.1)
- `-u, --path=UNIX` - Unix socket to bind to (takes precedence over host:port)
- `-v, --debug=LEVEL` - Debug level 0-5 (default: 0)
- `-e, --extensions=LIST` - Comma-separated list of extensions to load (e.g., vector,pgcrypto)
- `-r, --run=COMMAND` - Command to run after server starts
- `--include-database-url` - Include DATABASE_URL in subprocess environment
- `--shutdown-timeout=MS` - Timeout for graceful subprocess shutdown in ms (default: 5000)
- `-m, --max-connections=N` - Maximum concurrent connections (default is no concurrency: 1)
### Development Server Integration
The `--run` option is particularly useful for development workflows where you want to use PGlite as a drop-in replacement for PostgreSQL. This allows you to wrap your development server and automatically provide it with a DATABASE_URL pointing to your PGlite instance.
```bash
# Start your Next.js dev server with PGlite
pglite-server --run "npm run dev" --include-database-url
# Start a Node.js app with PGlite
pglite-server --db=./dev-db --run "node server.js" --include-database-url
# Start multiple services (using a process manager like concurrently)
pglite-server --run "npx concurrently 'npm run dev' 'npm run worker'" --include-database-url
```
When using `--run` with `--include-database-url`, the subprocess will receive a `DATABASE_URL` environment variable with the correct connection string for your PGlite server. This enables seamless integration with applications that expect a PostgreSQL connection string.
### Using in npm scripts
You can add the CLI to your package.json scripts for convenient execution:
```json
{
"scripts": {
"db:start": "pglite-server --db=./data/mydb --port=5433",
"db:dev": "pglite-server --db=memory:// --debug=1",
"dev": "pglite-server --db=./dev-db --run 'npm run start:dev' --include-database-url",
"dev:clean": "pglite-server --run 'npm run start:dev' --include-database-url"
}
}
```
Then run with:
```bash
npm run dev # Start with persistent database
npm run dev:clean # Start with in-memory database
```
### Unix Socket Support
For better performance in local development, you can use Unix sockets instead of TCP:
```bash
# Start server on a Unix socket
pglite-server --path=/tmp/pglite.sock --run "npm run dev" --include-database-url
# The DATABASE_URL will be: postgresql://postgres:postgres@/postgres?host=/tmp
```
### Connecting to the server
Once the server is running, you can connect to it using any PostgreSQL client:
#### Using psql
```bash
PGSSLMODE=disable psql -h localhost -p 5432 -d template1
```
#### Using Node.js clients
```javascript
// Using node-postgres
import pg from 'pg'
const client = new pg.Client({
host: 'localhost',
port: 5432,
database: 'template1'
})
await client.connect()
// Using postgres.js
import postgres from 'postgres'
const sql = postgres({
host: 'localhost',
port: 5432,
database: 'template1'
})
// Using environment variable (when using --include-database-url)
const sql = postgres(process.env.DATABASE_URL)
```
### Limitations and Tips
- Multiple concurrent connections are supported through a **multiplexer** over the single conn, therefore not all cases might be covered.
- For development purposes, using an in-memory database (`--db=memory://`) is fastest but data won't persist after the server is stopped.
- For persistent storage, specify a file path for the database (e.g., `--db=./data/mydb`).
- When using debug mode (`--debug=1` or higher), additional protocol information will be displayed in the console.
- To allow connections from other machines, set the host to `0.0.0.0` with `--host=0.0.0.0`.
- SSL connections are **NOT** supported. For `psql`, set env var `PGSSLMODE=disable`.
- When using `--run`, the server will automatically shut down if the subprocess exits with a non-zero code.
- Use `--shutdown-timeout` to adjust how long to wait for graceful subprocess termination (default: 5 seconds).
- Use `--max-connections=10` to allow up to 10 concurrent connections (default: 1, no concurrent connections).
## License
Apache 2.0

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,118 @@
import { PGlite } from '@electric-sql/pglite';
import { Socket } from 'net';
declare const CONNECTION_QUEUE_TIMEOUT = 60000;
/**
* Global query queue manager
* Ensures only one query executes at a time in PGlite
*/
declare class QueryQueueManager {
private queue;
private processing;
private db;
private debug;
private lastHandlerId;
constructor(db: PGlite, debug?: boolean);
private log;
enqueue(handlerId: number, message: Uint8Array, onData: (data: Uint8Array) => void): Promise<number>;
private processQueue;
getQueueLength(): number;
clearQueueForHandler(handlerId: number): void;
clearTransactionIfNeeded(handlerId: number): Promise<void>;
}
/**
* Options for creating a PGLiteSocketHandler
*/
interface PGLiteSocketHandlerOptions {
/** The query queue manager */
queryQueue: QueryQueueManager;
/** Whether to close the socket when detached (default: false) */
closeOnDetach?: boolean;
/** Print the incoming and outgoing data to the console in hex and ascii */
inspect?: boolean;
/** Enable debug logging of method calls */
debug?: boolean;
/** Idle timeout in ms (0 to disable, default: 0) */
idleTimeout?: number;
}
/**
* Handler for a single socket connection to PGlite
* Each connection can remain open and send multiple queries
*/
declare class PGLiteSocketHandler extends EventTarget {
private queryQueue;
private socket;
private active;
private closeOnDetach;
private inspect;
private debug;
private readonly id;
private messageBuffer;
private idleTimer?;
private idleTimeout;
private lastActivityTime;
private static nextHandlerId;
constructor(options: PGLiteSocketHandlerOptions);
get handlerId(): number;
private log;
attach(socket: Socket): Promise<PGLiteSocketHandler>;
private resetIdleTimer;
detach(close?: boolean): Promise<PGLiteSocketHandler>;
get isAttached(): boolean;
private handleData;
private handleError;
private handleClose;
private inspectData;
}
/**
* Options for creating a PGLiteSocketServer
*/
interface PGLiteSocketServerOptions {
/** The PGlite database instance */
db: PGlite;
/** The port to listen on (default: 5432) */
port?: number;
/** The host to bind to (default: 127.0.0.1) */
host?: string;
/** Unix socket path to bind to (default: undefined) */
path?: string;
/** Print the incoming and outgoing data to the console in hex and ascii */
inspect?: boolean;
/** Enable debug logging of method calls */
debug?: boolean;
/** Idle timeout in ms (0 to disable, default: 0) */
idleTimeout?: number;
/** Maximum concurrent connections (default: 100) */
maxConnections?: number;
}
/**
* PGLite Socket Server with support for multiple concurrent connections
* Connections remain open and queries are queued at the query level
*/
declare class PGLiteSocketServer extends EventTarget {
readonly db: PGlite;
private server;
private port?;
private host?;
private path?;
private active;
private inspect;
private debug;
private idleTimeout;
private maxConnections;
private handlers;
private queryQueue;
constructor(options: PGLiteSocketServerOptions);
private log;
start(): Promise<void>;
getServerConn(): string;
stop(): Promise<void>;
private handleConnection;
getStats(): {
activeConnections: number;
queuedQueries: number;
maxConnections: number;
};
}
export { CONNECTION_QUEUE_TIMEOUT, PGLiteSocketHandler, type PGLiteSocketHandlerOptions, PGLiteSocketServer, type PGLiteSocketServerOptions };

View File

@@ -0,0 +1,118 @@
import { PGlite } from '@electric-sql/pglite';
import { Socket } from 'net';
declare const CONNECTION_QUEUE_TIMEOUT = 60000;
/**
* Global query queue manager
* Ensures only one query executes at a time in PGlite
*/
declare class QueryQueueManager {
private queue;
private processing;
private db;
private debug;
private lastHandlerId;
constructor(db: PGlite, debug?: boolean);
private log;
enqueue(handlerId: number, message: Uint8Array, onData: (data: Uint8Array) => void): Promise<number>;
private processQueue;
getQueueLength(): number;
clearQueueForHandler(handlerId: number): void;
clearTransactionIfNeeded(handlerId: number): Promise<void>;
}
/**
* Options for creating a PGLiteSocketHandler
*/
interface PGLiteSocketHandlerOptions {
/** The query queue manager */
queryQueue: QueryQueueManager;
/** Whether to close the socket when detached (default: false) */
closeOnDetach?: boolean;
/** Print the incoming and outgoing data to the console in hex and ascii */
inspect?: boolean;
/** Enable debug logging of method calls */
debug?: boolean;
/** Idle timeout in ms (0 to disable, default: 0) */
idleTimeout?: number;
}
/**
* Handler for a single socket connection to PGlite
* Each connection can remain open and send multiple queries
*/
declare class PGLiteSocketHandler extends EventTarget {
private queryQueue;
private socket;
private active;
private closeOnDetach;
private inspect;
private debug;
private readonly id;
private messageBuffer;
private idleTimer?;
private idleTimeout;
private lastActivityTime;
private static nextHandlerId;
constructor(options: PGLiteSocketHandlerOptions);
get handlerId(): number;
private log;
attach(socket: Socket): Promise<PGLiteSocketHandler>;
private resetIdleTimer;
detach(close?: boolean): Promise<PGLiteSocketHandler>;
get isAttached(): boolean;
private handleData;
private handleError;
private handleClose;
private inspectData;
}
/**
* Options for creating a PGLiteSocketServer
*/
interface PGLiteSocketServerOptions {
/** The PGlite database instance */
db: PGlite;
/** The port to listen on (default: 5432) */
port?: number;
/** The host to bind to (default: 127.0.0.1) */
host?: string;
/** Unix socket path to bind to (default: undefined) */
path?: string;
/** Print the incoming and outgoing data to the console in hex and ascii */
inspect?: boolean;
/** Enable debug logging of method calls */
debug?: boolean;
/** Idle timeout in ms (0 to disable, default: 0) */
idleTimeout?: number;
/** Maximum concurrent connections (default: 100) */
maxConnections?: number;
}
/**
* PGLite Socket Server with support for multiple concurrent connections
* Connections remain open and queries are queued at the query level
*/
declare class PGLiteSocketServer extends EventTarget {
readonly db: PGlite;
private server;
private port?;
private host?;
private path?;
private active;
private inspect;
private debug;
private idleTimeout;
private maxConnections;
private handlers;
private queryQueue;
constructor(options: PGLiteSocketServerOptions);
private log;
start(): Promise<void>;
getServerConn(): string;
stop(): Promise<void>;
private handleConnection;
getStats(): {
activeConnections: number;
queuedQueries: number;
maxConnections: number;
};
}
export { CONNECTION_QUEUE_TIMEOUT, PGLiteSocketHandler, type PGLiteSocketHandlerOptions, PGLiteSocketServer, type PGLiteSocketServerOptions };

View File

@@ -0,0 +1,2 @@
import{a,b,c}from"./chunk-NSUMFCRM.js";export{a as CONNECTION_QUEUE_TIMEOUT,b as PGLiteSocketHandler,c as PGLiteSocketServer};
//# sourceMappingURL=index.js.map

View File

@@ -0,0 +1 @@
{"version":3,"sources":[],"sourcesContent":[],"mappings":"","names":[]}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1 @@
#!/usr/bin/env node

View File

@@ -0,0 +1 @@
#!/usr/bin/env node

View File

@@ -0,0 +1,20 @@
#!/usr/bin/env node
import{c as d}from"../chunk-NSUMFCRM.js";import{PGlite as u}from"@electric-sql/pglite";import{parseArgs as h}from"node:util";import{spawn as p}from"node:child_process";var r=h({options:{db:{type:"string",short:"d",default:"memory://",help:"Database path (relative or absolute). Use memory:// for in-memory database."},port:{type:"string",short:"p",default:"5432",help:"Port to listen on"},host:{type:"string",short:"h",default:"127.0.0.1",help:"Host to bind to"},path:{type:"string",short:"u",default:void 0,help:"unix socket to bind to. Takes precedence over host:port"},debug:{type:"string",short:"v",default:"0",help:"Debug level (0-5)"},extensions:{type:"string",short:"e",default:void 0,help:"Comma-separated list of extensions to load (e.g., vector,pgcrypto,postgis etc.)"},run:{type:"string",short:"r",default:void 0,help:"Command to run after server starts"},"include-database-url":{type:"boolean",default:!1,help:"Include DATABASE_URL in the environment of the subprocess"},"shutdown-timeout":{type:"string",default:"5000",help:"Timeout in milliseconds for graceful subprocess shutdown (default: 5000)"},"max-connections":{type:"string",short:"m",default:"1",help:"Maximum concurrent connections (default: 1)"},help:{type:"boolean",short:"?",default:!1,help:"Show help"}}}),g=`PGlite Socket Server
Usage: pglite-server [options]
Options:
-d, --db=PATH Database path (default: memory://)
-p, --port=PORT Port to listen on (default: 5432)
-h, --host=HOST Host to bind to (default: 127.0.0.1)
-u, --path=UNIX Unix socket to bind to (default: undefined). Takes precedence over host:port
-v, --debug=LEVEL Debug level 0-5 (default: 0)
-e, --extensions=LIST Comma-separated list of extensions to load
Formats: vector, pgcrypto (built-in/contrib)
@org/package/path:exportedName (npm package)
-r, --run=COMMAND Command to run after server starts
--include-database-url Include DATABASE_URL in subprocess environment
--shutdown-timeout=MS Timeout for graceful subprocess shutdown in ms (default: 5000)
-m, --max-connections=N Maximum concurrent connections (default is no concurrency: 1)
`,l=class{constructor(e){this.db=null;this.server=null;this.subprocessManager=null;this.config=e}static parseConfig(){let e=r.values.extensions;return{dbPath:r.values.db,port:parseInt(r.values.port,10),host:r.values.host,path:r.values.path,debugLevel:parseInt(r.values.debug,10),extensionNames:e?e.split(",").map(o=>o.trim()):void 0,runCommand:r.values.run,includeDatabaseUrl:r.values["include-database-url"],shutdownTimeout:parseInt(r.values["shutdown-timeout"],10),maxConnections:parseInt(r.values["max-connections"],10)}}createDatabaseUrl(){let{host:e,port:o,path:t}=this.config;if(t){let s=t.endsWith("/.s.PGSQL.5432")?t.slice(0,-13):t;return`postgresql://postgres:postgres@/postgres?host=${encodeURIComponent(s)}`}else return`postgresql://postgres:postgres@${e}:${o}/postgres`}async importExtensions(){if(!this.config.extensionNames?.length)return;let e={},o=["vector","live","pg_hashids","pg_ivm","pg_uuidv7","pgtap","age","pg_textsearch"];for(let t of this.config.extensionNames){let s=null;try{if(t.includes(":")){let[i,n]=t.split(":");if(!i||!n)throw new Error(`Invalid extension format '${t}'. Expected: package/path:exportedName`);s=(await import(i))[n],s&&(e[n]=s,console.log(`Imported extension '${n}' from '${i}'`))}else if(o.includes(t))s=(await import(`@electric-sql/pglite/${t}`))[t],s&&(e[t]=s,console.log(`Imported extension: ${t}`));else{try{s=(await import(`@electric-sql/pglite/contrib/${t}`))[t]}catch{s=(await import(`@electric-sql/pglite-${t}`))[t]}s&&(e[t]=s,console.log(`Imported extension: ${t}`))}}catch(i){throw console.error(`Failed to import extension '${t}':`,i),new Error(`Failed to import extension '${t}'`)}}return Object.keys(e).length>0?e:void 0}async initializeDatabase(){console.log(`Initializing PGLite with database: ${this.config.dbPath}`),console.log(`Debug level: ${this.config.debugLevel}`);let e=await this.importExtensions();this.db=new u(this.config.dbPath,{debug:this.config.debugLevel,extensions:e}),await this.db.waitReady,console.log("PGlite database initialized")}setupServerEventHandlers(){if(!this.server||!this.subprocessManager)throw new Error("Server or subprocess manager not initialized");this.server.addEventListener("listening",e=>{let o=e.detail;if(console.log(`PGLiteSocketServer listening on ${JSON.stringify(o)}`),this.config.runCommand&&this.subprocessManager){let t=this.createDatabaseUrl();this.subprocessManager.spawn(this.config.runCommand,t,this.config.includeDatabaseUrl)}}),this.server.addEventListener("connection",e=>{let{clientAddress:o,clientPort:t}=e.detail;console.log(`Client connected from ${o}:${t}`)}),this.server.addEventListener("error",e=>{let o=e.detail;console.error("Socket server error:",o)})}setupSignalHandlers(){process.on("SIGINT",()=>this.shutdown()),process.on("SIGTERM",()=>this.shutdown())}async start(){try{if(await this.initializeDatabase(),!this.db)throw new Error("Database initialization failed");this.server=new d({db:this.db,port:this.config.port,host:this.config.host,path:this.config.path,inspect:this.config.debugLevel>0,maxConnections:this.config.maxConnections}),this.subprocessManager=new c(e=>{this.shutdown(e)}),this.setupServerEventHandlers(),this.setupSignalHandlers(),await this.server.start()}catch(e){throw console.error("Failed to start PGLiteSocketServer:",e),e}}async shutdown(e=0){console.log(`
Shutting down PGLiteSocketServer...`),this.subprocessManager&&this.subprocessManager.terminate(this.config.shutdownTimeout),this.server&&await this.server.stop(),this.db&&await this.db.close(),console.log("Server stopped"),process.exit(e)}},c=class{constructor(e){this.childProcess=null;this.onExit=e}get process(){return this.childProcess}spawn(e,o,t){console.log(`Running command: ${e}`);let s={...process.env};t&&(s.DATABASE_URL=o,console.log(`Setting DATABASE_URL=${o}`));let i=e.trim().split(/\s+/);this.childProcess=p(i[0],i.slice(1),{env:s,stdio:"inherit"}),this.childProcess.on("error",n=>{console.error("Error running command:",n),console.log("Subprocess failed to start, shutting down..."),this.onExit(1)}),this.childProcess.on("close",n=>{console.log(`Command exited with code ${n}`),this.childProcess=null,n!==null&&n!==0&&(console.log(`Child process failed with exit code ${n}, shutting down...`),this.onExit(n))})}terminate(e){this.childProcess&&(console.log("Terminating child process..."),this.childProcess.kill("SIGTERM"),setTimeout(()=>{this.childProcess&&!this.childProcess.killed&&(console.log("Force killing child process..."),this.childProcess.kill("SIGKILL"))},e))}};async function m(){r.values.help&&(console.log(g),process.exit(0));try{let a=l.parseConfig();await new l(a).start()}catch(a){console.error("Unhandled error:",a),process.exit(1)}}m();
//# sourceMappingURL=server.js.map

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,29 @@
import globals from 'globals'
import rootConfig from '../../eslint.config.js'
export default [
...rootConfig,
{
ignores: ['release/**/*', 'examples/**/*', 'dist/**/*'],
},
{
languageOptions: {
globals: {
...globals.browser,
...globals.node,
},
},
rules: {
...rootConfig.rules,
'@typescript-eslint/no-explicit-any': 'off',
},
},
{
files: ['tests/targets/deno/**/*.js'],
languageOptions: {
globals: {
Deno: false,
},
},
},
]

View File

@@ -0,0 +1,76 @@
import { PGLiteSocketServer } from '../src'
import { PGlite, DebugLevel } from '@electric-sql/pglite'
/*
* This is a basic example of how to use the PGLiteSocketServer class.
* It creates a PGlite instance and a PGLiteSocketServer instance and starts the server.
* It also handles SIGINT to stop the server and close the database.
* You can run this example with the following command:
*
* ```bash
* pnpm tsx examples/basic-server.ts
* ```
* or with the handy script:
* ```bash
* pnpm example:basic-server
* ```
*
* You can set the host and port with the following environment variables:
*
* ```bash
* HOST=127.0.0.1 PORT=5432 DEBUG=1 pnpm tsx examples/basic-server.ts
* ```
*
* Debug level can be set to 0, 1, 2, 3, or 4.
*
* ```bash
* DEBUG=1 pnpm tsx examples/basic-server.ts
* ```
* You can also use a UNIX socket instead of the host:port
*
* ```bash
* UNIX=/tmp/.s.PGSQL.5432 DEBUG=1 pnpm tsx examples/basic-server.ts
* ```
*/
const UNIX = process.env.UNIX
const PORT = process.env.PORT ? parseInt(process.env.PORT) : 5432
const HOST = process.env.HOST ?? '127.0.0.1'
const DEBUG = process.env.DEBUG
? (parseInt(process.env.DEBUG) as DebugLevel)
: 0
// Create a PGlite instance
const db = await PGlite.create({
debug: DEBUG,
})
// Check if the database is working
console.log(await db.query('SELECT version()'))
// Create a PGLiteSocketServer instance
const server = new PGLiteSocketServer({
db,
port: PORT,
host: HOST,
path: UNIX,
inspect: !!DEBUG, // Print the incoming and outgoing data to the console
})
server.addEventListener('listening', (event) => {
const detail = (
event as CustomEvent<{ port: number; host: string } | { host: string }>
).detail
console.log(`Server listening on ${JSON.stringify(detail)}`)
})
// Start the server
await server.start()
// Handle SIGINT to stop the server and close the database
process.on('SIGINT', async () => {
await server.stop()
await db.close()
console.log('Server stopped and database closed')
process.exit(0)
})

69
node_modules/@electric-sql/pglite-socket/package.json generated vendored Normal file
View File

@@ -0,0 +1,69 @@
{
"name": "@electric-sql/pglite-socket",
"version": "0.1.1",
"description": "A socket implementation for PGlite enabling remote connections",
"author": "Electric DB Limited",
"homepage": "https://pglite.dev",
"license": "Apache-2.0",
"repository": {
"type": "git",
"url": "git+https://github.com/electric-sql/pglite",
"directory": "packages/pglite-socket"
},
"keywords": [
"postgres",
"sql",
"database",
"wasm",
"pglite",
"socket"
],
"private": false,
"publishConfig": {
"access": "public"
},
"type": "module",
"main": "dist/index.cjs",
"module": "dist/index.js",
"types": "dist/index.d.ts",
"exports": {
".": {
"import": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
},
"require": {
"types": "./dist/index.d.cts",
"default": "./dist/index.cjs"
}
}
},
"bin": {
"pglite-server": "./dist/scripts/server.js"
},
"devDependencies": {
"@arethetypeswrong/cli": "^0.18.1",
"@types/emscripten": "^1.41.1",
"@types/node": "^20.16.11",
"pg": "^8.14.0",
"postgres": "^3.4.5",
"tsx": "^4.19.2",
"vitest": "^1.3.1",
"@electric-sql/pg-protocol": "0.0.4",
"@electric-sql/pglite": "0.4.1"
},
"peerDependencies": {
"@electric-sql/pglite": "0.4.1"
},
"scripts": {
"build": "tsup",
"check:exports": "attw . --pack --profile node16",
"lint": "eslint ./src ./tests --report-unused-disable-directives --max-warnings 0",
"format": "prettier --write ./src ./tests",
"typecheck": "tsc",
"stylecheck": "pnpm lint && prettier --check ./src ./tests",
"test": "vitest",
"example:basic-server": "tsx examples/basic-server.ts",
"pglite-server:dev": "tsx --watch src/scripts/server.ts"
}
}

771
node_modules/@electric-sql/pglite-socket/src/index.ts generated vendored Normal file
View File

@@ -0,0 +1,771 @@
import type { PGlite } from '@electric-sql/pglite'
import { type Server, type Socket, createServer } from 'net'
// Connection queue timeout in milliseconds
export const CONNECTION_QUEUE_TIMEOUT = 60000 // 60 seconds
/**
* Represents a queued query waiting for PGlite access
*/
interface QueuedQuery {
handlerId: number
message: Uint8Array
resolve: (resultSize: number) => void
reject: (error: Error) => void
timestamp: number
onData: (data: Uint8Array) => void
}
/**
* Global query queue manager
* Ensures only one query executes at a time in PGlite
*/
class QueryQueueManager {
private queue: QueuedQuery[] = []
private processing = false
private db: PGlite
private debug: boolean
private lastHandlerId: null | number = null
constructor(db: PGlite, debug = false) {
this.db = db
this.debug = debug
}
private log(message: string, ...args: any[]): void {
if (this.debug) {
console.log(`[QueryQueueManager] ${message}`, ...args)
}
}
async enqueue(
handlerId: number,
message: Uint8Array,
onData: (data: Uint8Array) => void,
): Promise<number> {
return new Promise((resolve, reject) => {
const query: QueuedQuery = {
handlerId,
message,
resolve,
reject,
timestamp: Date.now(),
onData,
}
this.queue.push(query)
this.log(
`enqueued query from handler #${handlerId}, queue size: ${this.queue.length}`,
)
// Process queue if not already processing
if (!this.processing) {
this.processQueue()
}
})
}
private async processQueue(): Promise<void> {
if (this.processing || this.queue.length === 0) {
return
}
this.processing = true
while (this.queue.length > 0) {
let query
if (this.db.isInTransaction() && this.lastHandlerId) {
const i = this.queue.findIndex(
(q) => q.handlerId === this.lastHandlerId,
)
if (i === -1) {
// we didn't find any other query from the same client!
this.log(
`transaction started, but no query from the same handler id found in queue`,
this.lastHandlerId,
)
query = null
} else {
query = this.queue.splice(i, 1)[0]
}
} else {
query = this.queue.shift()
}
if (!query) break
const waitTime = Date.now() - query.timestamp
this.log(
`processing query from handler #${query.handlerId} (waited ${waitTime}ms)`,
)
let result = 0
try {
// Execute the query with exclusive access to PGlite
await this.db.runExclusive(async () => {
return await this.db.execProtocolRawStream(query.message, {
onRawData: (data) => {
result += data.length
query.onData(data)
},
})
})
} catch (error) {
this.log(`query from handler #${query.handlerId} failed:`, error)
query.reject(error as Error)
return
}
this.log(
`query from handler #${query.handlerId} completed, ${result} bytes`,
)
this.lastHandlerId = query.handlerId
query.resolve(result)
}
this.processing = false
this.log(`queue processing complete, queue length is`, this.queue.length)
}
getQueueLength(): number {
return this.queue.length
}
clearQueueForHandler(handlerId: number): void {
const before = this.queue.length
this.queue = this.queue.filter((q) => {
if (q.handlerId === handlerId) {
q.reject(new Error('Handler disconnected'))
return false
}
return true
})
const removed = before - this.queue.length
if (removed > 0) {
this.log(`cleared ${removed} queries for handler #${handlerId}`)
}
}
async clearTransactionIfNeeded(handlerId: number): Promise<void> {
if (this.db.isInTransaction() && this.lastHandlerId === handlerId) {
await this.db.exec('ROLLBACK')
this.lastHandlerId = null
await this.processQueue()
}
}
}
/**
* Options for creating a PGLiteSocketHandler
*/
export interface PGLiteSocketHandlerOptions {
/** The query queue manager */
queryQueue: QueryQueueManager
/** Whether to close the socket when detached (default: false) */
closeOnDetach?: boolean
/** Print the incoming and outgoing data to the console in hex and ascii */
inspect?: boolean
/** Enable debug logging of method calls */
debug?: boolean
/** Idle timeout in ms (0 to disable, default: 0) */
idleTimeout?: number
}
/**
* Handler for a single socket connection to PGlite
* Each connection can remain open and send multiple queries
*/
export class PGLiteSocketHandler extends EventTarget {
private queryQueue: QueryQueueManager
private socket: Socket | null = null
private active = false
private closeOnDetach: boolean
private inspect: boolean
private debug: boolean
private readonly id: number
private messageBuffer: Buffer = Buffer.alloc(0)
private idleTimer?: NodeJS.Timeout
private idleTimeout: number
private lastActivityTime: number = Date.now()
// Static counter for generating unique handler IDs
private static nextHandlerId = 1
constructor(options: PGLiteSocketHandlerOptions) {
super()
this.queryQueue = options.queryQueue
this.closeOnDetach = options.closeOnDetach ?? false
this.inspect = options.inspect ?? false
this.debug = options.debug ?? false
this.idleTimeout = options.idleTimeout ?? 0
this.id = PGLiteSocketHandler.nextHandlerId++
this.log('constructor: created new handler')
}
public get handlerId(): number {
return this.id
}
private log(message: string, ...args: any[]): void {
if (this.debug) {
console.log(`[PGLiteSocketHandler#${this.id}] ${message}`, ...args)
}
}
public async attach(socket: Socket): Promise<PGLiteSocketHandler> {
this.log(
`attach: attaching socket from ${socket.remoteAddress}:${socket.remotePort}`,
)
if (this.socket) {
throw new Error('Socket already attached')
}
this.socket = socket
this.active = true
this.lastActivityTime = Date.now()
// Set up socket options
socket.setNoDelay(true)
// Set up idle timeout if configured
if (this.idleTimeout > 0) {
this.resetIdleTimer()
}
// Setup event handlers
this.log(`attach: setting up socket event handlers`)
socket.on('data', (data) => {
this.lastActivityTime = Date.now()
this.resetIdleTimer()
setImmediate(async () => {
try {
await this.handleData(data)
} catch (err) {
this.log('socket on data error: ', err)
this.handleError(err as Error)
}
})
})
socket.on('error', (err) => {
setImmediate(() => this.handleError(err))
})
socket.on('close', () => {
setImmediate(() => this.handleClose())
})
this.log(`attach: socket handler ready`)
return this
}
private resetIdleTimer(): void {
if (this.idleTimeout <= 0) return
if (this.idleTimer) {
clearTimeout(this.idleTimer)
}
this.idleTimer = setTimeout(() => {
const idleTime = Date.now() - this.lastActivityTime
this.log(`idle timeout after ${idleTime}ms`)
this.handleError(new Error('Idle timeout'))
}, this.idleTimeout)
}
public async detach(close?: boolean): Promise<PGLiteSocketHandler> {
this.log(`detach: detaching socket, close=${close ?? this.closeOnDetach}`)
if (this.idleTimer) {
clearTimeout(this.idleTimer)
this.idleTimer = undefined
}
// Clear any pending queries for this handler
this.queryQueue.clearQueueForHandler(this.id)
await this.queryQueue.clearTransactionIfNeeded(this.id)
if (!this.socket) {
this.log(`detach: no socket attached, nothing to do`)
return this
}
// Remove all listeners
this.socket.removeAllListeners('data')
this.socket.removeAllListeners('error')
this.socket.removeAllListeners('close')
// Close the socket if requested
if (close ?? this.closeOnDetach) {
if (this.socket.writable) {
this.log(`detach: closing socket`)
try {
this.socket.end()
this.socket.destroy()
} catch (err) {
this.log(`detach: error closing socket:`, err)
}
}
}
this.socket = null
this.active = false
this.messageBuffer = Buffer.alloc(0)
this.log(`detach: handler cleaned up`)
return this
}
public get isAttached(): boolean {
return this.socket !== null
}
private async handleData(data: Buffer): Promise<number> {
if (!this.socket || !this.active) {
this.log(`handleData: no active socket, ignoring data`)
return 0
}
this.log(`handleData: received ${data.length} bytes`)
// Append to buffer for message reassembly
this.messageBuffer = Buffer.concat([this.messageBuffer, data])
// Print the incoming data to the console
this.inspectData('incoming', data)
try {
let totalProcessed = 0
while (this.messageBuffer.length > 0) {
// Determine message length
let messageLength = 0
let isComplete = false
// Handle startup message (no type byte, just length)
if (this.messageBuffer.length >= 4) {
const firstInt = this.messageBuffer.readInt32BE(0)
if (this.messageBuffer.length >= 8) {
const secondInt = this.messageBuffer.readInt32BE(4)
// PostgreSQL 3.0 protocol version
if (secondInt === 196608 || secondInt === 0x00030000) {
messageLength = firstInt
isComplete = this.messageBuffer.length >= messageLength
}
}
// Regular message (type byte + length)
if (!isComplete && this.messageBuffer.length >= 5) {
const msgLength = this.messageBuffer.readInt32BE(1)
messageLength = 1 + msgLength
isComplete = this.messageBuffer.length >= messageLength
}
}
if (!isComplete || messageLength === 0) {
this.log(
`handleData: incomplete message, buffering ${this.messageBuffer.length} bytes`,
)
break
}
// Extract and process complete message
const message = this.messageBuffer.slice(0, messageLength)
this.messageBuffer = this.messageBuffer.slice(messageLength)
this.log(`handleData: processing message of ${message.length} bytes`)
// Check if socket is still active before processing
if (!this.active || !this.socket) {
this.log(`handleData: socket no longer active, stopping processing`)
break
}
let socketWriteError: any = undefined
// Queue the query for execution
// This allows multiple connections to queue queries simultaneously
await this.queryQueue.enqueue(
this.id,
new Uint8Array(message),
(data) => {
this.log(`handleData: received ${data.length} bytes from PGlite`)
// Print the outgoing data to the console
this.inspectData('outgoing', data)
// Send response if available
if (
data.length > 0 &&
this.socket &&
this.socket.writable &&
this.active
) {
// await new Promise<number>((resolve, reject) => {
this.log(`handleData: writing response to socket`)
if (this.socket?.writable) {
this.socket.write(Buffer.from(data), (err?: any) => {
if (err) {
this.log(`handleData: error writing to socket:`, err)
socketWriteError = err
} else {
this.log(`handleData: socket sent: ${data.length} bytes`)
}
})
} else {
this.log(`handleData: socket no longer writable`)
}
}
totalProcessed += data.length
},
)
if (socketWriteError) throw socketWriteError
}
// Emit data event with byte sizes
this.dispatchEvent(
new CustomEvent('data', {
detail: { incoming: data.length, outgoing: totalProcessed },
}),
)
return totalProcessed
} catch (err) {
this.log(`handleData: error processing data:`, err)
throw err
}
}
private handleError(err: Error): void {
if (!this.active) {
this.log(`handleError: handler not active, ignoring error`)
return
}
// ECONNRESET is expected behavior when clients disconnect
if (err.message?.includes('ECONNRESET')) {
this.log(
`handleError: client disconnected (ECONNRESET) - normal behavior`,
)
} else if (err.message?.includes('Idle timeout')) {
this.log(`handleError: connection idle timeout`)
} else {
this.log(`handleError:`, err)
}
this.active = false
// Emit error event
this.dispatchEvent(new CustomEvent('error', { detail: err }))
// Clean up
this.detach(true)
}
private handleClose(): void {
this.log(`handleClose: socket closed`)
this.active = false
this.dispatchEvent(new CustomEvent('close'))
this.detach(false)
}
private inspectData(
direction: 'incoming' | 'outgoing',
data: Buffer | Uint8Array,
): void {
if (!this.inspect) return
console.log('-'.repeat(75))
if (direction === 'incoming') {
console.log('-> incoming', data.length, 'bytes')
} else {
console.log('<- outgoing', data.length, 'bytes')
}
for (let offset = 0; offset < data.length; offset += 16) {
const chunkSize = Math.min(16, data.length - offset)
let hexPart = ''
for (let i = 0; i < 16; i++) {
if (i < chunkSize) {
const byte = data[offset + i]
hexPart += byte.toString(16).padStart(2, '0') + ' '
} else {
hexPart += ' '
}
}
let asciiPart = ''
for (let i = 0; i < chunkSize; i++) {
const byte = data[offset + i]
asciiPart += byte >= 32 && byte <= 126 ? String.fromCharCode(byte) : '.'
}
console.log(
`${offset.toString(16).padStart(8, '0')} ${hexPart} ${asciiPart}`,
)
}
}
}
/**
* Options for creating a PGLiteSocketServer
*/
export interface PGLiteSocketServerOptions {
/** The PGlite database instance */
db: PGlite
/** The port to listen on (default: 5432) */
port?: number
/** The host to bind to (default: 127.0.0.1) */
host?: string
/** Unix socket path to bind to (default: undefined) */
path?: string
/** Print the incoming and outgoing data to the console in hex and ascii */
inspect?: boolean
/** Enable debug logging of method calls */
debug?: boolean
/** Idle timeout in ms (0 to disable, default: 0) */
idleTimeout?: number
/** Maximum concurrent connections (default: 100) */
maxConnections?: number
}
/**
* PGLite Socket Server with support for multiple concurrent connections
* Connections remain open and queries are queued at the query level
*/
export class PGLiteSocketServer extends EventTarget {
readonly db: PGlite
private server: Server | null = null
private port?: number
private host?: string
private path?: string
private active = false
private inspect: boolean
private debug: boolean
private idleTimeout: number
private maxConnections: number
private handlers: Set<PGLiteSocketHandler> = new Set()
private queryQueue: QueryQueueManager
constructor(options: PGLiteSocketServerOptions) {
super()
this.db = options.db
if (options.path) {
this.path = options.path
} else {
if (typeof options.port === 'number') {
// Keep port undefined on port 0, will be set by the OS when we start the server.
this.port = options.port ?? options.port
} else {
this.port = 5432
}
this.host = options.host || '127.0.0.1'
}
this.inspect = options.inspect ?? false
this.debug = options.debug ?? false
this.idleTimeout = options.idleTimeout ?? 0
this.maxConnections = options.maxConnections ?? 1
// Create the shared query queue
this.queryQueue = new QueryQueueManager(this.db, this.debug)
this.log(`constructor: created server on ${this.getServerConn()}`)
this.log(`constructor: max connections: ${this.maxConnections}`)
if (this.idleTimeout > 0) {
this.log(`constructor: idle timeout: ${this.idleTimeout}ms`)
}
}
private log(message: string, ...args: any[]): void {
if (this.debug) {
console.log(`[PGLiteSocketServer] ${message}`, ...args)
}
}
public async start(): Promise<void> {
this.log(`start: starting server on ${this.getServerConn()}`)
if (this.server) {
throw new Error('Socket server already started')
}
// Ensure PGlite is ready before accepting connections
await this.db.waitReady
this.active = true
this.server = createServer((socket) => {
setImmediate(() => this.handleConnection(socket))
})
this.server.maxConnections = this.maxConnections
return new Promise<void>((resolve, reject) => {
if (!this.server) return reject(new Error('Server not initialized'))
this.server.on('error', (err) => {
this.log(`start: server error:`, err)
this.dispatchEvent(new CustomEvent('error', { detail: err }))
if (!this.active) {
reject(err)
}
})
if (this.path) {
this.server.listen(this.path, () => {
this.log(`start: server listening on ${this.getServerConn()}`)
this.dispatchEvent(
new CustomEvent('listening', {
detail: { path: this.path },
}),
)
resolve()
})
} else {
const server = this.server
server.listen(this.port, this.host, () => {
const address = server.address()
// We are not using pipes, so return type should be AddressInfo
if (address === null || typeof address !== 'object') {
throw Error('Expected address info')
}
// Assign the new port number
this.port = address.port
this.log(`start: server listening on ${this.getServerConn()}`)
this.dispatchEvent(
new CustomEvent('listening', {
detail: { port: this.port, host: this.host },
}),
)
resolve()
})
}
})
}
public getServerConn(): string {
if (this.path) return this.path
return `${this.host}:${this.port}`
}
public async stop(): Promise<void> {
this.log(`stop: stopping server`)
this.active = false
// Detach all handlers
this.log(`stop: detaching ${this.handlers.size} handlers`)
for (const handler of this.handlers) {
handler.detach(true)
}
this.handlers.clear()
if (!this.server) {
this.log(`stop: server not running, nothing to do`)
return Promise.resolve()
}
return new Promise<void>((resolve) => {
if (!this.server) return resolve()
this.server.close(() => {
this.log(`stop: server closed`)
this.server = null
this.dispatchEvent(new CustomEvent('close'))
resolve()
})
})
}
private async handleConnection(socket: Socket): Promise<void> {
const clientInfo = {
clientAddress: socket.remoteAddress || 'unknown',
clientPort: socket.remotePort || 0,
}
this.log(
`handleConnection: new connection from ${clientInfo.clientAddress}:${clientInfo.clientPort}`,
)
this.log(
`handleConnection: active connections: ${this.handlers.size}, queued queries: ${this.queryQueue.getQueueLength()}`,
)
if (!this.active) {
this.log(`handleConnection: server not active, closing connection`)
try {
socket.end()
} catch (err) {
this.log(`handleConnection: error closing socket:`, err)
}
return
}
// Check connection limit
if (this.handlers.size >= this.maxConnections) {
this.log(`handleConnection: max connections reached, rejecting`)
socket.write(Buffer.from('Too many connections\n'))
socket.end()
return
}
// Create a new handler for this connection
const handler = new PGLiteSocketHandler({
queryQueue: this.queryQueue,
closeOnDetach: true,
inspect: this.inspect,
debug: this.debug,
idleTimeout: this.idleTimeout,
})
// Track this handler
this.handlers.add(handler)
// Handle errors
handler.addEventListener('error', (event) => {
const error = (event as CustomEvent<Error>).detail
if (error?.message?.includes('ECONNRESET')) {
this.log(
`handler #${handler.handlerId}: client disconnected (ECONNRESET)`,
)
} else if (error?.message?.includes('Idle timeout')) {
this.log(`handler #${handler.handlerId}: idle timeout`)
} else {
this.log(`handler #${handler.handlerId}: error:`, error)
}
})
// Handle close event
handler.addEventListener('close', () => {
this.log(`handler #${handler.handlerId}: closed`)
this.handlers.delete(handler)
this.log(`handleConnection: active connections: ${this.handlers.size}`)
})
try {
await handler.attach(socket)
this.dispatchEvent(new CustomEvent('connection', { detail: clientInfo }))
} catch (err) {
this.log(`handleConnection: error attaching socket:`, err)
this.handlers.delete(handler)
this.dispatchEvent(new CustomEvent('error', { detail: err }))
try {
socket.end()
} catch (closeErr) {
this.log(`handleConnection: error closing socket:`, closeErr)
}
}
}
public getStats() {
return {
activeConnections: this.handlers.size,
queuedQueries: this.queryQueue.getQueueLength(),
maxConnections: this.maxConnections,
}
}
}

View File

@@ -0,0 +1,427 @@
#!/usr/bin/env node
import { PGlite, DebugLevel } from '@electric-sql/pglite'
import type { Extension, Extensions } from '@electric-sql/pglite'
import { PGLiteSocketServer } from '../index'
import { parseArgs } from 'node:util'
import { spawn, ChildProcess } from 'node:child_process'
// Define command line argument options
const args = parseArgs({
options: {
db: {
type: 'string',
short: 'd',
default: 'memory://',
help: 'Database path (relative or absolute). Use memory:// for in-memory database.',
},
port: {
type: 'string',
short: 'p',
default: '5432',
help: 'Port to listen on',
},
host: {
type: 'string',
short: 'h',
default: '127.0.0.1',
help: 'Host to bind to',
},
path: {
type: 'string',
short: 'u',
default: undefined,
help: 'unix socket to bind to. Takes precedence over host:port',
},
debug: {
type: 'string',
short: 'v',
default: '0',
help: 'Debug level (0-5)',
},
extensions: {
type: 'string',
short: 'e',
default: undefined,
help: 'Comma-separated list of extensions to load (e.g., vector,pgcrypto,postgis etc.)',
},
run: {
type: 'string',
short: 'r',
default: undefined,
help: 'Command to run after server starts',
},
'include-database-url': {
type: 'boolean',
default: false,
help: 'Include DATABASE_URL in the environment of the subprocess',
},
'shutdown-timeout': {
type: 'string',
default: '5000',
help: 'Timeout in milliseconds for graceful subprocess shutdown (default: 5000)',
},
'max-connections': {
type: 'string',
short: 'm',
default: '1',
help: 'Maximum concurrent connections (default: 1)',
},
help: {
type: 'boolean',
short: '?',
default: false,
help: 'Show help',
},
},
})
const help = `PGlite Socket Server
Usage: pglite-server [options]
Options:
-d, --db=PATH Database path (default: memory://)
-p, --port=PORT Port to listen on (default: 5432)
-h, --host=HOST Host to bind to (default: 127.0.0.1)
-u, --path=UNIX Unix socket to bind to (default: undefined). Takes precedence over host:port
-v, --debug=LEVEL Debug level 0-5 (default: 0)
-e, --extensions=LIST Comma-separated list of extensions to load
Formats: vector, pgcrypto (built-in/contrib)
@org/package/path:exportedName (npm package)
-r, --run=COMMAND Command to run after server starts
--include-database-url Include DATABASE_URL in subprocess environment
--shutdown-timeout=MS Timeout for graceful subprocess shutdown in ms (default: 5000)
-m, --max-connections=N Maximum concurrent connections (default is no concurrency: 1)
`
interface ServerConfig {
dbPath: string
port: number
host: string
path?: string
debugLevel: DebugLevel
extensionNames?: string[]
runCommand?: string
includeDatabaseUrl: boolean
shutdownTimeout: number
maxConnections: number
}
class PGLiteServerRunner {
private config: ServerConfig
private db: PGlite | null = null
private server: PGLiteSocketServer | null = null
private subprocessManager: SubprocessManager | null = null
constructor(config: ServerConfig) {
this.config = config
}
static parseConfig(): ServerConfig {
const extensionsArg = args.values.extensions as string | undefined
return {
dbPath: args.values.db as string,
port: parseInt(args.values.port as string, 10),
host: args.values.host as string,
path: args.values.path as string,
debugLevel: parseInt(args.values.debug as string, 10) as DebugLevel,
extensionNames: extensionsArg
? extensionsArg.split(',').map((e) => e.trim())
: undefined,
runCommand: args.values.run as string,
includeDatabaseUrl: args.values['include-database-url'] as boolean,
shutdownTimeout: parseInt(args.values['shutdown-timeout'] as string, 10),
maxConnections: parseInt(args.values['max-connections'] as string, 10),
}
}
private createDatabaseUrl(): string {
const { host, port, path } = this.config
if (path) {
// Unix socket connection
const socketDir = path.endsWith('/.s.PGSQL.5432')
? path.slice(0, -13)
: path
return `postgresql://postgres:postgres@/postgres?host=${encodeURIComponent(socketDir)}`
} else {
// TCP connection
return `postgresql://postgres:postgres@${host}:${port}/postgres`
}
}
private async importExtensions(): Promise<Extensions | undefined> {
if (!this.config.extensionNames?.length) {
return undefined
}
const extensions: Extensions = {}
// Built-in extensions that are not in contrib
const builtInExtensions = [
'vector',
'live',
'pg_hashids',
'pg_ivm',
'pg_uuidv7',
'pgtap',
'age',
'pg_textsearch',
]
for (const name of this.config.extensionNames) {
let ext: Extension | null = null
try {
// Check if this is a custom package path (contains ':')
// Format: @org/package/path:exportedName or package/path:exportedName
if (name.includes(':')) {
const [packagePath, exportName] = name.split(':')
if (!packagePath || !exportName) {
throw new Error(
`Invalid extension format '${name}'. Expected: package/path:exportedName`,
)
}
const mod = await import(packagePath)
ext = mod[exportName] as Extension
if (ext) {
extensions[exportName] = ext
console.log(
`Imported extension '${exportName}' from '${packagePath}'`,
)
}
} else if (builtInExtensions.includes(name)) {
// Built-in extension (e.g., @electric-sql/pglite/vector)
const mod = await import(`@electric-sql/pglite/${name}`)
ext = mod[name] as Extension
if (ext) {
extensions[name] = ext
console.log(`Imported extension: ${name}`)
}
} else {
// Try contrib first (e.g., @electric-sql/pglite/contrib/pgcrypto)
try {
const mod = await import(`@electric-sql/pglite/contrib/${name}`)
ext = mod[name] as Extension
} catch {
// Fall back to external package (e.g., @electric-sql/pglite-<extension>)
const mod = await import(`@electric-sql/pglite-${name}`)
ext = mod[name] as Extension
}
if (ext) {
extensions[name] = ext
console.log(`Imported extension: ${name}`)
}
}
} catch (error) {
console.error(`Failed to import extension '${name}':`, error)
throw new Error(`Failed to import extension '${name}'`)
}
}
return Object.keys(extensions).length > 0 ? extensions : undefined
}
private async initializeDatabase(): Promise<void> {
console.log(`Initializing PGLite with database: ${this.config.dbPath}`)
console.log(`Debug level: ${this.config.debugLevel}`)
const extensions = await this.importExtensions()
this.db = new PGlite(this.config.dbPath, {
debug: this.config.debugLevel,
extensions,
})
await this.db.waitReady
console.log('PGlite database initialized')
}
private setupServerEventHandlers(): void {
if (!this.server || !this.subprocessManager) {
throw new Error('Server or subprocess manager not initialized')
}
this.server.addEventListener('listening', (event) => {
const detail = (
event as CustomEvent<{ port: number; host: string } | { host: string }>
).detail
console.log(`PGLiteSocketServer listening on ${JSON.stringify(detail)}`)
// Run the command after server starts listening
if (this.config.runCommand && this.subprocessManager) {
const databaseUrl = this.createDatabaseUrl()
this.subprocessManager.spawn(
this.config.runCommand,
databaseUrl,
this.config.includeDatabaseUrl,
)
}
})
this.server.addEventListener('connection', (event) => {
const { clientAddress, clientPort } = (
event as CustomEvent<{ clientAddress: string; clientPort: number }>
).detail
console.log(`Client connected from ${clientAddress}:${clientPort}`)
})
this.server.addEventListener('error', (event) => {
const error = (event as CustomEvent<Error>).detail
console.error('Socket server error:', error)
})
}
private setupSignalHandlers(): void {
process.on('SIGINT', () => this.shutdown())
process.on('SIGTERM', () => this.shutdown())
}
async start(): Promise<void> {
try {
// Initialize database
await this.initializeDatabase()
if (!this.db) {
throw new Error('Database initialization failed')
}
// Create and setup the socket server
this.server = new PGLiteSocketServer({
db: this.db,
port: this.config.port,
host: this.config.host,
path: this.config.path,
inspect: this.config.debugLevel > 0,
maxConnections: this.config.maxConnections,
})
// Create subprocess manager
this.subprocessManager = new SubprocessManager((exitCode) => {
this.shutdown(exitCode)
})
// Setup event handlers
this.setupServerEventHandlers()
this.setupSignalHandlers()
// Start the server
await this.server.start()
} catch (error) {
console.error('Failed to start PGLiteSocketServer:', error)
throw error
}
}
async shutdown(exitCode: number = 0): Promise<void> {
console.log('\nShutting down PGLiteSocketServer...')
// Terminate subprocess if running
if (this.subprocessManager) {
this.subprocessManager.terminate(this.config.shutdownTimeout)
}
// Stop server
if (this.server) {
await this.server.stop()
}
// Close database
if (this.db) {
await this.db.close()
}
console.log('Server stopped')
process.exit(exitCode)
}
}
class SubprocessManager {
private childProcess: ChildProcess | null = null
private onExit: (code: number) => void
constructor(onExit: (code: number) => void) {
this.onExit = onExit
}
get process(): ChildProcess | null {
return this.childProcess
}
spawn(
command: string,
databaseUrl: string,
includeDatabaseUrl: boolean,
): void {
console.log(`Running command: ${command}`)
// Prepare environment variables
const env = { ...process.env }
if (includeDatabaseUrl) {
env.DATABASE_URL = databaseUrl
console.log(`Setting DATABASE_URL=${databaseUrl}`)
}
// Parse and spawn the command
const commandParts = command.trim().split(/\s+/)
this.childProcess = spawn(commandParts[0], commandParts.slice(1), {
env,
stdio: 'inherit',
})
this.childProcess.on('error', (error) => {
console.error('Error running command:', error)
// If subprocess fails to start, shutdown the server
console.log('Subprocess failed to start, shutting down...')
this.onExit(1)
})
this.childProcess.on('close', (code) => {
console.log(`Command exited with code ${code}`)
this.childProcess = null
// If child process exits with non-zero code, notify parent
if (code !== null && code !== 0) {
console.log(
`Child process failed with exit code ${code}, shutting down...`,
)
this.onExit(code)
}
})
}
terminate(timeout: number): void {
if (this.childProcess) {
console.log('Terminating child process...')
this.childProcess.kill('SIGTERM')
// Give it a moment to exit gracefully, then force kill if needed
setTimeout(() => {
if (this.childProcess && !this.childProcess.killed) {
console.log('Force killing child process...')
this.childProcess.kill('SIGKILL')
}
}, timeout)
}
}
}
// Main execution
async function main() {
// Show help and exit if requested
if (args.values.help) {
console.log(help)
process.exit(0)
}
try {
const config = PGLiteServerRunner.parseConfig()
const serverRunner = new PGLiteServerRunner(config)
await serverRunner.start()
} catch (error) {
console.error('Unhandled error:', error)
process.exit(1)
}
}
// Run the main function
main()

View File

@@ -0,0 +1,497 @@
import {
describe,
it,
expect,
beforeEach,
afterEach,
vi,
beforeAll,
afterAll,
} from 'vitest'
import { PGlite } from '@electric-sql/pglite'
import { PGLiteSocketHandler, PGLiteSocketServer } from '../src'
import { Socket, createConnection } from 'net'
import { existsSync } from 'fs'
import { unlink } from 'fs/promises'
// Mock timers for testing timeouts
beforeAll(() => {
vi.useFakeTimers()
})
afterAll(() => {
vi.useRealTimers()
})
async function testSocket(
fn: (socketOptions: {
host?: string
port?: number
path?: string
}) => Promise<void>,
) {
describe('TCP socket server', async () => {
await fn({ host: '127.0.0.1', port: 5433 })
})
describe('unix socket server', async () => {
await fn({ path: '/tmp/.s.PGSQL.5432' })
})
}
// Create a mock Socket for testing
const createMockSocket = () => {
const eventHandlers: Record<string, Array<(data: any) => void>> = {}
const mockSocket = {
// Socket methods we need for testing
removeAllListeners: vi.fn(),
end: vi.fn(),
destroy: vi.fn(),
write: vi.fn(),
writable: true,
remoteAddress: '127.0.0.1',
remotePort: 12345,
setNoDelay: vi.fn(),
// Mock on method with tracking of handlers
on: vi
.fn()
.mockImplementation((event: string, callback: (data: any) => void) => {
if (!eventHandlers[event]) {
eventHandlers[event] = []
}
eventHandlers[event].push(callback)
return mockSocket
}),
// Store event handlers for testing
eventHandlers,
// Helper to emit events
emit(event: string, data: any) {
if (eventHandlers[event]) {
eventHandlers[event].forEach((handler) => handler(data))
}
},
}
return mockSocket as unknown as Socket
}
// Create a mock QueryQueueManager for testing
const createMockQueryQueue = () => {
return {
enqueue: vi.fn().mockResolvedValue(new Uint8Array(0)),
clearQueueForHandler: vi.fn(),
clearTransactionIfNeeded: vi.fn(),
getQueueLength: vi.fn().mockReturnValue(0),
}
}
describe('PGLiteSocketHandler', () => {
let handler: PGLiteSocketHandler
let mockSocket: ReturnType<typeof createMockSocket> & {
eventHandlers: Record<string, Array<(data: any) => void>>
}
let mockQueryQueue: ReturnType<typeof createMockQueryQueue>
beforeEach(async () => {
// Create a mock query queue for testing
mockQueryQueue = createMockQueryQueue()
handler = new PGLiteSocketHandler({ queryQueue: mockQueryQueue as any })
mockSocket = createMockSocket() as any
})
afterEach(async () => {
// Ensure handler is detached
if (handler?.isAttached) {
await handler.detach(true)
}
})
it('should attach to a socket', async () => {
// Attach mock socket to handler
await handler.attach(mockSocket)
// Check that the socket is attached
expect(handler.isAttached).toBe(true)
expect(mockSocket.on).toHaveBeenCalledWith('data', expect.any(Function))
expect(mockSocket.on).toHaveBeenCalledWith('error', expect.any(Function))
expect(mockSocket.on).toHaveBeenCalledWith('close', expect.any(Function))
})
it('should detach from a socket', async () => {
// First attach
await handler.attach(mockSocket)
expect(handler.isAttached).toBe(true)
// Then detach
await handler.detach(false)
expect(handler.isAttached).toBe(false)
expect(mockSocket.removeAllListeners).toHaveBeenCalled()
})
it('should close socket when detaching with close option', async () => {
// Attach mock socket to handler
await handler.attach(mockSocket)
// Detach with close option
await handler.detach(true)
expect(handler.isAttached).toBe(false)
expect(mockSocket.end).toHaveBeenCalled()
})
it('should reject attaching multiple sockets', async () => {
// Attach first socket
await handler.attach(mockSocket)
// Trying to attach another socket should throw an error
const anotherMockSocket = createMockSocket()
await expect(handler.attach(anotherMockSocket)).rejects.toThrow(
'Socket already attached',
)
})
it('should emit error event when socket has error', async () => {
// Set up error listener
const errorHandler = vi.fn()
handler.addEventListener('error', errorHandler)
// Attach socket
await handler.attach(mockSocket)
// Mock the event handler logic directly instead of triggering actual error handlers
const customEvent = new CustomEvent('error', {
detail: { code: 'MOCK_ERROR', message: 'Test socket error' },
})
handler.dispatchEvent(customEvent)
// Verify error handler was called
expect(errorHandler).toHaveBeenCalled()
})
it('should emit close event when socket closes', async () => {
// Set up close listener
const closeHandler = vi.fn()
handler.addEventListener('close', closeHandler)
// Attach socket
await handler.attach(mockSocket)
// Mock the event handler logic directly instead of triggering actual socket handlers
const customEvent = new CustomEvent('close')
handler.dispatchEvent(customEvent)
// Verify close handler was called
expect(closeHandler).toHaveBeenCalled()
})
})
testSocket(async (connOptions) => {
describe('PGLiteSocketServer', () => {
let db: PGlite
let server: PGLiteSocketServer
beforeEach(async () => {
// Create a PGlite instance for testing
db = await PGlite.create()
if (connOptions.path) {
if (existsSync(connOptions.path)) {
try {
await unlink(connOptions.path)
console.log(`Removed old socket at ${connOptions.path}`)
} catch (err) {
console.log('')
}
}
}
})
afterEach(async () => {
// Stop server if running
try {
await server?.stop()
} catch (e) {
// Ignore errors during cleanup
}
// Close database
await db.close()
})
it('should start and stop server', async () => {
// Create server
server = new PGLiteSocketServer({
db,
host: connOptions.host,
port: connOptions.port,
path: connOptions.path,
})
// Start server
await server.start()
// Try to connect to confirm server is running
let client
if (connOptions.path) {
// unix socket
client = createConnection({ path: connOptions.path })
} else {
if (connOptions.port) {
// TCP socket
client = createConnection({
port: connOptions.port,
host: connOptions.host,
})
} else {
throw new Error(
'need to specify connOptions.path or connOptions.port',
)
}
}
client.on('error', () => {
// Ignore connection errors during test
})
await new Promise<void>((resolve) => {
client.on('connect', () => {
client.end()
resolve()
})
// Set timeout to resolve in case connection fails
setTimeout(resolve, 100)
})
// Stop server
await server.stop()
// Try to connect again - should fail
await expect(
new Promise<void>((resolve, reject) => {
let failClient
if (connOptions.path) {
// unix socket
failClient = createConnection({ path: connOptions.path })
} else {
if (connOptions.port) {
// TCP socket
failClient = createConnection({
port: connOptions.port,
host: connOptions.host,
})
} else {
throw new Error(
'need to specify connOptions.path or connOptions.port',
)
}
}
failClient.on('error', () => {
// Expected error - connection should fail
resolve()
})
failClient.on('connect', () => {
failClient.end()
reject(new Error('Connection should have failed'))
})
// Set timeout to resolve in case no events fire
setTimeout(resolve, 100)
}),
).resolves.not.toThrow()
})
describe('Connection multiplexing', () => {
beforeEach(() => {
// Create a server for testing
server = new PGLiteSocketServer({
db,
host: connOptions.host,
port: connOptions.port,
path: connOptions.path,
maxConnections: 100,
})
})
it('should create a handler for a new connection', async () => {
await server.start()
// Create mock socket
const socket1 = createMockSocket()
// Setup event listener
const connectionHandler = vi.fn()
server.addEventListener('connection', connectionHandler)
// Handle connection
await (server as any).handleConnection(socket1)
// Verify handler was created and tracked
expect((server as any).handlers.size).toBe(1)
expect(connectionHandler).toHaveBeenCalled()
})
it('should handle multiple simultaneous connections', async () => {
await server.start()
// Setup event listeners
const connectionHandler = vi.fn()
server.addEventListener('connection', connectionHandler)
// Create mock sockets
const socket1 = createMockSocket()
const socket2 = createMockSocket()
const socket3 = createMockSocket()
// Handle connections - all should be accepted simultaneously
await (server as any).handleConnection(socket1)
await (server as any).handleConnection(socket2)
await (server as any).handleConnection(socket3)
// All three sockets should have handlers (multiplexed)
expect((server as any).handlers.size).toBe(3)
expect(connectionHandler).toHaveBeenCalledTimes(3)
// None should be closed - they're all active
expect(socket1.end).not.toHaveBeenCalled()
expect(socket2.end).not.toHaveBeenCalled()
expect(socket3.end).not.toHaveBeenCalled()
})
it('should remove handler when connection closes', async () => {
await server.start()
// Create mock sockets
const socket1 = createMockSocket()
const socket2 = createMockSocket()
// Handle connections
await (server as any).handleConnection(socket1)
await (server as any).handleConnection(socket2)
// Both should be tracked
expect((server as any).handlers.size).toBe(2)
// Get the first handler and simulate close
const handlers = Array.from((server as any).handlers)
const handler1 = handlers[0] as PGLiteSocketHandler
handler1.dispatchEvent(new CustomEvent('close'))
// First handler should be removed, second still active
expect((server as any).handlers.size).toBe(1)
})
it('should reject connections when max connections reached', async () => {
// Create server with low max connections
server = new PGLiteSocketServer({
db,
host: connOptions.host,
port: connOptions.port,
path: connOptions.path,
maxConnections: 2,
})
await server.start()
// Create mock sockets
const socket1 = createMockSocket()
const socket2 = createMockSocket()
const socket3 = createMockSocket()
// Handle first two connections - should succeed
await (server as any).handleConnection(socket1)
await (server as any).handleConnection(socket2)
expect((server as any).handlers.size).toBe(2)
// Third connection should be rejected
await (server as any).handleConnection(socket3)
// Third socket should be closed
expect(socket3.end).toHaveBeenCalled()
expect((server as any).handlers.size).toBe(2)
})
it('should provide stats about active connections', async () => {
await server.start()
// Create mock sockets
const socket1 = createMockSocket()
const socket2 = createMockSocket()
// Check initial stats (maxConnections is set to 100 in beforeEach)
let stats = server.getStats()
expect(stats.activeConnections).toBe(0)
expect(stats.maxConnections).toBe(100)
// Handle connections
await (server as any).handleConnection(socket1)
await (server as any).handleConnection(socket2)
// Check updated stats
stats = server.getStats()
expect(stats.activeConnections).toBe(2)
})
it('should clean up all handlers when stopping the server', async () => {
await server.start()
// Create mock sockets
const socket1 = createMockSocket()
const socket2 = createMockSocket()
const socket3 = createMockSocket()
// Handle connections
await (server as any).handleConnection(socket1)
await (server as any).handleConnection(socket2)
await (server as any).handleConnection(socket3)
expect((server as any).handlers.size).toBe(3)
// Stop the server
await server.stop()
// All connections should be closed
expect(socket1.end).toHaveBeenCalled()
expect(socket2.end).toHaveBeenCalled()
expect(socket3.end).toHaveBeenCalled()
// Handlers should be cleared
expect((server as any).handlers.size).toBe(0)
})
it('should start server with OS-assigned port when port is 0', async () => {
server = new PGLiteSocketServer({
db,
host: connOptions.host,
port: 0, // Let OS assign port
})
await server.start()
const assignedPort = (server as any).port
expect(assignedPort).toBeGreaterThan(1024)
// Try to connect to confirm server is running
const client = createConnection({
port: assignedPort,
host: connOptions.host,
})
await new Promise<void>((resolve, reject) => {
client.on('error', () => {
reject(new Error('Connection should have failed'))
})
client.on('connect', () => {
client.end()
resolve()
})
setTimeout(resolve, 100)
})
await server.stop()
})
})
})
})

View File

@@ -0,0 +1,970 @@
import {
describe,
it,
expect,
beforeAll,
afterAll,
beforeEach,
afterEach,
} from 'vitest'
import { Client } from 'pg'
import { PGlite } from '@electric-sql/pglite'
import { PGLiteSocketServer } from '../src'
import { spawn, ChildProcess } from 'node:child_process'
import { fileURLToPath } from 'node:url'
import { dirname, join } from 'node:path'
import fs from 'fs'
const __filename = fileURLToPath(import.meta.url)
const __dirname = dirname(__filename)
/**
* Debug configuration for testing
*
* To test against a real PostgreSQL server:
* - Set DEBUG_TESTS=true as an environment variable
* - Optionally set DEBUG_TESTS_REAL_SERVER with a connection URL (defaults to localhost)
*
* Example:
* DEBUG_TESTS=true DEBUG_TESTS_REAL_SERVER=postgres://user:pass@host:port/db npm vitest ./tests/query-with-node-pg.test.ts
*/
const DEBUG_TESTS = process.env.DEBUG_TESTS === 'true'
const DEBUG_TESTS_REAL_SERVER =
process.env.DEBUG_TESTS_REAL_SERVER ||
'postgres://postgres:postgres@localhost:5432/postgres'
const TEST_PORT = 5434
describe(`PGLite Socket Server`, () => {
describe('with node-pg client', () => {
let db: PGlite
let server: PGLiteSocketServer
let client: typeof Client.prototype
let connectionConfig: any
beforeAll(async () => {
if (DEBUG_TESTS) {
console.log('TESTING WITH REAL POSTGRESQL SERVER')
console.log(`Connection URL: ${DEBUG_TESTS_REAL_SERVER}`)
} else {
console.log('TESTING WITH PGLITE SERVER')
// Create a PGlite instance
db = await PGlite.create()
// Wait for database to be ready
await db.waitReady
console.log('PGLite database ready')
// Create and start the server with explicit host
server = new PGLiteSocketServer({
db,
port: TEST_PORT,
host: '127.0.0.1',
maxConnections: 100,
})
// Add event listeners for debugging
server.addEventListener('error', (event) => {
console.error('Socket server error:', (event as CustomEvent).detail)
})
server.addEventListener('connection', (event) => {
console.log(
'Socket connection received:',
(event as CustomEvent).detail,
)
})
await server.start()
console.log(`PGLite Socket Server started on port ${TEST_PORT}`)
connectionConfig = {
host: '127.0.0.1',
port: TEST_PORT,
database: 'postgres',
user: 'postgres',
password: 'postgres',
// Connection timeout in milliseconds
connectionTimeoutMillis: 10000,
// Query timeout in milliseconds
statement_timeout: 5000,
}
}
})
afterAll(async () => {
if (!DEBUG_TESTS) {
// Stop server if running
if (server) {
await server.stop()
console.log('PGLite Socket Server stopped')
}
// Close database
if (db) {
await db.close()
console.log('PGLite database closed')
}
}
})
beforeEach(async () => {
// Create pg client instance before each test
if (DEBUG_TESTS) {
// Direct connection to real PostgreSQL server using URL
client = new Client({
connectionString: DEBUG_TESTS_REAL_SERVER,
connectionTimeoutMillis: 10000,
statement_timeout: 5000,
})
} else {
// Connection to PGLite Socket Server
client = new Client(connectionConfig)
}
// Connect the client
await client.connect()
})
afterEach(async () => {
// Clean up any tables created in tests
try {
await client.query('DROP TABLE IF EXISTS test_users')
} catch (e) {
console.error('Error cleaning up tables:', e)
}
// Disconnect the client after each test
if (client) {
await client.end()
}
})
it('should execute a basic SELECT query', async () => {
const result = await client.query('SELECT 1 as one')
expect(result.rows[0].one).toBe(1)
})
it('should create a table', async () => {
await client.query(`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`)
// Verify table exists by querying the schema
const tableCheck = await client.query(`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = 'test_users'
`)
expect(tableCheck.rows.length).toBe(1)
expect(tableCheck.rows[0].table_name).toBe('test_users')
})
it('should insert rows into a table', async () => {
// Create table
await client.query(`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT
)
`)
// Insert data
const insertResult = await client.query(`
INSERT INTO test_users (name, email)
VALUES
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com')
RETURNING *
`)
expect(insertResult.rows.length).toBe(2)
expect(insertResult.rows[0].name).toBe('Alice')
expect(insertResult.rows[1].name).toBe('Bob')
// Verify data is there
const count = await client.query(
'SELECT COUNT(*)::int as count FROM test_users',
)
expect(count.rows[0].count).toBe(2)
})
it('should update rows in a table', async () => {
// Create and populate table
await client.query(`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT
)
`)
await client.query(`
INSERT INTO test_users (name, email)
VALUES ('Alice', 'alice@example.com')
`)
// Update
const updateResult = await client.query(`
UPDATE test_users
SET email = 'alice.new@example.com'
WHERE name = 'Alice'
RETURNING *
`)
expect(updateResult.rows.length).toBe(1)
expect(updateResult.rows[0].email).toBe('alice.new@example.com')
})
it('should delete rows from a table', async () => {
// Create and populate table
await client.query(`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT
)
`)
await client.query(`
INSERT INTO test_users (name, email)
VALUES
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com')
`)
// Delete
const deleteResult = await client.query(`
DELETE FROM test_users
WHERE name = 'Alice'
RETURNING *
`)
expect(deleteResult.rows.length).toBe(1)
expect(deleteResult.rows[0].name).toBe('Alice')
// Verify only Bob remains
const remaining = await client.query('SELECT * FROM test_users')
expect(remaining.rows.length).toBe(1)
expect(remaining.rows[0].name).toBe('Bob')
})
it('should execute operations in a transaction', async () => {
// Create table
await client.query(`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
balance INTEGER DEFAULT 0
)
`)
// Insert initial data
await client.query(`
INSERT INTO test_users (name, balance)
VALUES ('Alice', 100), ('Bob', 50)
`)
// Start a transaction and perform operations
await client.query('BEGIN')
try {
// Deduct from Alice
await client.query(`
UPDATE test_users
SET balance = balance - 30
WHERE name = 'Alice'
`)
// Add to Bob
await client.query(`
UPDATE test_users
SET balance = balance + 30
WHERE name = 'Bob'
`)
// Commit the transaction
await client.query('COMMIT')
} catch (error) {
// Rollback on error
await client.query('ROLLBACK')
throw error
}
// Verify both operations succeeded
const users = await client.query(
'SELECT name, balance FROM test_users ORDER BY name',
)
expect(users.rows.length).toBe(2)
expect(users.rows[0].name).toBe('Alice')
expect(users.rows[0].balance).toBe(70)
expect(users.rows[1].name).toBe('Bob')
expect(users.rows[1].balance).toBe(80)
})
it('should rollback a transaction on ROLLBACK', async () => {
// Create table
await client.query(`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
balance INTEGER DEFAULT 0
)
`)
// Insert initial data
await client.query(`
INSERT INTO test_users (name, balance)
VALUES ('Alice', 100), ('Bob', 50)
`)
// Get initial balance
const initialResult = await client.query(`
SELECT balance FROM test_users WHERE name = 'Alice'
`)
const initialBalance = initialResult.rows[0].balance
// Start a transaction
await client.query('BEGIN')
try {
// Deduct from Alice
await client.query(`
UPDATE test_users
SET balance = balance - 30
WHERE name = 'Alice'
`)
// Verify balance is changed within transaction
const midResult = await client.query(`
SELECT balance FROM test_users WHERE name = 'Alice'
`)
expect(midResult.rows[0].balance).toBe(70)
// Explicitly roll back (cancel) the transaction
await client.query('ROLLBACK')
} catch (error) {
await client.query('ROLLBACK')
throw error
}
// Verify balance wasn't changed after rollback
const finalResult = await client.query(`
SELECT balance FROM test_users WHERE name = 'Alice'
`)
expect(finalResult.rows[0].balance).toBe(initialBalance)
})
it('should rollback a transaction on error', async () => {
// Create table
await client.query(`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
balance INTEGER DEFAULT 0
)
`)
// Insert initial data
await client.query(`
INSERT INTO test_users (name, balance)
VALUES ('Alice', 100), ('Bob', 50)
`)
try {
// Start a transaction
await client.query('BEGIN')
// Deduct from Alice
await client.query(`
UPDATE test_users
SET balance = balance - 30
WHERE name = 'Alice'
`)
// This will trigger an error
await client.query(`
UPDATE test_users_nonexistent
SET balance = balance + 30
WHERE name = 'Bob'
`)
// Should never get here
await client.query('COMMIT')
} catch (error) {
// Expected to fail - rollback transaction
await client.query('ROLLBACK').catch(() => {
// If the client connection is in a bad state, we just ignore
// the rollback error
})
}
// Verify Alice's balance was not changed due to rollback
const users = await client.query(
'SELECT name, balance FROM test_users ORDER BY name',
)
expect(users.rows.length).toBe(2)
expect(users.rows[0].name).toBe('Alice')
expect(users.rows[0].balance).toBe(100) // Should remain 100 after rollback
})
it('should handle a syntax error', async () => {
// Expect syntax error
let errorMessage = ''
try {
await client.query('THIS IS NOT VALID SQL;')
} catch (error) {
errorMessage = (error as Error).message
}
expect(errorMessage).not.toBe('')
expect(errorMessage.toLowerCase()).toContain('syntax error')
})
it('should support cursor-based pagination', async () => {
// Create a test table with many rows
await client.query(`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
value INTEGER
)
`)
// Insert 100 rows using generate_series (server-side generation)
await client.query(`
INSERT INTO test_users (name, value)
SELECT
'User ' || i as name,
i as value
FROM generate_series(1, 100) as i
`)
// Use a cursor to read data in smaller chunks
const chunkSize = 10
let results: any[] = []
let page = 0
try {
// Begin transaction
await client.query('BEGIN')
// Declare a cursor
await client.query(
'DECLARE user_cursor CURSOR FOR SELECT * FROM test_users ORDER BY id',
)
let hasMoreData = true
while (hasMoreData) {
// Fetch a batch of results
const chunk = await client.query('FETCH 10 FROM user_cursor')
// If no rows returned, we're done
if (chunk.rows.length === 0) {
hasMoreData = false
continue
}
// Process this chunk
page++
// Add to our results array
results = [...results, ...chunk.rows]
// Verify each chunk has correct data (except possibly the last one)
if (chunk.rows.length === chunkSize) {
expect(chunk.rows.length).toBe(chunkSize)
expect(chunk.rows[0].id).toBe((page - 1) * chunkSize + 1)
}
}
// Close the cursor
await client.query('CLOSE user_cursor')
// Commit transaction
await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
}
// Verify we got all 100 records
expect(results.length).toBe(100)
expect(results[0].name).toBe('User 1')
expect(results[99].name).toBe('User 100')
// Verify we received the expected number of pages
expect(page).toBe(Math.ceil(100 / chunkSize))
})
it('should support LISTEN/NOTIFY for pub/sub messaging', async () => {
// Set up listener for notifications
let receivedPayload = ''
const notificationReceived = new Promise<void>((resolve) => {
client.on('notification', (msg) => {
receivedPayload = msg.payload || ''
resolve()
})
})
// Start listening
await client.query('LISTEN test_channel')
// Small delay to ensure listener is set up
await new Promise((resolve) => setTimeout(resolve, 100))
// Send a notification
await client.query("NOTIFY test_channel, 'Hello from PGlite!'")
// Wait for the notification to be received with an appropriate timeout
const timeoutPromise = new Promise<void>((_, reject) => {
setTimeout(() => reject(new Error('Notification timeout')), 2000)
})
await Promise.race([notificationReceived, timeoutPromise]).catch(
(error) => {
console.error('Notification error:', error)
},
)
// Verify the notification was received with the correct payload
expect(receivedPayload).toBe('Hello from PGlite!')
})
it('should handle large queries that split across TCP packets', async () => {
// Create a table
await client.query(`CREATE TABLE test_users (id SERIAL, data TEXT)`)
// Generate >64KB payload to force TCP fragmentation
const largeData = 'x'.repeat(100_000) // 100KB string
// Insert large data
const result = await client.query(`
INSERT INTO test_users (data) VALUES ('${largeData}') RETURNING *
`)
expect(result.rows[0].data).toBe(largeData)
})
it('should handle concurrent clients with interleaved transaction and query', async () => {
// Create a second client connecting to the same server
let client2: typeof Client.prototype
if (DEBUG_TESTS) {
client2 = new Client({
connectionString: DEBUG_TESTS_REAL_SERVER,
connectionTimeoutMillis: 10000,
statement_timeout: 5000,
})
} else {
client2 = new Client(connectionConfig)
}
await client2.connect()
try {
// Client 1 starts a transaction (don't await yet)
const beginResult = await client.query('BEGIN')
// Client 2 makes a simple SELECT 1 query (don't await yet)
const selectPromise = client2.query('SELECT 999999 as one')
// Small delay to ensure SELECT is sent before ROLLBACK
await new Promise((r) => setTimeout(r, 10))
// Client 1 rolls back the transaction (don't await yet)
const rollbackResult = await client.query('ROLLBACK')
const selectResult = await selectPromise
// Verify results
expect(beginResult.command).toBe('BEGIN')
expect(selectResult.rows[0].one).toBe(999999)
expect(rollbackResult.command).toBe('ROLLBACK')
} finally {
await client2.end()
}
}, 30000)
it('should process pending queries when transaction owner disconnects', async () => {
// Create a second client connecting to the same server
let client2: typeof Client.prototype
if (DEBUG_TESTS) {
client2 = new Client({
connectionString: DEBUG_TESTS_REAL_SERVER,
connectionTimeoutMillis: 10000,
statement_timeout: 5000,
})
} else {
client2 = new Client(connectionConfig)
}
await client2.connect()
// Suppress the expected "Connection terminated unexpectedly" error
client2.on('error', () => {
// Expected when we destroy the connection
})
try {
// Client starts a transaction
const beginResult = await client2.query('BEGIN')
expect(beginResult.command).toBe('BEGIN')
// Client 2 sends a query (will be blocked because client is in transaction)
const selectPromise = client.query('SELECT 123456 as val')
// Small delay to ensure SELECT is enqueued
await new Promise((r) => setTimeout(r, 10))
// Client abruptly disconnects (simulating connection abort)
// This should trigger clearTransactionIfNeeded which rolls back
// the transaction and processes pending queries
;(client2 as any).connection.stream.destroy()
// Client 2's query should complete successfully after transaction is cleared
const selectResult = await selectPromise
expect(selectResult.rows[0].val).toBe(123456)
} catch {
expect(false, 'Should not happen')
}
}, 30000)
it('interleaved transactions should work', async () => {
const bob = client
// table that will be accessed by both clients
await client.query(`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`)
// Create a second bob connecting to the same server
let alice: typeof Client.prototype
if (DEBUG_TESTS) {
alice = new Client({
connectionString: DEBUG_TESTS_REAL_SERVER,
connectionTimeoutMillis: 10000,
statement_timeout: 5000,
})
} else {
alice = new Client(connectionConfig)
}
await alice.connect()
alice.on('error', () => {
// Suppress the expected "Connection terminated unexpectedly" error
})
// Client starts a transaction
const aliceBegin = await alice.query('BEGIN')
expect(aliceBegin.command).toBe('BEGIN')
// Client 2 begins its own transaction
const bobBegin = bob.query('BEGIN')
// Small delay to ensure client2.BEGIN is enqueued
await new Promise((r) => setTimeout(r, 10))
const aliceInsertPromise = alice.query(`
INSERT INTO test_users (name, email)
VALUES
('Alice', 'alice@example.com')
RETURNING *
`)
const bobInsertPromise = bob.query(`
INSERT INTO test_users (name, email)
VALUES
('Bob', 'bob@example.com')
RETURNING *
`)
// Small delay to ensure both inserts are enqueued
await new Promise((r) => setTimeout(r, 10))
// bob commits
const bobCommit = bob.query('COMMIT')
// alice rolls back
const aliceRollback = alice.query('ROLLBACK')
await Promise.all([
bobBegin,
aliceInsertPromise,
bobInsertPromise,
aliceRollback,
bobCommit,
])
// Verify only Bob was commited
const testUsers = await bob.query('SELECT * FROM test_users')
expect(testUsers.rows.length).toBe(1)
expect(testUsers.rows[0].name).toBe('Bob')
}, 30000)
it('interleaved transactions should work when one client crashes', async () => {
const bob = client
// table that will be accessed by both clients
await bob.query(`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`)
// Create a second client connecting to the same server
let alice: typeof Client.prototype
if (DEBUG_TESTS) {
alice = new Client({
connectionString: DEBUG_TESTS_REAL_SERVER,
connectionTimeoutMillis: 10000,
statement_timeout: 5000,
})
} else {
alice = new Client(connectionConfig)
}
await alice.connect()
// Suppress the expected "Connection terminated unexpectedly" error
alice.on('error', () => {
// Expected when we destroy the connection
})
try {
// alice starts a transaction
const aliceBegin = await alice.query('BEGIN')
expect(aliceBegin.command).toBe('BEGIN')
// bob begins its own transaction
const bobBegin = bob.query('BEGIN')
// Small delay to ensure client2.BEGIN is enqueued
await new Promise((r) => setTimeout(r, 10))
// alice inserts data
alice.query(`
INSERT INTO test_users (name, email)
VALUES
('Alice', 'alice@example.com')
RETURNING *
`)
// client inserts data
const bobInsert = bob.query(`
INSERT INTO test_users (name, email)
VALUES
('Bob', 'bob@example.com')
RETURNING *
`)
// Small delay to ensure both inserts are enqueued
await new Promise((r) => setTimeout(r, 10))
// Client2 abruptly disconnects (simulating connection abort)
// This should trigger clearTransactionIfNeeded which rolls back
// the transaction and processes pending queries
;(alice as any).connection.stream.destroy()
// bob commits
const bobCommit = bob.query('COMMIT')
await Promise.all([bobBegin, bobInsert, bobCommit])
// Verify only Bob was commited
const selectResult = await bob.query('SELECT * FROM test_users')
expect(selectResult.rows.length).toBe(1)
expect(selectResult.rows[0].name).toBe('Bob')
} catch {
// swallow
}
}, 30000)
})
describe('with extensions via CLI', () => {
const UNIX_SOCKET_DIR_PATH = `/tmp/${Date.now()}-${Math.random().toString(36).slice(2, 8)}`
fs.mkdirSync(UNIX_SOCKET_DIR_PATH)
const UNIX_SOCKET_PATH = `${UNIX_SOCKET_DIR_PATH}/.s.PGSQL.5432`
let serverProcess: ChildProcess | null = null
let client: typeof Client.prototype
beforeAll(async () => {
// Start the server with extensions via CLI using tsx for dev or node for dist
const serverScript = join(__dirname, '../src/scripts/server.ts')
serverProcess = spawn(
'npx',
[
'tsx',
serverScript,
'--path',
UNIX_SOCKET_PATH,
'--extensions',
'vector,pg_uuidv7,@electric-sql/pglite/pg_hashids:pg_hashids',
],
{
stdio: ['ignore', 'pipe', 'pipe'],
},
)
// Wait for server to be ready by checking for "listening" message
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Server startup timeout'))
}, 30000)
const onData = (data: Buffer) => {
const output = data.toString()
if (output.includes('listening')) {
clearTimeout(timeout)
resolve()
}
}
serverProcess!.stdout?.on('data', onData)
serverProcess!.stderr?.on('data', (data) => {
console.error('Server stderr:', data.toString())
})
serverProcess!.on('error', (err) => {
clearTimeout(timeout)
reject(err)
})
serverProcess!.on('exit', (code) => {
if (code !== 0 && code !== null) {
clearTimeout(timeout)
reject(new Error(`Server exited with code ${code}`))
}
})
})
console.log('Server with extensions started')
client = new Client({
host: UNIX_SOCKET_DIR_PATH,
database: 'postgres',
user: 'postgres',
password: 'postgres',
connectionTimeoutMillis: 10000,
})
await client.connect()
})
afterAll(async () => {
if (client) {
await client.end().catch(() => {})
}
if (serverProcess) {
serverProcess.kill('SIGTERM')
await new Promise<void>((resolve) => {
serverProcess!.on('exit', () => resolve())
setTimeout(resolve, 2000) // Force resolve after 2s
})
}
})
it('should load and use vector extension', async () => {
// Create the extension
await client.query('CREATE EXTENSION IF NOT EXISTS vector')
// Verify extension is loaded
const extCheck = await client.query(`
SELECT extname FROM pg_extension WHERE extname = 'vector'
`)
expect(extCheck.rows).toHaveLength(1)
expect(extCheck.rows[0].extname).toBe('vector')
// Create a table with vector column
await client.query(`
CREATE TABLE test_vectors (
id SERIAL PRIMARY KEY,
name TEXT,
vec vector(3)
)
`)
// Insert test data
await client.query(`
INSERT INTO test_vectors (name, vec) VALUES
('test1', '[1,2,3]'),
('test2', '[4,5,6]'),
('test3', '[7,8,9]')
`)
// Query with vector distance
const result = await client.query(`
SELECT name, vec, vec <-> '[3,1,2]' AS distance
FROM test_vectors
ORDER BY distance
`)
expect(result.rows).toHaveLength(3)
expect(result.rows[0].name).toBe('test1')
expect(result.rows[0].vec).toBe('[1,2,3]')
expect(parseFloat(result.rows[0].distance)).toBeCloseTo(2.449, 2)
})
it('should load and use pg_uuidv7 extension', async () => {
// Create the extension
await client.query('CREATE EXTENSION IF NOT EXISTS pg_uuidv7')
// Verify extension is loaded
const extCheck = await client.query(`
SELECT extname FROM pg_extension WHERE extname = 'pg_uuidv7'
`)
expect(extCheck.rows).toHaveLength(1)
expect(extCheck.rows[0].extname).toBe('pg_uuidv7')
// Generate a UUIDv7
const result = await client.query('SELECT uuid_generate_v7() as uuid')
expect(result.rows[0].uuid).toHaveLength(36)
// Test uuid_v7_to_timestamptz function
const tsResult = await client.query(`
SELECT uuid_v7_to_timestamptz('018570bb-4a7d-7c7e-8df4-6d47afd8c8fc') as ts
`)
const timestamp = new Date(tsResult.rows[0].ts)
expect(timestamp.toISOString()).toBe('2023-01-02T04:26:40.637Z')
})
it('should load and use pg_hashids extension from npm package path', async () => {
// Create the extension
await client.query('CREATE EXTENSION IF NOT EXISTS pg_hashids')
// Verify extension is loaded
const extCheck = await client.query(`
SELECT extname FROM pg_extension WHERE extname = 'pg_hashids'
`)
expect(extCheck.rows).toHaveLength(1)
expect(extCheck.rows[0].extname).toBe('pg_hashids')
// Test id_encode function
const result = await client.query(`
SELECT id_encode(1234567, 'salt', 10, 'abcdefghijABCDEFGHIJ1234567890') as hash
`)
expect(result.rows[0].hash).toBeTruthy()
expect(typeof result.rows[0].hash).toBe('string')
// Test id_decode function (round-trip)
const hash = result.rows[0].hash
const decodeResult = await client.query(`
SELECT id_decode('${hash}', 'salt', 10, 'abcdefghijABCDEFGHIJ1234567890') as id
`)
expect(decodeResult.rows[0].id[0]).toBe('1234567')
})
})
})

View File

@@ -0,0 +1,678 @@
import {
describe,
it,
expect,
beforeAll,
afterAll,
beforeEach,
afterEach,
} from 'vitest'
import postgres from 'postgres'
import { PGlite } from '@electric-sql/pglite'
import { PGLiteSocketServer } from '../src'
import { spawn, ChildProcess } from 'node:child_process'
import { fileURLToPath } from 'node:url'
import { dirname, join } from 'node:path'
import fs from 'fs'
const __filename = fileURLToPath(import.meta.url)
const __dirname = dirname(__filename)
/**
* Debug configuration for testing
*
* To test against a real PostgreSQL server:
* - Set DEBUG_TESTS=true as an environment variable
* - Optionally set DEBUG_TESTS_REAL_SERVER with a connection URL (defaults to localhost)
*
* Example:
* DEBUG_TESTS=true DEBUG_TESTS_REAL_SERVER=postgres://user:pass@host:port/db npm vitest ./tests/query-with-postgres-js.test.ts
*/
const DEBUG_LOCAL = process.env.DEBUG_LOCAL === 'true'
const DEBUG_TESTS = process.env.DEBUG_TESTS === 'true'
const DEBUG_TESTS_REAL_SERVER =
process.env.DEBUG_TESTS_REAL_SERVER ||
'postgres://postgres:postgres@localhost:5432/postgres'
const TEST_PORT = 5434
describe(`PGLite Socket Server`, () => {
describe('with postgres.js client', () => {
let db: PGlite
let server: PGLiteSocketServer
let sql: ReturnType<typeof postgres>
let connectionConfig: any
beforeAll(async () => {
if (DEBUG_TESTS) {
console.log('TESTING WITH REAL POSTGRESQL SERVER')
console.log(`Connection URL: ${DEBUG_TESTS_REAL_SERVER}`)
} else {
console.log('TESTING WITH PGLITE SERVER')
// Create a PGlite instance
if (DEBUG_LOCAL) db = await PGlite.create({ debug: '1' })
else db = await PGlite.create()
// Wait for database to be ready
await db.waitReady
console.log('PGLite database ready')
// Create and start the server with explicit host
server = new PGLiteSocketServer({
db,
port: TEST_PORT,
host: '127.0.0.1',
inspect: DEBUG_TESTS || DEBUG_LOCAL,
})
// Add event listeners for debugging
server.addEventListener('error', (event) => {
console.error('Socket server error:', (event as CustomEvent).detail)
})
server.addEventListener('connection', (event) => {
console.log(
'Socket connection received:',
(event as CustomEvent).detail,
)
})
await server.start()
console.log(`PGLite Socket Server started on port ${TEST_PORT}`)
connectionConfig = {
host: '127.0.0.1',
port: TEST_PORT,
database: 'postgres',
username: 'postgres',
password: 'postgres',
idle_timeout: 5,
connect_timeout: 10,
max: 1,
}
}
})
afterAll(async () => {
if (!DEBUG_TESTS) {
// Stop server if running
if (server) {
await server.stop()
console.log('PGLite Socket Server stopped')
}
// Close database
if (db) {
await db.close()
console.log('PGLite database closed')
}
}
})
beforeEach(() => {
// Create a postgres client instance before each test
if (DEBUG_TESTS) {
// Direct connection to real PostgreSQL server using URL
sql = postgres(DEBUG_TESTS_REAL_SERVER, {
idle_timeout: 5,
connect_timeout: 10,
max: 1,
})
} else {
// Connection to PGLite Socket Server
sql = postgres(connectionConfig)
}
})
afterEach(async () => {
// Clean up any tables created in tests
try {
await sql`DROP TABLE IF EXISTS test_users`
} catch (e) {
console.error('Error cleaning up tables:', e)
}
// Disconnect the client after each test
if (sql) {
await sql.end()
}
})
if (!DEBUG_LOCAL) {
it('should execute a basic SELECT query', async () => {
const result = await sql`SELECT 1 as one`
expect(result[0].one).toBe(1)
})
it('should create a table', async () => {
await sql`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`
// Verify table exists by querying the schema
const tableCheck = await sql`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = 'test_users'
`
expect(tableCheck.length).toBe(1)
expect(tableCheck[0].table_name).toBe('test_users')
})
it('should insert rows into a table', async () => {
// Create table
await sql`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT
)
`
// Insert data
const insertResult = await sql`
INSERT INTO test_users (name, email)
VALUES
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com')
RETURNING *
`
expect(insertResult.length).toBe(2)
expect(insertResult[0].name).toBe('Alice')
expect(insertResult[1].name).toBe('Bob')
// Verify data is there
const count = await sql`SELECT COUNT(*)::int as count FROM test_users`
expect(count[0].count).toBe(2)
})
it('should update rows in a table', async () => {
// Create and populate table
await sql`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT
)
`
await sql`
INSERT INTO test_users (name, email)
VALUES ('Alice', 'alice@example.com')
`
// Update
const updateResult = await sql`
UPDATE test_users
SET email = 'alice.new@example.com'
WHERE name = 'Alice'
RETURNING *
`
expect(updateResult.length).toBe(1)
expect(updateResult[0].email).toBe('alice.new@example.com')
})
it('should delete rows from a table', async () => {
// Create and populate table
await sql`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT
)
`
await sql`
INSERT INTO test_users (name, email)
VALUES
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com')
`
// Delete
const deleteResult = await sql`
DELETE FROM test_users
WHERE name = 'Alice'
RETURNING *
`
expect(deleteResult.length).toBe(1)
expect(deleteResult[0].name).toBe('Alice')
// Verify only Bob remains
const remaining = await sql`SELECT * FROM test_users`
expect(remaining.length).toBe(1)
expect(remaining[0].name).toBe('Bob')
})
it('should execute operations in a transaction', async () => {
// Create table
await sql`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
balance INTEGER DEFAULT 0
)
`
// Insert initial data
await sql`
INSERT INTO test_users (name, balance)
VALUES ('Alice', 100), ('Bob', 50)
`
// Start a transaction and perform operations
await sql.begin(async (tx) => {
// Deduct from Alice
await tx`
UPDATE test_users
SET balance = balance - 30
WHERE name = 'Alice'
`
// Add to Bob
await tx`
UPDATE test_users
SET balance = balance + 30
WHERE name = 'Bob'
`
})
// Verify both operations succeeded
const users =
await sql`SELECT name, balance FROM test_users ORDER BY name`
expect(users.length).toBe(2)
expect(users[0].name).toBe('Alice')
expect(users[0].balance).toBe(70)
expect(users[1].name).toBe('Bob')
expect(users[1].balance).toBe(80)
})
it('should rollback a transaction on ROLLBACK', async () => {
// Create table
await sql`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
balance INTEGER DEFAULT 0
)
`
// Insert initial data
await sql`
INSERT INTO test_users (name, balance)
VALUES ('Alice', 100), ('Bob', 50)
`
// Get initial balance
const initialResult = await sql`
SELECT balance FROM test_users WHERE name = 'Alice'
`
const initialBalance = initialResult[0].balance
// Start a transaction
await sql
.begin(async (tx) => {
// Deduct from Alice
await tx`
UPDATE test_users
SET balance = balance - 30
WHERE name = 'Alice'
`
// Verify balance is changed within transaction
const midResult = await tx`
SELECT balance FROM test_users WHERE name = 'Alice'
`
expect(midResult[0].balance).toBe(70)
// Explicitly roll back (cancel) the transaction
throw new Error('Triggering rollback')
})
.catch(() => {
// Expected error to trigger rollback
console.log('Transaction was rolled back as expected')
})
// Verify balance wasn't changed after rollback
const finalResult = await sql`
SELECT balance FROM test_users WHERE name = 'Alice'
`
expect(finalResult[0].balance).toBe(initialBalance)
})
it('should rollback a transaction on error', async () => {
// Create table
await sql`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
balance INTEGER DEFAULT 0
)
`
// Insert initial data
await sql`
INSERT INTO test_users (name, balance)
VALUES ('Alice', 100), ('Bob', 50)
`
// Start a transaction that will fail
try {
await sql.begin(async (tx) => {
// Deduct from Alice
await tx`
UPDATE test_users
SET balance = balance - 30
WHERE name = 'Alice'
`
// This will trigger an error
await tx`
UPDATE test_users_nonexistent
SET balance = balance + 30
WHERE name = 'Bob'
`
})
} catch (error) {
// Expected to fail
}
// Verify Alice's balance was not changed due to rollback
const users =
await sql`SELECT name, balance FROM test_users ORDER BY name`
expect(users.length).toBe(2)
expect(users[0].name).toBe('Alice')
expect(users[0].balance).toBe(100) // Should remain 100 after rollback
})
it('should handle a syntax error', async () => {
// Expect syntax error
let errorMessage = ''
try {
await sql`THIS IS NOT VALID SQL;`
} catch (error) {
errorMessage = (error as Error).message
}
expect(errorMessage).not.toBe('')
expect(errorMessage.toLowerCase()).toContain('syntax error')
})
it('should support cursor-based pagination', async () => {
// Create a test table with many rows
await sql`
CREATE TABLE test_users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
value INTEGER
)
`
// Insert 100 rows using generate_series (server-side generation)
await sql`
INSERT INTO test_users (name, value)
SELECT
'User ' || i as name,
i as value
FROM generate_series(1, 100) as i
`
// Use a cursor to read data in smaller chunks
const chunkSize = 10
let results: any[] = []
let page = 0
// Use a transaction for cursor operations (cursors must be in transactions)
await sql.begin(async (tx) => {
// Declare a cursor
await tx`DECLARE user_cursor CURSOR FOR SELECT * FROM test_users ORDER BY id`
let hasMoreData = true
while (hasMoreData) {
// Fetch a batch of results
const chunk = await tx`FETCH 10 FROM user_cursor`
// If no rows returned, we're done
if (chunk.length === 0) {
hasMoreData = false
continue
}
// Process this chunk
page++
// Add to our results array
results = [...results, ...chunk]
// Verify each chunk has correct data (except possibly the last one)
if (chunk.length === chunkSize) {
expect(chunk.length).toBe(chunkSize)
expect(chunk[0].id).toBe((page - 1) * chunkSize + 1)
}
}
// Close the cursor
await tx`CLOSE user_cursor`
})
// Verify we got all 100 records
expect(results.length).toBe(100)
expect(results[0].name).toBe('User 1')
expect(results[99].name).toBe('User 100')
// Verify we received the expected number of pages
expect(page).toBe(Math.ceil(100 / chunkSize))
})
} else {
it('should support LISTEN/NOTIFY for pub/sub messaging', async () => {
// Create a promise that will resolve when the notification is received
let receivedPayload = ''
const notificationPromise = new Promise<void>((resolve) => {
// Set up listener for the 'test_channel' notification
sql.listen('test_channel', (data) => {
receivedPayload = data
resolve()
})
})
// Small delay to ensure listener is set up
// await new Promise((resolve) => setTimeout(resolve, 100))
// Send a notification on the same connection
await sql`NOTIFY test_channel, 'Hello from PGlite!'`
// Wait for the notification to be received
await notificationPromise
// Verify the notification was received with the correct payload
expect(receivedPayload).toBe('Hello from PGlite!')
})
}
})
describe('with extensions via CLI', () => {
const UNIX_SOCKET_DIR_PATH = `/tmp/${Date.now().toString()}`
fs.mkdirSync(UNIX_SOCKET_DIR_PATH)
const UNIX_SOCKET_PATH = `${UNIX_SOCKET_DIR_PATH}/.s.PGSQL.5432`
let serverProcess: ChildProcess | null = null
let sql: ReturnType<typeof postgres>
beforeAll(async () => {
// Start the server with extensions via CLI using tsx for dev or node for dist
const serverScript = join(__dirname, '../src/scripts/server.ts')
serverProcess = spawn(
'npx',
[
'tsx',
serverScript,
'--path',
UNIX_SOCKET_PATH,
'--extensions',
'vector,pg_uuidv7,@electric-sql/pglite/pg_hashids:pg_hashids',
],
{
stdio: ['ignore', 'pipe', 'pipe'],
},
)
// Wait for server to be ready by checking for "listening" message
await new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error('Server startup timeout'))
}, 30000)
const onData = (data: Buffer) => {
const output = data.toString()
if (output.includes('listening')) {
clearTimeout(timeout)
resolve()
}
}
serverProcess!.stdout?.on('data', onData)
serverProcess!.stderr?.on('data', (data) => {
console.error('Server stderr:', data.toString())
})
serverProcess!.on('error', (err) => {
clearTimeout(timeout)
reject(err)
})
serverProcess!.on('exit', (code) => {
if (code !== 0 && code !== null) {
clearTimeout(timeout)
reject(new Error(`Server exited with code ${code}`))
}
})
})
console.log('Server with extensions started')
sql = postgres({
path: UNIX_SOCKET_PATH,
database: 'postgres',
username: 'postgres',
password: 'postgres',
idle_timeout: 5,
connect_timeout: 10,
max: 1,
})
})
afterAll(async () => {
if (sql) {
await sql.end().catch(() => {})
}
if (serverProcess) {
serverProcess.kill('SIGTERM')
await new Promise<void>((resolve) => {
serverProcess!.on('exit', () => resolve())
setTimeout(resolve, 2000) // Force resolve after 2s
})
}
})
it('should load and use vector extension', async () => {
// Create the extension
await sql`CREATE EXTENSION IF NOT EXISTS vector`
// Verify extension is loaded
const extCheck = await sql`
SELECT extname FROM pg_extension WHERE extname = 'vector'
`
expect(extCheck).toHaveLength(1)
expect(extCheck[0].extname).toBe('vector')
// Create a table with vector column
await sql`
CREATE TABLE test_vectors (
id SERIAL PRIMARY KEY,
name TEXT,
vec vector(3)
)
`
// Insert test data
await sql`
INSERT INTO test_vectors (name, vec) VALUES
('test1', '[1,2,3]'),
('test2', '[4,5,6]'),
('test3', '[7,8,9]')
`
// Query with vector distance
const result = await sql`
SELECT name, vec, vec <-> '[3,1,2]' AS distance
FROM test_vectors
ORDER BY distance
`
expect(result).toHaveLength(3)
expect(result[0].name).toBe('test1')
expect(result[0].vec).toBe('[1,2,3]')
expect(parseFloat(result[0].distance)).toBeCloseTo(2.449, 2)
})
it('should load and use pg_uuidv7 extension', async () => {
// Create the extension
await sql`CREATE EXTENSION IF NOT EXISTS pg_uuidv7`
// Verify extension is loaded
const extCheck = await sql`
SELECT extname FROM pg_extension WHERE extname = 'pg_uuidv7'
`
expect(extCheck).toHaveLength(1)
expect(extCheck[0].extname).toBe('pg_uuidv7')
// Generate a UUIDv7
const result = await sql`SELECT uuid_generate_v7() as uuid`
expect(result[0].uuid).toHaveLength(36)
// Test uuid_v7_to_timestamptz function
const tsResult = await sql`
SELECT uuid_v7_to_timestamptz('018570bb-4a7d-7c7e-8df4-6d47afd8c8fc') as ts
`
const timestamp = new Date(tsResult[0].ts)
expect(timestamp.toISOString()).toBe('2023-01-02T04:26:40.637Z')
})
it('should load and use pg_hashids extension from npm package path', async () => {
// Create the extension
await sql`CREATE EXTENSION IF NOT EXISTS pg_hashids`
// Verify extension is loaded
const extCheck = await sql`
SELECT extname FROM pg_extension WHERE extname = 'pg_hashids'
`
expect(extCheck).toHaveLength(1)
expect(extCheck[0].extname).toBe('pg_hashids')
// Test id_encode function
const result = await sql`
SELECT id_encode(1234567, 'salt', 10, 'abcdefghijABCDEFGHIJ1234567890') as hash
`
expect(result[0].hash).toBeTruthy()
expect(typeof result[0].hash).toBe('string')
// Test id_decode function (round-trip)
const hash = result[0].hash
const decodeResult = await sql`
SELECT id_decode(${hash}, 'salt', 10, 'abcdefghijABCDEFGHIJ1234567890') as id
`
expect(decodeResult[0].id[0]).toBe('1234567')
})
})
})

View File

@@ -0,0 +1,233 @@
import { describe, it, expect, afterEach } from 'vitest'
import { spawn, ChildProcess } from 'node:child_process'
import { createConnection } from 'net'
import path from 'node:path'
import { fileURLToPath } from 'node:url'
const __dirname = path.dirname(fileURLToPath(import.meta.url))
const serverScript = path.resolve(__dirname, '../src/scripts/server.ts')
// Helper to wait for a port to be available
async function waitForPort(port: number, timeout = 15000): Promise<boolean> {
const start = Date.now()
while (Date.now() - start < timeout) {
try {
const socket = createConnection({ port, host: '127.0.0.1' })
await new Promise<void>((resolve, reject) => {
socket.on('connect', () => {
socket.end()
resolve()
})
socket.on('error', reject)
})
return true
} catch {
await new Promise((resolve) => setTimeout(resolve, 100))
}
}
return false
}
describe('Server Script Tests', () => {
const TEST_PORT_BASE = 15500
let currentTestPort = TEST_PORT_BASE
// Get a unique port for each test
function getTestPort(): number {
return ++currentTestPort
}
describe('Help and Basic Functionality', () => {
it('should show help when --help flag is used', async () => {
const serverProcess = spawn('tsx', [serverScript, '--help'], {
stdio: ['pipe', 'pipe', 'pipe'],
})
let output = ''
serverProcess.stdout?.on('data', (data) => {
output += data.toString()
})
serverProcess.stderr?.on('data', (data) => {
console.error(data.toString())
})
await new Promise<void>((resolve) => {
serverProcess.on('exit', (code) => {
expect(code).toBe(0)
expect(output).toContain('PGlite Socket Server')
expect(output).toContain('Usage:')
expect(output).toContain('Options:')
expect(output).toContain('--db')
expect(output).toContain('--port')
expect(output).toContain('--host')
resolve()
})
})
}, 10000)
it('should accept and use debug level parameter', async () => {
const testPort = getTestPort()
const serverProcess = spawn(
'tsx',
[serverScript, '--port', testPort.toString(), '--debug', '2'],
{
stdio: ['pipe', 'pipe', 'pipe'],
},
)
let output = ''
serverProcess.stdout?.on('data', (data) => {
output += data.toString()
})
serverProcess.stderr?.on('data', (data) => {
console.error(data.toString())
})
// Wait for server to start
await waitForPort(testPort)
// Kill the server
serverProcess.kill('SIGTERM')
await new Promise<void>((resolve) => {
serverProcess.on('exit', () => {
expect(output).toContain('Debug level: 2')
resolve()
})
})
}, 10000)
})
describe('Server Startup and Connectivity', () => {
let serverProcess: ChildProcess | null = null
afterEach(async () => {
if (serverProcess) {
serverProcess.kill('SIGTERM')
await new Promise<void>((resolve) => {
if (serverProcess) {
serverProcess.on('exit', () => resolve())
} else {
resolve()
}
})
serverProcess = null
}
})
it('should start server on TCP port and accept connections', async () => {
const testPort = getTestPort()
serverProcess = spawn(
'tsx',
[serverScript, '--port', testPort.toString()],
{
stdio: ['pipe', 'pipe', 'pipe'],
},
)
let output = ''
serverProcess.stdout?.on('data', (data) => {
output += data.toString()
})
serverProcess.stderr?.on('data', (data) => {
console.error(data.toString())
})
// Wait for server to be ready
const isReady = await waitForPort(testPort)
expect(isReady).toBe(true)
// Check that we can connect
const socket = createConnection({ port: testPort, host: '127.0.0.1' })
await new Promise<void>((resolve, reject) => {
socket.on('connect', resolve)
socket.on('error', reject)
setTimeout(() => reject(new Error('Connection timeout')), 3000)
})
socket.end()
expect(output).toContain('PGlite database initialized')
expect(output).toContain(`"port":${testPort}`)
}, 10000)
it('should work with memory database', async () => {
const testPort = getTestPort()
serverProcess = spawn(
'tsx',
[serverScript, '--port', testPort.toString(), '--db', 'memory://'],
{
stdio: ['pipe', 'pipe', 'pipe'],
},
)
let output = ''
serverProcess.stdout?.on('data', (data) => {
output += data.toString()
})
serverProcess.stderr?.on('data', (data) => {
console.error(data.toString())
})
const isReady = await waitForPort(testPort)
expect(isReady).toBe(true)
expect(output).toContain('Initializing PGLite with database: memory://')
}, 10000)
})
describe('Configuration Options', () => {
let serverProcess: ChildProcess | null = null
afterEach(async () => {
if (serverProcess) {
serverProcess.kill('SIGTERM')
await new Promise<void>((resolve) => {
if (serverProcess) {
serverProcess.on('exit', () => resolve())
} else {
resolve()
}
})
serverProcess = null
}
})
it('should handle different hosts', async () => {
const testPort = getTestPort()
serverProcess = spawn(
'tsx',
[serverScript, '--port', testPort.toString(), '--host', '0.0.0.0'],
{
stdio: ['pipe', 'pipe', 'pipe'],
},
)
let output = ''
serverProcess.stdout?.on('data', (data) => {
output += data.toString()
})
serverProcess.stderr?.on('data', (data) => {
console.error(data.toString())
})
const isReady = await waitForPort(testPort)
expect(isReady).toBe(true)
serverProcess.kill()
await new Promise<void>((resolve) => {
serverProcess.on('exit', () => {
expect(output).toContain(`"host":"0.0.0.0"`)
serverProcess = null
resolve()
})
})
}, 10000)
})
})

10
node_modules/@electric-sql/pglite-socket/tsconfig.json generated vendored Normal file
View File

@@ -0,0 +1,10 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"types": [
"@types/emscripten",
"node"
]
},
"include": ["src", "examples", "tsup.config.ts", "vitest.config.ts"]
}

View File

@@ -0,0 +1,20 @@
import { defineConfig } from 'tsup'
const entryPoints = ['src/index.ts', 'src/scripts/server.ts']
const minify = process.env.DEBUG === 'true' ? false : true
export default defineConfig([
{
entry: entryPoints,
sourcemap: true,
dts: {
entry: entryPoints,
resolve: true,
},
clean: true,
minify: minify,
shims: true,
format: ['esm', 'cjs'],
},
])

View File

@@ -0,0 +1,16 @@
import { defineConfig } from 'vitest/config'
export default defineConfig({
test: {
name: 'integration tests',
globals: true,
typecheck: { enabled: true },
environment: 'node',
testTimeout: 30000,
watch: false,
dir: './tests',
maxWorkers: 1,
fileParallelism: false,
maxConcurrency: 1 // because we are running a TCP server on a port
},
})