Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
4163364
feat(client): add get_labels(), get_edge_types(), traverse() — R-SDK3
polaz Apr 12, 2026
0aab281
test(integration): verify get_labels, get_edge_types, traverse via li…
polaz Apr 12, 2026
80a73c6
fix(client): validate direction in traverse(), fix lint, guard test c…
polaz Apr 12, 2026
1e59f71
fix(client): correct schema type string representations
polaz Apr 12, 2026
4990122
fix(client): validate max_depth >= 1 in traverse(); xfail strict=True
polaz Apr 12, 2026
0d7ce75
test(integration): use unique names to avoid schema state false posit…
polaz Apr 12, 2026
c9d0d27
test(integration): use unique label name in has_property_definitions …
polaz Apr 12, 2026
df5f83d
test(integration): document why xfail uses strict=False for inbound t…
polaz Apr 12, 2026
ff59dd1
test(integration): use DETACH DELETE in label cleanup for consistency
polaz Apr 12, 2026
f69e7b7
test(unit): tighten repr assertions in test_repr_shows_counts
polaz Apr 12, 2026
004eb22
test(unit): replace magic number with named constant in TestEdgeTypeInfo
polaz Apr 12, 2026
c1f6ed9
test(unit): assert edge_id mapping in test_edges_are_wrapped_as_edge_…
polaz Apr 13, 2026
b8a626f
test: strengthen traverse assertions and add validation unit tests
polaz Apr 13, 2026
f97a0fe
test(integration): narrow xfail to AssertionError for inbound traverse
polaz Apr 13, 2026
cd3f3d7
refactor(client): validate direction/max_depth before proto import in…
polaz Apr 13, 2026
ae37106
fix(client): add type guards for direction and max_depth in traverse()
polaz Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions coordinode/coordinode/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
AsyncCoordinodeClient,
CoordinodeClient,
EdgeResult,
EdgeTypeInfo,
LabelInfo,
NodeResult,
PropertyDefinitionInfo,
TraverseResult,
VectorResult,
)

Expand All @@ -36,4 +40,8 @@
"NodeResult",
"EdgeResult",
"VectorResult",
"LabelInfo",
"EdgeTypeInfo",
"PropertyDefinitionInfo",
"TraverseResult",
]
132 changes: 132 additions & 0 deletions coordinode/coordinode/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,54 @@ def __repr__(self) -> str:
return f"VectorResult(distance={self.distance:.4f}, node={self.node})"


class PropertyDefinitionInfo:
"""A property definition from the schema (name, type, required, unique)."""

def __init__(self, proto_def: Any) -> None:
self.name: str = proto_def.name
self.type: int = proto_def.type
self.required: bool = proto_def.required
self.unique: bool = proto_def.unique

def __repr__(self) -> str:
return f"PropertyDefinitionInfo(name={self.name!r}, type={self.type}, required={self.required}, unique={self.unique})"


class LabelInfo:
"""A node label returned from the schema registry."""

def __init__(self, proto_label: Any) -> None:
self.name: str = proto_label.name
self.version: int = proto_label.version
self.properties: list[PropertyDefinitionInfo] = [PropertyDefinitionInfo(p) for p in proto_label.properties]

def __repr__(self) -> str:
return f"LabelInfo(name={self.name!r}, version={self.version}, properties={len(self.properties)})"


class EdgeTypeInfo:
"""An edge type returned from the schema registry."""

def __init__(self, proto_edge_type: Any) -> None:
self.name: str = proto_edge_type.name
self.version: int = proto_edge_type.version
self.properties: list[PropertyDefinitionInfo] = [PropertyDefinitionInfo(p) for p in proto_edge_type.properties]

def __repr__(self) -> str:
return f"EdgeTypeInfo(name={self.name!r}, version={self.version}, properties={len(self.properties)})"


class TraverseResult:
"""Result of a graph traversal: reached nodes and traversed edges."""

def __init__(self, proto_response: Any) -> None:
self.nodes: list[NodeResult] = [NodeResult(n) for n in proto_response.nodes]
self.edges: list[EdgeResult] = [EdgeResult(e) for e in proto_response.edges]

def __repr__(self) -> str:
return f"TraverseResult(nodes={len(self.nodes)}, edges={len(self.edges)})"


# ── Async client ─────────────────────────────────────────────────────────────


Expand Down Expand Up @@ -303,6 +351,72 @@ async def get_schema_text(self) -> str:

return "\n".join(lines)

async def get_labels(self) -> list[LabelInfo]:
"""Return all node labels defined in the schema."""
from coordinode._proto.coordinode.v1.graph.schema_pb2 import ListLabelsRequest # type: ignore[import]

resp = await self._schema_stub.ListLabels(ListLabelsRequest(), timeout=self._timeout)
return [LabelInfo(label) for label in resp.labels]

async def get_edge_types(self) -> list[EdgeTypeInfo]:
"""Return all edge types defined in the schema."""
from coordinode._proto.coordinode.v1.graph.schema_pb2 import ListEdgeTypesRequest # type: ignore[import]

