Neo4j-Compatible Streaming Driver + Multi-Language ORM Plan¶
Objective¶
Add true incremental result delivery to NornicDB while preserving the current Neo4j-compatible HTTP and Bolt behavior.
The end state is:
- Bolt
RUNreturns fields quickly andPULL ndrains rows from a live stream instead of a fully materialized result. - HTTP has a new streaming endpoint for non-Bolt clients.
- Existing
/db/{db}/tx/commit, existing Bolt clients, and existing Cypher callers keep working unchanged. - Go, Rust, and TypeScript users get first-party client/ORM packages on top of the same streaming contracts.
Reality Check¶
These constraints come from the current code and from Neo4j/Bolt protocol behavior.
-
Bolt cannot stream the query text itself. The client sends a complete framed
RUNmessage. NornicDB can start execution immediately after decoding that message, but it cannot start before the fullRUNframe arrives. -
Neo4j HTTP
/tx/commitshould remain materialized. It returns a single JSON transaction response. True HTTP streaming needs a new endpoint. Changing/tx/committo stream would break compatibility. -
Streaming all Cypher shapes immediately is not realistic.
ORDER BY, globalDISTINCT, aggregation,collect, some subqueries, writes with returned rows, and many APPLY shapes require buffering or commit-time guarantees. These must remain materialized until each shape gets a proven streaming operator. -
Read-only scans are feasible now. Storage already has
storage.StreamingEngine,StreamNodes,StreamEdges,StreamNodeChunks,PrefixStreamingEngine, andLabelNodeIDLookupEngine. The executor currently uses those only to collect bounded slices, not to expose rows incrementally. -
Fabric already has a partial iterator layer.
pkg/fabric/result.godefinesRowIterator, bounded prefetch, concat, distinct, and row-view helpers.pkg/fabric/executor.gohasexecuteRows, but top-level Fabric execution still returnsResultStream{Rows [][]interface{}}, and local/remote fragments still materialize through Cypher or remote transport APIs.
Current Code Facts¶
Materialized Result Boundary¶
pkg/cypher/types.godefinesExecuteResultwithRows [][]interface{}.pkg/cypher/executor.goexposesStorageExecutor.Execute(ctx, query, params) (*ExecuteResult, error).- Most Cypher files append to
result.Rows; streaming must be added as an alternate path, not by rewriting every executor first.
Storage Streaming Exists¶
pkg/storage/types.godefinesStreamingEngineand fallback helpers.pkg/storage/label_nodeid_lookup.godefines label-ID streaming helpers.pkg/cypher/match_multi.gohascollectNodesWithStreaming, but it still returns[]*storage.Node.
Bolt Is Protocol-Streaming, Not Execution-Streaming Yet¶
pkg/bolt/server.gostoreslastResult *QueryResultandresultIndexonSession.handleRunexecutes the full query before sendingSUCCESSwith fields.handlePullsliceslastResult.Rowsand writes records, so fetch size only controls network batching, not executor memory.
HTTP Is Fully Materialized¶
pkg/server/server_db.gohandles/db/{db}/tx/commitby building aTransactionResponsein memory.appendStatementResultconverts allExecuteResult.Rowsinto allQueryResult.Datarows before writing JSON.
Fabric Has Useful Building Blocks But Still Materializes At Boundaries¶
pkg/fabric/result.gohasRowIteratorand bounded prefetch.pkg/fabric/local_executor.goexposesExecuteRows, but it wraps materializedExecuteWithRecord.pkg/fabric/remote_executor.goexposesExecuteRows, but it wraps materialized remote results.pkg/cypher/executor_fabric.gocallsfabricExecutor.Executeand converts the returnedResultStreamtoExecuteResult.
Architecture Decisions¶
1. Add A Shared Row Stream Package¶
Do not put the base stream interface in pkg/cypher; pkg/fabric intentionally avoids importing pkg/cypher to prevent cycles. Add a small shared package:
Files:
pkg/rowstream/rowstream.gopkg/rowstream/materialized.gopkg/rowstream/channel.gopkg/rowstream/rowstream_test.go
Target API:
package rowstream
type Row struct {
Values []interface{}
}
type Summary struct {
Stats map[string]int64
Metadata map[string]interface{}
}
type RowStream interface {
Columns() []string
Next(ctx context.Context) (Row, error) // io.EOF on completion
Summary() Summary // valid after EOF or Close
Close() error
}
Adapters:
FromRows(columns []string, rows [][]interface{}, summary Summary) RowStreamDrain(ctx context.Context, stream RowStream, maxRows int) (columns []string, rows [][]interface{}, summary Summary, err error)NewChannelStream(ctx context.Context, columns []string, buffer int, produce func(context.Context, EmitFunc) Summary) RowStream
Reasoning:
- Cypher, Fabric, Bolt, Server, and RemoteEngine can all import this without cycles.
- Existing materialized paths can be adapted immediately.
- Channel streams give bounded backpressure when storage APIs are callback-based.
2. Add Streaming Beside Existing APIs¶
Keep existing public APIs stable first:
- Keep
StorageExecutor.Executeunchanged. - Add
StorageExecutor.ExecuteStreamin a new file. - Keep
bolt.QueryExecutor.Executeunchanged. - Add optional
bolt.StreamingQueryExecutor. - Keep
fabric.ResultStreamunchanged initially. - Add Fabric streaming methods and bridge them back to
ResultStreamwhere old callers still need it.
This gives a safe migration path and keeps tests from exploding in the first PR.
3. Stream Read-Only Shapes First¶
Initial streaming-eligible shapes:
MATCH (n) RETURN nMATCH (n:Label) RETURN nMATCH (n) WHERE <simple node predicate> RETURN <simple projection>MATCH ... RETURN ... SKIP ... LIMIT ...when no ordering/aggregation is requiredUNWIND $rows AS row RETURN rowand simple projections from parameter listsCALL db.labels()style catalog procedures only after their procedure implementation is iterator-backed
Materialized fallback remains required for:
ORDER BYwithout a bounded top-k optimization- global
DISTINCT - aggregation and
collect - variable-length traversal returning paths
OPTIONAL MATCHshapes not proven row-by-row- writes that return rows
- mixed read/write queries
- Fabric APPLY shapes not covered by the existing batched lookup iterator path
4. Bolt PULL Must Own Backpressure¶
The session should only read from the executor stream when a PULL arrives. If a producer goroutine is needed under the stream, it must use a bounded channel. PULL n must send at most n records.
To produce correct has_more, the Bolt session needs one-row lookahead:
- Drain up to
nrecords from the stream. - If exactly
nrows were sent, read at most one extra row intopendingRow. - If
pendingRowexists, sendSUCCESS {has_more: true}. - If
io.EOF, send completion metadata.
5. HTTP Streaming Is A New Endpoint¶
Add /db/{db}/tx/stream and keep /db/{db}/tx/commit unchanged.
Initial endpoint limitations:
- One statement per request.
- Read-only queries first.
- Materialized fallback allowed only when caller opts in with
allowMaterializedFallback: true. - Response format is newline-delimited JSON (
application/x-ndjson) because it is simple for Go, Rust, and TypeScript clients.
Event model:
{"type":"columns","columns":["n"]}
{"type":"row","row":[{"id":"1","labels":["Person"],"properties":{"name":"Alice"}}]}
{"type":"summary","stats":{},"metadata":{"db":"nornic"}}
{"type":"error","code":"Neo.ClientError.Statement.SyntaxError","message":"..."}
Server Implementation Plan¶
Phase 0: Baselines And Safety Gates¶
Files:
pkg/bolt/streaming_bench_test.gopkg/server/server_stream_bench_test.go(new)pkg/cypher/streaming_bench_test.go(new or expanded)docs/performance/streaming-baseline.md(new)
Tasks:
- Add benchmarks for first-row latency, full-result latency, and peak allocated bytes.
- Benchmark at least:
MATCH (n:Bench) RETURN nwith 10k, 100k rowsMATCH (n:Bench) RETURN n LIMIT 100- materialized fallback shape with
ORDER BY - Add cancellation tests for:
- Bolt disconnect during stream
- HTTP client cancellation during stream
- stream
Closebefore EOF
Acceptance:
- Baseline numbers committed in
docs/performance/streaming-baseline.md. go test ./pkg/rowstream ./pkg/cypher ./pkg/fabric ./pkg/bolt ./pkg/serverpasses.
Phase 1: Shared RowStream Core¶
Files:
pkg/rowstream/rowstream.go(new)pkg/rowstream/materialized.go(new)pkg/rowstream/channel.go(new)pkg/rowstream/rowstream_test.go(new)pkg/cypher/stream.go(new)
Tasks:
- Implement
rowstream.RowStream, materialized adapter, drain adapter, bounded channel stream. - Add
func (e *StorageExecutor) ExecuteStream(ctx context.Context, query string, params map[string]interface{}) (rowstream.RowStream, error). - First implementation may call
Executeand wrap rows. This is a compatibility bridge, not the optimized path. - Add
func ExecuteResultFromStream(ctx context.Context, s rowstream.RowStream) (*ExecuteResult, error)for old callers and tests.
Acceptance:
ExecuteStreamreturns the same columns/rows/metadata asExecutethrough the materialized adapter.Closeis idempotent.NextafterClosereturnsio.EOFor a stable terminal error.- Context cancellation unblocks a channel-backed stream.
Phase 2: Cypher Read Streaming¶
Files:
pkg/cypher/executor_stream.go(new)pkg/cypher/match_stream.go(new)pkg/cypher/match.gopkg/cypher/match_multi.gopkg/cypher/types.go
Tasks:
- Add a stream classifier before materialized execution:
- reject writes with existing mutation detection
- reject aggregation,
ORDER BY, globalDISTINCT, subquery, and complex traversal shapes - accept simple read scan shapes only
- Factor the simple
MATCHparsing already used bytryFastPathSimpleMatchReturnLimitinto reusable helpers. - Implement node scan stream using:
storage.LabelNodeIDLookupEnginefor label scans where possiblestorage.StreamingEngine.StreamNodesfor full scansstorage.StreamNodesWithFallbackonly when the engine lacks native streaming- Apply simple
WHERE,SKIP, andLIMITin the stream producer. - Keep
Executeimplemented asExecuteResultFromStreamonly for shapes handled byExecuteStream; otherwise leave the current materialized code path.
Acceptance:
MATCH (n:Bench) RETURN n LIMIT 1produces the first row without collecting allBenchnodes.MATCH (n:Bench) RETURN nstreams with bounded memory on Badger and Async storage.- Materialized and streaming results are byte-for-byte equivalent after Neo4j conversion for supported shapes.
Phase 3: Fabric Streaming Boundary¶
Files:
pkg/fabric/result.gopkg/fabric/executor.gopkg/fabric/local_executor.gopkg/fabric/remote_executor.gopkg/cypher/executor_fabric.go
Tasks:
- Keep existing
RowIteratorbut add bridges torowstream.RowStream. - Add
FabricExecutor.ExecuteStream(ctx, tx, fragment, params, authToken) (rowstream.RowStream, error). - Promote current unexported
executeRowsbehavior to the stream path instead of immediately materializing at the top. - Add optional local interface:
type StreamingCypherExecutor interface {
ExecuteQueryStream(ctx context.Context, dbName string, engine storage.Engine, query string, params map[string]interface{}, recordBindings map[string]interface{}) (rowstream.RowStream, error)
}
LocalFragmentExecutor.ExecuteWithRecordRowsshould useStreamingCypherExecutorwhen available, then fall back to materialized rows.- Add remote streaming support in
RemoteFragmentExecutor.ExecuteRowsonly afterRemoteEngine.QueryCypherStreamexists. - Leave APPLY operators materialized except the existing batched lookup iterator path, then convert that path to emit a stream instead of accumulating into
ResultStream.Rows.
Acceptance:
- Top-level
USE composite.alias MATCH ... RETURN ...can stream when every leaf query is streamable. - Fabric still enforces many-read/one-write transaction rules from
pkg/fabric/transaction.go. - Unsupported APPLY and UNION shapes explicitly expose
metadata.streamingFallback = "materialized".
Phase 4: Remote Engine Streaming¶
Files:
pkg/storage/remote_engine.gopkg/storage/remote_engine_test.go
Tasks:
- Add
QueryCypherStream(ctx, statement, params) (rowstream.RowStream, error)toRemoteEngine. - Bolt transport: use the Neo4j Go driver's
ResultWithContext.Next(ctx)as the row source instead of collecting rows. - HTTP transport: use
/db/{db}/tx/streamwhen the remote server supports it. - Fallback to
QueryCyphermaterialization when the remote does not support streaming.
Acceptance:
- Remote Fabric constituents can participate in streaming for read-only leaf fragments.
- Closing the returned stream closes the underlying Neo4j session/transaction result resources.
Phase 5: Bolt Incremental Execution Streaming¶
Files:
pkg/bolt/server.gopkg/bolt/streaming_bench_test.gopkg/bolt/server_extra_test.gopkg/bolt/integration_test.go
Tasks:
- Add optional interface:
type StreamingQueryExecutor interface {
ExecuteStream(ctx context.Context, query string, params map[string]any) (rowstream.RowStream, error)
}
- Session state changes:
- keep
lastResult *QueryResultfor fallback - add
lastStream rowstream.RowStream - add
pendingRow *rowstream.Row - add stream summary metadata captured at EOF
handleRunshould preferStreamingQueryExecutorwhen the executor implements it.handleRunmust send fields fromstream.Columns()without draining the stream.handlePullshould drainlastStreamaccording tonand use one-row lookahead forhas_more.handleDiscardshould calllastStream.Close()and then emit completion metadata.- Preserve existing deferred flush behavior for write queries. Initial streaming path should reject writes so current write semantics remain unchanged.
- Preserve the existing Bolt reader-loop cancellation model: normal pipelined
PULLmust not cancel the active stream; onlyRESET,GOODBYE, read error/EOF, timeout, or explicit stream close should cancel it.
Acceptance:
- Neo4j Go and TypeScript drivers observe incremental row delivery with fetch size.
PULL {n: 1}does not cause full result materialization.- Disconnect,
RESET, andGOODBYEcancel the active stream. - Existing Bolt materialized tests still pass.
Phase 6: HTTP Streaming Endpoint¶
Files:
pkg/server/server_db.gopkg/server/server_db_stream.go(new)pkg/server/server_router.gopkg/server/server_stream_test.go(new)
Routing detail:
/db/{db}/tx/streamcurrently routes astxId == "stream"becausehandleTransactionEndpointtreats one path segment aftertxas a transaction ID.- Add the
streamcase before the genericlen(remaining) == 1transaction-ID case.
Request:
{
"statement": "MATCH (n:Person) RETURN n",
"parameters": {},
"format": "ndjson",
"allowMaterializedFallback": false
}
Tasks:
- Reuse auth, RBAC, database resolution, query normalization, and mutation checks from
handleImplicitTransaction. - Write
columnsbefore rows. - Flush after each row or after a small configurable batch.
- Write terminal
summaryon success and terminalerroron failure after streaming has started. - Respect
r.Context()cancellation and close the stream.
Acceptance:
curl -Nreceives row events progressively.- Existing
/db/{db}/tx/committests are unchanged. - HTTP clients can cancel without leaving executor goroutines running.
Client And ORM Architecture¶
The server should remain Neo4j-compatible at the wire level. The first-party clients should be thin, language-native layers over Bolt and HTTP streaming, not separate database engines.
Shared Client Concepts¶
All language clients expose the same conceptual layers:
- Driver layer:
Driver,Session,Transaction,Result,Record,Summary-
compatible with common Neo4j driver workflows where each language has an established Neo4j API
-
Streaming layer:
RunStreamor equivalent returns an async/lazy row stream- Bolt is default when available
-
HTTP NDJSON stream is fallback and works through proxies/firewalls
-
ORM mapper layer:
- maps
Recordvalues to typed structs/classes - does not hide Cypher or invent a new query language in v1
-
supports explicit query methods, parameters, and result mapping
-
Schema helper layer:
- optional helpers for constraints and indexes
- no automatic destructive migrations in v1
Go Package¶
Files:
pkg/client/neo4jcompat/driver.go(new)pkg/client/neo4jcompat/session.go(new)pkg/client/neo4jcompat/result.go(new)pkg/client/orm/mapper.go(new)pkg/client/orm/query.go(new)pkg/client/neo4jcompat/*_test.go(new)
Dependencies:
- Reuse
github.com/neo4j/neo4j-go-driver/v5, already present ingo.mod. - Do not implement a second Bolt client unless the official driver cannot express a Nornic-specific feature.
API shape:
driver, err := neo4jcompat.NewDriver(ctx, neo4jcompat.Config{
URI: "bolt://localhost:7687",
Username: "neo4j",
Password: "password",
Database: "nornic",
})
session := driver.NewSession(ctx, neo4jcompat.SessionConfig{Database: "nornic"})
result, err := session.Run(ctx, "MATCH (p:Person) RETURN p.name AS name", nil)
for result.Next(ctx) {
name, _ := result.Record().Get("name")
_ = name
}
summary, err := result.Consume(ctx)
ORM mapper shape:
type Person struct {
ID string `nornic:"id"`
Name string `nornic:"name"`
}
people, err := orm.QueryAll[Person](ctx, session,
"MATCH (p:Person) RETURN elementId(p) AS id, p.name AS name",
nil,
)
Go acceptance:
- Works with current materialized Bolt path.
- Streams automatically once Bolt server streaming lands.
- Tests include first-row timing against a local NornicDB Bolt server.
TypeScript / npm Package¶
Files:
clients/typescript/package.json(new)clients/typescript/src/driver.ts(new)clients/typescript/src/httpStream.ts(new)clients/typescript/src/orm.ts(new)clients/typescript/test/*.test.ts(new)
Package name options:
- Preferred:
@nornicdb/client - Fallback if scope unavailable:
nornicdb
Dependencies:
- Use
neo4j-driverfor Bolt. The UI already depends onneo4j-driver. - Use native
fetchandReadableStreamfor HTTP NDJSON streaming.
API shape:
import { createDriver, queryAll } from "@nornicdb/client";
const driver = createDriver({
uri: "bolt://localhost:7687",
auth: { username: "neo4j", password: "password" },
database: "nornic",
});
for await (const record of driver
.session()
.runStream("MATCH (p:Person) RETURN p.name AS name")) {
console.log(record.get("name"));
}
type Person = { id: string; name: string };
const people = await queryAll<Person>(driver, {
cypher: "MATCH (p:Person) RETURN elementId(p) AS id, p.name AS name",
});
TypeScript acceptance:
- ESM package with generated
.d.tstypes. - Works in Node 20+.
- Optional browser HTTP-stream mode for the UI; Bolt remains Node/server-side.
Rust Crate¶
Files:
clients/rust/Cargo.toml(new)clients/rust/src/lib.rs(new)clients/rust/src/http_stream.rs(new)clients/rust/src/orm.rs(new)clients/rust/tests/*.rs(new)
Crate name options:
- Preferred:
nornicdb - If unavailable:
nornicdb-client
Dependencies:
- HTTP streaming first with
reqwest,serde, andfutures. - Optional
boltfeature can useneo4rsor another maintained Bolt client, but Rust should not block server streaming work.
API shape:
use futures::TryStreamExt;
use nornicdb::{Client, Query};
let client = Client::http("http://localhost:7474")
.database("nornic")
.basic_auth("neo4j", "password")
.build()?;
let mut rows = client.stream(Query::new("MATCH (p:Person) RETURN p.name AS name")).await?;
while let Some(record) = rows.try_next().await? {
let name: String = record.get("name")?;
}
ORM mapper shape:
#[derive(serde::Deserialize)]
struct Person {
id: String,
name: String,
}
let people: Vec<Person> = client
.query_as("MATCH (p:Person) RETURN elementId(p) AS id, p.name AS name")
.collect()
.await?;
Rust acceptance:
- Async stream implements
futures::Stream<Item = Result<Record, Error>>. - HTTP streaming works before any Rust Bolt dependency is selected.
- Optional Bolt feature is separately gated in CI.
Compatibility Contract¶
Bolt¶
- Existing Neo4j drivers must continue to connect.
- Existing materialized query behavior must remain valid.
- Fetch size must affect memory once
StreamingQueryExecutoris active. - Errors after partial row delivery must surface through
Result.Err()/Consume()semantics.
HTTP¶
/db/{db}/tx/commitremains Neo4j HTTP-compatible and materialized./db/{db}/tx/streamis NornicDB-specific and explicitly documented.- HTTP stream errors are terminal events once response headers are sent.
Transactions¶
- Read-only autocommit streams can emit rows immediately.
- Explicit read transactions can stream until commit/rollback/close.
- Writes initially remain materialized.
- Streamed writes are out of scope until commit visibility and failure semantics are proven.
Fabric¶
- Preserve many-read/one-write enforcement.
- Preserve strict
USEand composite scoping. - Remote streaming is best-effort; fallback must be visible through metadata.
Testing Strategy¶
Unit Tests¶
pkg/rowstream: EOF, cancellation, close idempotence, drain limits, channel backpressure.pkg/cypher: classifier accepts only proven streamable shapes.pkg/fabric: iterator-to-stream adapters, close propagation, materialized fallback metadata.pkg/bolt: lookahead behavior,PULL n,DISCARD, EOF metadata.pkg/server: NDJSON framing and cancellation.
Integration Tests¶
- Large node scan over Bolt with small fetch size.
- Large node scan over HTTP stream with client cancellation after N rows.
- Composite query with local-only streamable leaves.
- Remote Fabric query using Bolt stream transport.
- Explicit transaction rollback while a read stream is open.
Compatibility Tests¶
- Neo4j Go driver:
Result.Next,Record,Err,Consume. - TypeScript
neo4j-driver: async iteration orsubscribepath, depending on driver support. - Existing HTTP
/tx/committests unchanged.
Performance Tests¶
Targets for streaming-eligible read scans:
- First-row latency: at least 50% lower than materialized baseline.
- Peak memory: at least 40% lower on 100k-row scans.
- Full-result throughput: no regression greater than 10% compared with materialized delivery.
Rollout Order¶
pkg/rowstreamplus materialized adapter.StorageExecutor.ExecuteStreambridge with no behavior change.- Simple Cypher read stream for
MATCH ... RETURNscans. - Bolt
StreamingQueryExecutorandPULL-driven stream drain. - HTTP
/tx/streamendpoint. - Fabric top-level streaming for FragmentExec and streamable Union.
- RemoteEngine streaming.
- Go client package.
- TypeScript npm package.
- Rust crate.
- More Cypher operators: bounded top-k ORDER BY, UNWIND streaming, procedure streaming, selected APPLY streaming.
Definition Of Done¶
- Existing
Execute, Bolt, and HTTP materialized behavior is unchanged for unsupported shapes. - Supported read-only query classes do not require full-result buffering in the Bolt path.
- HTTP streaming delivers valid NDJSON events and handles cancellation cleanly.
- Fabric can stream at least simple local composite reads without top-level materialization.
- Go, TypeScript, and Rust packages expose a consistent record-stream and typed-mapping story.
- Docs clearly state which query shapes stream and which intentionally fall back.