Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 145 additions & 24 deletions src/ada/cadit/step/read/stream_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,41 @@ def _rep_rel_edge(pool_get, rep_rel_rec):
return rep_1, rep_2, idt.id


def _build_transform_map(pool_get, root_ids, cdsr_ids, srr_ids, absr_ids):
"""Map each root solid id -> list of world 4x4 matrices (one per placed instance).
def _build_product_name_map(pool_get, sdr_ids):
"""rep id -> product name, via SHAPE_DEFINITION_REPRESENTATION -> PRODUCT_DEFINITION_SHAPE
-> PRODUCT_DEFINITION -> FORMATION -> PRODUCT. Used to label assembly-tree group nodes;
any unresolvable link just leaves the rep unnamed (caller falls back to ``asm_<rep>``)."""
name_of_rep: dict[int, str] = {}
for sid in sdr_ids:
rec = pool_get(sid)
if rec is None or len(rec.args) < 2:
continue
defn, rep = rec.args[0], rec.args[1]
if not (isinstance(defn, _Ref) and isinstance(rep, _Ref)) or rep.id in name_of_rep:
continue
name = None
try:
pds = pool_get(defn.id) # PRODUCT_DEFINITION_SHAPE(name, desc, #pd)
pd = pool_get(pds.args[2].id) if pds and isinstance(pds.args[2], _Ref) else None
pdf = pool_get(pd.args[2].id) if pd and isinstance(pd.args[2], _Ref) else None
prod = pool_get(pdf.args[2].id) if pdf and isinstance(pdf.args[2], _Ref) else None
if prod is not None:
for cand in (prod.args[1], prod.args[0]):
if isinstance(cand, str) and cand.strip():
name = cand.strip()
break
except Exception: # noqa: BLE001 - naming is best-effort metadata
name = None
if name:
name_of_rep[rep.id] = name
return name_of_rep


def _build_transform_map(pool_get, root_ids, cdsr_ids, srr_ids, absr_ids, sdr_ids=()):
"""Map each root solid id -> (matrices, paths): one world 4x4 matrix per placed
instance, plus the matching assembly path — a root-first tuple of
``(rep_id, product_name)`` levels — so the scene graph can group instances the way
the STEP product tree does.

Resilient: any unresolved entity simply leaves a solid with the identity ([None]
handled by the caller); never raises.
Expand Down Expand Up @@ -366,14 +399,20 @@ def _build_transform_map(pool_get, root_ids, cdsr_ids, srr_ids, absr_ids):
continue
edges.setdefault(rep_1, []).append((rep_2, i1.id, i2.id))

name_of_rep = _build_product_name_map(pool_get, sdr_ids)

def _path_level(rep_id: int) -> tuple:
return (rep_id, name_of_rep.get(rep_id) or f"asm_{rep_id}")

def _world_matrices(rep_id: int, _seen: frozenset) -> list:
"""All world matrices reaching ``rep_id`` (a rep that is rep_1 of edges).
Each edge T_edge maps this rep's coords -> its parent rep; recurse to root."""
"""All ``(matrix, path)`` pairs reaching ``rep_id`` (a rep that is rep_1 of
edges); path is root-first. Each edge T_edge maps this rep's coords -> its
parent rep; recurse to root."""
out_edges = edges.get(rep_id)
if not out_edges:
return [np.eye(4)] # root rep: identity (its own coords ARE world)
return [(np.eye(4), (_path_level(rep_id),))] # root rep: its coords ARE world
if rep_id in _seen:
return [np.eye(4)] # cycle guard
return [(np.eye(4), (_path_level(rep_id),))] # cycle guard
seen2 = _seen | {rep_id}
mats: list = []
for rep_2, i1, i2 in out_edges:
Expand All @@ -383,11 +422,11 @@ def _world_matrices(rep_id: int, _seen: frozenset) -> list:
t_edge = np.linalg.inv(m_child) @ m_parent
except np.linalg.LinAlgError:
t_edge = np.eye(4)
for t_parent in _world_matrices(rep_2, seen2):
mats.append(t_parent @ t_edge)
for t_parent, parent_path in _world_matrices(rep_2, seen2):
mats.append((t_parent @ t_edge, parent_path + (_path_level(rep_id),)))
return mats

tmap: dict[int, list] = {}
tmap: dict[int, tuple] = {}
for sid in root_ids:
geom_rep = geomrep_of_solid.get(sid)
# Flat/baked file: no ABSR item linkage at all -> identity, yield once.
Expand All @@ -398,17 +437,91 @@ def _world_matrices(rep_id: int, _seen: frozenset) -> list:
# geom rep so its outgoing edges are found directly.
place_rep = place_rep_of_geom.get(geom_rep, geom_rep)
try:
mats = _world_matrices(place_rep, frozenset())
pairs = _world_matrices(place_rep, frozenset())
except Exception: # noqa: BLE001 - never crash a read over a placement chain
mats = [np.eye(4)]
pairs = [(np.eye(4), None)]
mats = [m for m, _p in pairs]
# Drop pure-identity lists to a no-op (single instance, transform=None).
nontrivial = [m for m in mats if not np.allclose(m, np.eye(4), atol=1e-12)]
if not nontrivial and len(mats) <= 1:
continue
tmap[sid] = mats
tmap[sid] = (mats, [p for _m, p in pairs])
return tmap


