From 39ab3fc0f5c89d1043edf144447e6e85dff07118 Mon Sep 17 00:00:00 2001 From: khushjain Date: Wed, 15 Apr 2026 15:11:17 -0400 Subject: [PATCH 1/3] add support for missing stats count in rollup for streaming expressions --- ...-support-missing-stats-count-in-rollup.yml | 8 ++ .../org/apache/solr/client/solrj/io/Lang.java | 2 + .../io/stream/ParallelMetricsRollup.java | 5 ++ .../io/stream/metrics/MissingMetric.java | 87 +++++++++++++++++++ .../client/solrj/io/stream/StreamingTest.java | 40 ++++++--- 5 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml create mode 100644 solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/MissingMetric.java diff --git a/changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml b/changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml new file mode 100644 index 000000000000..ec9f20bde065 --- /dev/null +++ b/changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml @@ -0,0 +1,8 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: "SOLR-18198: Support 'missing' stats count in rollup function for streaming expressions" +type: added +authors: + - name: khushjain +links: + - name: SOLR-18198 + url: https://issues.apache.org/jira/browse/SOLR-18198 diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/Lang.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/Lang.java index 927fb1eef5d8..2c0e3a4de7a9 100644 --- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/Lang.java +++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/Lang.java @@ -325,6 +325,7 @@ import org.apache.solr.client.solrj.io.stream.metrics.MaxMetric; import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MissingMetric; import org.apache.solr.client.solrj.io.stream.metrics.PercentileMetric; import org.apache.solr.client.solrj.io.stream.metrics.StdMetric; import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; @@ -406,6 +407,7 @@ public static void register(StreamFactory streamFactory) { .withFunctionName("std", StdMetric.class) .withFunctionName("count", CountMetric.class) .withFunctionName("countDist", CountDistinctMetric.class) + .withFunctionName("missing", MissingMetric.class) // tuple manipulation operations .withFunctionName("replace", ReplaceOperation.class) diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java index 752581b2f8fc..584932c1e016 100644 --- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java +++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/ParallelMetricsRollup.java @@ -26,6 +26,7 @@ import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; import org.apache.solr.client.solrj.io.stream.metrics.Metric; import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MissingMetric; import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; import org.apache.solr.client.solrj.io.stream.metrics.WeightedSumMetric; @@ -133,6 +134,10 @@ default Optional getRollupMetrics(Metric[] metrics) { // can't properly rollup mean metrics w/o a count (reqd by WeightedSumMetric) return Optional.empty(); } + } else if (next instanceof MissingMetric) { + // sum of missing counts + nextRollup = new SumMetric(next.getIdentifier()); + nextRollup.outputLong = next.outputLong; } else if (next instanceof CountDistinctMetric) { // rollup of count distinct is the max across the tiers nextRollup = new MaxMetric(next.getIdentifier()); diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/MissingMetric.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/MissingMetric.java new file mode 100644 index 000000000000..8f60dfc5f3b6 --- /dev/null +++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/MissingMetric.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.client.solrj.io.stream.metrics; + +import java.io.IOException; +import java.util.Locale; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; + +public class MissingMetric extends Metric { + private String columnName; + private long count; + + public MissingMetric(String columnName) { + init("missing", columnName); + } + + public MissingMetric(StreamExpression expression, StreamFactory factory) throws IOException { + String functionName = expression.getFunctionName(); + String columnName = factory.getValueOperand(expression, 0); + + if (null == columnName) { + throw new IOException( + String.format( + Locale.ROOT, + "Invalid expression %s - expected %s(columnName)", + expression, + functionName)); + } + if (1 != expression.getParameters().size()) { + throw new IOException( + String.format(Locale.ROOT, "Invalid expression %s - unknown operands found", expression)); + } + + init(functionName, columnName); + } + + private void init(String functionName, String columnName) { + this.columnName = columnName; + this.outputLong = true; + setFunctionName(functionName); + setIdentifier(functionName, "(", columnName, ")"); + } + + @Override + public String[] getColumns() { + return new String[] {columnName}; + } + + @Override + public void update(Tuple tuple) { + if (tuple.get(columnName) == null) { + ++count; + } + } + + @Override + public Long getValue() { + return count; + } + + @Override + public Metric newInstance() { + return new MissingMetric(columnName); + } + + @Override + public StreamExpressionParameter toExpression(StreamFactory factory) throws IOException { + return new StreamExpression(getFunctionName()).withParameter(columnName); + } +} diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index a0b72d8d2a10..d2748c2fcf4d 100644 --- a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -46,6 +46,7 @@ import org.apache.solr.client.solrj.io.stream.metrics.MeanMetric; import org.apache.solr.client.solrj.io.stream.metrics.Metric; import org.apache.solr.client.solrj.io.stream.metrics.MinMetric; +import org.apache.solr.client.solrj.io.stream.metrics.MissingMetric; import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.UpdateRequest; @@ -145,16 +146,16 @@ public static void configureCluster() throws Exception { // Update request shared by many of the tests private final UpdateRequest helloDocsUpdateRequest = new UpdateRequest() - .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1") + .add(id, "0", "a_s", "hello0", "a_i", "0", "a_f", "1", "b_f", "1.5") .add(id, "2", "a_s", "hello0", "a_i", "2", "a_f", "2") .add(id, "3", "a_s", "hello3", "a_i", "3", "a_f", "3") - .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4") + .add(id, "4", "a_s", "hello4", "a_i", "4", "a_f", "4", "b_f", "4.5") .add(id, "1", "a_s", "hello0", "a_i", "1", "a_f", "5") - .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6") - .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7") + .add(id, "5", "a_s", "hello3", "a_i", "10", "a_f", "6", "b_f", "6.5") + .add(id, "6", "a_s", "hello4", "a_i", "11", "a_f", "7", "b_f", "7.5") .add(id, "7", "a_s", "hello3", "a_i", "12", "a_f", "8") .add(id, "8", "a_s", "hello3", "a_i", "13", "a_f", "9") - .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10"); + .add(id, "9", "a_s", "hello0", "a_i", "14", "a_f", "10", "b_f", "10.5"); @Before public void clearCollection() throws Exception { @@ -1646,7 +1647,7 @@ public void testRollupStream() throws Exception { streamContext.setSolrClientCache(solrClientCache); try { - SolrParams sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc"); + SolrParams sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f,b_f", "sort", "a_s asc"); CloudSolrStream stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA); Bucket[] buckets = {new Bucket("a_s")}; @@ -1660,7 +1661,8 @@ public void testRollupStream() throws Exception { new MaxMetric("a_f"), new MeanMetric("a_i"), new MeanMetric("a_f"), - new CountMetric() + new CountMetric(), + new MissingMetric("b_f") }; RollupStream rollupStream = new RollupStream(stream, buckets, metrics); @@ -1682,6 +1684,7 @@ public void testRollupStream() throws Exception { Double avgi = tuple.getDouble("avg(a_i)"); Double avgf = tuple.getDouble("avg(a_f)"); Double count = tuple.getDouble("count(*)"); + Double missingBf = tuple.getDouble("missing(b_f)"); assertEquals("hello0", bucket); assertEquals(17, sumi, 0.001); @@ -1693,6 +1696,7 @@ public void testRollupStream() throws Exception { assertEquals(4.25, avgi, 0.001); assertEquals(4.5, avgf, 0.001); assertEquals(4, count, 0.001); + assertEquals(2, missingBf, 0.001); tuple = tuples.get(1); bucket = tuple.getString("a_s"); @@ -1705,6 +1709,7 @@ public void testRollupStream() throws Exception { avgi = tuple.getDouble("avg(a_i)"); avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); + missingBf = tuple.getDouble("missing(b_f)"); assertEquals("hello3", bucket); assertEquals(38, sumi, 0.001); @@ -1716,6 +1721,7 @@ public void testRollupStream() throws Exception { assertEquals(9.5, avgi, 0.001); assertEquals(6.5, avgf, 0.001); assertEquals(4, count, 0.001); + assertEquals(3, missingBf, 0.001); tuple = tuples.get(2); bucket = tuple.getString("a_s"); @@ -1728,6 +1734,7 @@ public void testRollupStream() throws Exception { avgi = tuple.getDouble("avg(a_i)"); avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); + missingBf = tuple.getDouble("missing(b_f)"); assertEquals("hello4", bucket); assertEquals(15, sumi.longValue()); @@ -1739,6 +1746,7 @@ public void testRollupStream() throws Exception { assertEquals(7.5, avgi, 0.01); assertEquals(5.5, avgf, 0.01); assertEquals(2, count, 0.01); + assertEquals(0, missingBf, 0.01); // Test will null metrics rollupStream = new RollupStream(stream, buckets, metrics); @@ -1763,7 +1771,7 @@ public void testRollupStream() throws Exception { .add(id, "12", "a_s", null, "a_i", "14", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f", "sort", "a_s asc", "qt", "/export"); + sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f,b_f", "sort", "a_s asc", "qt", "/export"); stream = new CloudSolrStream(zkHost, COLLECTIONORALIAS, sParamsA); Bucket[] buckets1 = {new Bucket("a_s")}; @@ -1776,7 +1784,8 @@ public void testRollupStream() throws Exception { new MaxMetric("a_f"), new MeanMetric("a_i"), new MeanMetric("a_f"), - new CountMetric() + new CountMetric(), + new MissingMetric("b_f") }; rollupStream = new RollupStream(stream, buckets1, metrics1); @@ -1796,6 +1805,7 @@ public void testRollupStream() throws Exception { avgi = tuple.getDouble("avg(a_i)"); avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); + missingBf = tuple.getDouble("missing(b_f)"); assertEquals(14, sumi, 0.01); assertEquals(10, sumf, 0.01); @@ -1806,6 +1816,7 @@ public void testRollupStream() throws Exception { assertEquals(14, avgi, 0.01); assertEquals(10, avgf, 0.01); assertEquals(1, count, 0.01); + assertEquals(1, missingBf, 0.01); } finally { solrClientCache.close(); } @@ -1982,7 +1993,7 @@ public void testParallelRollupStream() throws Exception { "q", "*:*", "fl", - "a_s,a_i,a_f", + "a_s,a_i,a_f,b_f", "sort", "a_s asc", "partitionKeys", @@ -2002,7 +2013,8 @@ public void testParallelRollupStream() throws Exception { new MaxMetric("a_f"), new MeanMetric("a_i"), new MeanMetric("a_f"), - new CountMetric() + new CountMetric(), + new MissingMetric("b_f") }; RollupStream rollupStream = new RollupStream(stream, buckets, metrics); @@ -2027,6 +2039,7 @@ public void testParallelRollupStream() throws Exception { Double avgi = tuple.getDouble("avg(a_i)"); Double avgf = tuple.getDouble("avg(a_f)"); Double count = tuple.getDouble("count(*)"); + Double missingBf = tuple.getDouble("missing(b_f)"); assertEquals("hello0", bucket); assertEquals(17, sumi, 0.001); @@ -2038,6 +2051,7 @@ public void testParallelRollupStream() throws Exception { assertEquals(4.25, avgi, 0.001); assertEquals(4.5, avgf, 0.001); assertEquals(4, count, 0.001); + assertEquals(2, missingBf, 0.001); tuple = tuples.get(1); bucket = tuple.getString("a_s"); @@ -2050,6 +2064,7 @@ public void testParallelRollupStream() throws Exception { avgi = tuple.getDouble("avg(a_i)"); avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); + missingBf = tuple.getDouble("missing(b_f)"); assertEquals("hello3", bucket); assertEquals(38, sumi, 0.001); @@ -2061,6 +2076,7 @@ public void testParallelRollupStream() throws Exception { assertEquals(9.5, avgi, 0.001); assertEquals(6.5, avgf, 0.001); assertEquals(4, count, 0.001); + assertEquals(3, missingBf, 0.001); tuple = tuples.get(2); bucket = tuple.getString("a_s"); @@ -2073,6 +2089,7 @@ public void testParallelRollupStream() throws Exception { avgi = tuple.getDouble("avg(a_i)"); avgf = tuple.getDouble("avg(a_f)"); count = tuple.getDouble("count(*)"); + missingBf = tuple.getDouble("missing(b_f)"); assertEquals("hello4", bucket); assertEquals(15, sumi.longValue()); @@ -2084,6 +2101,7 @@ public void testParallelRollupStream() throws Exception { assertEquals(7.5, avgi, 0.001); assertEquals(5.5, avgf, 0.001); assertEquals(2, count, 0.001); + assertEquals(0, missingBf, 0.001); } finally { solrClientCache.close(); } From 91725460de38ed8a5c27a178c2fbb6eae4b5ce44 Mon Sep 17 00:00:00 2001 From: khushjain Date: Thu, 16 Apr 2026 10:54:09 -0400 Subject: [PATCH 2/3] Update changelog and add test for all rollup metrics --- ...-support-missing-stats-count-in-rollup.yml | 2 +- .../client/solrj/io/stream/StreamingTest.java | 41 ++++++++++++++++++- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml b/changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml index ec9f20bde065..f614517e3689 100644 --- a/changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml +++ b/changelog/unreleased/SOLR-18198-support-missing-stats-count-in-rollup.yml @@ -1,5 +1,5 @@ # See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc -title: "SOLR-18198: Support 'missing' stats count in rollup function for streaming expressions" +title: "Support 'missing' stats count in rollup function for streaming expressions" type: added authors: - name: khushjain diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index d2748c2fcf4d..601a4fe6f676 100644 --- a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -1967,12 +1967,51 @@ public void testRollupWithNoParallel() throws Exception { "expr", "rollup(search(" + COLLECTIONORALIAS - + ",q=\"*:*\",fl=\"a_s,a_i,a_f\",sort=\"a_s desc\",partitionKeys=\"a_s\", qt=\"/export\"),over=\"a_s\")\n"); + + ",q=\"*:*\",fl=\"a_s,a_i,a_f,b_f\",sort=\"a_s asc\",partitionKeys=\"a_s\", qt=\"/export\"),over=\"a_s\",sum(a_i),sum(a_f),min(a_i),min(a_f),max(a_i),max(a_f),avg(a_i),avg(a_f),count(*),missing(b_f))\n"); SolrStream solrStream = new SolrStream(shardUrls.get(0), solrParams); streamContext = new StreamContext(); solrStream.setStreamContext(streamContext); tuples = getTuples(solrStream); assertEquals(3, tuples.size()); + + Tuple exprTuple = tuples.get(0); + assertEquals("hello0", exprTuple.getString("a_s")); + assertEquals(17, exprTuple.getDouble("sum(a_i)"), 0.001); + assertEquals(18, exprTuple.getDouble("sum(a_f)"), 0.001); + assertEquals(0, exprTuple.getDouble("min(a_i)"), 0.001); + assertEquals(1, exprTuple.getDouble("min(a_f)"), 0.001); + assertEquals(14, exprTuple.getDouble("max(a_i)"), 0.001); + assertEquals(10, exprTuple.getDouble("max(a_f)"), 0.001); + assertEquals(4.25, exprTuple.getDouble("avg(a_i)"), 0.001); + assertEquals(4.5, exprTuple.getDouble("avg(a_f)"), 0.001); + assertEquals(4, exprTuple.getDouble("count(*)"), 0.001); + assertEquals(2, exprTuple.getDouble("missing(b_f)"), 0.001); + + exprTuple = tuples.get(1); + assertEquals("hello3", exprTuple.getString("a_s")); + assertEquals(38, exprTuple.getDouble("sum(a_i)"), 0.001); + assertEquals(26, exprTuple.getDouble("sum(a_f)"), 0.001); + assertEquals(3, exprTuple.getDouble("min(a_i)"), 0.001); + assertEquals(3, exprTuple.getDouble("min(a_f)"), 0.001); + assertEquals(13, exprTuple.getDouble("max(a_i)"), 0.001); + assertEquals(9, exprTuple.getDouble("max(a_f)"), 0.001); + assertEquals(9.5, exprTuple.getDouble("avg(a_i)"), 0.001); + assertEquals(6.5, exprTuple.getDouble("avg(a_f)"), 0.001); + assertEquals(4, exprTuple.getDouble("count(*)"), 0.001); + assertEquals(3, exprTuple.getDouble("missing(b_f)"), 0.001); + + exprTuple = tuples.get(2); + assertEquals("hello4", exprTuple.getString("a_s")); + assertEquals(15, exprTuple.getDouble("sum(a_i)"), 0.001); + assertEquals(11, exprTuple.getDouble("sum(a_f)"), 0.001); + assertEquals(4, exprTuple.getDouble("min(a_i)"), 0.001); + assertEquals(4, exprTuple.getDouble("min(a_f)"), 0.001); + assertEquals(11, exprTuple.getDouble("max(a_i)"), 0.001); + assertEquals(7, exprTuple.getDouble("max(a_f)"), 0.001); + assertEquals(7.5, exprTuple.getDouble("avg(a_i)"), 0.001); + assertEquals(5.5, exprTuple.getDouble("avg(a_f)"), 0.001); + assertEquals(2, exprTuple.getDouble("count(*)"), 0.001); + assertEquals(0, exprTuple.getDouble("missing(b_f)"), 0.001); } finally { solrClientCache.close(); } From fbde8d1e3e6b94209746a0884875a189bfca7b4d Mon Sep 17 00:00:00 2001 From: khushjain Date: Thu, 16 Apr 2026 12:14:04 -0400 Subject: [PATCH 3/3] Update reference docs --- .../query-guide/pages/stream-decorator-reference.adoc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc b/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc index 03740f8f1d9f..7f14117ef1dd 100644 --- a/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc +++ b/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc @@ -1448,7 +1448,7 @@ For faster aggregation over low to moderate cardinality fields, the `facet` func * `StreamExpression` (Mandatory) * `over`: (Mandatory) A list of fields to group by. * `metrics`: (Mandatory) The list of metrics to compute. -Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`, `max(col)`, `count(*)`. +Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`, `max(col)`, `count(*)`, `missing(col)`. === rollup Syntax @@ -1465,7 +1465,8 @@ rollup( max(a_f), avg(a_i), avg(a_f), - count(*) + count(*), + missing(a_i) ) ----