From a36edb1e1ac34d177a869a6bce0cd7b90b678eb9 Mon Sep 17 00:00:00 2001 From: ivan-digital Date: Sat, 18 Apr 2026 13:08:24 +0200 Subject: [PATCH 1/4] Add physicsShedder filter for latency-aware load shedding Introduces physicsShedder, a route-level filter that catches gray failures (slow-but-200 backends) by modeling incoming traffic as resistance against a learned baseline. R = avgLatency/latencyTarget + errorWeight * errorRate threshold = mu + k*sigma (EWMA baseline + adaptive deviation) pReject = max(0, (R - threshold) / R), clamped to 0.95 Complements admissionControl, which reacts only to error rate. Uses the same Admission-Control header convention so the two filters compose on one route without double-counting. Known v1 tradeoff: a sudden step in latency inflates the adaptive variance, which can keep the threshold above R during the transient and let the new latency become the baseline. Collect data in logInactive or inactive mode first; refine threshold formulation based on feedback. Closes #3828 Tests include unit math, warmup gate, ring buffer rotation, mode behavior, pre/post processor, response error counting, metrics emission, tracing spans, fuzz on the math, randomized invariants, concurrent hot path, admissionControl chain composition, and a local load-test script under skptesting/. Signed-off-by: ivan-digital --- docs/reference/filters.md | 79 ++ filters/filters.go | 1 + filters/shedder/physics.go | 486 +++++++++ filters/shedder/physics_test.go | 1166 ++++++++++++++++++++++ skipper.go | 11 + skptesting/benchmark-shedder.sh | 70 ++ skptesting/physics-shedder-backend.eskip | 1 + skptesting/physics-shedder-slow.eskip | 1 + skptesting/physics-shedder.eskip | 1 + 9 files changed, 1816 insertions(+) create mode 100644 filters/shedder/physics.go create mode 100644 filters/shedder/physics_test.go create mode 100755 skptesting/benchmark-shedder.sh create mode 100644 skptesting/physics-shedder-backend.eskip create mode 100644 skptesting/physics-shedder-slow.eskip create mode 100644 skptesting/physics-shedder.eskip diff --git a/docs/reference/filters.md b/docs/reference/filters.md index 3a98689658..8c4792c9c3 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -2793,6 +2793,85 @@ probability you have to use values lower than 1: * 1/2: quadratic * 1/3: cubic +### physicsShedder + +Implements a self-tuning, route-level load shedder that uses latency as +a first-class signal. Unlike `admissionControl`, which only reacts to +error rate, `physicsShedder` also catches gray failures: backends that +keep returning HTTP 200 while getting slower. + +Each route carries a "resistance" score built from observed latency and +error rate. A per-route EWMA baseline of that score is learned +automatically, so there is no manual threshold to tune and the filter +adapts to daily traffic patterns on its own. When the current resistance +exceeds the learned baseline by a configurable number of standard +deviations, the filter rejects a share of traffic with status 503, using +the same probabilistic style as `admissionControl`. + +The resistance is computed per observation window as: + +$$ R = { avgLatency \over latencyTarget } + errorWeight \cdot errorRate $$ + +The shed threshold is $\mu + k \cdot \sigma$, where $\mu$ and $\sigma$ +come from an exponentially weighted moving average of past resistance +values. The reject probability is $(R - threshold) / R$, clamped to a +maximum. + +Examples: + + physicsShedder(metricSuffix, mode, latencyTarget) + physicsShedder(metricSuffix, mode, latencyTarget, window) + physicsShedder("myapp", "active", "200ms") + physicsShedder("myapp", "active", "200ms", "5s") + +Parameters: + +* metric suffix (string) +* mode (enum) +* latency target (time.Duration) +* window (time.Duration, optional, default `"5s"`) + +Metric suffix is the chosen suffix key to expose reject and resistance +metrics, should be unique by filter instance. + +Mode has 3 different possible values: + +* "active" will reject traffic +* "inactive" will never reject traffic, but collect metrics +* "logInactive" will not reject traffic, but log the filter's decisions + for tuning and dry-run deployments + +Latency target is the expected per-request latency for this route. It is +the single knob operators tune: the filter considers the route healthy +when average latency sits at or below this value. The baseline and +variance are learned relative to it. + +Window is the observation window, i.e. how much history contributes to +the current resistance calculation. It must be between `200ms` and `60s`. + +During filter startup the shedder does not reject any traffic until its +baseline has primed, so newly loaded routes behave safely even under +immediate load. + +Exposed metrics per metric suffix: + +* `shedder.physics.total.` — counter of requests observed +* `shedder.physics.reject.` — counter of requests rejected with + 503 in active mode +* `shedder.physics.would_reject.` — counter of requests that + would have been rejected in `inactive`/`logInactive` modes +* `shedder.physics.resistance.` — current resistance gauge +* `shedder.physics.baseline.` — EWMA baseline gauge +* `shedder.physics.threshold.` — current shed threshold gauge + +`physicsShedder` composes cleanly with `admissionControl`: both filters +honor the `Admission-Control` response header so a filter upstream in +the chain does not double-count 503s produced by a filter downstream. + +During load tests of the backend, run this filter in `inactive` or +`logInactive` mode so the deliberately induced latency does not trigger +shedding. + ## lua See [the scripts page](scripts.md) diff --git a/filters/filters.go b/filters/filters.go index d1f3107ba4..8159157ed2 100644 --- a/filters/filters.go +++ b/filters/filters.go @@ -330,6 +330,7 @@ const ( RateBreakerName = "rateBreaker" DisableBreakerName = "disableBreaker" AdmissionControlName = "admissionControl" + PhysicsShedderName = "physicsShedder" ClientRatelimitName = "clientRatelimit" RatelimitName = "ratelimit" ClusterClientRatelimitName = "clusterClientRatelimit" diff --git a/filters/shedder/physics.go b/filters/shedder/physics.go new file mode 100644 index 0000000000..690810bc5d --- /dev/null +++ b/filters/shedder/physics.go @@ -0,0 +1,486 @@ +package shedder + +import ( + "context" + "math" + "math/rand/v2" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + log "github.com/sirupsen/logrus" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/metrics" + "github.com/zalando/skipper/routing" +) + +const ( + physicsCounterPrefix = "shedder.physics." + physicsSpanName = "physics_shedder" + physicsStateBagKey = "shedder:physics" + physicsStateBagReject = "reject" + + physicsTickDuration = 100 * time.Millisecond + physicsEwmaAlpha = 0.01 + physicsSigmaMultiplier = 3.0 + physicsErrorWeight = 5.0 + physicsMaxRejectProb = 0.95 + physicsWarmupTicks = 50 + + physicsMinWindow = 2 * physicsTickDuration + physicsMaxWindow = 60 * time.Second +) + +type PhysicsShedderOptions struct { + Tracer opentracing.Tracer + testRand bool +} + +// PhysicsShedderSpec is exported so tests and wiring code can type-assert +// the returned filters.Spec to call PreProcessor/PostProcessor. +type PhysicsShedderSpec struct { + tracer opentracing.Tracer + testRand bool +} + +type physicsShedderPre struct{} + +// Do removes duplicate physicsShedder filters per route. Only the last +// instance survives so a chain always has at most one. +func (*physicsShedderPre) Do(routes []*eskip.Route) []*eskip.Route { + for _, r := range routes { + foundAt := -1 + toDelete := make(map[int]struct{}) + + for i, f := range r.Filters { + if f.Name == filters.PhysicsShedderName { + if foundAt != -1 { + toDelete[foundAt] = struct{}{} + } + foundAt = i + } + } + + if len(toDelete) == 0 { + continue + } + + rf := make([]*eskip.Filter, 0, len(r.Filters)-len(toDelete)) + for i, f := range r.Filters { + if _, ok := toDelete[i]; !ok { + rf = append(rf, f) + } + } + r.Filters = rf + } + + return routes +} + +type physicsShedderPost struct { + filters map[string]*physicsShedder +} + +// Do implements routing.PostProcessor so we can stop tick goroutines when +// a route is replaced or removed. +func (p *physicsShedderPost) Do(routes []*routing.Route) []*routing.Route { + inUse := make(map[string]struct{}) + + for _, r := range routes { + for _, f := range r.Filters { + if ps, ok := f.Filter.(*physicsShedder); ok { + if old, okOld := p.filters[r.Id]; okOld && old != ps { + old.Close() + } + p.filters[r.Id] = ps + inUse[r.Id] = struct{}{} + } + } + } + + for id, f := range p.filters { + if _, ok := inUse[id]; !ok { + f.Close() + delete(p.filters, id) + } + } + return routes +} + +type physicsShedder struct { + once sync.Once + quit chan struct{} + done chan struct{} + closed bool + + metrics metrics.Metrics + metricSuffix string + tracer opentracing.Tracer + + mode mode + tickDuration time.Duration + windowSize int + latencyTarget time.Duration + + // hot path counters, swapped by tickWindows + counter atomic.Int64 + errorCounter atomic.Int64 + latencyNs atomic.Int64 + + mu sync.Mutex + totals []int64 + errors []int64 + latencySumNs []int64 + bucketIdx int + ticksSeen int + + // EWMA baseline of R, owned by tick goroutine. + ewmaMu float64 + ewmaVar float64 + ewmaPrimed bool + + // Published reject probability (float64 bits) read lock-free from hot path. + pRejectBits atomic.Uint64 + + muRand sync.Mutex + rand func() float64 +} + +// NewPhysicsShedder creates the filter spec. A single spec is shared +// across routes; per-route state lives on the filter instance. +func NewPhysicsShedder(o PhysicsShedderOptions) filters.Spec { + tracer := o.Tracer + if tracer == nil { + tracer = &opentracing.NoopTracer{} + } + return &PhysicsShedderSpec{ + tracer: tracer, + testRand: o.testRand, + } +} + +func (*PhysicsShedderSpec) PreProcessor() *physicsShedderPre { + return &physicsShedderPre{} +} + +func (*PhysicsShedderSpec) PostProcessor() *physicsShedderPost { + return &physicsShedderPost{ + filters: make(map[string]*physicsShedder), + } +} + +func (*PhysicsShedderSpec) Name() string { return filters.PhysicsShedderName } + +// CreateFilter parses the filter arguments: +// +// physicsShedder(metricSuffix, mode, latencyTarget) +// physicsShedder(metricSuffix, mode, latencyTarget, window) +// +// metricSuffix identifies this filter in metrics. +// mode is one of "active", "inactive", "logInactive". +// latencyTarget is the expected per-request latency (e.g. "200ms"). +// window is the observation window (default "5s"). +func (spec *PhysicsShedderSpec) CreateFilter(args []interface{}) (filters.Filter, error) { + if len(args) < 3 || len(args) > 4 { + return nil, filters.ErrInvalidFilterParameters + } + + metricSuffix, ok := args[0].(string) + if !ok || metricSuffix == "" { + log.Warn("physicsShedder: metricSuffix must be a non-empty string") + return nil, filters.ErrInvalidFilterParameters + } + + md, err := getModeArg(args[1]) + if err != nil { + log.Warnf("physicsShedder: mode failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + + latencyTarget, err := getDurationArg(args[2]) + if err != nil || latencyTarget <= 0 { + log.Warnf("physicsShedder: latencyTarget must be a positive duration: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + + window := 5 * time.Second + if len(args) == 4 { + window, err = getDurationArg(args[3]) + if err != nil { + log.Warnf("physicsShedder: window failed: %v", err) + return nil, filters.ErrInvalidFilterParameters + } + } + if window < physicsMinWindow || window > physicsMaxWindow { + log.Warnf("physicsShedder: window out of range [%s, %s], got %s", + physicsMinWindow, physicsMaxWindow, window) + return nil, filters.ErrInvalidFilterParameters + } + + windowSize := int(window / physicsTickDuration) + + r := rand.Float64 + if spec.testRand { + r = randWithSeed() + } + + ps := &physicsShedder{ + quit: make(chan struct{}), + done: make(chan struct{}), + metrics: metrics.Default, + metricSuffix: metricSuffix, + tracer: spec.tracer, + mode: md, + tickDuration: physicsTickDuration, + windowSize: windowSize, + latencyTarget: latencyTarget, + totals: make([]int64, windowSize), + errors: make([]int64, windowSize), + latencySumNs: make([]int64, windowSize), + rand: r, + } + go ps.tickWindows() + return ps, nil +} + +// Close stops the tick goroutine and waits for it to exit. Safe to call +// multiple times. +func (ps *physicsShedder) Close() error { + ps.once.Do(func() { + ps.closed = true + close(ps.quit) + }) + <-ps.done + return nil +} + +func (ps *physicsShedder) tickWindows() { + defer close(ps.done) + t := time.NewTicker(ps.tickDuration) + defer t.Stop() + + for { + select { + case <-ps.quit: + return + case <-t.C: + } + + reqs := ps.counter.Swap(0) + errs := ps.errorCounter.Swap(0) + lat := ps.latencyNs.Swap(0) + + ps.mu.Lock() + ps.totals[ps.bucketIdx] = reqs + ps.errors[ps.bucketIdx] = errs + ps.latencySumNs[ps.bucketIdx] = lat + ps.bucketIdx = (ps.bucketIdx + 1) % ps.windowSize + if ps.ticksSeen < math.MaxInt32 { + ps.ticksSeen++ + } + sumReqs := sum(ps.totals) + sumErrs := sum(ps.errors) + sumLat := sum(ps.latencySumNs) + ticksSeen := ps.ticksSeen + ps.mu.Unlock() + + r := ps.computeResistance(sumReqs, sumErrs, sumLat) + threshold := ps.updateBaseline(r) + rejectP := ps.computeRejectProbability(r, threshold, ticksSeen) + + ps.pRejectBits.Store(math.Float64bits(rejectP)) + + ps.publishGauges(r, ps.ewmaMu, threshold) + + if ps.mode == logInactive { + log.Infof("%s[%s]: R=%.3f mu=%.3f sigma=%.3f threshold=%.3f pReject=%.3f ticks=%d reqs=%d errs=%d", + filters.PhysicsShedderName, ps.metricSuffix, + r, ps.ewmaMu, math.Sqrt(ps.ewmaVar), threshold, rejectP, ticksSeen, sumReqs, sumErrs) + } + } +} + +// computeResistance collapses window-aggregated metrics into a single R +// value. Pressure is the only component in v1; momentum/scar may be added +// behind flags if scenario tests motivate them. +func (ps *physicsShedder) computeResistance(sumReqs, sumErrs, sumLatNs int64) float64 { + if sumReqs <= 0 { + return 0 + } + avgLatNs := float64(sumLatNs) / float64(sumReqs) + latencyRatio := avgLatNs / float64(ps.latencyTarget) + errorRate := float64(sumErrs) / float64(sumReqs) + return latencyRatio + physicsErrorWeight*errorRate +} + +// updateBaseline advances the EWMA running mean and variance of R and +// returns the shed threshold (mu + k*sigma). Called only from the tick +// goroutine. +func (ps *physicsShedder) updateBaseline(r float64) float64 { + if !ps.ewmaPrimed { + ps.ewmaMu = r + ps.ewmaVar = 0 + ps.ewmaPrimed = true + } else { + diff := r - ps.ewmaMu + ps.ewmaMu += physicsEwmaAlpha * diff + ps.ewmaVar = (1 - physicsEwmaAlpha) * (ps.ewmaVar + physicsEwmaAlpha*diff*diff) + } + return ps.ewmaMu + physicsSigmaMultiplier*math.Sqrt(ps.ewmaVar) +} + +// computeRejectProbability returns the probability to reject an incoming +// request given the current resistance and adaptive threshold. It is 0 +// during warmup or when R is at or below threshold, and is clamped to +// physicsMaxRejectProb otherwise. +func (ps *physicsShedder) computeRejectProbability(r, threshold float64, ticksSeen int) float64 { + if ticksSeen < physicsWarmupTicks { + return 0 + } + if r <= threshold || r <= 0 { + return 0 + } + p := (r - threshold) / r + if p > physicsMaxRejectProb { + p = physicsMaxRejectProb + } + if p < 0 { + p = 0 + } + return p +} + +func (ps *physicsShedder) publishGauges(r, baseline, threshold float64) { + if ps.metrics == nil { + return + } + ps.metrics.UpdateGauge(physicsCounterPrefix+"resistance."+ps.metricSuffix, r) + ps.metrics.UpdateGauge(physicsCounterPrefix+"baseline."+ps.metricSuffix, baseline) + ps.metrics.UpdateGauge(physicsCounterPrefix+"threshold."+ps.metricSuffix, threshold) +} + +func (ps *physicsShedder) pReject() float64 { + b := ps.pRejectBits.Load() + return math.Float64frombits(b) +} + +func (ps *physicsShedder) shouldReject() bool { + p := ps.pReject() + if p <= 0 { + return false + } + ps.muRand.Lock() + r := ps.rand() + ps.muRand.Unlock() + return p > r +} + +func (ps *physicsShedder) setCommonTags(span opentracing.Span, r, threshold, p float64) { + span.SetTag("physicsShedder.group", ps.metricSuffix) + span.SetTag("physicsShedder.mode", ps.mode.String()) + span.SetTag("physicsShedder.latencyTarget", ps.latencyTarget.String()) + span.SetTag("physicsShedder.R", r) + span.SetTag("physicsShedder.threshold", threshold) + span.SetTag("physicsShedder.pReject", p) +} + +func (ps *physicsShedder) startSpan(ctx context.Context) opentracing.Span { + parent := opentracing.SpanFromContext(ctx) + if parent == nil { + return nil + } + span := ps.tracer.StartSpan(physicsSpanName, opentracing.ChildOf(parent.Context())) + ext.Component.Set(span, "skipper") + span.SetTag("mode", ps.mode.String()) + return span +} + +// Request implements the hot-path rejection decision. +func (ps *physicsShedder) Request(ctx filters.FilterContext) { + span := ps.startSpan(ctx.Request().Context()) + + ctx.StateBag()[physicsStartTimeKey] = time.Now() + + if ps.metrics != nil { + ps.metrics.IncCounter(physicsCounterPrefix + "total." + ps.metricSuffix) + } + + p := ps.pReject() + reject := false + if p > 0 { + ps.muRand.Lock() + r := ps.rand() + ps.muRand.Unlock() + reject = p > r + } + + if span != nil { + ps.setCommonTags(span, 0, 0, p) + defer span.Finish() + } + + if !reject { + return + } + + if ps.metrics != nil { + if ps.mode == active { + ps.metrics.IncCounter(physicsCounterPrefix + "reject." + ps.metricSuffix) + } else { + ps.metrics.IncCounter(physicsCounterPrefix + "would_reject." + ps.metricSuffix) + } + } + + if span != nil { + ext.Error.Set(span, true) + } + + if ps.mode != active { + return + } + + ctx.StateBag()[physicsStateBagKey] = physicsStateBagReject + + header := make(http.Header) + header.Set(admissionSignalHeaderKey, admissionSignalHeaderValue) + ctx.Serve(&http.Response{ + Header: header, + StatusCode: http.StatusServiceUnavailable, + }) +} + +// Response records outcome for the tick loop to consume. +func (ps *physicsShedder) Response(ctx filters.FilterContext) { + // Skip our own short-cut responses. + if ctx.StateBag()[physicsStateBagKey] == physicsStateBagReject { + return + } + // Skip responses produced by upstream shedders. + if ctx.Response().Header.Get(admissionSignalHeaderKey) == admissionSignalHeaderValue { + return + } + + ps.counter.Add(1) + if ctx.Response().StatusCode >= 500 { + ps.errorCounter.Add(1) + } + + // Latency: use StateBag start time if a predicate/filter set it, + // otherwise leave latency contribution to zero for this request. + if startAny, ok := ctx.StateBag()[physicsStartTimeKey]; ok { + if start, ok := startAny.(time.Time); ok { + ps.latencyNs.Add(time.Since(start).Nanoseconds()) + } + } +} + +// HandleErrorResponse opts in for Response callbacks on proxy errors. +func (*physicsShedder) HandleErrorResponse() bool { return true } + +// physicsStartTimeKey stores request start time in StateBag so Response +// can measure request latency without allocating a context key. +const physicsStartTimeKey = "shedder:physics:start" diff --git a/filters/shedder/physics_test.go b/filters/shedder/physics_test.go new file mode 100644 index 0000000000..4c246abe05 --- /dev/null +++ b/filters/shedder/physics_test.go @@ -0,0 +1,1166 @@ +package shedder + +import ( + "math" + mrand "math/rand/v2" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/zalando/skipper/eskip" + "github.com/zalando/skipper/filters" + "github.com/zalando/skipper/filters/builtin" + "github.com/zalando/skipper/filters/filtertest" + "github.com/zalando/skipper/metrics" + "github.com/zalando/skipper/metrics/metricstest" + "github.com/zalando/skipper/proxy/proxytest" + "github.com/zalando/skipper/routing" + "github.com/zalando/skipper/routing/testdataclient" + "github.com/zalando/skipper/tracing/tracingtest" +) + +func TestPhysicsShedderCreateFilter(t *testing.T) { + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + + cases := []struct { + name string + args []interface{} + wantErr bool + }{ + {"ok minimal", []interface{}{"app", "active", "200ms"}, false}, + {"ok with window", []interface{}{"app", "active", "200ms", "3s"}, false}, + {"ok inactive", []interface{}{"app", "inactive", "200ms"}, false}, + {"ok logInactive", []interface{}{"app", "logInactive", "200ms"}, false}, + {"too few args", []interface{}{"app", "active"}, true}, + {"too many args", []interface{}{"app", "active", "200ms", "3s", "extra"}, true}, + {"empty suffix", []interface{}{"", "active", "200ms"}, true}, + {"non-string suffix", []interface{}{123, "active", "200ms"}, true}, + {"bad mode", []interface{}{"app", "bogus", "200ms"}, true}, + {"bad latency target", []interface{}{"app", "active", "not-a-duration"}, true}, + {"zero latency target", []interface{}{"app", "active", "0ms"}, true}, + {"negative latency target", []interface{}{"app", "active", "-100ms"}, true}, + {"bad window", []interface{}{"app", "active", "200ms", "nope"}, true}, + {"window too small", []interface{}{"app", "active", "200ms", "50ms"}, true}, + {"window too large", []interface{}{"app", "active", "200ms", "120s"}, true}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + f, err := spec.CreateFilter(tc.args) + if tc.wantErr { + if err == nil { + t.Fatalf("expected error, got filter %v", f) + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + f.(*physicsShedder).Close() + }) + } +} + +func TestPhysicsShedderComputeResistance(t *testing.T) { + ps := &physicsShedder{latencyTarget: 200 * time.Millisecond} + + t.Run("no traffic", func(t *testing.T) { + if got := ps.computeResistance(0, 0, 0); got != 0 { + t.Fatalf("want 0, got %v", got) + } + }) + + t.Run("on target no errors", func(t *testing.T) { + // 100 reqs averaging exactly latencyTarget → latencyRatio = 1, errorRate = 0 + r := ps.computeResistance(100, 0, int64(100*200*time.Millisecond)) + if math.Abs(r-1.0) > 1e-9 { + t.Fatalf("want 1.0, got %v", r) + } + }) + + t.Run("double latency", func(t *testing.T) { + // 100 reqs averaging 2 * latencyTarget + r := ps.computeResistance(100, 0, int64(100*400*time.Millisecond)) + if math.Abs(r-2.0) > 1e-9 { + t.Fatalf("want 2.0, got %v", r) + } + }) + + t.Run("errors contribute via errorWeight", func(t *testing.T) { + // 100 reqs at target with 10% errors → 1.0 + 5.0*0.1 = 1.5 + r := ps.computeResistance(100, 10, int64(100*200*time.Millisecond)) + if math.Abs(r-1.5) > 1e-9 { + t.Fatalf("want 1.5, got %v", r) + } + }) +} + +func TestPhysicsShedderUpdateBaselinePrimes(t *testing.T) { + ps := &physicsShedder{} + threshold := ps.updateBaseline(1.0) + if !ps.ewmaPrimed { + t.Fatal("expected primed after first update") + } + if ps.ewmaMu != 1.0 { + t.Fatalf("want mu 1.0, got %v", ps.ewmaMu) + } + if ps.ewmaVar != 0 { + t.Fatalf("want var 0 on prime, got %v", ps.ewmaVar) + } + if threshold != 1.0 { + t.Fatalf("want threshold 1.0, got %v", threshold) + } +} + +func TestPhysicsShedderUpdateBaselineConverges(t *testing.T) { + ps := &physicsShedder{} + // Feed steady R=1.0 for many ticks; mu should converge to 1.0 and var to 0. + for range 500 { + ps.updateBaseline(1.0) + } + if math.Abs(ps.ewmaMu-1.0) > 1e-6 { + t.Fatalf("mu not converged: %v", ps.ewmaMu) + } + if ps.ewmaVar > 1e-6 { + t.Fatalf("var not converged: %v", ps.ewmaVar) + } +} + +func TestPhysicsShedderComputeRejectProbability(t *testing.T) { + ps := &physicsShedder{} + + t.Run("warmup blocks shedding", func(t *testing.T) { + p := ps.computeRejectProbability(10.0, 1.0, physicsWarmupTicks-1) + if p != 0 { + t.Fatalf("want 0 during warmup, got %v", p) + } + }) + + t.Run("below threshold no shed", func(t *testing.T) { + p := ps.computeRejectProbability(0.5, 1.0, physicsWarmupTicks) + if p != 0 { + t.Fatalf("want 0 below threshold, got %v", p) + } + }) + + t.Run("above threshold sheds", func(t *testing.T) { + // R=2, threshold=1 → (2-1)/2 = 0.5 + p := ps.computeRejectProbability(2.0, 1.0, physicsWarmupTicks) + if math.Abs(p-0.5) > 1e-9 { + t.Fatalf("want 0.5, got %v", p) + } + }) + + t.Run("clamped to max", func(t *testing.T) { + // Very high R should saturate at physicsMaxRejectProb + p := ps.computeRejectProbability(1e9, 1.0, physicsWarmupTicks) + if p != physicsMaxRejectProb { + t.Fatalf("want %v, got %v", physicsMaxRejectProb, p) + } + }) +} + +// TestPhysicsShedderTickPipeline drives the tick loop directly and verifies +// the full pipeline (counters → ring buffer → R → EWMA → pReject). +func TestPhysicsShedderTickPipeline(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms") + defer ps.Close() + + // Healthy phase: 100 reqs at 50ms (well below target) for windowSize ticks. + // R ≈ 0.25, no errors, baseline primes low. + for range ps.windowSize { + ps.counter.Store(10) + ps.errorCounter.Store(0) + ps.latencyNs.Store(int64(10 * 50 * time.Millisecond)) + ps.runOneTick() + } + if ps.pReject() != 0 { + t.Fatalf("healthy baseline should not shed, got p=%v", ps.pReject()) + } + + // Continue healthy for warmup to elapse. + for ps.ticksSeen < physicsWarmupTicks { + ps.counter.Store(10) + ps.errorCounter.Store(0) + ps.latencyNs.Store(int64(10 * 50 * time.Millisecond)) + ps.runOneTick() + } + if ps.pReject() != 0 { + t.Fatalf("after warmup on healthy traffic should not shed, got p=%v", ps.pReject()) + } + + // Latency spike: 100 reqs averaging 2s (10x target) — pure gray failure. + for range 5 { + ps.counter.Store(10) + ps.errorCounter.Store(0) + ps.latencyNs.Store(int64(10 * 2 * time.Second)) + ps.runOneTick() + } + if ps.pReject() <= 0 { + t.Fatalf("gray failure should shed, got p=%v", ps.pReject()) + } + if ps.pReject() > physicsMaxRejectProb { + t.Fatalf("p exceeds clamp: %v", ps.pReject()) + } +} + +func TestPhysicsShedderErrorBurstSheds(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms") + defer ps.Close() + + // Prime with healthy traffic past warmup. + for ps.ticksSeen < physicsWarmupTicks { + ps.counter.Store(10) + ps.errorCounter.Store(0) + ps.latencyNs.Store(int64(10 * 50 * time.Millisecond)) + ps.runOneTick() + } + + // Fast 5xx burst: latency is fine but everything fails. + for range 5 { + ps.counter.Store(10) + ps.errorCounter.Store(10) + ps.latencyNs.Store(int64(10 * 50 * time.Millisecond)) + ps.runOneTick() + } + if ps.pReject() <= 0 { + t.Fatalf("error burst should shed, got p=%v", ps.pReject()) + } +} + +func TestPhysicsShedderRecoveryDropsRejects(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms") + defer ps.Close() + + // Prime with healthy traffic past warmup. + for ps.ticksSeen < physicsWarmupTicks { + ps.counter.Store(10) + ps.latencyNs.Store(int64(10 * 50 * time.Millisecond)) + ps.runOneTick() + } + + // Spike. + for range 5 { + ps.counter.Store(10) + ps.latencyNs.Store(int64(10 * 2 * time.Second)) + ps.runOneTick() + } + if ps.pReject() <= 0 { + t.Fatalf("spike should shed first") + } + + // Recovery: latency back to target for many ticks. + for range 200 { + ps.counter.Store(10) + ps.latencyNs.Store(int64(10 * 50 * time.Millisecond)) + ps.runOneTick() + } + if ps.pReject() > 0 { + t.Fatalf("expected 0 rejects after recovery, got p=%v", ps.pReject()) + } +} + +// TestPhysicsShedderModesDoNotServe503 verifies that inactive/logInactive +// modes never serve a 503 even when shouldReject returns true. +func TestPhysicsShedderModesDoNotServe503(t *testing.T) { + for _, md := range []mode{inactive, logInactive} { + t.Run(md.String(), func(t *testing.T) { + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + f, err := spec.CreateFilter([]interface{}{"app", md.String(), "200ms"}) + if err != nil { + t.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + defer ps.Close() + + // Force the filter to "want to reject". + ps.pRejectBits.Store(math.Float64bits(1.0)) + + ctx := newFakeFilterContext() + ps.Request(ctx) + if ctx.FServed { + t.Fatalf("%s mode should not serve a response", md) + } + }) + } +} + +func TestPhysicsShedderActiveServes503(t *testing.T) { + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + f, err := spec.CreateFilter([]interface{}{"app", "active", "200ms"}) + if err != nil { + t.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + defer ps.Close() + + ps.pRejectBits.Store(math.Float64bits(1.0)) + + ctx := newFakeFilterContext() + ps.Request(ctx) + if !ctx.FServed { + t.Fatal("active mode should serve a 503") + } + if ctx.FResponse.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("want 503, got %d", ctx.FResponse.StatusCode) + } + if ctx.FResponse.Header.Get(admissionSignalHeaderKey) != admissionSignalHeaderValue { + t.Fatal("active mode must set Admission-Control header") + } + if ctx.FStateBag[physicsStateBagKey] != physicsStateBagReject { + t.Fatal("state bag should mark reject") + } +} + +func TestPhysicsShedderResponseSkipsShortcut(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms") + defer ps.Close() + + ctx := newFakeFilterContext() + ctx.FStateBag[physicsStateBagKey] = physicsStateBagReject + ctx.FResponse = &http.Response{StatusCode: 503, Header: http.Header{}} + ps.Response(ctx) + + if ps.counter.Load() != 0 { + t.Fatalf("shortcut response should not be counted, got %d", ps.counter.Load()) + } +} + +func TestPhysicsShedderResponseSkipsUpstreamShedder(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms") + defer ps.Close() + + ctx := newFakeFilterContext() + ctx.FResponse = &http.Response{StatusCode: 503, Header: http.Header{}} + ctx.FResponse.Header.Set(admissionSignalHeaderKey, admissionSignalHeaderValue) + ps.Response(ctx) + + if ps.counter.Load() != 0 { + t.Fatalf("upstream shedder response should not be counted, got %d", ps.counter.Load()) + } +} + +func TestPhysicsShedderResponseCountsLatency(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms") + defer ps.Close() + + ctx := newFakeFilterContext() + ctx.FStateBag[physicsStartTimeKey] = time.Now().Add(-50 * time.Millisecond) + ctx.FResponse = &http.Response{StatusCode: 200, Header: http.Header{}} + ps.Response(ctx) + + if ps.counter.Load() != 1 { + t.Fatalf("expected 1 req counted, got %d", ps.counter.Load()) + } + if ps.latencyNs.Load() <= 0 { + t.Fatal("expected latency counted from StateBag start time") + } +} + +func TestPhysicsShedderCreateFilterStartsAndStopsGoroutine(t *testing.T) { + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + f, err := spec.CreateFilter([]interface{}{"app", "inactive", "200ms"}) + if err != nil { + t.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + + // Let the tick goroutine run a bit then close. + time.Sleep(3 * physicsTickDuration) + if err := ps.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + // Double close must be safe. + if err := ps.Close(); err != nil { + t.Fatalf("Close again: %v", err) + } +} + +func TestPhysicsShedderPreProcessorDedupes(t *testing.T) { + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + pre := spec.PreProcessor() + + route := &eskip.Route{ + Filters: []*eskip.Filter{ + {Name: "setRequestHeader", Args: []interface{}{"X-A", "a"}}, + {Name: filters.PhysicsShedderName, Args: []interface{}{"a", "active", "200ms"}}, + {Name: filters.PhysicsShedderName, Args: []interface{}{"b", "active", "200ms"}}, + {Name: "setRequestHeader", Args: []interface{}{"X-B", "b"}}, + {Name: filters.PhysicsShedderName, Args: []interface{}{"c", "active", "200ms"}}, + }, + } + out := pre.Do([]*eskip.Route{route}) + if len(out) != 1 { + t.Fatalf("expected 1 route, got %d", len(out)) + } + count := 0 + for _, f := range out[0].Filters { + if f.Name == filters.PhysicsShedderName { + count++ + } + } + if count != 1 { + t.Fatalf("expected 1 physicsShedder remaining after dedupe, got %d", count) + } +} + +func TestPhysicsShedderPostProcessorClosesStale(t *testing.T) { + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + + f1, _ := spec.CreateFilter([]interface{}{"a", "inactive", "200ms"}) + f2, _ := spec.CreateFilter([]interface{}{"b", "inactive", "200ms"}) + ps1 := f1.(*physicsShedder) + ps2 := f2.(*physicsShedder) + + post := spec.PostProcessor() + routes := []*routing.Route{ + { + Route: eskip.Route{Id: "r1"}, + Filters: []*routing.RouteFilter{{Filter: ps1, Name: filters.PhysicsShedderName}}, + }, + } + post.Do(routes) + + // Replace r1's filter with ps2; postprocessor should close ps1. + routes[0].Filters[0].Filter = ps2 + post.Do(routes) + if !ps1.closed { + t.Fatal("old filter should be closed after replacement") + } + + // Drop the route entirely; postprocessor should close ps2. + post.Do(nil) + if !ps2.closed { + t.Fatal("filter should be closed when route removed") + } +} + +// TestPhysicsShedderEndToEnd wires the filter into a real proxytest and +// checks a small traffic run completes successfully in inactive mode. +// The goal here is wiring/smoke, not algorithm correctness. +func TestPhysicsShedderEndToEnd(t *testing.T) { + prev := metrics.Default + m := &metricstest.MockMetrics{} + metrics.Default = m + t.Cleanup(func() { metrics.Default = prev; m.Close() }) + + var backendHits int64 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt64(&backendHits, 1) + w.WriteHeader(http.StatusOK) + })) + defer backend.Close() + + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + + args := []interface{}{"e2e", "inactive", "200ms"} + route := &eskip.Route{ + Id: "r1", + Filters: []*eskip.Filter{{Name: spec.Name(), Args: args}}, + Backend: backend.URL, + } + + fr := make(filters.Registry) + fr.Register(spec) + fr.Register(builtin.NewSetRequestHeader()) + + dc := testdataclient.New([]*eskip.Route{route}) + defer dc.Close() + + proxy := proxytest.WithRoutingOptions(fr, routing.Options{ + DataClients: []routing.DataClient{dc}, + PreProcessors: []routing.PreProcessor{spec.PreProcessor()}, + PostProcessors: []routing.PostProcessor{spec.PostProcessor()}, + }) + defer proxy.Close() + + client := proxy.Client() + req, err := http.NewRequest("GET", proxy.URL, nil) + if err != nil { + t.Fatalf("NewRequest: %v", err) + } + + const N = 50 + for range N { + rsp, err := client.Do(req) + if err != nil { + t.Fatalf("roundtrip: %v", err) + } + rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + t.Fatalf("want 200, got %d", rsp.StatusCode) + } + } + + if got := atomic.LoadInt64(&backendHits); got != N { + t.Fatalf("expected all %d requests to hit backend, got %d", N, got) + } + + var total int64 + m.WithCounters(func(c map[string]int64) { + total = c["shedder.physics.total.e2e"] + }) + if total != N { + t.Fatalf("expected total counter %d, got %d", N, total) + } +} + +// TestPhysicsShedderConcurrentHotPath hammers Request/Response from many +// goroutines while the real tick goroutine runs. Intended to catch data +// races (go test -race) and to smoke-test lock contention on the reject +// path. It also forces pReject non-zero so the shed branch (StateBag + +// Serve) is exercised. +func TestPhysicsShedderConcurrentHotPath(t *testing.T) { + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + f, err := spec.CreateFilter([]interface{}{"race", "active", "200ms"}) + if err != nil { + t.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + ps.metrics = nil // avoid metrics.Default side effects + defer ps.Close() + + const workers = 16 + stop := make(chan struct{}) + var wg sync.WaitGroup + + // Keep pReject non-zero so the shed branch (StateBag write + Serve) + // races against the tick goroutine's pRejectBits.Store. + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + ps.pRejectBits.Store(math.Float64bits(0.3)) + time.Sleep(time.Millisecond) + } + }() + + for range workers { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + ctx := newFakeFilterContext() + ctx.FResponse = &http.Response{StatusCode: 200, Header: http.Header{}} + ps.Request(ctx) + if !ctx.FServed { + ps.Response(ctx) + } + } + }() + } + + // 3 full tick cycles is enough to cover swap + ring buffer rotation. + time.Sleep(3*physicsTickDuration + 50*time.Millisecond) + close(stop) + wg.Wait() +} + +// TestPhysicsShedderSustainedGrayFailure verifies shedding engages +// strongly when a backend turns slow-but-200. v1 has no scar/momentum +// term, so EWMA + 3-sigma eventually normalizes a sustained failure to +// the new baseline — by design, the filter focuses on the transition, +// not on punishing a steady state. This test asserts that the transition +// shedding is strong (peak ≥ 0.5) and lasts long enough to matter +// (≥ 10 ticks above 0.1), which is what catches a degrading deployment. +func TestPhysicsShedderSustainedGrayFailure(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms", "500ms") // 5-bucket window + defer ps.Close() + + // Warmup at healthy 50ms latency. + for ps.ticksSeen < physicsWarmupTicks { + ps.counter.Store(10) + ps.latencyNs.Store(int64(10 * 50 * time.Millisecond)) + ps.runOneTick() + } + if ps.pReject() != 0 { + t.Fatalf("warmup should leave pReject=0, got %v", ps.pReject()) + } + + // Gray failure: latency jumps to 5x target (1s vs 200ms). + var peakP float64 + engagedTicks := 0 + for range 100 { + ps.counter.Store(10) + ps.latencyNs.Store(int64(10 * 1 * time.Second)) + ps.runOneTick() + p := ps.pReject() + if p > peakP { + peakP = p + } + if p > 0.1 { + engagedTicks++ + } + } + if peakP < 0.5 { + t.Fatalf("expected peak shedding >= 0.5 during gray failure, got %v", peakP) + } + if engagedTicks < 10 { + t.Fatalf("expected >=10 ticks of pReject>0.1 during gray failure, got %d", engagedTicks) + } +} + +// TestPhysicsShedderBaselineAdaptsToNewNormal verifies the opposite side +// of the tradeoff: when the "new normal" is still healthy (below target), +// shedding from the transient eventually abates as EWMA catches up. +func TestPhysicsShedderBaselineAdaptsToNewNormal(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms") + defer ps.Close() + + // Warmup at 50ms (R ≈ 0.25). + for ps.ticksSeen < physicsWarmupTicks { + ps.counter.Store(10) + ps.latencyNs.Store(int64(10 * 50 * time.Millisecond)) + ps.runOneTick() + } + + // Step up to a new, slower but still-healthy baseline of 150ms + // (R ≈ 0.75, still below target). Run enough ticks for EWMA to + // track (~5x time constant). + for range 500 { + ps.counter.Store(10) + ps.latencyNs.Store(int64(10 * 150 * time.Millisecond)) + ps.runOneTick() + } + + if ps.ewmaMu < 0.6 { + t.Fatalf("expected mu to track up toward 0.75, got %v", ps.ewmaMu) + } + if ps.pReject() > 0.05 { + t.Fatalf("shedding should abate after baseline adapts, got p=%v", ps.pReject()) + } +} + +// TestPhysicsShedderMetrics verifies every metric the filter emits has the +// right name and value. Downstream dashboards depend on these exact keys. +func TestPhysicsShedderMetrics(t *testing.T) { + t.Run("active mode increments reject counter", func(t *testing.T) { + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + f, err := spec.CreateFilter([]interface{}{"m", "active", "200ms"}) + if err != nil { + t.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + m := &metricstest.MockMetrics{} + ps.metrics = m + defer ps.Close() + + ps.pRejectBits.Store(math.Float64bits(1.0)) + ctx := newFakeFilterContext() + ps.Request(ctx) + + m.WithCounters(func(c map[string]int64) { + if got := c["shedder.physics.total.m"]; got != 1 { + t.Errorf("total: got %d, want 1", got) + } + if got := c["shedder.physics.reject.m"]; got != 1 { + t.Errorf("reject: got %d, want 1", got) + } + if got := c["shedder.physics.would_reject.m"]; got != 0 { + t.Errorf("would_reject in active mode: got %d, want 0", got) + } + }) + }) + + t.Run("inactive mode increments would_reject counter", func(t *testing.T) { + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + f, err := spec.CreateFilter([]interface{}{"m", "inactive", "200ms"}) + if err != nil { + t.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + m := &metricstest.MockMetrics{} + ps.metrics = m + defer ps.Close() + + ps.pRejectBits.Store(math.Float64bits(1.0)) + ctx := newFakeFilterContext() + ps.Request(ctx) + + m.WithCounters(func(c map[string]int64) { + if got := c["shedder.physics.would_reject.m"]; got != 1 { + t.Errorf("would_reject: got %d, want 1", got) + } + if got := c["shedder.physics.reject.m"]; got != 0 { + t.Errorf("reject in inactive mode: got %d, want 0", got) + } + }) + }) + + t.Run("gauges emit resistance baseline threshold", func(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms") + defer ps.Close() + m := ps.metrics.(*metricstest.MockMetrics) + + ps.counter.Store(10) + ps.latencyNs.Store(int64(10 * 100 * time.Millisecond)) + ps.runOneTick() + + m.WithGauges(func(g map[string]float64) { + if _, ok := g["shedder.physics.resistance.test"]; !ok { + t.Error("resistance gauge missing") + } + if _, ok := g["shedder.physics.baseline.test"]; !ok { + t.Error("baseline gauge missing") + } + if _, ok := g["shedder.physics.threshold.test"]; !ok { + t.Error("threshold gauge missing") + } + // After one tick with R = 100ms/200ms = 0.5, mu primes to 0.5. + if got := g["shedder.physics.resistance.test"]; math.Abs(got-0.5) > 1e-9 { + t.Errorf("resistance: got %v, want 0.5", got) + } + if got := g["shedder.physics.baseline.test"]; math.Abs(got-0.5) > 1e-9 { + t.Errorf("baseline: got %v, want 0.5", got) + } + }) + }) +} + +// TestPhysicsShedderWindowRotation verifies that the ring buffer actually +// evicts old data when it wraps. A stale spike stuck in the buffer would +// be silent in every other test but disastrous in production. +func TestPhysicsShedderWindowRotation(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms", "500ms") // 5-bucket window + defer ps.Close() + + // Fill the whole window with 1s-latency spike. + for range ps.windowSize { + ps.counter.Store(10) + ps.latencyNs.Store(int64(10 * 1 * time.Second)) + ps.runOneTick() + } + ps.mu.Lock() + r1 := ps.computeResistance(sum(ps.totals), sum(ps.errors), sum(ps.latencySumNs)) + ps.mu.Unlock() + if math.Abs(r1-5.0) > 1e-9 { + t.Fatalf("window full of spike: want R=5.0, got %v", r1) + } + + // Fill the window with healthy 50ms traffic — old spike must be gone. + for range ps.windowSize { + ps.counter.Store(10) + ps.latencyNs.Store(int64(10 * 50 * time.Millisecond)) + ps.runOneTick() + } + ps.mu.Lock() + r2 := ps.computeResistance(sum(ps.totals), sum(ps.errors), sum(ps.latencySumNs)) + ps.mu.Unlock() + if math.Abs(r2-0.25) > 1e-9 { + t.Fatalf("window refilled healthy: want R=0.25 (old spike evicted), got %v", r2) + } +} + +// TestPhysicsShedderResponseCountsErrors checks that only 5xx responses +// feed the error arm of R. 2xx/3xx/4xx increment the total counter but +// not the error counter. +func TestPhysicsShedderResponseCountsErrors(t *testing.T) { + cases := []struct { + status int + wantErrors int64 + }{ + {200, 0}, + {302, 0}, + {404, 0}, + {499, 0}, + {500, 1}, + {502, 1}, + {503, 1}, + {504, 1}, + } + for _, tc := range cases { + t.Run(http.StatusText(tc.status), func(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms") + defer ps.Close() + ctx := newFakeFilterContext() + ctx.FResponse = &http.Response{StatusCode: tc.status, Header: http.Header{}} + ps.Response(ctx) + if got := ps.counter.Load(); got != 1 { + t.Fatalf("status %d: total got %d, want 1", tc.status, got) + } + if got := ps.errorCounter.Load(); got != tc.wantErrors { + t.Fatalf("status %d: errors got %d, want %d", tc.status, got, tc.wantErrors) + } + }) + } +} + +// FuzzPhysicsShedderMath drives computeResistance, updateBaseline, and +// computeRejectProbability with arbitrary non-negative inputs. Asserts +// outputs stay finite and pReject stays in [0, physicsMaxRejectProb]. +func FuzzPhysicsShedderMath(f *testing.F) { + f.Add(int64(100), int64(10), int64(20_000_000_000), 100) + f.Add(int64(0), int64(0), int64(0), 0) + f.Add(int64(1), int64(1), int64(1), physicsWarmupTicks) + f.Add(int64(1_000_000), int64(0), int64(math.MaxInt32), 1000) + f.Fuzz(func(t *testing.T, sumReqs, sumErrs, sumLatNs int64, ticksSeen int) { + // Production inputs are all non-negative; skip invalid shapes. + if sumReqs < 0 || sumErrs < 0 || sumLatNs < 0 || ticksSeen < 0 { + t.Skip() + } + ps := &physicsShedder{latencyTarget: 200 * time.Millisecond} + r := ps.computeResistance(sumReqs, sumErrs, sumLatNs) + if math.IsNaN(r) || math.IsInf(r, 0) { + t.Fatalf("R non-finite: reqs=%d errs=%d lat=%d → %v", sumReqs, sumErrs, sumLatNs, r) + } + threshold := ps.updateBaseline(r) + if math.IsNaN(threshold) || math.IsInf(threshold, 0) { + t.Fatalf("threshold non-finite: R=%v → %v", r, threshold) + } + p := ps.computeRejectProbability(r, threshold, ticksSeen) + if math.IsNaN(p) { + t.Fatalf("pReject NaN: R=%v threshold=%v", r, threshold) + } + if p < 0 || p > physicsMaxRejectProb { + t.Fatalf("pReject %v out of range [0, %v]", p, physicsMaxRejectProb) + } + }) +} + +// TestPhysicsShedderInvariants runs randomized traffic for many ticks and +// asserts pipeline invariants always hold: pReject is in [0, max] and +// shedding never activates during warmup. Complements the fuzz test by +// exercising sequential state (EWMA + ring buffer). +func TestPhysicsShedderInvariants(t *testing.T) { + ps := makeTestPhysicsShedder(t, "200ms") + defer ps.Close() + + rng := mrand.New(mrand.NewPCG(1, 2)) + for range 2000 { + reqs := int64(rng.IntN(100)) + var errs int64 + if reqs > 0 { + errs = int64(rng.IntN(int(reqs) + 1)) + } + latPerReq := time.Duration(rng.Int64N(int64(2 * time.Second))) + ps.counter.Store(reqs) + ps.errorCounter.Store(errs) + ps.latencyNs.Store(reqs * int64(latPerReq)) + ps.runOneTick() + + p := ps.pReject() + if math.IsNaN(p) { + t.Fatalf("pReject NaN at tick %d", ps.ticksSeen) + } + if p < 0 || p > physicsMaxRejectProb { + t.Fatalf("pReject %v out of range at tick %d", p, ps.ticksSeen) + } + if ps.ticksSeen < physicsWarmupTicks && p != 0 { + t.Fatalf("shed during warmup at tick %d: p=%v", ps.ticksSeen, p) + } + } +} + +// TestPhysicsShedderTracingOnReject verifies that rejecting a request +// emits a physics_shedder span with the expected tags (including error). +func TestPhysicsShedderTracingOnReject(t *testing.T) { + tracer := tracingtest.NewTracer() + spec := NewPhysicsShedder(PhysicsShedderOptions{Tracer: tracer, testRand: true}).(*PhysicsShedderSpec) + f, err := spec.CreateFilter([]interface{}{"trace", "active", "200ms"}) + if err != nil { + t.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + ps.metrics = &metricstest.MockMetrics{} + defer ps.Close() + + ps.pRejectBits.Store(math.Float64bits(1.0)) + + parent := tracer.StartSpan("parent") + req, _ := http.NewRequest("GET", "http://example/", nil) + req = req.WithContext(opentracing.ContextWithSpan(req.Context(), parent)) + ctx := &filtertest.Context{FRequest: req, FStateBag: map[string]interface{}{}} + + ps.Request(ctx) + parent.Finish() + + span := tracer.FindSpan(physicsSpanName) + if span == nil { + t.Fatal("physics_shedder span not created") + } + tags := span.Tags() + wantTags := map[string]interface{}{ + "component": "skipper", + "mode": "active", + "physicsShedder.group": "trace", + "physicsShedder.mode": "active", + "physicsShedder.latencyTarget": "200ms", + "physicsShedder.pReject": 1.0, + "error": true, + } + for k, want := range wantTags { + got, ok := tags[k] + if !ok { + t.Errorf("tag %q missing", k) + continue + } + if got != want { + t.Errorf("tag %q: got %v, want %v", k, got, want) + } + } +} + +// TestPhysicsShedderTracingNoShedNoErrorTag verifies that when pReject is +// zero, the span is created but the error tag is NOT set. +func TestPhysicsShedderTracingNoShedNoErrorTag(t *testing.T) { + tracer := tracingtest.NewTracer() + spec := NewPhysicsShedder(PhysicsShedderOptions{Tracer: tracer, testRand: true}).(*PhysicsShedderSpec) + f, err := spec.CreateFilter([]interface{}{"trace", "active", "200ms"}) + if err != nil { + t.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + ps.metrics = &metricstest.MockMetrics{} + defer ps.Close() + + parent := tracer.StartSpan("parent") + req, _ := http.NewRequest("GET", "http://example/", nil) + req = req.WithContext(opentracing.ContextWithSpan(req.Context(), parent)) + ctx := &filtertest.Context{FRequest: req, FStateBag: map[string]interface{}{}} + + ps.Request(ctx) + parent.Finish() + + span := tracer.FindSpan(physicsSpanName) + if span == nil { + t.Fatal("physics_shedder span not created") + } + if _, hasErr := span.Tags()["error"]; hasErr { + t.Error("error tag should not be set when not rejecting") + } + if got := span.Tags()["physicsShedder.pReject"]; got != 0.0 { + t.Errorf("pReject tag: got %v, want 0", got) + } +} + +// TestPhysicsShedderTracingNoParentNoSpan verifies that without a parent +// span in the request context, physicsShedder creates no span (avoiding +// orphaned spans and noise in traces). +func TestPhysicsShedderTracingNoParentNoSpan(t *testing.T) { + tracer := tracingtest.NewTracer() + spec := NewPhysicsShedder(PhysicsShedderOptions{Tracer: tracer, testRand: true}).(*PhysicsShedderSpec) + f, err := spec.CreateFilter([]interface{}{"trace", "inactive", "200ms"}) + if err != nil { + t.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + ps.metrics = &metricstest.MockMetrics{} + defer ps.Close() + + ctx := newFakeFilterContext() + ps.Request(ctx) + + finished := tracer.FinishedSpans() + if len(finished) != 0 { + t.Fatalf("expected no spans without parent, got %d", len(finished)) + } +} + +// TestPhysicsShedderCoexistsWithAdmissionControl verifies that both +// filters can share a route, wired with their own pre/post processors, +// without breaking traffic. Healthy traffic flows through and each +// filter's counters advance independently. +func TestPhysicsShedderCoexistsWithAdmissionControl(t *testing.T) { + prev := metrics.Default + m := &metricstest.MockMetrics{} + metrics.Default = m + t.Cleanup(func() { metrics.Default = prev; m.Close() }) + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer backend.Close() + + physSpec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + acSpec := NewAdmissionControl(Options{testRand: true}).(*AdmissionControlSpec) + + route := &eskip.Route{ + Id: "r1", + Filters: []*eskip.Filter{ + {Name: filters.AdmissionControlName, Args: []interface{}{ + "ac", "inactive", "100ms", 5, 10, 0.9, 0.95, 1.0, + }}, + {Name: filters.PhysicsShedderName, Args: []interface{}{ + "ph", "inactive", "200ms", + }}, + }, + Backend: backend.URL, + } + + fr := make(filters.Registry) + fr.Register(physSpec) + fr.Register(acSpec) + fr.Register(builtin.NewSetRequestHeader()) + + dc := testdataclient.New([]*eskip.Route{route}) + defer dc.Close() + + proxy := proxytest.WithRoutingOptions(fr, routing.Options{ + DataClients: []routing.DataClient{dc}, + PreProcessors: []routing.PreProcessor{ + physSpec.PreProcessor(), + acSpec.PreProcessor(), + }, + PostProcessors: []routing.PostProcessor{ + physSpec.PostProcessor(), + acSpec.PostProcessor(), + }, + }) + defer proxy.Close() + + client := proxy.Client() + req, err := http.NewRequest("GET", proxy.URL, nil) + if err != nil { + t.Fatalf("NewRequest: %v", err) + } + const N = 20 + for range N { + rsp, err := client.Do(req) + if err != nil { + t.Fatalf("roundtrip: %v", err) + } + rsp.Body.Close() + if rsp.StatusCode != http.StatusOK { + t.Fatalf("want 200, got %d", rsp.StatusCode) + } + } + + var physTotal, acTotal int64 + m.WithCounters(func(c map[string]int64) { + physTotal = c["shedder.physics.total.ph"] + acTotal = c["shedder.admission_control.total.ac"] + }) + if physTotal != N { + t.Errorf("physics total: got %d, want %d", physTotal, N) + } + if acTotal == 0 { + t.Errorf("admissionControl total: got 0, expected > 0 (filters may not both have run)") + } +} + +// --- helpers -------------------------------------------------------------- + +// makeTestPhysicsShedder builds a filter without starting its tick goroutine +// and attaches a mock metrics sink. Tests drive ticks manually via runOneTick +// for determinism. An optional window argument overrides the default (5s). +func makeTestPhysicsShedder(t *testing.T, latencyTarget string, window ...string) *physicsShedder { + t.Helper() + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + args := []interface{}{"test", "inactive", latencyTarget} + if len(window) == 1 { + args = append(args, window[0]) + } + f, err := spec.CreateFilter(args) + if err != nil { + t.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + ps.metrics = &metricstest.MockMetrics{} + // Stop the auto tick goroutine so we can drive it deterministically. + // Close() now blocks until the goroutine actually exits, so it's safe + // to replace quit/done here without racing. + ps.Close() + ps.closed = false + ps.quit = make(chan struct{}) + ps.done = make(chan struct{}) + close(ps.done) // no goroutine running; make Close() a no-op wait. + ps.once = sync.Once{} + return ps +} + +// runOneTick performs exactly one tick cycle — swapping counters into the +// ring buffer and recomputing R, baseline, and reject probability. +func (ps *physicsShedder) runOneTick() { + reqs := ps.counter.Swap(0) + errs := ps.errorCounter.Swap(0) + lat := ps.latencyNs.Swap(0) + + ps.mu.Lock() + ps.totals[ps.bucketIdx] = reqs + ps.errors[ps.bucketIdx] = errs + ps.latencySumNs[ps.bucketIdx] = lat + ps.bucketIdx = (ps.bucketIdx + 1) % ps.windowSize + if ps.ticksSeen < math.MaxInt32 { + ps.ticksSeen++ + } + sumReqs := sum(ps.totals) + sumErrs := sum(ps.errors) + sumLat := sum(ps.latencySumNs) + ticksSeen := ps.ticksSeen + ps.mu.Unlock() + + r := ps.computeResistance(sumReqs, sumErrs, sumLat) + threshold := ps.updateBaseline(r) + rejectP := ps.computeRejectProbability(r, threshold, ticksSeen) + ps.pRejectBits.Store(math.Float64bits(rejectP)) + ps.publishGauges(r, ps.ewmaMu, threshold) +} + +func newFakeFilterContext() *filtertest.Context { + req, _ := http.NewRequest("GET", "http://example/", nil) + return &filtertest.Context{ + FRequest: req, + FStateBag: map[string]interface{}{}, + } +} + +// --- benchmarks ----------------------------------------------------------- + +// BenchmarkPhysicsShedderHotPath measures Request+Response under concurrent +// load. The filter is configured in inactive mode so no 503s are served, +// isolating the counter/atomic cost. +func BenchmarkPhysicsShedderHotPath(b *testing.B) { + spec := NewPhysicsShedder(PhysicsShedderOptions{testRand: true}).(*PhysicsShedderSpec) + f, err := spec.CreateFilter([]interface{}{"bench", "inactive", "200ms"}) + if err != nil { + b.Fatalf("CreateFilter: %v", err) + } + ps := f.(*physicsShedder) + defer ps.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + ctx := newFakeFilterContext() + ctx.FResponse = &http.Response{StatusCode: 200, Header: http.Header{}} + for pb.Next() { + ps.Request(ctx) + ps.Response(ctx) + } + }) +} + +// BenchmarkPhysicsShedderTickLoop measures the cost of one tick-cycle +// (swap + ring buffer + R + EWMA + probability publish). +func BenchmarkPhysicsShedderTickLoop(b *testing.B) { + ps := &physicsShedder{ + windowSize: 50, + latencyTarget: 200 * time.Millisecond, + totals: make([]int64, 50), + errors: make([]int64, 50), + latencySumNs: make([]int64, 50), + } + // Pre-fill counters so each tick has work to do. + ps.counter.Store(1000) + ps.latencyNs.Store(int64(1000 * 100 * time.Millisecond)) + + b.ResetTimer() + for range b.N { + ps.counter.Store(1000) + ps.latencyNs.Store(int64(1000 * 100 * time.Millisecond)) + ps.runOneTick() + } +} diff --git a/skipper.go b/skipper.go index 3acf96055a..09f440caaf 100644 --- a/skipper.go +++ b/skipper.go @@ -1868,6 +1868,14 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { log.Fatal("Failed to cast admission control filter to spec") } + physicsShedderFilter := shedder.NewPhysicsShedder(shedder.PhysicsShedderOptions{ + Tracer: tracer, + }) + physicsShedderSpec, ok := physicsShedderFilter.(*shedder.PhysicsShedderSpec) + if !ok { + log.Fatal("Failed to cast physics shedder filter to spec") + } + o.CustomFilters = append(o.CustomFilters, logfilter.NewAuditLog(o.MaxAuditBody), block.NewBlock(o.MaxMatcherBufferSize), @@ -1894,6 +1902,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { o.ApiUsageMonitoringRealmsTrackingPattern, ), admissionControlFilter, + physicsShedderFilter, ) if o.OIDCSecretsFile != "" { @@ -2296,6 +2305,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { builtin.NewRouteCreationMetrics(mtr), fadein.NewPostProcessor(fadein.PostProcessorOptions{EndpointRegistry: endpointRegistry}), admissionControlSpec.PostProcessor(), + physicsShedderSpec.PostProcessor(), builtin.CommentPostProcessor{}, }, SignalFirstLoad: o.WaitFirstRouteLoad, @@ -2340,6 +2350,7 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { } ro.PreProcessors = append(ro.PreProcessors, admissionControlSpec.PreProcessor()) + ro.PreProcessors = append(ro.PreProcessors, physicsShedderSpec.PreProcessor()) ro.PreProcessors = append(ro.PreProcessors, eskip.ForwardPreProcessor(o.ForwardBackendURL)) diff --git a/skptesting/benchmark-shedder.sh b/skptesting/benchmark-shedder.sh new file mode 100755 index 0000000000..13d4021e61 --- /dev/null +++ b/skptesting/benchmark-shedder.sh @@ -0,0 +1,70 @@ +#! /usr/bin/env bash +# Local load test for physicsShedder. Runs two scenarios back-to-back: +# 1) healthy — fast inline backend, latency ~0ms +# 2) slow — latency("100ms") injected, 2x the shedder's 50ms latencyTarget +# +# Both run with physicsShedder in "logInactive" mode so you can observe R, +# mu, sigma, threshold, pReject in the proxy log without 503s being served. +# +# Usage: +# ./benchmark-shedder.sh [duration_seconds] [concurrency] [skipper-binary] +# Requires: ab (apachebench). + +set -euo pipefail + +duration=${1:-20} +concurrency=${2:-32} +bin=${3:-} + +cwd=$(cd "$(dirname "$0")" && pwd) +if [ -z "$bin" ]; then + bin=$(mktemp) + echo "building skipper -> $bin" + (cd "$cwd/.." && go build -o "$bin" ./cmd/skipper) +fi + +which ab >/dev/null 2>&1 || { echo "ab (apachebench) required"; exit 2; } + +log_dir=$(mktemp -d) +echo "logs: $log_dir" + +cleanup() { + [ -n "${BPID:-}" ] && kill "$BPID" 2>/dev/null || true + [ -n "${PPID_SK:-}" ] && kill "$PPID_SK" 2>/dev/null || true +} +trap cleanup EXIT INT + +run_scenario() { + local name=$1 + local proxy_eskip=$2 + + echo + echo "=== scenario: $name ===" + "$bin" -access-log-disabled -address :9980 \ + -routes-file "$cwd/physics-shedder-backend.eskip" -support-listener :0 \ + > "$log_dir/backend-$name.log" 2>&1 & + BPID=$! + "$bin" -access-log-disabled -address :9090 \ + -routes-file "$cwd/$proxy_eskip" -support-listener :0 \ + > "$log_dir/proxy-$name.log" 2>&1 & + PPID_SK=$! + sleep 1 + + ab -t "$duration" -c "$concurrency" -q http://127.0.0.1:9090/ 2>&1 \ + | grep -E "Requests per|Time per|Failed|Complete|Non-2xx" || true + + sleep 1 + echo "--- last 5 physicsShedder ticks ---" + grep -i "physicsshedder\[local\]" "$log_dir/proxy-$name.log" | tail -5 || true + + kill "$BPID" "$PPID_SK" 2>/dev/null || true + wait "$BPID" "$PPID_SK" 2>/dev/null || true + BPID=""; PPID_SK="" + sleep 1 +} + +run_scenario healthy physics-shedder.eskip +run_scenario slow physics-shedder-slow.eskip + +echo +echo "full logs at $log_dir" diff --git a/skptesting/physics-shedder-backend.eskip b/skptesting/physics-shedder-backend.eskip new file mode 100644 index 0000000000..2b1f0550b9 --- /dev/null +++ b/skptesting/physics-shedder-backend.eskip @@ -0,0 +1 @@ +ok: * -> status(200) -> inlineContent("ok\n") -> ; diff --git a/skptesting/physics-shedder-slow.eskip b/skptesting/physics-shedder-slow.eskip new file mode 100644 index 0000000000..054219377e --- /dev/null +++ b/skptesting/physics-shedder-slow.eskip @@ -0,0 +1 @@ +proxy: * -> physicsShedder("local", "logInactive", "50ms") -> latency("100ms") -> "http://127.0.0.1:9980"; diff --git a/skptesting/physics-shedder.eskip b/skptesting/physics-shedder.eskip new file mode 100644 index 0000000000..8c2bfed54e --- /dev/null +++ b/skptesting/physics-shedder.eskip @@ -0,0 +1 @@ +proxy: * -> physicsShedder("local", "logInactive", "50ms") -> "http://127.0.0.1:9980"; From 868b283498e78380c3c6807923d07a10687d9c25 Mon Sep 17 00:00:00 2001 From: ivan-digital Date: Sat, 18 Apr 2026 13:30:26 +0200 Subject: [PATCH 2/4] Remove unused physicsShedder.shouldReject helper Signed-off-by: ivan-digital --- filters/shedder/physics.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/filters/shedder/physics.go b/filters/shedder/physics.go index 690810bc5d..a1fd083800 100644 --- a/filters/shedder/physics.go +++ b/filters/shedder/physics.go @@ -368,17 +368,6 @@ func (ps *physicsShedder) pReject() float64 { return math.Float64frombits(b) } -func (ps *physicsShedder) shouldReject() bool { - p := ps.pReject() - if p <= 0 { - return false - } - ps.muRand.Lock() - r := ps.rand() - ps.muRand.Unlock() - return p > r -} - func (ps *physicsShedder) setCommonTags(span opentracing.Span, r, threshold, p float64) { span.SetTag("physicsShedder.group", ps.metricSuffix) span.SetTag("physicsShedder.mode", ps.mode.String()) From 8b63d033a6f1efb701eb959114e62db3aff71f59 Mon Sep 17 00:00:00 2001 From: ivan-digital Date: Sat, 18 Apr 2026 18:32:04 +0200 Subject: [PATCH 3/4] Tighten physicsShedder docs and drop forward-looking comment Signed-off-by: ivan-digital --- docs/reference/filters.md | 91 +++++++++++++++----------------------- filters/shedder/physics.go | 3 +- 2 files changed, 37 insertions(+), 57 deletions(-) diff --git a/docs/reference/filters.md b/docs/reference/filters.md index 8c4792c9c3..c3725dc99b 100644 --- a/docs/reference/filters.md +++ b/docs/reference/filters.md @@ -2795,27 +2795,19 @@ probability you have to use values lower than 1: ### physicsShedder -Implements a self-tuning, route-level load shedder that uses latency as -a first-class signal. Unlike `admissionControl`, which only reacts to -error rate, `physicsShedder` also catches gray failures: backends that -keep returning HTTP 200 while getting slower. +Route-level load shedder that uses latency as a first-class signal. It +catches gray failures (backends returning 200 but getting slower) that +`admissionControl` misses because it only watches error rate. -Each route carries a "resistance" score built from observed latency and -error rate. A per-route EWMA baseline of that score is learned -automatically, so there is no manual threshold to tune and the filter -adapts to daily traffic patterns on its own. When the current resistance -exceeds the learned baseline by a configurable number of standard -deviations, the filter rejects a share of traffic with status 503, using -the same probabilistic style as `admissionControl`. - -The resistance is computed per observation window as: +Per route, the filter computes a resistance score from observed latency +and errors: $$ R = { avgLatency \over latencyTarget } + errorWeight \cdot errorRate $$ -The shed threshold is $\mu + k \cdot \sigma$, where $\mu$ and $\sigma$ -come from an exponentially weighted moving average of past resistance -values. The reject probability is $(R - threshold) / R$, clamped to a -maximum. +It learns a baseline ($\mu$) and deviation ($\sigma$) of $R$ over time +via EWMA, and starts rejecting some traffic when $R$ exceeds +$\mu + k\sigma$, with reject probability $(R - threshold) / R$ clamped to +a maximum. Examples: @@ -2826,50 +2818,39 @@ Examples: Parameters: -* metric suffix (string) -* mode (enum) -* latency target (time.Duration) -* window (time.Duration, optional, default `"5s"`) +* metric suffix (string) — unique per filter instance, used in metric keys +* mode (enum) — `active`, `inactive`, or `logInactive` +* latency target (time.Duration) — the expected per-request latency for + this route; the only knob operators tune +* window (time.Duration, optional, default `"5s"`, range `200ms`–`60s`) — + observation window for the resistance calculation -Metric suffix is the chosen suffix key to expose reject and resistance -metrics, should be unique by filter instance. +Modes: -Mode has 3 different possible values: +* `active` — reject traffic with 503 +* `inactive` — never reject, but expose `would_reject` counter for dry-run +* `logInactive` — same as `inactive`, plus per-tick logs of $R$, $\mu$, + $\sigma$, threshold and reject probability -* "active" will reject traffic -* "inactive" will never reject traffic, but collect metrics -* "logInactive" will not reject traffic, but log the filter's decisions - for tuning and dry-run deployments - -Latency target is the expected per-request latency for this route. It is -the single knob operators tune: the filter considers the route healthy -when average latency sits at or below this value. The baseline and -variance are learned relative to it. - -Window is the observation window, i.e. how much history contributes to -the current resistance calculation. It must be between `200ms` and `60s`. - -During filter startup the shedder does not reject any traffic until its -baseline has primed, so newly loaded routes behave safely even under -immediate load. - -Exposed metrics per metric suffix: - -* `shedder.physics.total.` — counter of requests observed -* `shedder.physics.reject.` — counter of requests rejected with - 503 in active mode -* `shedder.physics.would_reject.` — counter of requests that - would have been rejected in `inactive`/`logInactive` modes -* `shedder.physics.resistance.` — current resistance gauge -* `shedder.physics.baseline.` — EWMA baseline gauge +The filter does not reject any traffic during startup until its baseline +has primed. + +Metrics (per metric suffix): + +* `shedder.physics.total.` — requests observed +* `shedder.physics.reject.` — requests rejected (active mode only) +* `shedder.physics.would_reject.` — requests that would have been + rejected (inactive/logInactive) +* `shedder.physics.resistance.` — current $R$ gauge +* `shedder.physics.baseline.` — current $\mu$ gauge * `shedder.physics.threshold.` — current shed threshold gauge -`physicsShedder` composes cleanly with `admissionControl`: both filters -honor the `Admission-Control` response header so a filter upstream in -the chain does not double-count 503s produced by a filter downstream. +`physicsShedder` composes with `admissionControl` on the same route: +both honor the `Admission-Control` response header so an upstream filter +does not double-count 503s served by a downstream one. -During load tests of the backend, run this filter in `inactive` or -`logInactive` mode so the deliberately induced latency does not trigger +When load testing the backend, run this filter in `inactive` or +`logInactive` mode so deliberately induced latency does not trigger shedding. ## lua diff --git a/filters/shedder/physics.go b/filters/shedder/physics.go index a1fd083800..a5fd2aefea 100644 --- a/filters/shedder/physics.go +++ b/filters/shedder/physics.go @@ -305,8 +305,7 @@ func (ps *physicsShedder) tickWindows() { } // computeResistance collapses window-aggregated metrics into a single R -// value. Pressure is the only component in v1; momentum/scar may be added -// behind flags if scenario tests motivate them. +// value combining latency-vs-target and error rate. func (ps *physicsShedder) computeResistance(sumReqs, sumErrs, sumLatNs int64) float64 { if sumReqs <= 0 { return 0 From 5ab1f301657f4e900a1fc34ce47572a639d249a5 Mon Sep 17 00:00:00 2001 From: ivan-digital Date: Sun, 19 Apr 2026 19:24:31 +0200 Subject: [PATCH 4/4] ci: retrigger after flaky TestValkeyClient Signed-off-by: ivan-digital