# SI prefix -> factor relative to the unprefixed unit (we only resolve METRE).
_SI_PREFIX_SCALE = {
"KILO": 1e3,
"DECI": 1e-1,
"CENTI": 1e-2,
"MILLI": 1e-3,
"MICRO": 1e-6,
"NANO": 1e-9,
}
# CONVERSION_BASED_UNIT names -> metres. The exact factor lives in a referenced
# MEASURE_WITH_UNIT record; resolving it cross-statement isn't worth it when the
# unit *name* already pins the factor exactly. Some writers (e.g. Abaqus) express
# even plain millimetres this way rather than via an SI prefix.
_CONV_UNIT_SCALE = {
"MILLIMETRE": 1e-3,
"MILLIMETER": 1e-3,
"MM": 1e-3,
"CENTIMETRE": 1e-2,
"CENTIMETER": 1e-2,
"CM": 1e-2,
"METRE": 1.0,
"METER": 1.0,
"M": 1.0,
"INCH": 0.0254,
"INCHES": 0.0254,
"IN": 0.0254,
"FOOT": 0.3048,
"FEET": 0.3048,
"FT": 0.3048,
"YARD": 0.9144,
"MILE": 1609.344,
}

# Both arg forms occur in the wild: SI_UNIT(.MILLI.,.METRE.) and SI_UNIT(.METRE.).
_SI_LEN_RE = re.compile(r"SI_UNIT\(\s*(?:(\.\w+\.|\$)\s*,\s*)?\.METRE\.\s*\)")
_CONV_NAME_RE = re.compile(r"CONVERSION_BASED_UNIT\(\s*'([^']*)'")


def detect_step_length_unit_scale(filepath) -> float:
"""Factor converting the file's declared length unit to METRES, read from the
first ``LENGTH_UNIT`` record in the data section (e.g. the ubiquitous
``( LENGTH_UNIT() NAMED_UNIT(*) SI_UNIT(.MILLI.,.METRE.) )`` -> 0.001).

glTF mandates metres, so the streaming GLB path multiplies positions by this;
the OCC reader does the same conversion internally via ``xstep.cascade.unit``.
Returns 1.0 when the file is already in metres or the unit is undetectable
(logged at warning in the latter case)."""
import mmap as _mmap

try:
with open(filepath, "rb") as fh, _mmap.mmap(fh.fileno(), 0, access=_mmap.ACCESS_READ) as mm:
i = mm.find(b"LENGTH_UNIT")
if i < 0:
return 1.0
start = mm.rfind(b";", 0, i) + 1
stmt = mm[start : _stmt_end(mm, start, len(mm))].decode("ascii", "replace")
except (OSError, ValueError):
return 1.0
m = _SI_LEN_RE.search(stmt)
if m is not None:
prefix = m.group(1)
if prefix is None or prefix == "$":
return 1.0
return _SI_PREFIX_SCALE.get(prefix.strip("."), 1.0)
m = _CONV_NAME_RE.search(stmt)
if m is not None:
scale = _CONV_UNIT_SCALE.get(m.group(1).strip().upper())
if scale is not None:
return scale
logger.warning("detect_step_length_unit_scale: unrecognised LENGTH_UNIT record %r — assuming metres", stmt[:120])
return 1.0


_HEADER_RE = re.compile(r"^\s*#(\d+)\s*=\s*([A-Z0-9_]+)\s*\(", re.S)
_COMPLEX_RE = re.compile(r"^\s*#(\d+)\s*=\s*\(", re.S) # #id=(NAME(..)NAME(..)..) complex record
_COMPLEX = "__COMPLEX__"
Expand Down Expand Up @@ -948,18 +1061,20 @@ def _solid_name(args: list, n_solids: int) -> str:
return args[0] if args and isinstance(args[0], str) and args[0] else f"solid_{n_solids + 1}"


