Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8a66cc8
feat(gfql): native Polars cypher row pipeline (RETURN/LIMIT/SKIP/DIST…
lmeyerov Jun 25, 2026
e8beaac
feat(gfql): host-bridge fallback completes polars cypher row pipeline
lmeyerov Jun 26, 2026
1e99033
test(gfql): lower frame_ops.py coverage floor for polars-only branches
lmeyerov Jun 26, 2026
844f3ee
test(gfql): cover host-bridge helper branches; changelog for complete…
lmeyerov Jun 26, 2026
f4a1db5
perf(gfql): narrow host-bridge — skip base-graph for self-contained s…
lmeyerov Jun 26, 2026
0d5acf6
ci(gfql): run polars row-pipeline tests in the coverage step
lmeyerov Jun 26, 2026
6567066
perf(gfql): native polars select/order_by lowering (kills bridge for …
lmeyerov Jun 26, 2026
a88f787
perf(gfql): native polars group_by + unwind lowering
lmeyerov Jun 26, 2026
712a80f
test(gfql): differential cypher conformance lane for engine=polars
lmeyerov Jun 26, 2026
6808a64
perf(gfql): native polars result projection for property/expr columns
lmeyerov Jun 26, 2026
4238b1c
bench(gfql): fair per-engine native graph (no per-call input coercion)
lmeyerov Jun 26, 2026
2250559
perf(gfql): native polars entity-text projection for int/string/bool …
lmeyerov Jun 26, 2026
65bfe54
fix(gfql): host-bridge same-path WHERE route for engine='polars'
lmeyerov Jun 26, 2026
3d65aeb
test(gfql): lower gfql_unified/result_postprocess coverage floors for…
lmeyerov Jun 26, 2026
29dc0ec
refactor(gfql): move native polars projection out of the pandas-audit…
lmeyerov Jun 26, 2026
09865f7
test(gfql): adversarial NULL / 3-valued-logic conformance for the pol…
lmeyerov Jun 26, 2026
665e18b
docs(gfql): changelog reflects native polars cypher row pipeline + perf
lmeyerov Jun 26, 2026
e4a4f28
refactor(gfql/polars): de-cheat — native polars or honest NotImplemen…
lmeyerov Jun 26, 2026
5de97b2
docs(changelog): honest native-or-deferred for polars cypher row pipe…
lmeyerov Jun 26, 2026
5189d33
feat(gfql/polars): native where_rows (OR/NOT WHERE) — no pandas bridge
lmeyerov Jun 26, 2026
b1bbf03
feat(gfql/polars): native scalar functions coalesce + abs in expr low…
lmeyerov Jun 26, 2026
b9432bd
fix(gfql/polars): mypy narrowing in _lower_function (no None in args …
lmeyerov Jun 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,9 @@ jobs:
--cov=graphistry/compute --cov-report= \
graphistry/tests/compute/test_polars.py \
graphistry/tests/compute/gfql/test_engine_polars_hop.py \
graphistry/tests/compute/gfql/test_engine_polars_chain.py
graphistry/tests/compute/gfql/test_engine_polars_chain.py \
graphistry/tests/compute/gfql/test_engine_polars_row_pipeline.py \
graphistry/tests/compute/gfql/test_engine_polars_cypher_conformance.py

- name: Upload polars coverage
if: ${{ matrix.python-version == '3.12' }}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Added
- **GFQL native Polars engine — traversals (`engine='polars'`)**: Added a native, vectorized Polars execution engine for the core GFQL traversals `hop()` and `chain()`, dispatched at the engine boundary so the production pandas/cuDF paths are untouched. `Engine.POLARS` is opt-in (explicit `engine='polars'`); `engine='auto'` with Polars input still coerces to pandas as before. Covers forward/reverse/undirected single-hop traversal, directed multi-hop chains, node/edge filter dicts and predicates (lowered to Polars expressions), `edge_match`/`source_node_match`/`destination_node_match`, `target_wave_front`, and alias names; the BFS advances via semi/anti joins (no per-row Python work). Validated by differential parity against the pandas engine (hop + chain test suites plus a randomized fuzzer) and benchmarked vs pandas (`benchmarks/gfql/pandas_vs_polars.py`) — Polars wins at scale (up to ~2.5x on multi-edge chains at millions of edges; crossover ~50–100k rows). Variable-length/multi-hop edges, undirected edges in multi-edge chains, hop labels, and node `query=` raise `NotImplementedError` for now (use `engine='pandas'`).
- **GFQL native Polars engine — cypher row pipeline (`engine='polars'`)**: Extended the Polars engine to the Cypher `MATCH … RETURN` row surface, natively vectorized. **NO CHEATING:** the polars engine never silently falls back to the pandas engine — every query runs natively on polars or raises an honest `NotImplementedError` pointing at `engine='pandas'` (falling back to pandas would misrepresent pandas performance as polars; only a human may consent to a bridge). `chain_polars` splits boundary `call()` ops (mirroring the pandas `_handle_boundary_calls`) and runs each trailing row op per-op native or raises. **Native polars** (no pandas round-trip): frame ops (`rows`/`limit`/`skip`/`distinct`/`drop_cols`), `select`/`with_`/`return_` projection (a conservative cypher-expr-AST → `pl.Expr` lowering covering property access, arithmetic, comparison, boolean, literals), `order_by` (`.sort`), `group_by` (`count`/`sum`/`avg`/`min`/`max`), `unwind` (literal-list cross-join), the result projection for property/expr columns, and entity-text `RETURN n` rendering for int/string/bool nodes (`pl.concat_str`). **Honestly deferred** (raise `NotImplementedError`, no pandas fallback): multi-entity `rows(binding_ops=…)`, cross-entity same-path `WHERE` (`DFSamePathExecutor`), float/temporal/nested entity-text, and exotic expressions (CASE/list/map/temporal, `collect` aggregates) — these are the forward native-engineering targets. Validated by differential parity vs pandas including a TCK-style conformance lane (`test_engine_polars_cypher_conformance.py`: native-only curated corpus + seeded fuzzer + NULL/3-valued-logic graph + entity-text escaping, plus a `DEFERRED` list asserting deferred queries raise rather than silently bridge) and benchmarked (`benchmarks/gfql/cypher_row_pipeline.py`). **Perf (interleaved, 1M nodes, each engine on its native-frame graph, all fully native):** polars wins **5.6–38×** across the surface — `RETURN n` ~38×, `ORDER BY` ~17×, `WHERE`+`ORDER BY`+`LIMIT` ~14×, traversals 6–7.5×, projections/aggregations/`DISTINCT` 5.6–6.9×. cuDF/pandas paths untouched.

### Changed
- **GFQL Cypher parse memoization (perf)**: `parse_cypher` now memoizes its result (LRU over the deterministic lark parse+transform → immutable frozen AST). Repeated identical Cypher queries skip the ~15 ms parse — the dominant per-call cost of small queries (~50% of a Cypher call at 100k rows) — making end-to-end query latency ~1.3–1.7× faster at small/interactive sizes across pandas/polars/cuDF. Safe to share the cached AST: every Cypher AST node is `@dataclass(frozen=True)` and `compile_cypher_query` does not mutate the parsed tree; validation errors still raise and are not cached.
Expand Down
182 changes: 182 additions & 0 deletions benchmarks/gfql/cypher_row_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#!/usr/bin/env python3
"""Benchmark the native polars GFQL row pipeline vs pandas for cypher queries.

Phase 2 of the polars engine enables cypher RETURN / LIMIT / SKIP / DISTINCT /
single-entity WHERE on ``engine='polars'`` (before this increment these raised
NotImplementedError on polars). The heavy traversal + frame ops (filter, dedup,
slice) run natively in polars; only the final row-wise entity-text projection is
host-bridged to pandas. So polars wins most where a row op reduces the set
before projection (LIMIT, selective WHERE, DISTINCT), and is closest to neutral
on a full-table whole-entity RETURN (projection dominates, bridge roundtrip).

Reports median latency and the polars speedup (pandas_ms / polars_ms; > 1 means
polars wins). On a shared host, interleave is implicit (pandas then polars
back-to-back per query); for regression-grade claims run several times and
compare distributions (see plans/gfql-polars-engine memory).

Example::

python benchmarks/gfql/cypher_row_pipeline.py --runs 7 --warmup 2 \
--sizes 10000,100000,1000000 --output /tmp/cypher-row.md
"""

from __future__ import annotations

import argparse
import statistics
import time
from dataclasses import dataclass
from typing import Callable, List, Optional, Tuple

import numpy as np
import pandas as pd

import graphistry

# (name, cypher) — exercised on both engines via g.gfql(cypher, engine=...)
# Native = frame ops (rows/limit/skip/distinct) run in polars; Bridged = the
# cypher expression engine (select/order_by/group_by) runs host-bridged to pandas.
WORKLOADS: List[Tuple[str, str]] = [
# native frame-op path
("RETURN n LIMIT 10", "MATCH (n) RETURN n LIMIT 10"),
("RETURN n SKIP/LIMIT", "MATCH (n) RETURN n SKIP 5 LIMIT 100"),
("WHERE > RETURN LIMIT", "MATCH (n) WHERE n.score > 90 RETURN n LIMIT 50"),
("RETURN DISTINCT n", "MATCH (n) RETURN DISTINCT n"),
("WHERE > RETURN n", "MATCH (n) WHERE n.score > 50 RETURN n"),
("RETURN n (full)", "MATCH (n) RETURN n"),
("rel RETURN m LIMIT", "MATCH (n)-[e]->(m) RETURN m LIMIT 100"),
# host-bridged expression path
("select n.score", "MATCH (n) RETURN n.score"),
("select 2 cols", "MATCH (n) RETURN n.score, n.kind"),
("order_by", "MATCH (n) RETURN n.score ORDER BY n.score DESC"),
("where+select+limit", "MATCH (n) WHERE n.score > 50 RETURN n.score ORDER BY n.score LIMIT 100"),
("group_by count", "MATCH (n) RETURN n.kind, count(n) AS c"),
]


@dataclass
class ResultRow:
workload: str
n_nodes: int
n_edges: int
pandas_ms: Optional[float]
polars_ms: Optional[float]
error: Optional[str] = None

@property
def speedup(self) -> Optional[float]:
if self.pandas_ms and self.polars_ms:
return self.pandas_ms / self.polars_ms
return None


def make_graph(n_nodes: int, n_edges: int, seed: int = 0):
rng = np.random.default_rng(seed)
nodes = pd.DataFrame({
"id": np.arange(n_nodes),
"kind": rng.choice(["x", "y", "z"], size=n_nodes),
"score": rng.integers(0, 100, size=n_nodes),
})
edges = pd.DataFrame({
"s": rng.integers(0, n_nodes, size=n_edges),
"d": rng.integers(0, n_nodes, size=n_edges),
"rel": rng.choice(["r1", "r2", "r3"], size=n_edges),
})
return graphistry.nodes(nodes, "id").edges(edges, "s", "d")


def timeit(fn: Callable[[], object], runs: int, warmup: int) -> float:
for _ in range(warmup):
fn()
samples = []
for _ in range(runs):
t0 = time.perf_counter()
fn()
samples.append((time.perf_counter() - t0) * 1000.0)
return statistics.median(samples)


def _polars_graph(g):
"""Same graph with node/edge frames already in polars, so the polars runs
don't pay a per-call pandas->polars input coercion that the pandas runs avoid
(a real deployment keeps the graph in its engine's native frame type)."""
from graphistry.Engine import Engine, df_to_engine
return g.nodes(df_to_engine(g._nodes, Engine.POLARS), g._node).edges(
df_to_engine(g._edges, Engine.POLARS), g._source, g._destination)


def run(sizes: List[Tuple[int, int]], runs: int, warmup: int) -> List[ResultRow]:
rows: List[ResultRow] = []
for n_nodes, n_edges in sizes:
g_pd = make_graph(n_nodes, n_edges)
g_pl = _polars_graph(g_pd)
for name, query in WORKLOADS:
try:
pandas_ms = timeit(lambda: g_pd.gfql(query, engine="pandas"), runs, warmup)
polars_ms = timeit(lambda: g_pl.gfql(query, engine="polars"), runs, warmup)
rows.append(ResultRow(name, n_nodes, n_edges, pandas_ms, polars_ms))
except Exception as exc: # noqa: BLE001 - bench harness reports, never crashes the sweep
rows.append(ResultRow(name, n_nodes, n_edges, None, None, error=f"{type(exc).__name__}: {exc}"))
return rows


def to_markdown(rows: List[ResultRow]) -> str:
lines = [
"| workload | nodes | edges | pandas_ms | polars_ms | speedup |",
"|----------|-------|-------|-----------|-----------|---------|",
]
for r in rows:
if r.error:
lines.append(f"| {r.workload} | {r.n_nodes} | {r.n_edges} | ERROR | ERROR | {r.error} |")
else:
lines.append(
f"| {r.workload} | {r.n_nodes} | {r.n_edges} | "
f"{r.pandas_ms:.1f} | {r.polars_ms:.1f} | {r.speedup:.2f}x |"
)
return "\n".join(lines)


def _parse_sizes(text: str) -> List[Tuple[int, int]]:
# "nodes:edges,nodes:edges" or "nodes" (edges defaults to 5x nodes)
out: List[Tuple[int, int]] = []
for chunk in text.split(","):
chunk = chunk.strip()
if not chunk:
continue
if ":" in chunk:
nn, ne = chunk.split(":")
out.append((int(nn), int(ne)))
else:
nn = int(chunk)
out.append((nn, nn * 5))
return out


def main() -> None:
try:
import polars # noqa: F401
except ImportError:
raise SystemExit("polars is not installed; install with `pip install polars`")

parser = argparse.ArgumentParser(description="Benchmark GFQL cypher row pipeline pandas vs polars.")
parser.add_argument("--runs", type=int, default=7)
parser.add_argument("--warmup", type=int, default=2)
parser.add_argument(
"--sizes",
default="10000,100000,1000000",
help="Comma list of node counts (edges=5x) or nodes:edges pairs.",
)
parser.add_argument("--output", default="", help="Optional path to write the markdown table.")
args = parser.parse_args()

rows = run(_parse_sizes(args.sizes), args.runs, args.warmup)
table = to_markdown(rows)
print(table)
if args.output:
with open(args.output, "w") as fh:
fh.write(table + "\n")
print(f"\nwrote {args.output}")


if __name__ == "__main__":
main()
16 changes: 13 additions & 3 deletions benchmarks/gfql/pandas_vs_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,24 @@ def timeit(fn: Callable[[], object], runs: int, warmup: int) -> float:
return statistics.median(samples)


def _polars_graph(g):
"""Graph with frames already in polars so polars runs don't pay a per-call
pandas->polars input coercion the pandas runs avoid (real deployments keep
the graph in the engine's native frame type)."""
from graphistry.Engine import Engine, df_to_engine
return g.nodes(df_to_engine(g._nodes, Engine.POLARS), g._node).edges(
df_to_engine(g._edges, Engine.POLARS), g._source, g._destination)


def run(sizes: List[Tuple[int, int]], runs: int, warmup: int) -> List[ResultRow]:
rows: List[ResultRow] = []
for n_nodes, n_edges in sizes:
g = make_graph(n_nodes, n_edges)
g_pd = make_graph(n_nodes, n_edges)
g_pl = _polars_graph(g_pd)
for name, fn in WORKLOADS:
try:
pandas_ms = timeit(lambda: fn(g, "pandas"), runs, warmup)
polars_ms = timeit(lambda: fn(g, "polars"), runs, warmup)
pandas_ms = timeit(lambda: fn(g_pd, "pandas"), runs, warmup)
polars_ms = timeit(lambda: fn(g_pl, "polars"), runs, warmup)
rows.append(ResultRow(name, n_nodes, n_edges, pandas_ms, polars_ms))
except Exception as exc: # noqa: BLE001 - bench harness reports, never crashes the sweep
rows.append(ResultRow(name, n_nodes, n_edges, None, None, error=f"{type(exc).__name__}: {exc}"))
Expand Down
4 changes: 3 additions & 1 deletion bin/test-polars.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ python -m pytest --version
python -B -m pytest -vv \
graphistry/tests/compute/test_polars.py \
graphistry/tests/compute/gfql/test_engine_polars_hop.py \
graphistry/tests/compute/gfql/test_engine_polars_chain.py
graphistry/tests/compute/gfql/test_engine_polars_chain.py \
graphistry/tests/compute/gfql/test_engine_polars_row_pipeline.py \
graphistry/tests/compute/gfql/test_engine_polars_cypher_conformance.py
3 changes: 3 additions & 0 deletions graphistry/compute/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,9 @@ def chain(
if validate_schema:
Chain(ops if not isinstance(ops, Chain) else ops.chain).validate(collect_all=False)
from graphistry.compute.gfql.engine_polars.chain import chain_polars
# NO pandas fallback here (see plan.md NO-CHEATING): chain_polars raises
# NotImplementedError for deferred features (var-length/multi-hop edges,
# undirected multi-edge); that honest signal propagates to the caller.
return chain_polars(self, ops, start_nodes=start_nodes)

if policy:
Expand Down
11 changes: 11 additions & 0 deletions graphistry/compute/gfql/cypher/result_postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,17 @@ def _projection_alias_rows(


def apply_result_projection(result: Plottable, projection: ResultProjectionPlan) -> Plottable:
rows_df = getattr(result, "_nodes", None)
if rows_df is not None and "polars" in type(rows_df).__module__:
# Native polars projection lives in engine_polars (not this pandas-audited
# module); it renders natively or raises NotImplementedError — NO pandas
# bridge (see plans/gfql-polars-engine NO-CHEATING).
from graphistry.compute.gfql.engine_polars.projection import apply_result_projection_polars
return apply_result_projection_polars(result, projection)
return _apply_result_projection_pandas(result, projection)


def _apply_result_projection_pandas(result: Plottable, projection: ResultProjectionPlan) -> Plottable:
rows_df = cast(DataFrameT, getattr(result, "_nodes", None))
if rows_df is None:
return result
Expand Down
Loading
Loading