resp = await self._schema_stub.ListEdgeTypes(ListEdgeTypesRequest(), timeout=self._timeout)
return [EdgeTypeInfo(et) for et in resp.edge_types]

async def traverse(
self,
start_node_id: int,
edge_type: str,
direction: str = "outbound",
max_depth: int = 1,
) -> TraverseResult:
"""Traverse the graph from *start_node_id* following *edge_type* edges.

Args:
start_node_id: ID of the node to start from.
edge_type: Edge type label to follow (e.g. ``"KNOWS"``).
direction: ``"outbound"`` (default), ``"inbound"``, or ``"both"``.
max_depth: Maximum hop count (default 1).

Returns:
:class:`TraverseResult` with ``nodes`` and ``edges`` lists.
"""
# Validate pure string/int inputs before importing proto stubs — ensures ValueError
# is raised even when proto stubs have not been generated yet.
# Type guards come first so that wrong types raise ValueError, not AttributeError/TypeError.
if not isinstance(direction, str):
raise ValueError(f"direction must be a str, got {type(direction).__name__!r}.")
_valid_directions = {"outbound", "inbound", "both"}
key = direction.lower()
if key not in _valid_directions:
raise ValueError(f"Invalid direction {direction!r}. Must be one of: 'outbound', 'inbound', 'both'.")
# bool is a subclass of int in Python, so `isinstance(True, int)` is True — exclude it.
if not isinstance(max_depth, int) or isinstance(max_depth, bool) or max_depth < 1:
raise ValueError(f"max_depth must be an integer >= 1, got {max_depth!r}.")

from coordinode._proto.coordinode.v1.graph.graph_pb2 import ( # type: ignore[import]
TraversalDirection,
TraverseRequest,
)

Comment thread
polaz marked this conversation as resolved.
_direction_map = {
"outbound": TraversalDirection.TRAVERSAL_DIRECTION_OUTBOUND,
"inbound": TraversalDirection.TRAVERSAL_DIRECTION_INBOUND,
"both": TraversalDirection.TRAVERSAL_DIRECTION_BOTH,
}
direction_value = _direction_map[key]

