diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index add4508c87bd..db8c75b4a578 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -177,9 +177,11 @@ use crate::spill::spill_manager::SpillManager; /// output partition independently re-executes the left child and manages /// its own spill state. /// -/// Currently supports INNER, LEFT, LEFT SEMI, LEFT ANTI, and LEFT MARK -/// join types. RIGHT/FULL joins require a global right-side bitmap across -/// all left chunks, which is not yet implemented. +/// All join types are supported. For RIGHT/FULL/RIGHT SEMI/RIGHT ANTI/ +/// RIGHT MARK joins, a global right-side bitmap (indexed by right batch +/// sequence number) accumulates matches across all left chunks. After the +/// last left chunk is processed, the right side is replayed one more time +/// to emit unmatched right rows using the accumulated bitmap. /// /// Tracking issue: /// @@ -655,11 +657,23 @@ impl ExecutionPlan for NestedLoopJoinExec { // Determine if OOM fallback to memory-limited mode is possible. // Conditions: - // 1. Disk manager supports temp files (needed for right-side spilling) - // 2. Join type does not require tracking right-side matched state - // across multiple left chunks (RIGHT/FULL joins not yet supported) + // 1. Disk manager supports temp files (needed for spilling). + // 2. FULL join with multiple right partitions is not yet supported + // in the fallback path. FULL join needs to track BOTH left-side + // matches (for unmatched left rows) AND right-side matches (for + // unmatched right rows). The fallback path builds a per-partition + // `JoinLeftData` with `probe_threads_counter == 1`, so each + // partition emits unmatched left rows based only on its own + // right-side matches, producing incorrect duplicate output for + // left rows that match in another partition. Other join types + // that need only one-sided final emission (LEFT, LEFT SEMI, + // LEFT ANTI, LEFT MARK) have a similar latent issue in the + // fallback path which predates this change; tracking is out of + // scope for this PR. + let full_join_multi_partition = + matches!(self.join_type, JoinType::Full) && right_partition_count > 1; let spill_state = if context.runtime_env().disk_manager.tmp_files_enabled() - && !need_produce_right_in_final(self.join_type) + && !full_join_multi_partition { SpillState::Pending { left_plan: Arc::clone(&self.left), @@ -871,6 +885,11 @@ enum NLJState { ProbeRight, EmitRightUnmatched, EmitLeftUnmatched, + /// Emit unmatched right rows using the global bitmap accumulated across + /// all left chunks. Only used in memory-limited mode for join types that + /// require tracking right-side matches in the final output (RIGHT, FULL, + /// RIGHT SEMI, RIGHT ANTI, RIGHT MARK). + EmitGlobalRightUnmatched, Done, } /// Shared data for the left-side spill fallback. @@ -933,6 +952,47 @@ pub(crate) struct SpillStateActive { pending_batches: Vec, /// Right input that spills on the first pass and replays from spill later. right_input: ReplayableStreamSource, + /// Per-batch accumulated right bitmaps across all left chunks. + /// Index = right batch sequence number (0-based, non-empty batches only). + /// Only populated when `should_track_unmatched_right` is true. + global_right_bitmaps: Vec, + /// Separate reservation for `global_right_bitmaps`. These buffers live + /// for the full operator lifetime (not per-chunk), so they must be + /// tracked separately from `reservation`, which gets `resize(0)`-ed + /// between chunks. + global_right_bitmaps_reservation: MemoryReservation, + /// Current right batch sequence index within the current pass. + right_batch_index: usize, +} + +impl SpillStateActive { + /// Merge a per-pass right bitmap into the global accumulator at the + /// given batch index, growing the dedicated reservation when seeing + /// a batch index for the first time. + /// + /// On first encounter of `idx`, the bitmap is stored as-is and its + /// size is reserved. On subsequent encounters (later left chunk + /// passes over the same right batch), the existing entry is OR-merged + /// with `values`. Because `bitor` produces a buffer of the same bit + /// length, the reservation does not need to be adjusted on merge. + fn merge_current_right_bitmap(&mut self, idx: usize, values: BooleanBuffer) { + if idx >= self.global_right_bitmaps.len() { + // First encounter of this right batch — account memory and store. + // The bitmap has one bit per right row, so for very large right + // inputs the accumulated size can be non-negligible (e.g., + // 1M rows ≈ 125 KB per batch). + // Use infallible `grow` because we must accept the bitmap to + // preserve correctness — the fallback path has no other recourse. + let bytes = values.len().div_ceil(8); + self.global_right_bitmaps_reservation.grow(bytes); + self.global_right_bitmaps.push(values); + } else { + // Subsequent left chunk pass — OR merge. Same bit length, so + // no reservation adjustment is needed. + self.global_right_bitmaps[idx] = + self.global_right_bitmaps[idx].bitor(&values); + } + } } pub(crate) struct NestedLoopJoinStream { @@ -1210,6 +1270,25 @@ impl Stream for NestedLoopJoinStream { } } + // Replay all right batches from spill and emit unmatched + // right rows using the global bitmap accumulated across all + // left chunks. Only entered in memory-limited mode for join + // types where `should_track_unmatched_right` is true + // (RIGHT, FULL, RIGHT SEMI, RIGHT ANTI, RIGHT MARK). + NLJState::EmitGlobalRightUnmatched => { + debug!("[NLJState] Entering: {:?}", self.state); + + let join_metric = self.metrics.join_metrics.join_time.clone(); + let _join_timer = join_metric.timer(); + + match self.handle_emit_global_right_unmatched(cx) { + ControlFlow::Continue(()) => continue, + ControlFlow::Break(poll) => { + return self.metrics.join_metrics.baseline.record_poll(poll); + } + } + } + // The final state and the exit point NLJState::Done => { debug!("[NLJState] Entering: {:?}", self.state); @@ -1348,6 +1427,13 @@ impl NestedLoopJoinStream { .with_can_spill(true) .register(context.memory_pool()); + // Separate reservation for the global right bitmaps. These buffers + // persist across all left chunks, whereas `reservation` is reset + // between chunks via `resize(0)`. + let global_right_bitmaps_reservation = + MemoryConsumer::new("NestedLoopJoinGlobalRightBitmaps".to_string()) + .register(context.memory_pool()); + // Create SpillManager for right-side spilling let right_schema = self .right_data @@ -1376,6 +1462,9 @@ impl NestedLoopJoinStream { right_spill_manager, "NestedLoopJoin right spill", ), + global_right_bitmaps: Vec::new(), + global_right_bitmaps_reservation, + right_batch_index: 0, })); // State stays BufferingLeft — next poll will enter @@ -1572,6 +1661,7 @@ impl NestedLoopJoinStream { self.buffered_left_data = Some(Arc::new(left_data)); + active.right_batch_index = 0; match active.right_input.open_pass() { Ok(stream) => { self.right_data = Some(stream); @@ -1680,10 +1770,37 @@ impl NestedLoopJoinStream { } } - /// Handle EmitRightUnmatched state - emit unmatched right rows + /// Handle EmitRightUnmatched state - emit unmatched right rows. + /// + /// In memory-limited mode, instead of emitting unmatched right rows + /// per-batch (which would be incorrect since more left chunks may + /// match those rows), we merge the bitmap into the global accumulator + /// and defer emission to `EmitGlobalRightUnmatched`. fn handle_emit_right_unmatched( &mut self, ) -> ControlFlow>>> { + // In memory-limited mode, merge bitmap into global and move on + if self.is_memory_limited() { + debug_assert!( + self.current_right_batch_matched.is_some(), + "right bitmap must be present" + ); + let bitmap = std::mem::take(&mut self.current_right_batch_matched) + .expect("right bitmap should be available"); + let (values, _nulls) = bitmap.into_parts(); + + if let SpillState::Active(ref mut active) = self.spill_state { + let idx = active.right_batch_index; + active.merge_current_right_bitmap(idx, values); + active.right_batch_index += 1; + } + + self.current_right_batch = None; + self.state = NLJState::FetchingRight; + return ControlFlow::Continue(()); + } + + // Standard (single-pass) mode: emit unmatched right rows immediately // Return any completed batches first if let Some(poll) = self.maybe_flush_ready_batch() { return ControlFlow::Break(poll); @@ -1694,23 +1811,16 @@ impl NestedLoopJoinStream { && self.current_right_batch.is_some(), "This state is yielding output for unmatched rows in the current right batch, so both the right batch and the bitmap must be present" ); - // Construct the result batch for unmatched right rows using a utility function match self.process_right_unmatched() { - Ok(Some(batch)) => { - match self.output_buffer.push_batch(batch) { - Ok(()) => { - // Processed all in one pass - // cleared inside `process_right_unmatched` - debug_assert!(self.current_right_batch.is_none()); - self.state = NLJState::FetchingRight; - ControlFlow::Continue(()) - } - Err(e) => ControlFlow::Break(Poll::Ready(Some(arrow_err!(e)))), + Ok(Some(batch)) => match self.output_buffer.push_batch(batch) { + Ok(()) => { + debug_assert!(self.current_right_batch.is_none()); + self.state = NLJState::FetchingRight; + ControlFlow::Continue(()) } - } + Err(e) => ControlFlow::Break(Poll::Ready(Some(arrow_err!(e)))), + }, Ok(None) => { - // Processed all in one pass - // cleared inside `process_right_unmatched` debug_assert!(self.current_right_batch.is_none()); self.state = NLJState::FetchingRight; ControlFlow::Continue(()) @@ -1758,6 +1868,17 @@ impl NestedLoopJoinStream { self.left_probe_idx = 0; self.left_emit_idx = 0; self.state = NLJState::BufferingLeft; + } else if self.is_memory_limited() + && self.should_track_unmatched_right + { + // All left chunks done — emit global right unmatched. + // Drop the exhausted right stream so that + // EmitGlobalRightUnmatched opens a fresh replay pass + // from the spill file. (process_left_unmatched_range + // already ran with right_data still set, so its + // schema access is not affected.) + self.right_data = None; + self.state = NLJState::EmitGlobalRightUnmatched; } else { self.state = NLJState::Done; } @@ -1769,6 +1890,103 @@ impl NestedLoopJoinStream { } } + /// Handle EmitGlobalRightUnmatched state. + /// + /// Replays all right batches from the spill file and emits unmatched + /// right rows using the global bitmap accumulated across all left chunks. + fn handle_emit_global_right_unmatched( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> ControlFlow>>> { + // Flush any completed batches first + if let Some(poll) = self.maybe_flush_ready_batch() { + return ControlFlow::Break(poll); + } + + // On first entry, open a new replay pass on the right input + if self.right_data.is_none() { + let SpillState::Active(ref mut active) = self.spill_state else { + unreachable!("EmitGlobalRightUnmatched without Active spill state"); + }; + active.right_batch_index = 0; + match active.right_input.open_pass() { + Ok(stream) => { + self.right_data = Some(stream); + } + Err(e) => { + return ControlFlow::Break(Poll::Ready(Some(Err(e)))); + } + } + } + + // Poll the replay stream for the next right batch + match self + .right_data + .as_mut() + .expect("right_data must be present") + .poll_next_unpin(cx) + { + Poll::Ready(Some(Ok(right_batch))) => { + if right_batch.num_rows() == 0 { + return ControlFlow::Continue(()); + } + + let SpillState::Active(ref mut active) = self.spill_state else { + unreachable!(); + }; + let idx = active.right_batch_index; + active.right_batch_index += 1; + + // Build BooleanArray from the global bitmap + let bitmap = if idx < active.global_right_bitmaps.len() { + BooleanArray::new(active.global_right_bitmaps[idx].clone(), None) + } else { + // Batch never seen — treat all rows as unmatched + BooleanArray::new( + BooleanBuffer::new_unset(right_batch.num_rows()), + None, + ) + }; + + let left_schema = Arc::clone( + active + .left_schema + .as_ref() + .expect("left_schema must be set"), + ); + + match build_unmatched_batch( + &self.output_schema, + &right_batch, + bitmap, + &left_schema, + &self.column_indices, + self.join_type, + JoinSide::Right, + ) { + Ok(Some(batch)) => match self.output_buffer.push_batch(batch) { + Ok(()) => ControlFlow::Continue(()), + Err(e) => ControlFlow::Break(Poll::Ready(Some(arrow_err!(e)))), + }, + Ok(None) => ControlFlow::Continue(()), + Err(e) => ControlFlow::Break(Poll::Ready(Some(Err(e)))), + } + } + Poll::Ready(Some(Err(e))) => ControlFlow::Break(Poll::Ready(Some(Err(e)))), + Poll::Ready(None) => { + // All right batches replayed + match self.output_buffer.finish_buffered_batch() { + Ok(()) => { + self.state = NLJState::Done; + ControlFlow::Continue(()) + } + Err(e) => ControlFlow::Break(Poll::Ready(Some(arrow_err!(e)))), + } + } + Poll::Pending => ControlFlow::Break(Poll::Pending), + } + } + /// Handle Done state - final state processing fn handle_done(&mut self) -> Poll>> { // Return any remaining completed batches before final termination @@ -3310,6 +3528,10 @@ pub(crate) mod tests { JoinType::LeftSemi, JoinType::LeftAnti, JoinType::LeftMark, + JoinType::Right, + JoinType::RightSemi, + JoinType::RightAnti, + JoinType::RightMark, ]; for join_type in &fallback_join_types { @@ -3330,34 +3552,24 @@ pub(crate) mod tests { .await?; } - // Join types that do NOT support fallback should still OOM. - let no_fallback_join_types = vec![ - JoinType::Right, - JoinType::Full, - JoinType::RightSemi, - JoinType::RightAnti, - JoinType::RightMark, - ]; - - for join_type in &no_fallback_join_types { - let runtime = RuntimeEnvBuilder::new() - .with_memory_limit(100, 1.0) - .build_arc()?; - let task_ctx = TaskContext::default().with_runtime(runtime); - let task_ctx = Arc::new(task_ctx); - - let err = multi_partitioned_join_collect( - Arc::clone(&left), - Arc::clone(&right), - join_type, - Some(filter.clone()), - task_ctx, - ) - .await - .unwrap_err(); - - assert_contains!(err.to_string(), "Resources exhausted"); - } + // FULL JOIN with multiple right partitions is intentionally not + // supported in the fallback path yet (cross-partition left-bitmap + // coordination is missing). It should still OOM under tight memory. + let runtime = RuntimeEnvBuilder::new() + .with_memory_limit(100, 1.0) + .build_arc()?; + let task_ctx = TaskContext::default().with_runtime(runtime); + let task_ctx = Arc::new(task_ctx); + let err = multi_partitioned_join_collect( + Arc::clone(&left), + Arc::clone(&right), + &JoinType::Full, + Some(filter.clone()), + task_ctx, + ) + .await + .unwrap_err(); + assert_contains!(err.to_string(), "Resources exhausted"); Ok(()) } @@ -3571,19 +3783,156 @@ pub(crate) mod tests { } #[tokio::test] - async fn test_nlj_right_join_no_fallback_on_oom() -> Result<()> { - // RIGHT JOIN does not support multi-pass fallback (needs global right - // bitmap). OOM should propagate as an error. + async fn test_nlj_memory_limited_right_join() -> Result<()> { let task_ctx = task_ctx_with_memory_limit(50, 16)?; let left = build_left_table(); let right = build_right_table(); let filter = prepare_join_filter(); - let err = join_collect(left, right, &JoinType::Right, Some(filter), task_ctx) - .await - .unwrap_err(); + let (columns, batches, metrics) = + join_collect(left, right, &JoinType::Right, Some(filter), task_ctx).await?; - assert_contains!(err.to_string(), "Resources exhausted"); + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + + // Verify spill actually occurred + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling to occur under tight memory limit" + ); + + // Right join: all right rows appear. Unmatched right rows get NULLs on left. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+----+----+----+-----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+----+----+----+-----+ + | | | | 10 | 10 | 100 | + | | | | 12 | 10 | 40 | + | 5 | 5 | 50 | 2 | 2 | 80 | + +----+----+----+----+----+-----+ + ")); + Ok(()) + } + + #[tokio::test] + async fn test_nlj_memory_limited_full_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = + join_collect(left, right, &JoinType::Full, Some(filter), task_ctx).await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]); + + // Verify spill actually occurred + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling to occur under tight memory limit" + ); + + // Full join: unmatched from both sides appear with NULL padding. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+----+----+-----+ + | a1 | b1 | c1 | a2 | b2 | c2 | + +----+----+-----+----+----+-----+ + | | | | 10 | 10 | 100 | + | | | | 12 | 10 | 40 | + | 11 | 8 | 110 | | | | + | 5 | 5 | 50 | 2 | 2 | 80 | + | 9 | 8 | 90 | | | | + +----+----+-----+----+----+-----+ + ")); + Ok(()) + } + + #[tokio::test] + async fn test_nlj_memory_limited_right_semi_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = + join_collect(left, right, &JoinType::RightSemi, Some(filter), task_ctx) + .await?; + + assert_eq!(columns, vec!["a2", "b2", "c2"]); + + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling to occur under tight memory limit" + ); + + // Right semi: only right rows that matched at least one left row. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+----+ + | a2 | b2 | c2 | + +----+----+----+ + | 2 | 2 | 80 | + +----+----+----+ + ")); + Ok(()) + } + + #[tokio::test] + async fn test_nlj_memory_limited_right_anti_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = + join_collect(left, right, &JoinType::RightAnti, Some(filter), task_ctx) + .await?; + + assert_eq!(columns, vec!["a2", "b2", "c2"]); + + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling to occur under tight memory limit" + ); + + // Right anti: right rows that did NOT match any left row. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+ + | a2 | b2 | c2 | + +----+----+-----+ + | 10 | 10 | 100 | + | 12 | 10 | 40 | + +----+----+-----+ + ")); + Ok(()) + } + + #[tokio::test] + async fn test_nlj_memory_limited_right_mark_join() -> Result<()> { + let task_ctx = task_ctx_with_memory_limit(50, 16)?; + let left = build_left_table(); + let right = build_right_table(); + let filter = prepare_join_filter(); + + let (columns, batches, metrics) = + join_collect(left, right, &JoinType::RightMark, Some(filter), task_ctx) + .await?; + + assert_eq!(columns, vec!["a2", "b2", "c2", "mark"]); + + assert!( + metrics.spill_count().unwrap_or(0) > 0, + "Expected spilling to occur under tight memory limit" + ); + + // Right mark: all right rows with a bool column indicating match. + allow_duplicates!(assert_snapshot!(batches_to_sort_string(&batches), @r" + +----+----+-----+-------+ + | a2 | b2 | c2 | mark | + +----+----+-----+-------+ + | 10 | 10 | 100 | false | + | 12 | 10 | 40 | false | + | 2 | 2 | 80 | true | + +----+----+-----+-------+ + ")); Ok(()) } } diff --git a/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt b/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt index 7b5da1d4b8e0..b47fc5ac877c 100644 --- a/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt +++ b/datafusion/sqllogictest/test_files/nested_loop_join_spill.slt @@ -57,6 +57,77 @@ Plan with Metrics 06)------ProjectionExec: expr=[value@0 as v2], metrics=[] 07)--------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=1, batch_size=8192], metrics=[] +# --- RIGHT JOIN with non-equijoin predicate --- +# Every (v1, v2=1) pair passes the predicate `v1 + v2 > 0`, so all +# 100000 left rows match the single right row. Output count = 100000. +query I nosort +SELECT count(*) +FROM generate_series(1, 100000) AS t1(v1) +RIGHT JOIN generate_series(1, 1) AS t2(v2) + ON (t1.v1 + t2.v2) > 0 +---- +100000 + +# --- RIGHT JOIN with partial matches under spill --- +# Left has 100000 rows (forces spill); right has 200 rows. +# The predicate `(t1.v1 + t2.v2) = 2 AND t2.v2 <= 100` is non-equi from +# the optimizer's perspective (no equi-join key), so the plan uses NLJ. +# Behavior: +# - v2 = 1: needs v1 = 1 → 1 match +# - v2 in [2..100]: needs v1 = 2 - v2 ≤ 0, no match → NULL-padded left +# - v2 in [101..200]: condition false → NULL-padded left +# Output = 1 matched pair + 199 unmatched right rows = 200 total. +# This exercises the per-batch global right bitmap: each right batch is +# replayed across many left chunks, and within each batch a few bits flip +# on (only when v2=1 sees v1=1) while the rest stay off across all passes. +query III nosort +SELECT count(*) as cnt, + sum(case when t1.v1 IS NULL then 1 else 0 end) as unmatched_right, + sum(case when t1.v1 IS NOT NULL then 1 else 0 end) as matched +FROM generate_series(1, 100000) AS t1(v1) +RIGHT JOIN generate_series(1, 200) AS t2(v2) + ON (t1.v1 + t2.v2) = 2 AND t2.v2 <= 100 +---- +200 199 1 + +# Verify spill metrics for the partial-match RIGHT JOIN above. +# The optimizer pushes `t2.v2 <= 100` into a projection on the right side +# (rewriting it as a `join_proj_push_down_*` boolean column), so the join +# filter becomes `v1@0 + v2@1 = 2 AND join_proj_push_down_1@2` and an +# extra ProjectionExec is inserted between AggregateExec and NLJ. +query TT +EXPLAIN ANALYZE +SELECT count(*) +FROM generate_series(1, 100000) AS t1(v1) +RIGHT JOIN generate_series(1, 200) AS t2(v2) + ON (t1.v1 + t2.v2) = 2 AND t2.v2 <= 100 +---- +Plan with Metrics +01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)], metrics=[] +02)--AggregateExec: mode=Single, gby=[], aggr=[count(Int64(1))], metrics=[] +03)----ProjectionExec: expr=[], metrics=[] +04)------NestedLoopJoinExec: join_type=Right, filter=v1@0 + v2@1 = 2 AND join_proj_push_down_1@2, projection=[v1@0, v2@1], metrics=[output_rows=200, spill_count=2, ] +05)--------ProjectionExec: expr=[value@0 as v1], metrics=[] +06)----------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=100000, batch_size=8192], metrics=[] +07)--------ProjectionExec: expr=[value@0 as v2, value@0 <= 100 as join_proj_push_down_1], metrics=[] +08)----------LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=200, batch_size=8192], metrics=[] + +# --- FULL JOIN with partial matches under spill --- +# Same predicate as above. Output: +# - 1 matched pair (v1=1, v2=1) +# - 199 unmatched right rows (NULL-padded left) +# - 99999 unmatched left rows (v1 != 1, NULL-padded right) +# Total = 1 + 199 + 99999 = 100199 +query III nosort +SELECT count(*) as cnt, + sum(case when t1.v1 IS NULL then 1 else 0 end) as unmatched_right, + sum(case when t2.v2 IS NULL then 1 else 0 end) as unmatched_left +FROM generate_series(1, 100000) AS t1(v1) +FULL JOIN generate_series(1, 200) AS t2(v2) + ON (t1.v1 + t2.v2) = 2 AND t2.v2 <= 100 +---- +100199 199 99999 + # Restore settings to slt runner defaults statement ok RESET datafusion.runtime.memory_limit