Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 60 additions & 11 deletions be/src/exprs/aggregate/aggregate_function_window_funnel_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,14 @@ struct WindowFunnelStateV2 {
int event_idx = get_event_idx(evt.event_idx) - 1;

if (event_idx == 0) {
// Duplicate of event 0: terminate current chain first
// Duplicate of event 0: terminate current chain first.
// However, if this E0 is from the same row as an event already
// in the chain, it's a multi-match row — not a true duplicate.
// V1 doesn't break chains on same-row events, so skip it.
if (events_timestamp[0].has_value()) {
if (_is_same_row_as_chain(i, curr_level, events_timestamp)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This skip is broader than the bug description and still leaves a correctness hole in deduplication mode. If a row contributes E1 to the current chain and also matches E0, skipping that E0 throws away a valid restart candidate.

Concrete case with window = 15:

  • r0: E0 @ 00
  • r1: E0 + E1 @ 10
  • r2: E1 @ 11
  • r3: E2 @ 12

V1 can return 3 by starting a new chain from r1 (E0@r1 -> E1@r2 -> E2@r3). With this code, E0@r1 is discarded because it is on the same row as the already-used E1, then E1@r2 is treated as a duplicate and the result stays 2. So the patch still does not preserve the V1 deduplication semantics it is trying to recover here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个应该不算问题,只是语义上的diff

continue;
}
if (curr_level > max_level) {
max_level = curr_level;
}
Expand All @@ -434,7 +440,12 @@ struct WindowFunnelStateV2 {
events_timestamp[0] = {evt.timestamp, evt.timestamp, i};
curr_level = 0;
} else if (events_timestamp[event_idx].has_value()) {
// Duplicate event detected: this level was already matched
// Duplicate event detected: this level was already matched.
// But if this duplicate is from the same row as a chain event,
// it's a multi-match row — not a true duplicate. Skip it.
if (_is_same_row_as_chain(i, curr_level, events_timestamp)) {
continue;
}
if (curr_level > max_level) {
max_level = curr_level;
}
Expand All @@ -456,10 +467,18 @@ struct WindowFunnelStateV2 {
return max_level + 1;
}

/// FIXED mode (StarRocks-style semantics): if a matched event appears whose
/// FIXED mode: if a matched event appears whose
/// predecessor level has NOT been matched, the chain is broken (event level jumped).
/// Note: V2 semantics differ from V1. V1 checks physical row adjacency;
/// V2 checks event level continuity (unmatched rows don't break the chain).
///
/// Same-row multi-condition handling: when a row matches multiple conditions,
/// its events are stored in descending event_idx order. A higher-index event
/// (e.g., cond4) might trigger a level jump before a lower-index event (e.g.,
/// cond2) gets a chance to advance the chain. To fix this, before triggering
/// a level jump, we look ahead through same-row continuation events to check
/// if any of them could advance the chain. If so, we skip the level jump and
/// let the advancing event be processed in a subsequent iteration.
int _get_fixed() const {
std::vector<TimestampPair> events_timestamp(event_count);
int max_level = -1;
Expand All @@ -473,21 +492,36 @@ struct WindowFunnelStateV2 {
if (event_idx == 0) {
// Save current chain before starting a new one
if (events_timestamp[0].has_value()) {
if (curr_level > max_level) {
max_level = curr_level;
}
max_level = std::max(curr_level, max_level);
_eliminate_chain(curr_level, events_timestamp);
}
events_timestamp[0] = {evt.timestamp, evt.timestamp, i};
curr_level = 0;
first_event = true;
} else if (first_event && !events_timestamp[event_idx - 1].has_value()) {
// Event level jumped: predecessor was not matched
if (curr_level >= 0) {
if (curr_level > max_level) {
max_level = curr_level;
// Event level jumped: predecessor was not matched.
// Before breaking the chain, check if any same-row continuation event
// could advance the chain instead. Same-row events follow consecutively
// with continuation flags set.
bool same_row_can_advance = false;
for (size_t j = i + 1;
j < events_list.size() && is_continuation(events_list[j].event_idx); ++j) {
int sr_idx = get_event_idx(events_list[j].event_idx) - 1;
// The continuation event must actually advance the chain beyond
// curr_level; a duplicate of an already-matched level is not an
// advancement and should not suppress the level-jump break.
if (sr_idx > curr_level && sr_idx > 0 &&
events_timestamp[sr_idx - 1].has_value() &&
!_is_same_row(events_timestamp[sr_idx - 1].last_list_idx, j)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lookahead is too permissive for FIXED mode. It treats any same-row continuation whose predecessor level exists as proof that the current jump should be ignored, but that also matches same-row duplicates that would not advance the chain.

Concrete case: E0@r0, E1@r1, (E3,E1 same row)@r2, E2@r3, E3@r4. When processing E3@r2, events_timestamp[1] is set from r1, so the loop marks same_row_can_advance = true because E1@r2 sees an existing predecessor (E0@r0) from another row. But E1@r2 is only a duplicate of an already-matched level, not an advancement, so FIXED mode should still break on the E3 jump. With the current code the chain survives and can later reach level 4 incorrectly.

The lookahead needs to check that the same-row event would actually promote the chain, e.g. sr_idx > curr_level (or an equivalent advancement test), rather than only checking that its predecessor exists.

same_row_can_advance = true;
break;
}
}
if (!same_row_can_advance) {
if (curr_level >= 0) {
max_level = std::max(curr_level, max_level);
_eliminate_chain(curr_level, events_timestamp);
}
_eliminate_chain(curr_level, events_timestamp);
}
} else if (events_timestamp[event_idx - 1].has_value() &&
!_is_same_row(events_timestamp[event_idx - 1].last_list_idx, i)) {
Expand All @@ -512,6 +546,21 @@ struct WindowFunnelStateV2 {
}
}

/// Check if the event at position `curr_i` in events_list is from the same original
/// row as any event currently recorded in the chain (levels 0..curr_level).
/// This is used by deduplication mode to avoid breaking a chain when the "duplicate"
/// or "new E0" is actually part of a multi-match row that already contributed to the chain.
bool _is_same_row_as_chain(size_t curr_i, int curr_level,
const std::vector<TimestampPair>& events_timestamp) const {
for (int k = 0; k <= curr_level; ++k) {
if (events_timestamp[k].has_value() &&
_is_same_row(events_timestamp[k].last_list_idx, curr_i)) {
return true;
}
}
return false;
}

/// Check if the event at `current_idx` in events_list is from the same original row
/// as the event at `prev_idx`. Same-row events are consecutive in the sorted list
/// with continuation flags set. We walk backwards from current_idx checking if every
Expand Down
Loading
Loading