def _yield_instances(name: str, geom, color, mats):
def _yield_instances(name: str, geom, color, tmap_entry):
"""Yield ONE Geometry for this (single) solid, carrying its list of world-placement
matrices. ``mats`` is a list of 4x4 matrices (one per placed instance) or None/empty
for the single, no-transform case. The downstream tessellator meshes the local shell
ONCE and applies each matrix to that mesh, so a part instanced N times meshes once.
A lone identity matrix collapses to ``transforms=None`` so flat files and
single-instance solids are byte-for-byte unchanged."""
matrices plus the matching assembly paths. ``tmap_entry`` is ``(mats, paths)`` (one
4x4 + one root-first path per placed instance) or None for the single, no-transform
case. The downstream tessellator meshes the local shell ONCE and applies each matrix
to that mesh, so a part instanced N times meshes once. A lone identity matrix
collapses to ``transforms=None`` so flat files and single-instance solids are
byte-for-byte unchanged."""
import numpy as np

mats, paths = tmap_entry if tmap_entry else (None, None)
if mats and len(mats) == 1 and np.allclose(mats[0], np.eye(4), atol=1e-12):
mats = None
yield Geometry(id=name, geometry=geom, color=color, transforms=(mats or None))
mats, paths = None, None
yield Geometry(id=name, geometry=geom, color=color, transforms=(mats or None), instance_paths=(paths or None))


def _short_reason(ex: Exception) -> str:
Expand Down Expand Up @@ -1032,6 +1147,7 @@ def _read_two_pass_dict(filepath: Path, *, tolerant: bool, skipped, on_total=Non
cdsr_ids: list[int] = []
srr_ids: list[int] = []
absr_ids: list[int] = []
sdr_ids: list[int] = []
with filepath.open("r", encoding="utf-8", errors="replace") as fh:
for stmt in _iter_statements(fh):
parsed = _parse_statement(stmt)
Expand All @@ -1049,9 +1165,11 @@ def _read_two_pass_dict(filepath: Path, *, tolerant: bool, skipped, on_total=Non
srr_ids.append(inst_id)
elif etype == "ADVANCED_BREP_SHAPE_REPRESENTATION":
absr_ids.append(inst_id)
elif etype == "SHAPE_DEFINITION_REPRESENTATION":
sdr_ids.append(inst_id)

colour_map = _build_colour_map(pool.get, styled_ids)
tmap = _build_transform_map(pool.get, root_ids, cdsr_ids, srr_ids, absr_ids)
tmap = _build_transform_map(pool.get, root_ids, cdsr_ids, srr_ids, absr_ids, sdr_ids)
if on_total is not None:
on_total(len(root_ids))
resolver = _Resolver(pool)
Expand Down Expand Up @@ -1103,6 +1221,7 @@ def _scan_offset_index(mm):
cdsr: list[int] = [] # CONTEXT_DEPENDENT_SHAPE_REPRESENTATION ids — assembly transforms
srr: list[int] = [] # standalone SHAPE_REPRESENTATION_RELATIONSHIP ids
absr: list[int] = [] # ADVANCED_BREP_SHAPE_REPRESENTATION ids (solid -> geom rep)
sdr: list[int] = [] # SHAPE_DEFINITION_REPRESENTATION ids (rep -> product, for tree names)
n = len(mm)
pos = 0
while pos < n:
Expand Down Expand Up @@ -1144,8 +1263,10 @@ def _scan_offset_index(mm):
srr.append(rid)
elif kw == "ADVANCED_BREP_SHAPE_REPRESENTATION":
absr.append(rid)
elif kw == "SHAPE_DEFINITION_REPRESENTATION":
sdr.append(rid)
pos = end + 1
return ids, offs, roots, styled, cdsr, srr, absr
return ids, offs, roots, styled, cdsr, srr, absr, sdr


class _OffsetPool:
Expand Down Expand Up @@ -1184,7 +1305,7 @@ def _read_two_pass_lazy(filepath: Path, *, tolerant: bool, skipped, on_total=Non
mm = mmap.mmap(fh.fileno(), 0, access=mmap.ACCESS_READ)
idx_tmp: list[str] = [] # sorted-index temp files, unlinked in finally
try:
ids_arr, offs_arr, roots, styled, cdsr, srr, absr = _scan_offset_index(mm)
ids_arr, offs_arr, roots, styled, cdsr, srr, absr, sdr = _scan_offset_index(mm)
ids_np = np.frombuffer(ids_arr, dtype=np.int64)
offs_np = np.frombuffer(offs_arr, dtype=np.int64)
order = np.argsort(ids_np, kind="stable")
Expand All @@ -1210,7 +1331,7 @@ def _read_two_pass_lazy(filepath: Path, *, tolerant: bool, skipped, on_total=Non
ids_mm, offs_mm = ids_sorted, offs_sorted
pool = _OffsetPool(mm, ids_mm, offs_mm)
colour_map = _build_colour_map(pool.get, styled)
tmap = _build_transform_map(pool.get, roots, cdsr, srr, absr)
tmap = _build_transform_map(pool.get, roots, cdsr, srr, absr, sdr)
if on_total is not None:
on_total(len(roots))
resolver = _Resolver(pool)
Expand Down
4 changes: 4 additions & 0 deletions src/ada/geom/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ class Geometry(Generic[T]):
# The solid is tessellated ONCE in its local frame; each transform is applied to the
# resulting mesh (not the B-rep), so a part instanced N times meshes once.
transforms: list[np.ndarray] | None = None
# Aligned 1:1 with ``transforms``: each instance's assembly path, a root-first tuple
# of (rep_id, product_name) levels, so a scene builder can group instances the way
# the source assembly tree does. None = no hierarchy information.
instance_paths: list[tuple] | None = None
Loading
Loading