req = TraverseRequest(
start_node_id=start_node_id,
edge_type=edge_type,
direction=direction_value,
max_depth=max_depth,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
resp = await self._graph_stub.Traverse(req, timeout=self._timeout)
return TraverseResult(resp)

async def health(self) -> bool:
from coordinode._proto.coordinode.v1.health.health_pb2 import ( # type: ignore[import]
HealthCheckRequest,
Expand Down Expand Up @@ -422,6 +536,24 @@ def create_edge(
def get_schema_text(self) -> str:
return self._run(self._async.get_schema_text())

def get_labels(self) -> list[LabelInfo]:
"""Return all node labels defined in the schema."""
return self._run(self._async.get_labels())

def get_edge_types(self) -> list[EdgeTypeInfo]:
"""Return all edge types defined in the schema."""
return self._run(self._async.get_edge_types())

def traverse(
self,
start_node_id: int,
edge_type: str,
direction: str = "outbound",
max_depth: int = 1,
) -> TraverseResult:
"""Traverse the graph from *start_node_id* following *edge_type* edges."""
return self._run(self._async.traverse(start_node_id, edge_type, direction, max_depth))

def health(self) -> bool:
return self._run(self._async.health())

Expand Down
128 changes: 127 additions & 1 deletion tests/integration/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import pytest

from coordinode import AsyncCoordinodeClient, CoordinodeClient
from coordinode import AsyncCoordinodeClient, CoordinodeClient, EdgeTypeInfo, LabelInfo, TraverseResult

ADDR = os.environ.get("COORDINODE_ADDR", "localhost:7080")

Expand Down Expand Up @@ -208,6 +208,132 @@ def test_get_schema_text(client):
client.cypher("MATCH (n:SchemaTestLabel {tag: $tag}) DETACH DELETE n", params={"tag": tag})


# ── get_labels / get_edge_types / traverse ────────────────────────────────────


def test_get_labels_returns_list(client):
"""get_labels() returns a non-empty list of LabelInfo after data is present."""
tag = uid()
label_name = f"GetLabelsTest{uid()}"
client.cypher(f"CREATE (n:{label_name} {{tag: $tag}})", params={"tag": tag})
try:
labels = client.get_labels()
assert isinstance(labels, list)
assert len(labels) > 0
assert all(isinstance(lbl, LabelInfo) for lbl in labels)
names = [lbl.name for lbl in labels]
assert label_name in names, f"{label_name} not in {names}"
finally:
client.cypher(f"MATCH (n:{label_name} {{tag: $tag}}) DETACH DELETE n", params={"tag": tag})


def test_get_labels_has_property_definitions(client):
"""LabelInfo.properties is a list (may be empty for schema-free labels)."""
tag = uid()
label_name = f"PropLabel{uid()}"
client.cypher(f"CREATE (n:{label_name} {{tag: $tag}})", params={"tag": tag})
try:
labels = client.get_labels()
found = next((lbl for lbl in labels if lbl.name == label_name), None)
assert found is not None, f"{label_name} not returned by get_labels()"
# Intentionally only check the type — CoordiNode is schema-free and may return
# an empty properties list even when the node was created with properties.
assert isinstance(found.properties, list)
Comment thread
polaz marked this conversation as resolved.
finally:
client.cypher(f"MATCH (n:{label_name} {{tag: $tag}}) DETACH DELETE n", params={"tag": tag})


def test_get_edge_types_returns_list(client):
"""get_edge_types() returns a non-empty list of EdgeTypeInfo after data is present."""
tag = uid()
edge_type = f"GET_EDGE_TYPE_TEST_{uid()}".upper()
client.cypher(
f"CREATE (a:EdgeTypeTestNode {{tag: $tag}})-[:{edge_type}]->(b:EdgeTypeTestNode {{tag: $tag}})",
params={"tag": tag},
)
try:
edge_types = client.get_edge_types()
assert isinstance(edge_types, list)
assert len(edge_types) > 0
assert all(isinstance(et, EdgeTypeInfo) for et in edge_types)
type_names = [et.name for et in edge_types]
assert edge_type in type_names, f"{edge_type} not in {type_names}"
finally:
client.cypher("MATCH (n:EdgeTypeTestNode {tag: $tag}) DETACH DELETE n", params={"tag": tag})


def test_traverse_returns_neighbours(client):
"""traverse() returns adjacent nodes reachable via the given edge type."""
tag = uid()
client.cypher(
"CREATE (a:TraverseRPC {tag: $tag, role: 'hub'})-[:TRAVERSE_TEST]->(b:TraverseRPC {tag: $tag, role: 'leaf1'})",
params={"tag": tag},
)
try:
rows = client.cypher(
"MATCH (a:TraverseRPC {tag: $tag, role: 'hub'}) RETURN a AS node_id",
params={"tag": tag},
)
assert len(rows) >= 1, "hub node not found"
start_id = rows[0]["node_id"]

# Fetch the leaf1 node ID so we can assert it specifically appears in the result.
leaf_rows = client.cypher(
"MATCH (b:TraverseRPC {tag: $tag, role: 'leaf1'}) RETURN b AS node_id",
params={"tag": tag},
)
assert len(leaf_rows) >= 1, "leaf1 node not found"
leaf1_id = leaf_rows[0]["node_id"]

result = client.traverse(start_id, "TRAVERSE_TEST", direction="outbound", max_depth=1)
assert isinstance(result, TraverseResult)
assert len(result.nodes) >= 1, "traverse() returned no neighbour nodes"
Comment thread
polaz marked this conversation as resolved.
node_ids = {n.id for n in result.nodes}
assert leaf1_id in node_ids, f"traverse() did not return the expected leaf1 node ({leaf1_id}); got: {node_ids}"
finally:
client.cypher("MATCH (n:TraverseRPC {tag: $tag}) DETACH DELETE n", params={"tag": tag})


@pytest.mark.xfail(
strict=False,
Comment thread
polaz marked this conversation as resolved.
raises=AssertionError,
# strict=False: XPASS is good news (server gained inbound support), not an error.
# strict=True would break CI exactly when the server improves, which is undesirable.
# The XPASS report in pytest output is the signal to remove this marker.
# raises=AssertionError: narrows xfail to the known failure mode (empty result set →
# assertion fails). Unexpected errors (gRPC RpcError, wrong enum, etc.) are NOT covered
# and will still propagate as CI failures.
reason="CoordiNode Traverse RPC does not yet support inbound direction — server returns empty result set",
)
Comment thread
polaz marked this conversation as resolved.
Comment thread
polaz marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
polaz marked this conversation as resolved.
def test_traverse_inbound_direction(client):
"""traverse() with direction='inbound' reaches nodes that point TO start_id."""
tag = uid()
client.cypher(
"CREATE (src:TraverseIn {tag: $tag})-[:INBOUND_TEST]->(dst:TraverseIn {tag: $tag})",
params={"tag": tag},
)
try:
# Capture both src and dst so that when the server gains inbound support
# (XPASS), the assertion verifies the *correct* node was returned, not just any node.
rows = client.cypher(
"MATCH (src:TraverseIn {tag: $tag})-[:INBOUND_TEST]->(dst:TraverseIn {tag: $tag}) "
"RETURN src AS src_id, dst AS dst_id",
params={"tag": tag},
)
assert len(rows) >= 1
src_id = rows[0]["src_id"]
dst_id = rows[0]["dst_id"]
result = client.traverse(dst_id, "INBOUND_TEST", direction="inbound", max_depth=1)
assert isinstance(result, TraverseResult)
assert len(result.nodes) >= 1, "inbound traverse returned no nodes"
node_ids = {n.id for n in result.nodes}
assert src_id in node_ids, (
f"inbound traverse did not return the expected source node ({src_id}); got: {node_ids}"
)
finally:
client.cypher("MATCH (n:TraverseIn {tag: $tag}) DETACH DELETE n", params={"tag": tag})


# ── Hybrid search ─────────────────────────────────────────────────────────────


Expand Down
Loading
Loading