diff --git a/doc/api.rst b/doc/api.rst index 07eebfeb..1fd5cb64 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -20,9 +20,9 @@ Creating a model model.Model.add_objective model.Model.add_piecewise_formulation piecewise.PiecewiseFormulation + piecewise.Slopes piecewise.breakpoints piecewise.segments - piecewise.slopes_to_points piecewise.tangent_lines model.Model.linexpr model.Model.remove_constraints diff --git a/doc/piecewise-linear-constraints.rst b/doc/piecewise-linear-constraints.rst index 78f4ecd7..e364988c 100644 --- a/doc/piecewise-linear-constraints.rst +++ b/doc/piecewise-linear-constraints.rst @@ -96,7 +96,9 @@ Two factories with distinct geometric meaning: linopy.breakpoints([0, 50, 100]) # connected linopy.breakpoints({"gen1": [0, 50], "gen2": [0, 80]}, dim="gen") # per-entity - linopy.breakpoints(slopes=[1.2, 1.4], x_points=[0, 30, 60], y0=0) # from slopes + linopy.Slopes( + [1.2, 1.4], y0=0 + ) # from slopes (deferred — pairs with a sibling tuple) linopy.segments([(0, 10), (50, 100)]) # two disjoint regions linopy.segments({"gen1": [(0, 10)], "gen2": [(0, 80)]}, dim="gen") @@ -240,21 +242,23 @@ Equivalent, but explicit about the DataArray construction: From slopes ~~~~~~~~~~~ -When you know marginal costs (slopes) rather than absolute values: +When you know marginal costs (slopes) rather than absolute values, wrap +them in :class:`linopy.Slopes`. The x grid is borrowed from the sibling +tuple — no need to repeat it: .. code-block:: python m.add_piecewise_formulation( (power, [0, 50, 100, 150]), - ( - cost, - linopy.breakpoints( - slopes=[1.1, 1.5, 1.9], x_points=[0, 50, 100, 150], y0=0 - ), - ), + (cost, linopy.Slopes([1.1, 1.5, 1.9], y0=0)), ) # cost breakpoints: [0, 55, 130, 225] +For standalone resolution outside of ``add_piecewise_formulation``, call +:meth:`linopy.Slopes.to_breakpoints` with an explicit x grid:: + + bp = linopy.Slopes([1.1, 1.5, 1.9], y0=0).to_breakpoints([0, 50, 100, 150]) + Per-entity breakpoints ~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/release_notes.rst b/doc/release_notes.rst index 52d7526a..88180844 100644 --- a/doc/release_notes.rst +++ b/doc/release_notes.rst @@ -16,7 +16,8 @@ Upcoming Version * Add unit-commitment gating via the ``active`` parameter on ``add_piecewise_formulation``: a binary variable that, when zero, forces all auxiliary variables (and thus the linked expressions) to zero. Works with the SOS2, incremental, and disjunctive methods. * Surface formulation metadata on the returned ``PiecewiseFormulation``: ``.method`` (resolved method name) and ``.convexity`` (``"convex"`` / ``"concave"`` / ``"linear"`` / ``"mixed"`` when well-defined). Both persist across netCDF round-trip. * Add ``tangent_lines()`` as a low-level helper that returns per-piece chord expressions as a ``LinearExpression`` — no variables created. Most users should prefer ``add_piecewise_formulation`` with a bounded tuple ``(y, y_pts, "<=")``, which builds on this helper and adds domain bounds and curvature validation. -* Add ``linopy.breakpoints()`` (lists/Series/DataFrame/DataArray/dict, plus a slopes-mode constructor), ``linopy.segments()`` (disjunctive operating regions), and ``slopes_to_points()`` (per-piece slopes → breakpoint y-coordinates) as breakpoint-construction helpers. +* Add ``linopy.breakpoints()`` (lists/Series/DataFrame/DataArray/dict) and ``linopy.segments()`` (disjunctive operating regions) as breakpoint-construction helpers. +* Add ``linopy.Slopes`` for specifying a piecewise curve by marginal costs / per-piece slopes instead of absolute y-values — ``(fuel, Slopes([1.2, 1.4, 1.7], y0=0))`` borrows the x grid from a sibling tuple in ``add_piecewise_formulation``. * Add the `sphinx-copybutton` to the documentation * Add SOS1 and SOS2 reformulations for solvers not supporting them. * Add semi-continous variables for solvers that support them diff --git a/examples/piecewise-linear-constraints.ipynb b/examples/piecewise-linear-constraints.ipynb index a7011935..392ca8f1 100644 --- a/examples/piecewise-linear-constraints.ipynb +++ b/examples/piecewise-linear-constraints.ipynb @@ -391,6 +391,36 @@ "m.solve(reformulate_sos=\"auto\")\n", "m.solution[[\"power\", \"fuel\"]].to_dataframe()" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 8. Specifying with slopes — `Slopes`\n", + "\n", + "When marginal costs (slopes) are more natural than absolute y-values, wrap them in `linopy.Slopes`. The x grid is borrowed from the sibling tuple — no need to repeat it. Same curve as section 1:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "m = linopy.Model()\n", + "power = m.add_variables(name=\"power\", lower=0, upper=100, coords=[time])\n", + "fuel = m.add_variables(name=\"fuel\", lower=0, coords=[time])\n", + "\n", + "m.add_piecewise_formulation(\n", + " (power, [0, 30, 60, 100]),\n", + " (fuel, linopy.Slopes([1.2, 1.6, 2.15], y0=0)),\n", + ")\n", + "m.add_constraints(power == demand, name=\"demand\")\n", + "m.add_objective(fuel.sum())\n", + "m.solve(reformulate_sos=\"auto\")\n", + "\n", + "m.solution[[\"power\", \"fuel\"]].to_pandas()" + ] } ], "metadata": { diff --git a/linopy/__init__.py b/linopy/__init__.py index 220eee3c..d47d3aa7 100644 --- a/linopy/__init__.py +++ b/linopy/__init__.py @@ -27,9 +27,9 @@ from linopy.objective import Objective from linopy.piecewise import ( PiecewiseFormulation, + Slopes, breakpoints, segments, - slopes_to_points, tangent_lines, ) from linopy.remote import RemoteHandler @@ -53,6 +53,7 @@ "PiecewiseFormulation", "QuadraticExpression", "RemoteHandler", + "Slopes", "Variable", "Variables", "align", @@ -62,6 +63,5 @@ "options", "read_netcdf", "segments", - "slopes_to_points", "tangent_lines", ) diff --git a/linopy/piecewise.py b/linopy/piecewise.py index 5918fea7..7497c4bf 100644 --- a/linopy/piecewise.py +++ b/linopy/piecewise.py @@ -12,7 +12,7 @@ from collections.abc import Sequence from dataclasses import dataclass from numbers import Real -from typing import TYPE_CHECKING, Literal, TypeAlias +from typing import TYPE_CHECKING, Literal, TypeAlias, TypeGuard import numpy as np import pandas as pd @@ -63,32 +63,270 @@ # Each user-facing piecewise entry point fires its EvolvingAPIWarning at # most once per process. Without dedup, a single model build emits the # verbose warning hundreds of times and drowns out other output. -_EvolvingApiKey: TypeAlias = Literal["tangent_lines", "add_piecewise_formulation"] +_EvolvingApiKey: TypeAlias = Literal[ + "tangent_lines", "add_piecewise_formulation", "Slopes" +] _emitted_evolving_warnings: set[_EvolvingApiKey] = set() -def _warn_evolving_api(key: _EvolvingApiKey, message: str) -> None: - """Emit an :class:`EvolvingAPIWarning` at most once per session per ``key``.""" +def _warn_evolving_api(key: _EvolvingApiKey, message: str, stacklevel: int = 3) -> None: + """ + Emit an :class:`EvolvingAPIWarning` at most once per session per ``key``. + + ``stacklevel`` defaults to 3 (helper → entry-point function → user + code). Pass a larger value when called from one frame deeper than + a function — e.g. from a dataclass ``__post_init__``, which is + itself invoked by an auto-generated ``__init__``. + """ if key in _emitted_evolving_warnings: return _emitted_evolving_warnings.add(key) - warnings.warn(message, category=EvolvingAPIWarning, stacklevel=3) + warnings.warn(message, category=EvolvingAPIWarning, stacklevel=stacklevel) # Accepted input types for breakpoint-like data BreaksLike: TypeAlias = ( - Sequence[float] | DataArray | pd.Series | pd.DataFrame | dict[str, Sequence[float]] + Sequence[float] + | np.ndarray + | DataArray + | pd.Series + | pd.DataFrame + | dict[str, Sequence[float]] ) # Accepted input types for segment-like data (2D: segments × breakpoints) SegmentsLike: TypeAlias = ( Sequence[Sequence[float]] + | np.ndarray | DataArray | pd.DataFrame | dict[str, Sequence[Sequence[float]]] ) +# --------------------------------------------------------------------------- +# Deferred slopes spec +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True, slots=True, repr=False, eq=False) +class Slopes: + """ + Per-piece slopes + initial y-value, deferred until an x grid is known. + + Used as the second element of a tuple in + :func:`add_piecewise_formulation`. When any :class:`Slopes` tuple is + present, **exactly one** other tuple must carry explicit breakpoints — + that tuple's values are the x grid against which all :class:`Slopes` + are integrated:: + + m.add_piecewise_formulation( + (power, [0, 30, 60, 100]), # the x grid + (fuel, Slopes([1.2, 1.4, 1.7], y0=0)), # integrated against power + ) + + With two or more non-:class:`Slopes` tuples there is no canonical x + axis, and the call raises :class:`ValueError`. Resolve the + :class:`Slopes` explicitly via :meth:`to_breakpoints` in that case, + or for any standalone use:: + + bp = Slopes([1.2, 1.4, 1.7], y0=0).to_breakpoints([0, 30, 60, 100]) + + Parameters + ---------- + values : BreaksLike + Per-piece slopes. 1D for shared breakpoints; 2D (DataFrame / + dict / DataArray with entity dim) for per-entity slopes. + y0 : float, dict, pd.Series, or DataArray, default 0.0 + y-value at the first breakpoint. Scalar broadcasts to all + entities; dict/Series/DataArray provides per-entity values. + align : {"pieces", "leading"}, default "pieces" + Alignment of ``values`` relative to the x grid. + + - ``"pieces"``: ``len(values) == len(x_points) - 1``; + ``values[i]`` is the slope between ``x[i]`` and ``x[i+1]``. + - ``"leading"``: ``len(values) == len(x_points)``; ``values[0]`` + must be NaN and is dropped, ``values[i]`` for ``i>=1`` is the + slope between ``x[i-1]`` and ``x[i]``. Useful when a marginal + value is tabulated alongside each breakpoint with the first + row's marginal undefined. + dim : str, optional + Entity dimension name. Required when ``values`` is a + ``pd.DataFrame`` or ``dict``. + + Warns + ----- + EvolvingAPIWarning + :class:`Slopes` is part of the newly-added piecewise API. Its + constructor signature and dispatch semantics may be refined. + Silence with ``warnings.filterwarnings("ignore", + category=linopy.EvolvingAPIWarning)``. + """ + + values: BreaksLike + y0: Real | dict[str, Real] | pd.Series | DataArray = 0.0 + align: Literal["pieces", "leading"] = "pieces" + dim: str | None = None + + def __post_init__(self) -> None: + # ``stacklevel=4``: warn → _warn_evolving_api → __post_init__ → + # dataclass-generated ``__init__`` → user code. + _warn_evolving_api( + "Slopes", + "piecewise: Slopes is a new API; the constructor signature and " + "the dispatch rules for inheriting an x grid from sibling tuples " + "may be refined in minor releases.", + stacklevel=4, + ) + + def to_breakpoints(self, x_points: BreaksLike) -> DataArray: + """ + Resolve to a breakpoint :class:`xarray.DataArray`, given an x grid. + + Rarely called directly — typically you pass the :class:`Slopes` + instance to :func:`add_piecewise_formulation` and the x grid is + inherited from a sibling tuple. Use this method for inspection + or when building breakpoints outside the formulation pipeline. + """ + return _breakpoints_from_slopes( + self.values, x_points, self.y0, self.dim, self.align + ) + + def __repr__(self) -> str: + bits = [_summarise_breakslike(self.values), f"y0={self.y0!r}"] + if self.align != "pieces": + bits.append(f"align={self.align!r}") + if self.dim is not None: + bits.append(f"dim={self.dim!r}") + return f"Slopes({', '.join(bits)})" + + def __eq__(self, other: object) -> bool: + """ + Value-equality across the field types accepted by the constructor. + + Two ``Slopes`` are equal iff every field matches: + + * ``align`` and ``dim`` compare with ``==`` (str / None). + * ``y0`` and ``values`` dispatch on type via :func:`_values_equal`: + numeric scalars compare by value across types (``int 0 == + float 0.0 == np.float64(0)``); ``list`` and ``tuple`` are + promoted to ndarray so NaN content compares element-wise + regardless of which NaN object was used; ndarrays use + ``np.array_equal(equal_nan=True)`` (with a fallback for + non-numeric dtypes); ``pd.Series`` / ``pd.DataFrame`` / + ``DataArray`` use ``.equals``; ``dict`` recurses on matching + keys. + + Non-``Slopes`` operands return ``NotImplemented`` per Python + convention. + + Caveats + ------- + * ``Series.equals`` / ``DataFrame.equals`` / ``DataArray.equals`` + are *order-sensitive*: two frames with the same content but + reordered rows / columns / coords compare unequal. + * Cross-container coercion is limited to ``list``/``tuple`` → + ndarray. A ``dict`` and a ``DataFrame`` describing the same + per-entity slopes still compare unequal. + + ``__hash__`` is set to ``None`` (unhashable) since the inner + ``values`` may be a mutable container. + """ + if not isinstance(other, Slopes): + return NotImplemented + return ( + self.align == other.align + and self.dim == other.dim + and _values_equal(self.y0, other.y0) + and _values_equal(self.values, other.values) + ) + + __hash__ = None # type: ignore[assignment] + + +def _is_numeric_scalar(x: object) -> TypeGuard[Real]: + return isinstance(x, Real) and not isinstance(x, bool) + + +def _values_equal(a: object, b: object) -> bool: + """ + Type-dispatched equality for ``Slopes`` field values (NaN-safe). + + Numeric scalars compare by value across types (``int 0 == float 0.0 == + np.float64(0)``); ``bool`` is excluded. Lists / tuples are promoted + to ndarray so in-place ``float('nan')`` content compares NaN-safe. + Non-numeric ndarray dtypes fall back to ``np.array_equal`` without + ``equal_nan``. ``DataFrame`` / ``Series`` / ``DataArray`` use + ``.equals``; ``dict`` recurses on matching keys. + """ + if _is_numeric_scalar(a) and _is_numeric_scalar(b): + af, bf = float(a), float(b) + return af == bf or (af != af and bf != bf) + + if isinstance(a, list | tuple): + a = np.asarray(a) + if isinstance(b, list | tuple): + b = np.asarray(b) + + if isinstance(a, np.ndarray): + if not isinstance(b, np.ndarray) or a.shape != b.shape: + return False + try: + return bool(np.array_equal(a, b, equal_nan=True)) + except TypeError: + return bool(np.array_equal(a, b)) + + if isinstance(a, pd.DataFrame): + return isinstance(b, pd.DataFrame) and bool(a.equals(b)) + if isinstance(a, pd.Series): + return isinstance(b, pd.Series) and bool(a.equals(b)) + if isinstance(a, DataArray): + return isinstance(b, DataArray) and bool(a.equals(b)) + + if isinstance(a, dict): + return ( + isinstance(b, dict) + and a.keys() == b.keys() + and all(_values_equal(a[k], b[k]) for k in a) + ) + + return type(a) is type(b) and bool(a == b) + + +def _summarise_breakslike(v: BreaksLike) -> str: + """Compact one-line summary of a BreaksLike value for use in reprs.""" + if isinstance(v, DataArray): + sizes = ", ".join(f"{d}: {s}" for d, s in v.sizes.items()) + return f"" + if isinstance(v, pd.DataFrame): + return f"" + if isinstance(v, pd.Series): + return f"" + if isinstance(v, dict): + return f"" + + arr = np.asarray(v) + if arr.ndim > 1: + return f"" + seq: list = arr.tolist() + if len(seq) <= 8: + return "[" + ", ".join(_short_num(x) for x in seq) + "]" + head = ", ".join(_short_num(x) for x in seq[:3]) + tail = ", ".join(_short_num(x) for x in seq[-2:]) + return f"[{head}, ..., {tail}] ({len(seq)} items)" + + +def _short_num(x: object) -> str: + """Compact number formatting for repr — ``g`` for floats, ``repr`` else.""" + if isinstance(x, float): + return f"{x:g}" + return repr(x) + + +# Tuple element type covering both eager (DataArray etc.) and deferred (Slopes) bps. +BreaksOrSlopes: TypeAlias = BreaksLike | Slopes + + # --------------------------------------------------------------------------- # Result type # --------------------------------------------------------------------------- @@ -227,7 +465,7 @@ def _rename_to_pieces(da: DataArray, piece_index: np.ndarray) -> DataArray: return da -def _sequence_to_array(values: Sequence[float]) -> DataArray: +def _sequence_to_array(values: Sequence[float] | np.ndarray | pd.Series) -> DataArray: arr = np.asarray(values, dtype=float) if arr.ndim != 1: raise ValueError( @@ -322,7 +560,7 @@ def _dict_segments_to_array( def _breakpoints_from_slopes( slopes: BreaksLike, x_points: BreaksLike, - y0: float | dict[str, float] | pd.Series | DataArray, + y0: Real | dict[str, Real] | pd.Series | DataArray, dim: str | None, slopes_align: Literal["pieces", "leading"] = "pieces", ) -> DataArray: @@ -345,7 +583,7 @@ def _breakpoints_from_slopes( if slopes_arr.ndim == 1: if not isinstance(y0, Real): raise TypeError("When 'slopes' is 1D, 'y0' must be a scalar float") - pts = slopes_to_points(list(xp_arr.values), list(slopes_arr.values), float(y0)) + pts = _slopes_to_points(list(xp_arr.values), list(slopes_arr.values), float(y0)) return _sequence_to_array(pts) # Multi-dim case: per-entity slopes @@ -379,7 +617,7 @@ def _breakpoints_from_slopes( xp = _strip_nan(xp_arr.sel({entity_dim: key}).values) else: xp = _strip_nan(xp_arr.values) - computed[sk] = slopes_to_points(xp, sl, y0_map[sk]) + computed[sk] = _slopes_to_points(xp, sl, y0_map[sk]) return _dict_to_array(computed, entity_dim) @@ -389,30 +627,14 @@ def _breakpoints_from_slopes( # --------------------------------------------------------------------------- -def slopes_to_points( +def _slopes_to_points( x_points: list[float], slopes: list[float], y0: float ) -> list[float]: """ Convert per-piece slopes + initial y-value to y-coordinates at each breakpoint. - Parameters - ---------- - x_points : list[float] - Breakpoint x-coordinates (length n). - slopes : list[float] - Slope of each piece (length n-1). - y0 : float - y-value at the first breakpoint. - - Returns - ------- - list[float] - y-coordinates at each breakpoint (length n). - - Raises - ------ - ValueError - If ``len(slopes) != len(x_points) - 1``. + Internal primitive used by ``Slopes.to_breakpoints``. Public callers + should use :class:`Slopes` (DataArray output) instead. """ if len(slopes) != len(x_points) - 1: raise ValueError( @@ -426,72 +648,34 @@ def slopes_to_points( def breakpoints( - values: BreaksLike | None = None, + values: BreaksLike, *, - slopes: BreaksLike | None = None, - x_points: BreaksLike | None = None, - y0: float | dict[str, float] | pd.Series | DataArray | None = None, dim: str | None = None, - slopes_align: Literal["pieces", "leading"] = "pieces", ) -> DataArray: """ Create a breakpoint DataArray for piecewise linear constraints. - Two modes (mutually exclusive): - - **Points mode**: ``breakpoints(values, ...)`` - - **Slopes mode**: ``breakpoints(slopes=..., x_points=..., y0=...)`` - Parameters ---------- - values : BreaksLike, optional + values : BreaksLike Breakpoint values. Accepted types: ``Sequence[float]``, ``pd.Series``, ``pd.DataFrame``, or ``xr.DataArray``. A 1D input (list, Series) creates 1D breakpoints. A 2D input (DataFrame, multi-dim DataArray) creates per-entity breakpoints (``dim`` is required for DataFrame). - slopes : BreaksLike, optional - Segment slopes. Mutually exclusive with ``values``. - x_points : BreaksLike, optional - Breakpoint x-coordinates. Required with ``slopes``. - y0 : float, dict, pd.Series, or DataArray, optional - Initial y-value. Required with ``slopes``. A scalar broadcasts to - all entities. A dict/Series/DataArray provides per-entity values. dim : str, optional - Entity dimension name. Required when ``values`` or ``slopes`` is a + Entity dimension name. Required when ``values`` is a ``pd.DataFrame`` or ``dict``. - slopes_align : {"pieces", "leading"}, default "pieces" - Alignment of ``slopes`` relative to ``x_points``. - - - ``"pieces"``: ``len(slopes) == len(x_points) - 1``. ``slopes[i]`` - is the slope between ``x[i]`` and ``x[i+1]``. - - ``"leading"``: ``len(slopes) == len(x_points)``. ``slopes[0]`` - must be NaN and is ignored; ``slopes[i]`` for ``i>=1`` is the - slope between ``x[i-1]`` and ``x[i]``. Useful when a marginal - value is tabulated alongside each breakpoint with the first - row's marginal undefined. Returns ------- DataArray - """ - # Validate mutual exclusivity - if values is not None and slopes is not None: - raise ValueError("'values' and 'slopes' are mutually exclusive") - if values is not None and (x_points is not None or y0 is not None): - raise ValueError("'x_points' and 'y0' are forbidden when 'values' is given") - if slopes_align != "pieces" and slopes is None: - raise ValueError("'slopes_align' is only valid in slopes mode") - if slopes is not None: - if x_points is None or y0 is None: - raise ValueError("'slopes' requires both 'x_points' and 'y0'") - return _breakpoints_from_slopes(slopes, x_points, y0, dim, slopes_align) - - # Points mode - if values is None: - raise ValueError("Must pass either 'values' or 'slopes'") + See Also + -------- + Slopes : per-piece slopes + ``y0`` (deferred or standalone via + :meth:`Slopes.to_breakpoints`). + """ return _coerce_breaks(values, dim) @@ -848,8 +1032,8 @@ def _broadcast_points( def add_piecewise_formulation( model: Model, - *pairs: tuple[LinExprLike, BreaksLike] - | tuple[LinExprLike, BreaksLike, Literal["==", "<=", ">="]], + *pairs: tuple[LinExprLike, BreaksOrSlopes] + | tuple[LinExprLike, BreaksOrSlopes, Literal["==", "<=", ">="]], method: PWL_METHOD = "auto", active: LinExprLike | None = None, name: str | None = None, @@ -984,7 +1168,7 @@ def add_piecewise_formulation( # Parse and normalise per-tuple signs. Each pair is either # (expr, bp) — sign defaults to "==" — or (expr, bp, sign). - parsed: list[tuple[LinExprLike, BreaksLike, str]] = [] + parsed: list[tuple[LinExprLike, BreaksOrSlopes, str]] = [] for i, pair in enumerate(pairs): if not isinstance(pair, tuple) or len(pair) not in (2, 3): raise TypeError( @@ -1004,6 +1188,34 @@ def add_piecewise_formulation( ) parsed.append((expr, bp, tuple_sign)) + slopes_set = {i for i, p in enumerate(parsed) if isinstance(p[1], Slopes)} + if slopes_set: + non_slopes_idx = [i for i in range(len(parsed)) if i not in slopes_set] + if not non_slopes_idx: + raise ValueError( + "All tuples are Slopes; at least one tuple must carry an " + "explicit x grid. Pass the x grid via a regular tuple " + "or call Slopes(...).to_breakpoints(x_pts) explicitly." + ) + if len(non_slopes_idx) > 1: + raise ValueError( + f"Slopes tuples present at positions {sorted(slopes_set)}, " + f"but {len(non_slopes_idx)} non-Slopes tuples carry their " + f"own breakpoint values (positions {non_slopes_idx}). " + "There is no canonical x grid for the Slopes to integrate " + "against — borrowing from any one of them would silently " + "depend on tuple order. Either reduce to a single non-Slopes " + "tuple, or resolve the Slopes explicitly by calling " + "Slopes(...).to_breakpoints(x_pts) before passing it in." + ) + x_grid = parsed[non_slopes_idx[0]][1] + parsed = [ + (expr, bp.to_breakpoints(x_grid), sign) + if isinstance(bp, Slopes) + else (expr, bp, sign) + for expr, bp, sign in parsed + ] + # At most one non-equality sign; with 3+ tuples, none. bounded_positions = [i for i, p in enumerate(parsed) if p[2] != EQUAL] if len(bounded_positions) > 1: diff --git a/test/test_piecewise_constraints.py b/test/test_piecewise_constraints.py index 2fa4e7bb..987336a4 100644 --- a/test/test_piecewise_constraints.py +++ b/test/test_piecewise_constraints.py @@ -18,7 +18,6 @@ available_solvers, breakpoints, segments, - slopes_to_points, tangent_lines, ) from linopy.constants import ( @@ -41,10 +40,11 @@ PWL_SELECT_SUFFIX, SEGMENT_DIM, ) +from linopy.piecewise import _slopes_to_points from linopy.solver_capabilities import SolverFeature, get_available_solvers_with_feature if TYPE_CHECKING: - from linopy.piecewise import _PwlInputs + from linopy.piecewise import BreaksLike, _PwlInputs Sign: TypeAlias = Literal["==", "<=", ">="] Method: TypeAlias = Literal["sos2", "incremental", "lp", "auto"] @@ -56,23 +56,32 @@ s for s in ["highs", "gurobi", "glpk", "cplex"] if s in available_solvers ] +# Solver-output tolerance for solution-value assertions in this file. Matches +# the convention in ``test_piecewise_feasibility.py``. +TOL = 1e-6 + # =========================================================================== -# slopes_to_points +# _slopes_to_points (private list utility) # =========================================================================== -class TestSlopesToPoints: +class TestSlopesToPointsPrivate: + """ + The list-level slopes→points primitive is private; the public path is + :class:`Slopes`. These tests exist so the math stays under test even + though the helper isn't user-facing. + """ + def test_basic(self) -> None: - assert slopes_to_points([0, 1, 2], [1, 2], 0) == [0, 1, 3] + assert _slopes_to_points([0, 1, 2], [1, 2], 0) == [0, 1, 3] def test_negative_slopes(self) -> None: - result = slopes_to_points([0, 10, 20], [-0.5, -1.0], 10) - assert result == [10, 5, -5] + assert _slopes_to_points([0, 10, 20], [-0.5, -1.0], 10) == [10, 5, -5] def test_wrong_length_raises(self) -> None: with pytest.raises(ValueError, match="len\\(slopes\\)"): - slopes_to_points([0, 1, 2], [1], 0) + _slopes_to_points([0, 1, 2], [1], 0) # =========================================================================== @@ -96,61 +105,10 @@ def test_dict_without_dim_raises(self) -> None: with pytest.raises(ValueError, match="'dim' is required"): breakpoints({"a": [0, 50], "b": [0, 30]}) - def test_slopes_list(self) -> None: - bp = breakpoints(slopes=[1, 2], x_points=[0, 1, 2], y0=0) - expected = breakpoints([0, 1, 3]) - xr.testing.assert_equal(bp, expected) - - def test_slopes_dict(self) -> None: - bp = breakpoints( - slopes={"a": [1, 0.5], "b": [2, 1]}, - x_points={"a": [0, 10, 50], "b": [0, 20, 80]}, - y0={"a": 0, "b": 10}, - dim="gen", - ) - assert set(bp.dims) == {"gen", BREAKPOINT_DIM} - # a: [0, 10, 30], b: [10, 50, 110] - np.testing.assert_allclose(bp.sel(gen="a").values, [0, 10, 30]) - np.testing.assert_allclose(bp.sel(gen="b").values, [10, 50, 110]) - - def test_slopes_dict_shared_xpoints(self) -> None: - bp = breakpoints( - slopes={"a": [1, 2], "b": [3, 4]}, - x_points=[0, 1, 2], - y0={"a": 0, "b": 0}, - dim="gen", - ) - np.testing.assert_allclose(bp.sel(gen="a").values, [0, 1, 3]) - np.testing.assert_allclose(bp.sel(gen="b").values, [0, 3, 7]) - - def test_slopes_dict_shared_y0(self) -> None: - bp = breakpoints( - slopes={"a": [1, 2], "b": [3, 4]}, - x_points={"a": [0, 1, 2], "b": [0, 1, 2]}, - y0=5.0, - dim="gen", - ) - np.testing.assert_allclose(bp.sel(gen="a").values, [5, 6, 8]) - - def test_values_and_slopes_raises(self) -> None: - with pytest.raises(ValueError, match="mutually exclusive"): - breakpoints([0, 1], slopes=[1], x_points=[0, 1], y0=0) - - def test_slopes_without_xpoints_raises(self) -> None: - with pytest.raises(ValueError, match="requires both"): - breakpoints(slopes=[1], y0=0) - - def test_slopes_without_y0_raises(self) -> None: - with pytest.raises(ValueError, match="requires both"): - breakpoints(slopes=[1], x_points=[0, 1]) - - def test_xpoints_with_values_raises(self) -> None: - with pytest.raises(ValueError, match="forbidden"): - breakpoints([0, 1], x_points=[0, 1]) - - def test_y0_with_values_raises(self) -> None: - with pytest.raises(ValueError, match="forbidden"): - breakpoints([0, 1], y0=5) + def test_slopes_kwargs_removed(self) -> None: + """The slopes mode of ``breakpoints`` was removed in favour of ``Slopes``.""" + with pytest.raises(TypeError): + breakpoints([0, 1], slopes=[1], x_points=[0, 1], y0=0) # type: ignore[call-arg] # --- pandas and xarray inputs --- @@ -188,98 +146,665 @@ def test_dataarray_missing_dim_raises(self) -> None: with pytest.raises(ValueError, match="must have a"): breakpoints(da) - def test_slopes_series(self) -> None: - bp = breakpoints( - slopes=pd.Series([1, 2]), - x_points=pd.Series([0, 1, 2]), - y0=0, - ) - expected = breakpoints([0, 1, 3]) - xr.testing.assert_equal(bp, expected) - def test_slopes_dataarray(self) -> None: - slopes_da = xr.DataArray( - [[1, 2], [3, 4]], - dims=["gen", BREAKPOINT_DIM], - coords={"gen": ["a", "b"], BREAKPOINT_DIM: [0, 1]}, - ) - xp_da = xr.DataArray( - [[0, 1, 2], [0, 1, 2]], - dims=["gen", BREAKPOINT_DIM], - coords={"gen": ["a", "b"], BREAKPOINT_DIM: [0, 1, 2]}, - ) - y0_da = xr.DataArray([0, 5], dims=["gen"], coords={"gen": ["a", "b"]}) - bp = breakpoints(slopes=slopes_da, x_points=xp_da, y0=y0_da, dim="gen") - np.testing.assert_allclose(bp.sel(gen="a").values, [0, 1, 3]) - np.testing.assert_allclose(bp.sel(gen="b").values, [5, 8, 12]) +# =========================================================================== +# Slopes class — deferred breakpoint spec +# =========================================================================== - def test_slopes_dataframe(self) -> None: - slopes_df = pd.DataFrame({"a": [1, 0.5], "b": [2, 1]}).T - xp_df = pd.DataFrame({"a": [0, 10, 50], "b": [0, 20, 80]}).T - y0_series = pd.Series({"a": 0, "b": 10}) - bp = breakpoints(slopes=slopes_df, x_points=xp_df, y0=y0_series, dim="gen") - np.testing.assert_allclose(bp.sel(gen="a").values, [0, 10, 30]) - np.testing.assert_allclose(bp.sel(gen="b").values, [10, 50, 110]) +class TestSlopesValueType: + """``Slopes`` is a frozen value type with a custom repr.""" -# =========================================================================== -# breakpoints(slopes_align="leading") -# =========================================================================== + def test_immutable(self) -> None: + from linopy import Slopes + + s = Slopes([1, 2], y0=0) + with pytest.raises((AttributeError, TypeError)): + s.y0 = 5 # type: ignore[misc] + + @pytest.mark.parametrize( + ("kwargs", "expected"), + [ + pytest.param( + {"values": [1.2, 1.6, 2.15], "y0": 0}, + "Slopes([1.2, 1.6, 2.15], y0=0)", + id="1d_list_defaults_hidden", + ), + pytest.param( + {"values": [np.nan, 1, 2], "y0": 0, "align": "leading"}, + "align='leading'", + id="non_default_align_shown", + ), + pytest.param( + {"values": [1, 2], "y0": 0, "dim": "gen"}, + "dim='gen'", + id="non_default_dim_shown", + ), + ], + ) + def test_repr_renders(self, kwargs: dict[str, Any], expected: str) -> None: + from linopy import Slopes + + r = repr(Slopes(**kwargs)) + if expected.startswith("Slopes("): + assert r == expected + else: + assert expected in r + + def test_repr_truncates_long_sequences(self) -> None: + """Lists/ndarrays over 8 entries must be summarised, not dumped.""" + from linopy import Slopes + + r = repr(Slopes(list(range(50)), y0=0)) + # No 50-element dump — must include the "(50 items)" suffix and + # contain at most a handful of explicit numbers. + assert "(50 items)" in r + assert "..." in r + assert len(r) < 80, f"repr unexpectedly long: {r!r}" + + def test_repr_normalises_numpy_scalars(self) -> None: + """``np.int64`` / ``np.float64`` must render as plain Python numbers.""" + from linopy import Slopes + + r_int = repr(Slopes(np.array([1, 2, 3], dtype=np.int64), y0=0)) + r_float = repr(Slopes(np.array([1.5, 2.5, 3.5]), y0=0)) + # No numpy type prefixes, no surprising precision. + assert "np." not in r_int and "int64" not in r_int + assert r_int == "Slopes([1, 2, 3], y0=0)" + assert r_float == "Slopes([1.5, 2.5, 3.5], y0=0)" + + @pytest.mark.parametrize( + ("a", "b", "expected"), + [ + pytest.param( + {"values": [1, 2], "y0": 0}, + {"values": [1, 2], "y0": 0}, + True, + id="lists_equal", + ), + pytest.param( + {"values": np.array([1, 2]), "y0": 0}, + {"values": np.array([1, 2]), "y0": 0}, + True, + id="ndarrays_equal_no_raise", + ), + pytest.param( + {"values": [1, 2], "y0": 0}, + {"values": [1, 3], "y0": 0}, + False, + id="different_values", + ), + pytest.param( + {"values": [1, 2], "y0": 0}, + {"values": [1, 2], "y0": 5}, + False, + id="different_y0", + ), + pytest.param( + # list and ndarray of same numeric content — list/tuple + # are promoted to ndarray, so they compare equal. + {"values": [1, 2], "y0": 0}, + {"values": np.array([1, 2]), "y0": 0}, + True, + id="list_and_ndarray_same_content", + ), + pytest.param( + # int and float y0 describe the same curve — Real scalars + # coerce numerically. + {"values": [1, 2], "y0": 0}, + {"values": [1, 2], "y0": 0.0}, + True, + id="int_and_float_y0", + ), + pytest.param( + # numpy scalar y0 vs Python float — same numeric value. + {"values": [1, 2], "y0": np.float64(0)}, + {"values": [1, 2], "y0": 0.0}, + True, + id="numpy_scalar_and_float_y0", + ), + pytest.param( + # In-place ``float('nan')`` (not the np.nan singleton) must + # still compare equal — the array-path promotion handles it. + {"values": [float("nan"), 1.0], "y0": 0, "align": "leading"}, + {"values": [float("nan"), 1.0], "y0": 0, "align": "leading"}, + True, + id="float_nan_in_list", + ), + pytest.param( + {"values": [np.nan, 1], "y0": 0, "align": "leading"}, + {"values": [np.nan, 1], "y0": 0, "align": "leading"}, + True, + id="np_nan_in_list", + ), + pytest.param( + {"values": [1, 2], "y0": float("nan")}, + {"values": [1, 2], "y0": float("nan")}, + True, + id="nan_in_scalar_y0", + ), + pytest.param( + {"values": {"a": [1, 2], "b": [3, 4]}, "y0": 0, "dim": "g"}, + {"values": {"a": [1, 2], "b": [3, 4]}, "y0": 0, "dim": "g"}, + True, + id="dict_equal", + ), + pytest.param( + {"values": {"a": [1, 2]}, "y0": 0, "dim": "g"}, + {"values": {"a": [9, 9]}, "y0": 0, "dim": "g"}, + False, + id="dict_different_inner_values", + ), + ], + ) + def test_equality( + self, a: dict[str, Any], b: dict[str, Any], expected: bool + ) -> None: + """Value-equality across the field types accepted by the constructor.""" + from linopy import Slopes + + assert (Slopes(**a) == Slopes(**b)) is expected + + def test_eq_against_non_slopes_returns_notimplemented(self) -> None: + from linopy import Slopes + + # Falls through to bool(False), not raising. + assert (Slopes([1, 2], y0=0) == "not a slopes") is False + assert (Slopes([1, 2], y0=0) == 42) is False + + def test_eq_dataframe_is_order_sensitive(self) -> None: + """``DataFrame.equals`` is order-sensitive — pin the documented caveat.""" + from linopy import Slopes + + df1 = pd.DataFrame({"a": [1, 0.5], "b": [2, 1]}).T + df2 = df1.loc[["b", "a"]] + assert (Slopes(df1, y0=0, dim="g") == Slopes(df2, y0=0, dim="g")) is False + + def test_eq_object_dtype_ndarray_does_not_raise(self) -> None: + """Object/string-dtype ndarrays fall back to plain array_equal.""" + from linopy import Slopes + + a = np.array(["x", "y"], dtype=object) + b = np.array(["x", "y"], dtype=object) + c = np.array(["x", "z"], dtype=object) + # Equal content -> True; different content -> False; neither raises. + assert (Slopes(a, y0=0) == Slopes(b, y0=0)) is True + assert (Slopes(a, y0=0) == Slopes(c, y0=0)) is False + + def test_unhashable(self) -> None: + """ + ``values`` may be a mutable container (list, ndarray, dict), so + ``Slopes`` is intentionally unhashable. Using one as a dict key + or set member must raise rather than silently using identity hash. + """ + from linopy import Slopes + + with pytest.raises(TypeError, match="unhashable"): + {Slopes([1, 2], y0=0): "x"} + + @pytest.mark.parametrize( + ("values", "fragment"), + [ + pytest.param( + pd.DataFrame({"a": [1, 2], "b": [3, 4]}).T, + "", + id="dict", + ), + pytest.param( + np.zeros((20, 5, 30)), + "", + id="multi_dim_ndarray", + ), + ], + ) + def test_repr_summarises_bulky_values( + self, values: BreaksLike, fragment: str + ) -> None: + """Bulky value types must not dump their full content into the repr.""" + from linopy import Slopes + + r = repr(Slopes(values, y0=0, dim="gen")) + assert fragment in r + + +class TestSlopesToBreakpoints1D: + """ + 1D inputs (single shared curve). All callable input types must + resolve to the same DataArray for the same data: slopes [1, 2] over + x = [0, 1, 2] with y0=0 yields y = [0, 1, 3]. + """ + + EXPECTED = [0.0, 1.0, 3.0] + @pytest.mark.parametrize( + ("slopes_in", "x_in"), + [ + pytest.param([1, 2], [0, 1, 2], id="list-list"), + pytest.param((1, 2), (0, 1, 2), id="tuple-tuple"), + pytest.param(np.array([1, 2]), np.array([0, 1, 2]), id="ndarray-ndarray"), + pytest.param(pd.Series([1, 2]), pd.Series([0, 1, 2]), id="series-series"), + pytest.param([1, 2], np.array([0, 1, 2]), id="list-ndarray-mixed"), + pytest.param( + xr.DataArray([1, 2], dims=[BREAKPOINT_DIM]), + xr.DataArray([0, 1, 2], dims=[BREAKPOINT_DIM]), + id="dataarray-dataarray", + ), + ], + ) + def test_resolves_to_expected_breakpoints( + self, slopes_in: BreaksLike, x_in: BreaksLike + ) -> None: + from linopy import Slopes + + bp = Slopes(slopes_in, y0=0).to_breakpoints(x_in) + assert bp.dims == (BREAKPOINT_DIM,) + np.testing.assert_allclose(bp.values, self.EXPECTED) + + @pytest.mark.parametrize( + ("slopes", "x_pts", "y0", "expected"), + [ + pytest.param([1, 2], [0, 1, 2], 0, [0, 1, 3], id="canonical"), + pytest.param( + [1.2, 1.4, 1.7], + [0, 30, 60, 100], + 0, + [0, 36, 78, 146], + id="non_unit_slopes", + ), + pytest.param([-0.5, -1.0], [0, 10, 20], 10, [10, 5, -5], id="negative"), + pytest.param([1, 2], [0, 1, 2], 5, [5, 6, 8], id="non_zero_y0"), + ], + ) + def test_arithmetic_anchors( + self, + slopes: list[float], + x_pts: list[float], + y0: float, + expected: list[float], + ) -> None: + """Hand-computable cases pinning the slopes→y arithmetic.""" + from linopy import Slopes -class TestSlopesAlignLeading: + bp = Slopes(slopes, y0=y0).to_breakpoints(x_pts) + np.testing.assert_allclose(bp.values, expected) + + +class TestSlopesToBreakpointsPerEntity: """ - `slopes_align="leading"` accepts slopes of length len(x_points), - where slopes[0] is a NaN sentinel that gets dropped. + Per-entity inputs (multiple curves along one entity dim). All input + container types must produce the same per-entity result. + + Reference data: gen=a slopes [1, 0.5] over x=[0, 10, 50] from y0=0 + → [0, 10, 30]; gen=b slopes [2, 1] over x=[0, 20, 80] from y0=10 + → [10, 50, 110]. """ - def test_1d_matches_pieces(self) -> None: - leading = breakpoints( - slopes=[np.nan, 1, 2], x_points=[0, 1, 2], y0=0, slopes_align="leading" + EXPECTED_A = [0.0, 10.0, 30.0] + EXPECTED_B = [10.0, 50.0, 110.0] + + @pytest.mark.parametrize( + ("slopes_in", "x_in"), + [ + pytest.param( + {"a": [1, 0.5], "b": [2, 1]}, + {"a": [0, 10, 50], "b": [0, 20, 80]}, + id="dict-dict", + ), + pytest.param( + pd.DataFrame({"a": [1, 0.5], "b": [2, 1]}).T, + pd.DataFrame({"a": [0, 10, 50], "b": [0, 20, 80]}).T, + id="dataframe-dataframe", + ), + pytest.param( + xr.DataArray( + [[1, 0.5], [2, 1]], + dims=["gen", BREAKPOINT_DIM], + coords={"gen": ["a", "b"], BREAKPOINT_DIM: [0, 1]}, + ), + xr.DataArray( + [[0, 10, 50], [0, 20, 80]], + dims=["gen", BREAKPOINT_DIM], + coords={"gen": ["a", "b"], BREAKPOINT_DIM: [0, 1, 2]}, + ), + id="dataarray-dataarray", + ), + ], + ) + def test_resolves_to_expected_per_entity( + self, slopes_in: BreaksLike, x_in: BreaksLike + ) -> None: + from linopy import Slopes + + bp = Slopes(slopes_in, y0={"a": 0, "b": 10}, dim="gen").to_breakpoints(x_in) + assert "gen" in bp.dims and BREAKPOINT_DIM in bp.dims + np.testing.assert_allclose(bp.sel(gen="a").values, self.EXPECTED_A) + np.testing.assert_allclose(bp.sel(gen="b").values, self.EXPECTED_B) + + def test_shared_x_grid_broadcasts(self) -> None: + """Per-entity slopes against a single shared x grid (1D x_points).""" + from linopy import Slopes + + bp = Slopes( + {"a": [1, 2], "b": [3, 4]}, y0={"a": 0, "b": 0}, dim="gen" + ).to_breakpoints([0, 1, 2]) + np.testing.assert_allclose(bp.sel(gen="a").values, [0, 1, 3]) + np.testing.assert_allclose(bp.sel(gen="b").values, [0, 3, 7]) + + @pytest.mark.parametrize( + ("y0", "id"), + [ + pytest.param(5.0, "scalar"), + pytest.param({"a": 5, "b": 5}, "dict"), + pytest.param(pd.Series({"a": 5, "b": 5}), "series"), + pytest.param( + xr.DataArray([5, 5], dims=["gen"], coords={"gen": ["a", "b"]}), + "dataarray", + ), + ], + ids=lambda x: x if isinstance(x, str) else None, + ) + def test_y0_input_types_broadcast_consistently(self, y0: object, id: str) -> None: + """All accepted ``y0`` shapes resolve to the same per-entity result.""" + from linopy import Slopes + + bp = Slopes({"a": [1, 2], "b": [3, 4]}, y0=y0, dim="gen").to_breakpoints( + {"a": [0, 1, 2], "b": [0, 1, 2]} ) - pieces = breakpoints(slopes=[1, 2], x_points=[0, 1, 2], y0=0) - xr.testing.assert_equal(leading, pieces) + np.testing.assert_allclose(bp.sel(gen="a").values, [5, 6, 8]) + np.testing.assert_allclose(bp.sel(gen="b").values, [5, 8, 12]) + - def test_dict_ragged(self) -> None: - bp = breakpoints( - slopes={"a": [np.nan, 1, 0.5], "b": [np.nan, 2]}, - x_points={"a": [0, 10, 50], "b": [0, 20]}, +class TestSlopesToBreakpointsAlignment: + """ + ``align="pieces"`` (n-1 slopes) and ``align="leading"`` (n slopes + with a NaN sentinel in position 0) describe the same curve. They + must produce the same breakpoint DataArray. + """ + + @pytest.mark.parametrize( + ("pieces_input", "leading_input"), + [ + pytest.param([1, 2], [np.nan, 1, 2], id="1d"), + pytest.param( + {"a": [1, 0.5], "b": [2, 1]}, + {"a": [np.nan, 1, 0.5], "b": [np.nan, 2, 1]}, + id="dict_per_entity", + ), + ], + ) + def test_pieces_and_leading_match( + self, pieces_input: BreaksLike, leading_input: BreaksLike + ) -> None: + from linopy import Slopes + + kwargs: dict[str, Any] = {"y0": 0} + if isinstance(pieces_input, dict): + kwargs.update(dim="gen", y0={"a": 0, "b": 10}) + x_pts: BreaksLike = {"a": [0, 10, 50], "b": [0, 20, 80]} + else: + x_pts = [0, 1, 2] + pieces_bp = Slopes(pieces_input, align="pieces", **kwargs).to_breakpoints(x_pts) + leading_bp = Slopes(leading_input, align="leading", **kwargs).to_breakpoints( + x_pts + ) + xr.testing.assert_allclose(pieces_bp, leading_bp) + + def test_leading_ragged_dict(self) -> None: + """``align='leading'`` with ragged per-entity input keeps NaN padding.""" + from linopy import Slopes + + bp = Slopes( + {"a": [np.nan, 1, 0.5], "b": [np.nan, 2]}, y0={"a": 0, "b": 10}, dim="gen", - slopes_align="leading", - ) + align="leading", + ).to_breakpoints({"a": [0, 10, 50], "b": [0, 20]}) np.testing.assert_allclose(bp.sel(gen="a").values, [0, 10, 30]) np.testing.assert_allclose( bp.sel(gen="b").dropna(BREAKPOINT_DIM).values, [10, 50] ) - def test_dataarray(self) -> None: - slopes_da = xr.DataArray( - [[np.nan, 1, 2], [np.nan, 3, 4]], - dims=["gen", BREAKPOINT_DIM], - coords={"gen": ["a", "b"], BREAKPOINT_DIM: [0, 1, 2]}, - ) - xp_da = xr.DataArray( - [[0, 1, 2], [0, 1, 2]], - dims=["gen", BREAKPOINT_DIM], - coords={"gen": ["a", "b"], BREAKPOINT_DIM: [0, 1, 2]}, + +class TestSlopesValidationErrors: + """``to_breakpoints`` rejects malformed specs with actionable messages.""" + + @pytest.mark.parametrize( + ("ctor_kwargs", "x_pts", "match"), + [ + pytest.param( + {"values": [1, 2, 3], "y0": 0, "align": "leading"}, + [0, 1, 2], + "first slope", + id="leading_first_not_nan", + ), + pytest.param( + {"values": [1, 2], "y0": {"a": 0}}, + [0, 10, 20], + "scalar float", + id="1d_with_dict_y0", + ), + pytest.param( + {"values": {"a": [1, 2], "b": [3, 4]}, "y0": "bad", "dim": "gen"}, + {"a": [0, 10, 20], "b": [0, 10, 20]}, + "y0", + id="bad_y0_type", + ), + ], + ) + def test_invalid_inputs_raise( + self, + ctor_kwargs: dict[str, Any], + x_pts: BreaksLike, + match: str, + ) -> None: + from linopy import Slopes + + with pytest.raises((TypeError, ValueError), match=match): + Slopes(**ctor_kwargs).to_breakpoints(x_pts) + + +class TestSlopesDispatch: + """Slopes inside ``add_piecewise_formulation`` — sibling resolution.""" + + def test_two_tuple_deferred(self) -> None: + from linopy import Slopes + + m = Model() + power = m.add_variables(lower=0, upper=100, name="power") + fuel = m.add_variables(lower=0, name="fuel") + # Slopes [1.2, 1.4, 1.7] resolved over the borrowed x grid + # [0, 30, 60, 100] -> fuel breakpoints [0, 36, 78, 146]. + # Equality-2-tuple convexity uses pinned_bps[1] as x; with + # increasing dy/dx slopes, the inverse view (power-vs-fuel) is + # concave — that's the label the formulation reports. + f = m.add_piecewise_formulation( + (power, [0, 30, 60, 100]), + (fuel, Slopes([1.2, 1.4, 1.7], y0=0)), ) - y0_da = xr.DataArray([0, 5], dims=["gen"], coords={"gen": ["a", "b"]}) - bp = breakpoints( - slopes=slopes_da, x_points=xp_da, y0=y0_da, slopes_align="leading" + assert f.method in ("sos2", "incremental") + assert f.convexity == "concave" + + def test_slopes_as_bounded_tuple(self) -> None: + from linopy import Slopes + + m = Model() + x = m.add_variables(lower=0, upper=30, name="x") + y = m.add_variables(lower=0, upper=100, name="y") + f = m.add_piecewise_formulation( + (y, Slopes([2, 1, 0.5], y0=0), "<="), # concave + (x, [0, 10, 20, 30]), ) - np.testing.assert_allclose(bp.sel(gen="a").values, [0, 1, 3]) - np.testing.assert_allclose(bp.sel(gen="b").values, [5, 8, 12]) + assert f.method == "lp" + assert f.convexity == "concave" + + def test_all_slopes_raises(self) -> None: + from linopy import Slopes - def test_non_nan_first_slope_raises(self) -> None: - with pytest.raises(ValueError, match="first slope"): - breakpoints( - slopes=[1, 2, 3], x_points=[0, 1, 2], y0=0, slopes_align="leading" + m = Model() + x = m.add_variables(name="x") + y = m.add_variables(name="y") + with pytest.raises(ValueError, match="All tuples are Slopes"): + m.add_piecewise_formulation( + (x, Slopes([1, 2], y0=0)), + (y, Slopes([1, 1], y0=0)), ) - def test_without_slopes_mode_raises(self) -> None: - with pytest.raises(ValueError, match="only valid in slopes mode"): - breakpoints([0, 1, 2], slopes_align="leading") + def test_multiple_non_slopes_with_slopes_raises(self) -> None: + """ + With Slopes present, two or more non-Slopes tuples is rejected: + each non-Slopes tuple is a y-vector for its own variable, so + there is no canonical x grid for the Slopes to integrate against. + """ + from linopy import Slopes + + m = Model() + x = m.add_variables(name="x") + y = m.add_variables(name="y") + z = m.add_variables(name="z") + with pytest.raises(ValueError, match="no canonical x grid"): + m.add_piecewise_formulation( + (x, [0, 10, 20, 30]), + (y, [0, 100, 200, 300]), + (z, Slopes([1, 1, 1], y0=0)), + ) + + def test_multiple_slopes_share_x_grid(self) -> None: + """ + Two Slopes tuples plus one non-Slopes — both Slopes resolve against + the same borrowed x grid. Pin via distinct slope sequences so the + two Slopes-derived variables end up with different breakpoint values. + """ + from linopy import Slopes + + m = Model() + x = m.add_variables(name="x") + y = m.add_variables(name="y") + z = m.add_variables(name="z") + f = m.add_piecewise_formulation( + (x, [0, 10, 20, 30]), + (y, Slopes([1, 1, 1], y0=0)), # → [0, 10, 20, 30] + (z, Slopes([2, 2, 2], y0=0)), # → [0, 20, 40, 60] + ) + # 3-var formulation -> convexity is None. + assert f.convexity is None + assert f.name in m._piecewise_formulations + + def test_slopes_align_leading_in_dispatch(self) -> None: + from linopy import Slopes + + m = Model() + x = m.add_variables(lower=0, upper=2, name="x") + y = m.add_variables(name="y") + f = m.add_piecewise_formulation( + (x, [0, 1, 2]), + (y, Slopes([np.nan, 1, 2], y0=0, align="leading")), + ) + # Resolved bp for y: [0, 1, 3]. As above, the equality-2-tuple + # convention reports the inverse view → concave. + assert f.convexity == "concave" + + +class TestSlopesDispatchEquivalence: + """ + Deferred Slopes dispatch builds the same model as eager breakpoints. + + The wiring tests in :class:`TestSlopesDispatch` verify dispatch attributes + (``method``/``convexity``). These tests pin the *outcome*: the deferred + form must produce a model byte-equal to the eagerly-resolved reference + (same auxiliary variables, same constraint coefficients/RHS). + """ + + def test_two_tuple_matches_eager(self) -> None: + from linopy import Slopes + from linopy.testing import assert_model_equal + + # Slopes([1.2, 1.4, 1.7], y0=0) over [0, 30, 60, 100] resolves to + # fuel breakpoints [0, 36, 78, 146]. + m_eager = Model() + p1 = m_eager.add_variables(lower=0, upper=100, name="power") + f1 = m_eager.add_variables(lower=0, name="fuel") + m_eager.add_piecewise_formulation( + (p1, [0, 30, 60, 100]), (f1, [0, 36, 78, 146]) + ) + + m_deferred = Model() + p2 = m_deferred.add_variables(lower=0, upper=100, name="power") + f2 = m_deferred.add_variables(lower=0, name="fuel") + m_deferred.add_piecewise_formulation( + (p2, [0, 30, 60, 100]), + (f2, Slopes([1.2, 1.4, 1.7], y0=0)), + ) + + assert_model_equal(m_eager, m_deferred) + + def test_multiple_slopes_resolved_breakpoints(self) -> None: + """ + Two Slopes tuples resolve against the same borrowed x grid: + y → [0, 10, 20, 30], z → [0, 20, 40, 60]. + """ + from linopy import Slopes + from linopy.testing import assert_model_equal + + m_eager = Model() + x1 = m_eager.add_variables(lower=0, upper=30, name="x") + y1 = m_eager.add_variables(lower=0, name="y") + z1 = m_eager.add_variables(lower=0, name="z") + m_eager.add_piecewise_formulation( + (x1, [0, 10, 20, 30]), + (y1, [0, 10, 20, 30]), + (z1, [0, 20, 40, 60]), + ) + + m_deferred = Model() + x2 = m_deferred.add_variables(lower=0, upper=30, name="x") + y2 = m_deferred.add_variables(lower=0, name="y") + z2 = m_deferred.add_variables(lower=0, name="z") + m_deferred.add_piecewise_formulation( + (x2, [0, 10, 20, 30]), + (y2, Slopes([1, 1, 1], y0=0)), + (z2, Slopes([2, 2, 2], y0=0)), + ) + + assert_model_equal(m_eager, m_deferred) + + def test_align_leading_matches_eager(self) -> None: + """``align='leading'`` dispatch resolves to bps [0, 1, 3].""" + from linopy import Slopes + from linopy.testing import assert_model_equal + + m_eager = Model() + x1 = m_eager.add_variables(lower=0, upper=2, name="x") + y1 = m_eager.add_variables(name="y") + m_eager.add_piecewise_formulation((x1, [0, 1, 2]), (y1, [0, 1, 3])) + + m_deferred = Model() + x2 = m_deferred.add_variables(lower=0, upper=2, name="x") + y2 = m_deferred.add_variables(name="y") + m_deferred.add_piecewise_formulation( + (x2, [0, 1, 2]), + (y2, Slopes([np.nan, 1, 2], y0=0, align="leading")), + ) + + assert_model_equal(m_eager, m_deferred) # =========================================================================== @@ -420,9 +945,11 @@ def test_with_slopes(self) -> None: y = m.add_variables(name="y") # slopes=[-0.3, 0.45, 1.2] with y0=5 -> y_points=[5, 2, 20, 80] # Non-monotonic y-breakpoints, so auto selects SOS2 + from linopy import Slopes + m.add_piecewise_formulation( (x, [0, 10, 50, 100]), - (y, breakpoints(slopes=[-0.3, 0.45, 1.2], x_points=[0, 10, 50, 100], y0=5)), + (y, Slopes([-0.3, 0.45, 1.2], y0=5)), ) assert f"pwl0{PWL_LAMBDA_SUFFIX}" in m.variables @@ -1041,10 +1568,12 @@ def test_slopes_equivalence(self, solver_name: str) -> None: m2 = Model() x2 = m2.add_variables(lower=0, upper=100, name="x") y2 = m2.add_variables(name="y") + from linopy import Slopes + env2 = tangent_lines( x2, [0, 50, 100], - breakpoints(slopes=[0.8, 0.4], x_points=[0, 50, 100], y0=0), + Slopes([0.8, 0.4], y0=0).to_breakpoints([0, 50, 100]), ) m2.add_constraints(y2 <= env2, name="pwl") m2.add_constraints(x2 <= 75, name="x_max") @@ -1369,37 +1898,10 @@ def test_non_1d_sequence_raises(self) -> None: with pytest.raises(ValueError, match="1D sequence"): breakpoints([[1, 2], [3, 4]]) - def test_breakpoints_no_values_no_slopes_raises(self) -> None: - """breakpoints() with neither values nor slopes raises.""" - with pytest.raises(ValueError, match="Must pass either"): - breakpoints() - - def test_slopes_1d_non_scalar_y0_raises(self) -> None: - """1D slopes with dict y0 raises TypeError.""" - with pytest.raises(TypeError, match="scalar float"): - breakpoints(slopes=[1, 2], x_points=[0, 10, 20], y0={"a": 0}) - - def test_slopes_bad_y0_type_raises(self) -> None: - """Slopes with unsupported y0 type raises TypeError.""" - with pytest.raises(TypeError, match="y0"): - breakpoints( - slopes={"a": [1, 2], "b": [3, 4]}, - x_points={"a": [0, 10, 20], "b": [0, 10, 20]}, - y0="bad", - dim="entity", - ) - - def test_slopes_dataarray_y0(self) -> None: - """Slopes mode with DataArray y0 works.""" - y0_da = xr.DataArray([0, 5], dims=["gen"], coords={"gen": ["a", "b"]}) - bp = breakpoints( - slopes={"a": [1, 2], "b": [3, 4]}, - x_points={"a": [0, 10, 20], "b": [0, 10, 20]}, - y0=y0_da, - dim="gen", - ) - assert BREAKPOINT_DIM in bp.dims - assert "gen" in bp.dims + def test_breakpoints_no_values_raises(self) -> None: + """breakpoints() with no positional argument raises TypeError.""" + with pytest.raises(TypeError): + breakpoints() # type: ignore[call-arg] def test_non_numeric_breakpoint_coords_raises(self) -> None: """SOS2 with string breakpoint coords raises ValueError.""" @@ -2492,6 +2994,40 @@ def test_tangent_lines_warns_and_dedups_independently(self) -> None: assert len(evolving) == 1 assert "tangent_lines" in str(evolving[0].message) + def test_slopes_construction_warns_and_dedups(self) -> None: + """ + ``Slopes(...)`` is part of the same evolving API surface and emits + on construction so that the standalone ``Slopes(...).to_breakpoints(...)`` + path doesn't silently bypass the signal. Per-key dedup keeps it + quiet for repeated use. + """ + from linopy import EvolvingAPIWarning, Slopes + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always", EvolvingAPIWarning) + Slopes([1, 2], y0=0) + Slopes([3, 4], y0=5) + Slopes([1, 1, 1], y0=0, align="leading") + evolving = [w for w in caught if issubclass(w.category, EvolvingAPIWarning)] + assert len(evolving) == 1 + assert "Slopes" in str(evolving[0].message) + + def test_slopes_warning_stacklevel_points_to_user_call(self) -> None: + """ + ``Slopes.__post_init__`` emits via a dataclass-generated ``__init__`` + — ``_warn_evolving_api`` needs ``stacklevel=4`` to skip the helper, + ``__post_init__``, and the synthetic init and land on the actual + user line. + """ + from linopy import EvolvingAPIWarning, Slopes + + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always", EvolvingAPIWarning) + Slopes([1, 2], y0=0) + evolving = [w for w in caught if issubclass(w.category, EvolvingAPIWarning)] + assert len(evolving) == 1 + assert evolving[0].filename.endswith("test_piecewise_constraints.py") + def test_warning_stacklevel_points_to_user_call(self) -> None: """ ``stacklevel=3`` in ``_warn_evolving_api`` should make the warning