From 31702ce1a07decd1cb008e5ee862cd92f7ccb7c3 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 2 Apr 2026 15:24:41 +0800 Subject: [PATCH 1/6] fix wrong result of window funnel v2 + deduplication mode --- .../aggregate_function_window_funnel_v2.h | 30 +++- .../aggregate/vec_window_funnel_v2_test.cpp | 166 +++++++++++++++++- .../query_p0/aggregate/window_funnel_v2.out | 6 + .../aggregate/window_funnel_v2.groovy | 61 +++++++ 4 files changed, 260 insertions(+), 3 deletions(-) diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h index 8038c0bef471ed..3c3199743a32a3 100644 --- a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h +++ b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h @@ -423,8 +423,14 @@ struct WindowFunnelStateV2 { int event_idx = get_event_idx(evt.event_idx) - 1; if (event_idx == 0) { - // Duplicate of event 0: terminate current chain first + // Duplicate of event 0: terminate current chain first. + // However, if this E0 is from the same row as an event already + // in the chain, it's a multi-match row — not a true duplicate. + // V1 doesn't break chains on same-row events, so skip it. if (events_timestamp[0].has_value()) { + if (_is_same_row_as_chain(i, curr_level, events_timestamp)) { + continue; + } if (curr_level > max_level) { max_level = curr_level; } @@ -434,7 +440,12 @@ struct WindowFunnelStateV2 { events_timestamp[0] = {evt.timestamp, evt.timestamp, i}; curr_level = 0; } else if (events_timestamp[event_idx].has_value()) { - // Duplicate event detected: this level was already matched + // Duplicate event detected: this level was already matched. + // But if this duplicate is from the same row as a chain event, + // it's a multi-match row — not a true duplicate. Skip it. + if (_is_same_row_as_chain(i, curr_level, events_timestamp)) { + continue; + } if (curr_level > max_level) { max_level = curr_level; } @@ -512,6 +523,21 @@ struct WindowFunnelStateV2 { } } + /// Check if the event at position `curr_i` in events_list is from the same original + /// row as any event currently recorded in the chain (levels 0..curr_level). + /// This is used by deduplication mode to avoid breaking a chain when the "duplicate" + /// or "new E0" is actually part of a multi-match row that already contributed to the chain. + bool _is_same_row_as_chain(size_t curr_i, int curr_level, + const std::vector& events_timestamp) const { + for (int k = 0; k <= curr_level; ++k) { + if (events_timestamp[k].has_value() && + _is_same_row(events_timestamp[k].last_list_idx, curr_i)) { + return true; + } + } + return false; + } + /// Check if the event at `current_idx` in events_list is from the same original row /// as the event at `prev_idx`. Same-row events are consecutive in the sorted list /// with continuation flags set. We walk backwards from current_idx checking if every diff --git a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp index 6e82d430513822..0ab0821463a6c0 100644 --- a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp +++ b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp @@ -1164,4 +1164,168 @@ TEST_F(VWindowFunnelV2Test, testIncreaseModeOldChainBetter) { } } -} // namespace doris +// Test DEDUPLICATION mode: same-row multi-event matching should NOT break the chain. +// This is the exact scenario from the bug report where V1 returns 3 but V2 returned 2. +// +// 3 conditions, window=1 day (86400s), DEDUPLICATION mode +// Row 0 (t=10:00): event1=T, event2=T, event3=F → matches E0+E1 on same row +// Row 1 (t=11:00): event1=T, event2=T, event3=F → matches E0+E1 on same row +// Row 2 (t=12:00): event1=T, event2=F, event3=T → matches E0+E2 on same row +// +// V2 stores events in reverse order per row: [E1, E0] for each multi-match row. +// After sort, events_list for row0: (t=10:00, E1-cont), (t=10:00, E0) +// Bug: when processing E0 from row0 after E1 was already used to advance the chain, +// old V2 would break the chain. With the fix, E0 is recognized as same-row and skipped. +// +// Expected chain: E0@row0 → E1@row1 → E2@row2 = 3 +TEST_F(VWindowFunnelV2Test, testDeduplicationSameRowMultiEvent) { + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + DataTypes data_types_3 = { + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared()}; + auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false, + BeExecVersionManager::get_newest_version()); + ASSERT_NE(agg_func_3, nullptr); + + const int NUM_ROWS = 3; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_mode->insert(Field::create_field("deduplication")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + VecDateTimeValue tv0, tv1, tv2; + tv0.unchecked_set_time(2022, 3, 12, 10, 0, 0); + tv1.unchecked_set_time(2022, 3, 12, 11, 0, 0); + tv2.unchecked_set_time(2022, 3, 12, 12, 0, 0); + auto dtv2_0 = tv0.to_datetime_v2(); + auto dtv2_1 = tv1.to_datetime_v2(); + auto dtv2_2 = tv2.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2_0, 0); + column_timestamp->insert_data((char*)&dtv2_1, 0); + column_timestamp->insert_data((char*)&dtv2_2, 0); + + // Row 0: event1=T, event2=T, event3=F (matches E0 and E1) + // Row 1: event1=T, event2=T, event3=F (matches E0 and E1) + // Row 2: event1=T, event2=F, event3=T (matches E0 and E2) + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(1)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_window->insert(Field::create_field(86400)); + } + + std::unique_ptr memory(new char[agg_func_3->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_func_3->create(place); + const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_event1.get(), column_event2.get(), column_event3.get()}; + for (int i = 0; i < NUM_ROWS; i++) { + agg_func_3->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_func_3->insert_result_into(place, column_result); + // Chain: E0@row0 -> E1@row1 -> E2@row2 = 3 + // Before fix: V2 returned 2 because same-row E0 broke the chain after E1 was used. + EXPECT_EQ(column_result.get_data()[0], 3); + agg_func_3->destroy(place); +} + +// Test DEDUPLICATION mode: a true duplicate on a DIFFERENT row should still break the chain. +// This ensures the fix doesn't over-suppress chain breaks. +// +// 3 conditions, window=86400s, DEDUPLICATION mode +// Row 0 (t=10:00): event1=T only +// Row 1 (t=11:00): event2=T only +// Row 2 (t=12:00): event1=T only ← true duplicate E0 from different row → breaks chain +// Row 3 (t=13:00): event3=T only +// +// Expected: 2 (chain E0@row0 → E1@row1, then E0@row2 breaks it; new chain E0@row2 alone = 1) +TEST_F(VWindowFunnelV2Test, testDeduplicationTrueDuplicateStillBreaks) { + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + DataTypes data_types_3 = { + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared()}; + auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false, + BeExecVersionManager::get_newest_version()); + ASSERT_NE(agg_func_3, nullptr); + + const int NUM_ROWS = 4; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_mode->insert(Field::create_field("deduplication")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + VecDateTimeValue tv0, tv1, tv2, tv3; + tv0.unchecked_set_time(2022, 3, 12, 10, 0, 0); + tv1.unchecked_set_time(2022, 3, 12, 11, 0, 0); + tv2.unchecked_set_time(2022, 3, 12, 12, 0, 0); + tv3.unchecked_set_time(2022, 3, 12, 13, 0, 0); + auto dtv2_0 = tv0.to_datetime_v2(); + auto dtv2_1 = tv1.to_datetime_v2(); + auto dtv2_2 = tv2.to_datetime_v2(); + auto dtv2_3 = tv3.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2_0, 0); + column_timestamp->insert_data((char*)&dtv2_1, 0); + column_timestamp->insert_data((char*)&dtv2_2, 0); + column_timestamp->insert_data((char*)&dtv2_3, 0); + + // Row 0: event1=T only + // Row 1: event2=T only + // Row 2: event1=T only (true duplicate from different row) + // Row 3: event3=T only + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(0)); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(0)); + column_event2->insert(Field::create_field(0)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(0)); + column_event3->insert(Field::create_field(1)); + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_window->insert(Field::create_field(86400)); + } + + std::unique_ptr memory(new char[agg_func_3->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_func_3->create(place); + const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_event1.get(), column_event2.get(), column_event3.get()}; + for (int i = 0; i < NUM_ROWS; i++) { + agg_func_3->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_func_3->insert_result_into(place, column_result); + // Chain: E0@row0 -> E1@row1 (level=2), then E0@row2 is a TRUE duplicate from a + // different row → breaks chain. New chain: E0@row2, no E1 → level=1. + // max(2, 1) = 2 + EXPECT_EQ(column_result.get_data()[0], 2); + agg_func_3->destroy(place); +} diff --git a/regression-test/data/query_p0/aggregate/window_funnel_v2.out b/regression-test/data/query_p0/aggregate/window_funnel_v2.out index 5d91239ad73523..c8318ab8c79f0d 100644 --- a/regression-test/data/query_p0/aggregate/window_funnel_v2.out +++ b/regression-test/data/query_p0/aggregate/window_funnel_v2.out @@ -86,3 +86,9 @@ -- !v2_increase_old_chain_better -- 2 +-- !v2_dedup_same_row_multi_event -- +3 + +-- !v2_dedup_true_dup_breaks -- +2 + diff --git a/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy b/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy index e8157c23543e8e..5668024af6c1ea 100644 --- a/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy +++ b/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy @@ -486,5 +486,66 @@ suite("window_funnel_v2") { from windowfunnel_v2_test t; """ + // ==================== DEDUPLICATION mode: same-row multi-event bug fix ==================== + // Regression test for the bug where deduplication mode incorrectly breaks the chain + // when a single row matches multiple conditions including E0. + // V1 returns 3, V2 was returning 2 before the fix. + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ + sql """ + CREATE TABLE IF NOT EXISTS windowfunnel_v2_test ( + xwho varchar(50) NULL COMMENT 'xwho', + xwhen datetimev2(3) COMMENT 'xwhen', + xwhat int NULL COMMENT 'xwhat' + ) + DUPLICATE KEY(xwho) + DISTRIBUTED BY HASH(xwho) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + // Case 1: Rows matching both E0+E1 or E0+E2 simultaneously. + // Row 0 (t=10:00): xwhat=1 → matches E0 (xwhat=1) and E1 (xwhat!=2) + // Row 1 (t=11:00): xwhat=1 → matches E0 (xwhat=1) and E1 (xwhat!=2) + // Row 2 (t=12:00): xwhat=3 → matches E0 (xwhat=1 false, but see below), E1 (xwhat!=2) and E2 (xwhat=3) + // Using conditions: xwhat=1, xwhat!=2, xwhat=3 + // Row 0: E0=T(1=1), E1=T(1!=2), E2=F(1!=3) → same-row E0+E1 + // Row 1: E0=T(1=1), E1=T(1!=2), E2=F(1!=3) → same-row E0+E1 + // Row 2: E0=F(3!=1), E1=T(3!=2), E2=T(3=3) → same-row E1+E2 + // Expected chain: E0@row0 → E1@row1 → E2@row2 = 3 + sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 10:00:00.000', 1)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 11:00:00.000', 1)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 12:00:00.000', 3)" + order_qt_v2_dedup_same_row_multi_event """ + select + window_funnel( + 86400, + 'deduplication', + t.xwhen, + t.xwhat = 1, + t.xwhat != 2, + t.xwhat = 3 + ) AS level + from windowfunnel_v2_test t; + """ + + // Case 2: True duplicate from a different row should still break the chain. + sql """ truncate table windowfunnel_v2_test; """ + sql "INSERT into windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 10:00:00.000', 1)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 11:00:00.000', 2)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 12:00:00.000', 1)" + sql "INSERT INTO windowfunnel_v2_test (xwho, xwhen, xwhat) VALUES('1', '2022-03-12 13:00:00.000', 3)" + order_qt_v2_dedup_true_dup_breaks """ + select + window_funnel( + 86400, + 'deduplication', + t.xwhen, + t.xwhat = 1, + t.xwhat = 2, + t.xwhat = 3 + ) AS level + from windowfunnel_v2_test t; + """ + sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ } From 4282f0655b931d5edf40e85bd1a85660f11ee113 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Thu, 2 Apr 2026 15:46:39 +0800 Subject: [PATCH 2/6] fix --- be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp index 0ab0821463a6c0..d90a05da06bcda 100644 --- a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp +++ b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp @@ -1329,3 +1329,5 @@ TEST_F(VWindowFunnelV2Test, testDeduplicationTrueDuplicateStillBreaks) { EXPECT_EQ(column_result.get_data()[0], 2); agg_func_3->destroy(place); } + +} // namespace doris From ade82efd1f60a47abfe444ff4e2fd056fa9dc0a7 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 3 Apr 2026 12:47:55 +0800 Subject: [PATCH 3/6] update --- .../aggregate_function_window_funnel_v2.h | 37 ++++++--- .../aggregate/vec_window_funnel_v2_test.cpp | 65 +++++++++++++++ .../query_p0/aggregate/window_funnel_v2.out | 6 ++ .../aggregate/window_funnel_v2.groovy | 80 +++++++++++++++++++ 4 files changed, 179 insertions(+), 9 deletions(-) diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h index 3c3199743a32a3..3bb3f9036c4ab1 100644 --- a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h +++ b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h @@ -467,10 +467,18 @@ struct WindowFunnelStateV2 { return max_level + 1; } - /// FIXED mode (StarRocks-style semantics): if a matched event appears whose + /// FIXED mode: if a matched event appears whose /// predecessor level has NOT been matched, the chain is broken (event level jumped). /// Note: V2 semantics differ from V1. V1 checks physical row adjacency; /// V2 checks event level continuity (unmatched rows don't break the chain). + /// + /// Same-row multi-condition handling: when a row matches multiple conditions, + /// its events are stored in descending event_idx order. A higher-index event + /// (e.g., cond4) might trigger a level jump before a lower-index event (e.g., + /// cond2) gets a chance to advance the chain. To fix this, before triggering + /// a level jump, we look ahead through same-row continuation events to check + /// if any of them could advance the chain. If so, we skip the level jump and + /// let the advancing event be processed in a subsequent iteration. int _get_fixed() const { std::vector events_timestamp(event_count); int max_level = -1; @@ -484,21 +492,32 @@ struct WindowFunnelStateV2 { if (event_idx == 0) { // Save current chain before starting a new one if (events_timestamp[0].has_value()) { - if (curr_level > max_level) { - max_level = curr_level; - } + max_level = std::max(curr_level, max_level); _eliminate_chain(curr_level, events_timestamp); } events_timestamp[0] = {evt.timestamp, evt.timestamp, i}; curr_level = 0; first_event = true; } else if (first_event && !events_timestamp[event_idx - 1].has_value()) { - // Event level jumped: predecessor was not matched - if (curr_level >= 0) { - if (curr_level > max_level) { - max_level = curr_level; + // Event level jumped: predecessor was not matched. + // Before breaking the chain, check if any same-row continuation event + // could advance the chain instead. Same-row events follow consecutively + // with continuation flags set. + bool same_row_can_advance = false; + for (size_t j = i + 1; + j < events_list.size() && is_continuation(events_list[j].event_idx); ++j) { + int sr_idx = get_event_idx(events_list[j].event_idx) - 1; + if (sr_idx > 0 && events_timestamp[sr_idx - 1].has_value() && + !_is_same_row(events_timestamp[sr_idx - 1].last_list_idx, j)) { + same_row_can_advance = true; + break; + } + } + if (!same_row_can_advance) { + if (curr_level >= 0) { + max_level = std::max(curr_level, max_level); + _eliminate_chain(curr_level, events_timestamp); } - _eliminate_chain(curr_level, events_timestamp); } } else if (events_timestamp[event_idx - 1].has_value() && !_is_same_row(events_timestamp[event_idx - 1].last_list_idx, i)) { diff --git a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp index d90a05da06bcda..ae21d106d816c9 100644 --- a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp +++ b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp @@ -1330,4 +1330,69 @@ TEST_F(VWindowFunnelV2Test, testDeduplicationTrueDuplicateStillBreaks) { agg_func_3->destroy(place); } +// Fixed mode: same-row multi-condition level jump fix. +// When a row matches both a high-index condition (c3) and a low-index condition (c2), +// the high-index event was processed first and triggered a spurious level jump before +// the low-index event could advance the chain. +TEST_F(VWindowFunnelV2Test, testFixedSameRowMultiCondLevelJump) { + // Use 3 conditions: c1=event1, c2=event2, c3=event3 + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + DataTypes data_types_3 = { + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared()}; + auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false, + BeExecVersionManager::get_newest_version()); + ASSERT_NE(agg_func_3, nullptr); + + const int NUM_ROWS = 2; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_mode->insert(Field::create_field("fixed")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + VecDateTimeValue tv0, tv1; + tv0.unchecked_set_time(2020, 1, 1, 0, 0, 0); + tv1.unchecked_set_time(2020, 1, 2, 0, 0, 0); + auto dtv2_0 = tv0.to_datetime_v2(); + auto dtv2_1 = tv1.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2_0, 0); + column_timestamp->insert_data((char*)&dtv2_1, 0); + + // Row 0: c1=T, c2=T, c3=T (all match — starts chain via c1) + // Row 1: c1=F, c2=T, c3=T (c3 would trigger level jump before c2 can advance) + auto column_event1 = ColumnUInt8::create(); + column_event1->insert(Field::create_field(1)); + column_event1->insert(Field::create_field(0)); + + auto column_event2 = ColumnUInt8::create(); + column_event2->insert(Field::create_field(1)); + column_event2->insert(Field::create_field(1)); + + auto column_event3 = ColumnUInt8::create(); + column_event3->insert(Field::create_field(1)); + column_event3->insert(Field::create_field(1)); + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_window->insert(Field::create_field(86400)); + } + + std::unique_ptr memory(new char[agg_func_3->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_func_3->create(place); + const IColumn* column[6] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_event1.get(), column_event2.get(), column_event3.get()}; + for (int i = 0; i < NUM_ROWS; i++) { + agg_func_3->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_func_3->insert_result_into(place, column_result); + // Before fix: c3 from row 1 triggered level jump (c2 predecessor not matched) → result 1. + // After fix: same-row look-ahead finds c2 can advance → skip level jump → result 2. + EXPECT_EQ(column_result.get_data()[0], 2); + agg_func_3->destroy(place); +} + } // namespace doris diff --git a/regression-test/data/query_p0/aggregate/window_funnel_v2.out b/regression-test/data/query_p0/aggregate/window_funnel_v2.out index c8318ab8c79f0d..68a492774de894 100644 --- a/regression-test/data/query_p0/aggregate/window_funnel_v2.out +++ b/regression-test/data/query_p0/aggregate/window_funnel_v2.out @@ -92,3 +92,9 @@ -- !v2_dedup_true_dup_breaks -- 2 +-- !v2_fixed_same_row_level_jump -- +2 + +-- !v2_fixed_multi_cond_complex -- +2 + diff --git a/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy b/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy index 5668024af6c1ea..f78cfb84036c83 100644 --- a/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy +++ b/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy @@ -548,4 +548,84 @@ suite("window_funnel_v2") { """ sql """ DROP TABLE IF EXISTS windowfunnel_v2_test """ + + // ==================== FIXED mode: same-row multi-condition level jump fix ==================== + // When a row matches both a high-index condition (e.g., c3) and a low-index condition (e.g., c2), + // the high-index event was processed first (descending order) and triggered a spurious level jump + // before the low-index event could advance the chain. This test verifies the fix. + + // Minimal reproduction: 3 conditions, 2 rows + sql """ DROP TABLE IF EXISTS windowfunnel_v2_fixed_multi_cond """ + sql """ + CREATE TABLE windowfunnel_v2_fixed_multi_cond ( + ts datetime, + val int + ) DUPLICATE KEY(ts) + DISTRIBUTED BY HASH(ts) BUCKETS 3 + PROPERTIES ("replication_num" = "1"); + """ + sql """ + INSERT INTO windowfunnel_v2_fixed_multi_cond VALUES + ('2020-01-01 00:00:00', 3), + ('2020-01-02 00:00:00', 1); + """ + // Row 1: val=3 -> c1(>2)=T, c2(<5)=T, c3(>0)=T (starts chain) + // Row 2: val=1 -> c1=F, c2(<5)=T, c3(>0)=T + // Bug: E2(c3) from row 2 triggered level jump before E1(c2) could advance. + // Fix: same-row look-ahead detects E1 can advance, skips level jump. Result=2. + order_qt_v2_fixed_same_row_level_jump """ + SELECT window_funnel(86400, 'fixed', ts, val > 2, val < 5, val > 0) AS level + FROM windowfunnel_v2_fixed_multi_cond; + """ + + // 4 conditions with overlapping column-based predicates + sql """ DROP TABLE IF EXISTS windowfunnel_v2_fixed_multi_cond """ + sql """ + CREATE TABLE windowfunnel_v2_fixed_multi_cond ( + col_date date NULL, + col_int int NULL, + col_datetime datetime NULL, + pk int + ) DUPLICATE KEY(col_date, col_int) + DISTRIBUTED BY HASH(pk) BUCKETS 10 + PROPERTIES ("replication_num" = "1"); + """ + sql """ + INSERT INTO windowfunnel_v2_fixed_multi_cond (pk, col_int, col_date, col_datetime) VALUES + (0,2,'2011-03-24','2009-09-17 17:25:39'),(1,null,'2023-12-10','2001-09-19 20:24:22'), + (2,1,null,'2003-04-04 02:59:52'),(3,8,'2023-12-16','2004-02-13 03:59:09'), + (4,5,'2023-12-18','2015-09-09 23:02:51'),(5,1,'2023-12-14','2001-10-21 07:04:03'), + (6,3,'2023-12-10','2009-04-07 10:55:27'),(7,null,'2023-12-14','2018-11-02 09:18:56'), + (8,1,'2023-12-11','2015-05-03 17:34:12'),(9,1,'2023-12-12','2010-12-25 06:21:52'), + (10,3,null,'2008-06-02 22:49:15'),(11,0,'2023-12-12','2010-11-13 22:32:47'), + (12,1,'2023-12-16','2019-04-28 20:36:03'),(13,null,null,'2019-01-01 21:24:32'), + (14,3,'2023-12-15','2006-08-17 21:47:52'),(15,3,null,'2015-12-23 05:58:29'), + (16,5,'2013-09-15','2018-08-20 22:15:56'),(17,5,'2023-12-11','2017-01-17 11:34:56'), + (18,7,'2023-12-18','2016-01-16 13:00:40'),(19,null,'2023-12-13','2003-03-16 01:28:28'), + (20,0,'2023-12-16','2006-08-28 22:48:19'),(21,8,'2013-10-28','2007-01-19 07:52:53'), + (22,3,'2023-12-17','2001-06-05 18:32:05'),(23,2,'2023-12-17','2001-05-15 22:39:45'), + (24,null,'2023-12-18','2015-02-06 18:44:55'),(25,null,'2023-12-11','2004-02-22 19:46:22'), + (26,0,'2023-12-17','2001-05-20 05:25:26'),(27,7,'2023-12-15','2015-05-07 15:56:20'), + (28,0,'2023-12-16','2016-02-10 04:26:23'),(29,0,'2023-12-18','2019-09-04 05:15:31'), + (30,1,'2023-12-11','2000-12-08 10:19:58'),(31,9,'2023-12-14','2004-07-11 03:51:22'), + (32,5,'2023-12-16','2013-11-18 16:32:18'),(33,6,null,'2000-11-13 05:31:42'), + (34,7,'2023-12-18','2013-01-02 22:46:46'),(35,4,'2023-12-12','2008-10-23 09:22:02'), + (36,2,'2023-12-16','2006-01-18 13:41:57'),(37,5,'2023-12-14','2008-05-27 21:09:03'), + (38,2,'2023-12-15','2007-01-15 01:30:56'),(39,1,'2023-12-18','2003-04-10 16:23:13'), + (40,null,'2023-12-18','2018-03-25 20:34:39'),(41,4,'2023-12-15','2007-10-06 00:49:06'), + (42,7,'2023-12-11','2015-11-11 23:53:16'),(43,null,'2023-12-10','2002-05-26 11:17:20'), + (44,6,'2023-12-14','2003-04-11 05:05:55'),(45,null,'2014-08-07','2000-10-02 00:55:06'), + (46,2,'2023-12-16','2018-08-26 22:48:02'),(47,2,'2023-12-11','2004-12-15 08:38:08'), + (48,5,'2019-01-19','2000-11-09 19:21:35'),(49,6,'2023-12-12','2010-11-21 05:54:00'); + """ + // c1: col_int>2, c2: col_date<'2023-12-12', c3: col_int=5, c4: col_date>'2009-10-14' + // Many rows match both c2 and c4 (e.g., col_date='2023-12-11'). In the buggy code, + // c4's event triggered a level jump before c2 could advance. Expected: 2. + order_qt_v2_fixed_multi_cond_complex """ + SELECT WINDOW_FUNNEL(1736123071, 'fixed', col_datetime, + col_int > 2, col_date < '2023-12-12', col_int = 5, col_date > '2009-10-14') + FROM windowfunnel_v2_fixed_multi_cond; + """ + + sql """ DROP TABLE IF EXISTS windowfunnel_v2_fixed_multi_cond """ } From 2730a64083471cc58b803218cf63d7040afa29a1 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 3 Apr 2026 13:11:18 +0800 Subject: [PATCH 4/6] update --- .../aggregate_function_window_funnel_v2.h | 6 +- .../aggregate/vec_window_funnel_v2_test.cpp | 71 +++++++++++++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h index 3bb3f9036c4ab1..3a269583fa591c 100644 --- a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h +++ b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h @@ -507,7 +507,11 @@ struct WindowFunnelStateV2 { for (size_t j = i + 1; j < events_list.size() && is_continuation(events_list[j].event_idx); ++j) { int sr_idx = get_event_idx(events_list[j].event_idx) - 1; - if (sr_idx > 0 && events_timestamp[sr_idx - 1].has_value() && + // The continuation event must actually advance the chain beyond + // curr_level; a duplicate of an already-matched level is not an + // advancement and should not suppress the level-jump break. + if (sr_idx > curr_level && sr_idx > 0 && + events_timestamp[sr_idx - 1].has_value() && !_is_same_row(events_timestamp[sr_idx - 1].last_list_idx, j)) { same_row_can_advance = true; break; diff --git a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp index ae21d106d816c9..ecd615509d8600 100644 --- a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp +++ b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp @@ -1395,4 +1395,75 @@ TEST_F(VWindowFunnelV2Test, testFixedSameRowMultiCondLevelJump) { agg_func_3->destroy(place); } +// Fixed mode: same-row duplicate should NOT suppress a level-jump break. +// Counterexample from review: E0@r0, E1@r1, (E3,E1)@r2, E2@r3, E3@r4. +// E1@r2 is a duplicate of an already-matched level (curr_level=2, sr_idx=1 ≤ curr_level), +// so it must not be treated as an advancement. The chain should break at E3@r2. +TEST_F(VWindowFunnelV2Test, testFixedSameRowDuplicateDoesNotSuppressJump) { + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + // 4 conditions: window, mode, timestamp, c1, c2, c3, c4 + DataTypes data_types_4 = { + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared(), + std::make_shared()}; + auto agg_func = factory.get("window_funnel_v2", data_types_4, nullptr, false, + BeExecVersionManager::get_newest_version()); + ASSERT_NE(agg_func, nullptr); + + const int NUM_ROWS = 5; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_mode->insert(Field::create_field("fixed")); + } + + // Timestamps: r0=t0, r1=t1, r2=t2, r3=t3, r4=t4 (all within window) + auto column_timestamp = ColumnDateTimeV2::create(); + for (int i = 0; i < NUM_ROWS; i++) { + VecDateTimeValue tv; + tv.unchecked_set_time(2020, 1, 1 + i, 0, 0, 0); + auto dtv2 = tv.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2, 0); + } + + // r0: c1=T, c2=F, c3=F, c4=F → E0 + // r1: c1=F, c2=T, c3=F, c4=F → E1 + // r2: c1=F, c2=T, c3=F, c4=T → E1(dup)+E3(jump) + // r3: c1=F, c2=F, c3=T, c4=F → E2 + // r4: c1=F, c2=F, c3=F, c4=T → E3 + auto column_c1 = ColumnUInt8::create(); + for (int v : {1, 0, 0, 0, 0}) column_c1->insert(Field::create_field(v)); + + auto column_c2 = ColumnUInt8::create(); + for (int v : {0, 1, 1, 0, 0}) column_c2->insert(Field::create_field(v)); + + auto column_c3 = ColumnUInt8::create(); + for (int v : {0, 0, 0, 1, 0}) column_c3->insert(Field::create_field(v)); + + auto column_c4 = ColumnUInt8::create(); + for (int v : {0, 0, 1, 0, 1}) column_c4->insert(Field::create_field(v)); + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_window->insert(Field::create_field(86400 * 10)); + } + + std::unique_ptr memory(new char[agg_func->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_func->create(place); + const IColumn* column[7] = {column_window.get(), column_mode.get(), + column_timestamp.get(), column_c1.get(), + column_c2.get(), column_c3.get(), column_c4.get()}; + for (int i = 0; i < NUM_ROWS; i++) { + agg_func->add(place, column, i, arena); + } + + ColumnInt32 column_result; + agg_func->insert_result_into(place, column_result); + // Chain: E0@r0 → E1@r1 → E3@r2 triggers level jump. E1@r2 is a duplicate + // (sr_idx=1 ≤ curr_level=2), so it must NOT suppress the break. Result = 2. + EXPECT_EQ(column_result.get_data()[0], 2); + agg_func->destroy(place); +} + } // namespace doris From 86833f43807e16556d3bd0f7ae2745482148328d Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 3 Apr 2026 13:14:18 +0800 Subject: [PATCH 5/6] update --- be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp index ecd615509d8600..1bedf385ed4730 100644 --- a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp +++ b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp @@ -1451,9 +1451,9 @@ TEST_F(VWindowFunnelV2Test, testFixedSameRowDuplicateDoesNotSuppressJump) { std::unique_ptr memory(new char[agg_func->size_of_data()]); AggregateDataPtr place = memory.get(); agg_func->create(place); - const IColumn* column[7] = {column_window.get(), column_mode.get(), - column_timestamp.get(), column_c1.get(), - column_c2.get(), column_c3.get(), column_c4.get()}; + const IColumn* column[7] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_c1.get(), column_c2.get(), column_c3.get(), + column_c4.get()}; for (int i = 0; i < NUM_ROWS; i++) { agg_func->add(place, column, i, arena); } From 7d97a04564ecdce1d4d7aae8c480f2d46288c9a6 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 3 Apr 2026 16:32:48 +0800 Subject: [PATCH 6/6] update --- .../aggregate_function_window_funnel_v2.h | 26 +++++-- .../aggregate/vec_window_funnel_v2_test.cpp | 75 +++++++++++++++++++ .../query_p0/aggregate/window_funnel_v2.out | 3 + .../aggregate/window_funnel_v2.groovy | 33 ++++++++ 4 files changed, 130 insertions(+), 7 deletions(-) diff --git a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h index 3a269583fa591c..8ff3eb1be8f483 100644 --- a/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h +++ b/be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h @@ -490,14 +490,26 @@ struct WindowFunnelStateV2 { int event_idx = get_event_idx(evt.event_idx) - 1; if (event_idx == 0) { - // Save current chain before starting a new one - if (events_timestamp[0].has_value()) { - max_level = std::max(curr_level, max_level); - _eliminate_chain(curr_level, events_timestamp); + // In FIXED mode, when a row matches multiple conditions, its events + // appear in descending index order. If a higher-index event (e.g., c2) + // already advanced the chain, the lower-index E0 (c1) from the same row + // is a continuation that should NOT restart the chain. In V1, each row + // is tested against exactly one expected condition; the row's other + // condition matches are irrelevant. We emulate this by skipping + // continuation E0 events when the chain tip was set by a same-row event. + bool skip_e0 = is_continuation(evt.event_idx) && curr_level >= 0 && + events_timestamp[curr_level].has_value() && + _is_same_row(events_timestamp[curr_level].last_list_idx, i); + if (!skip_e0) { + // Save current chain before starting a new one + if (events_timestamp[0].has_value()) { + max_level = std::max(curr_level, max_level); + _eliminate_chain(curr_level, events_timestamp); + } + events_timestamp[0] = {evt.timestamp, evt.timestamp, i}; + curr_level = 0; + first_event = true; } - events_timestamp[0] = {evt.timestamp, evt.timestamp, i}; - curr_level = 0; - first_event = true; } else if (first_event && !events_timestamp[event_idx - 1].has_value()) { // Event level jumped: predecessor was not matched. // Before breaking the chain, check if any same-row continuation event diff --git a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp index 1bedf385ed4730..aff7e341a94c60 100644 --- a/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp +++ b/be/test/exprs/aggregate/vec_window_funnel_v2_test.cpp @@ -1466,4 +1466,79 @@ TEST_F(VWindowFunnelV2Test, testFixedSameRowDuplicateDoesNotSuppressJump) { agg_func->destroy(place); } +// Fixed mode: when a row matches multiple conditions including E0 (c1), +// the E0 continuation should NOT restart the chain if a same-row event +// already advanced it. This emulates V1's row-based semantics where each +// row is tested against exactly one expected condition. +TEST_F(VWindowFunnelV2Test, testFixedContinuationE0DoesNotRestartChain) { + AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); + DataTypes data_types_3 = { + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared()}; + auto agg_func_3 = factory.get("window_funnel_v2", data_types_3, nullptr, false, + BeExecVersionManager::get_newest_version()); + ASSERT_NE(agg_func_3, nullptr); + + // 4 rows, 3 conditions (c1, c2, c3): + // r0: c1=T, c2=F, c3=F -> E0 starts chain + // r1: c1=T, c2=T, c3=F -> c2 advances chain, c1(cont) should be skipped + // r2: c1=T, c2=T, c3=F -> c2 overwrites level 1, c1(cont) skipped + // r3: c1=F, c2=F, c3=T -> c3 advances chain to level 2 + // Expected result: 3 (c1@r0 -> c2@r2 -> c3@r3) + // Without E0-skip fix, c1@r1(cont) would restart chain, and result would be 2. + const int NUM_ROWS = 4; + auto column_mode = ColumnString::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_mode->insert(Field::create_field("fixed")); + } + auto column_timestamp = ColumnDateTimeV2::create(); + VecDateTimeValue tv; + for (int i = 0; i < NUM_ROWS; i++) { + tv.unchecked_set_time(2020, 1, i + 1, 0, 0, 0); + auto dtv2 = tv.to_datetime_v2(); + column_timestamp->insert_data((char*)&dtv2, 0); + } + + auto column_window = ColumnInt64::create(); + for (int i = 0; i < NUM_ROWS; i++) { + column_window->insert(Field::create_field(86400 * 10)); + } + + // c1: r0=T, r1=T, r2=T, r3=F + auto column_c1 = ColumnUInt8::create(); + column_c1->insert(Field::create_field(1)); + column_c1->insert(Field::create_field(1)); + column_c1->insert(Field::create_field(1)); + column_c1->insert(Field::create_field(0)); + + // c2: r0=F, r1=T, r2=T, r3=F + auto column_c2 = ColumnUInt8::create(); + column_c2->insert(Field::create_field(0)); + column_c2->insert(Field::create_field(1)); + column_c2->insert(Field::create_field(1)); + column_c2->insert(Field::create_field(0)); + + // c3: r0=F, r1=F, r2=F, r3=T + auto column_c3 = ColumnUInt8::create(); + column_c3->insert(Field::create_field(0)); + column_c3->insert(Field::create_field(0)); + column_c3->insert(Field::create_field(0)); + column_c3->insert(Field::create_field(1)); + + std::unique_ptr memory(new char[agg_func_3->size_of_data()]); + AggregateDataPtr place = memory.get(); + agg_func_3->create(place); + const IColumn* columns[6] = {column_window.get(), column_mode.get(), column_timestamp.get(), + column_c1.get(), column_c2.get(), column_c3.get()}; + for (int i = 0; i < NUM_ROWS; i++) { + agg_func_3->add(place, columns, i, arena); + } + + ColumnInt32 column_result; + agg_func_3->insert_result_into(place, column_result); + EXPECT_EQ(column_result.get_data()[0], 3); + agg_func_3->destroy(place); +} + } // namespace doris diff --git a/regression-test/data/query_p0/aggregate/window_funnel_v2.out b/regression-test/data/query_p0/aggregate/window_funnel_v2.out index 68a492774de894..fde6dad4a4faa4 100644 --- a/regression-test/data/query_p0/aggregate/window_funnel_v2.out +++ b/regression-test/data/query_p0/aggregate/window_funnel_v2.out @@ -98,3 +98,6 @@ -- !v2_fixed_multi_cond_complex -- 2 +-- !v2_fixed_e0_skip -- +3 + diff --git a/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy b/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy index f78cfb84036c83..bc767ff36f4e43 100644 --- a/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy +++ b/regression-test/suites/query_p0/aggregate/window_funnel_v2.groovy @@ -628,4 +628,37 @@ suite("window_funnel_v2") { """ sql """ DROP TABLE IF EXISTS windowfunnel_v2_fixed_multi_cond """ + + // ==================== FIXED mode: continuation E0 should not restart chain ==================== + // When a row matches both the expected next condition (e.g., c2) and c1, the c2 event + // advances the chain while c1 (stored as continuation) should NOT restart a new chain. + // This matches V1's row-based semantics. + + sql """ DROP TABLE IF EXISTS windowfunnel_v2_fixed_e0_skip """ + sql """ + CREATE TABLE windowfunnel_v2_fixed_e0_skip ( + ts datetime, + val int + ) DUPLICATE KEY(ts) + DISTRIBUTED BY HASH(ts) BUCKETS 3 + PROPERTIES ("replication_num" = "1"); + """ + sql """ + INSERT INTO windowfunnel_v2_fixed_e0_skip VALUES + ('2020-01-01 00:00:00', 1), + ('2020-01-02 00:00:00', 3), + ('2020-01-03 00:00:00', 3), + ('2020-01-04 00:00:00', -1); + """ + // r0: val=1 -> c1(val>0)=T, c2(val>2)=F, c3(val<0)=F. Starts chain. + // r1: val=3 -> c1=T, c2=T, c3=F. c2 advances chain; c1(cont) should NOT restart. + // r2: val=3 -> c1=T, c2=T, c3=F. c2 overwrites level 1; c1(cont) skipped. + // r3: val=-1 -> c1=F, c2=F, c3=T. c3 advances to level 2. + // Expected: 3 (c1@r0 -> c2@r2 -> c3@r3) + order_qt_v2_fixed_e0_skip """ + SELECT window_funnel(864000, 'fixed', ts, val > 0, val > 2, val < 0) AS level + FROM windowfunnel_v2_fixed_e0_skip; + """ + + sql """ DROP TABLE IF EXISTS windowfunnel_v2_fixed_e0_skip """ }