Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions src/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1394,11 +1394,6 @@ int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
return 0;
}

if (streamCompareID(&s->first_id, &s->max_deleted_entry_id) > 0) {
/* The latest tombstone is before the first entry. */
return 0;
}

if (start) {
start_id = *start;
} else {
Expand Down Expand Up @@ -1490,6 +1485,11 @@ long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
return s->entries_added;
}

if (!streamIDEqZero(id) && streamCompareID(id, &s->max_deleted_entry_id) < 0) {
/* The ID is before the last tombstone, so the counter is unknown. */
return SCG_INVALID_ENTRIES_READ;
}

int cmp_last = streamCompareID(id, &s->last_id);
if (cmp_last == 0) {
/* Return the exact counter of the last entry in the stream. */
Expand Down Expand Up @@ -1678,10 +1678,11 @@ size_t streamReplyWithRange(client *c,
while (streamIteratorGetID(&si, &id, &numfields)) {
/* Update the group last_id if needed. */
if (group && streamCompareID(&id, &group->last_id) > 0) {
if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s, &id, NULL)) {
/* A valid counter and no future tombstones mean we can
* increment the read counter to keep tracking the group's
* progress. */
if (group->entries_read != SCG_INVALID_ENTRIES_READ &&
streamCompareID(&group->last_id, &s->first_id) >= 0 &&
!streamRangeHasTombstones(s, &group->last_id, NULL)) {
/* A valid counter and no tombstones in the group's last-delivered-id and the stream's last-generated-id,
* we can increment the read counter to keep tracking the group's progress. */
group->entries_read++;
} else if (s->entries_added) {
/* The group's counter may be invalid, so we try to obtain it. */
Expand Down
145 changes: 145 additions & 0 deletions tests/unit/type/stream-cgroups.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,25 @@ start_server {
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 1

r XADD x 7-0 data g
r XADD x 8-0 data h
r XADD x 9-0 data i
r XADD x 10-0 data j
r XREADGROUP GROUP g1 c11 COUNT 3 STREAMS x >
r XDEL x 9-0
# Now there is a tombstone in the stream after the consumer group last_id
# so the lag can't be calculated
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 8
assert_equal [dict get $group lag] {}

r XREADGROUP GROUP g1 c12 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 10
assert_equal [dict get $group lag] 0
}

test {Consumer group lag with XDELs} {
Expand Down Expand Up @@ -1177,6 +1196,132 @@ start_server {
assert_equal [dict get $group lag] 2
}

test {Consumer group lag with XTRIM} {
r DEL x
r XADD x 1-0 data a
r XADD x 2-0 data b
r XADD x 3-0 data c
r XADD x 4-0 data d
r XADD x 5-0 data e
r XDEL x 3-0
r XGROUP CREATE x g1 0
r XGROUP CREATE x g2 0

set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}

r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}

r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}

r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}

r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 0

r XADD x 6-0 data f
r XADD x 7-0 data g
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 2

r XTRIM x MINID = 7-0
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 2
set group [lindex [dict get $reply groups] 1]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] 1

r XREADGROUP GROUP g1 c11 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 7
assert_equal [dict get $group lag] 0
}

test {Consumer group lag with XADD trimming} {
r DEL x
r XADD x 1-0 data a
r XADD x 2-0 data b
r XADD x 3-0 data c
r XADD x 4-0 data d
r XADD x 5-0 data e
r XDEL x 3-0
r XGROUP CREATE x g1 0
r XGROUP CREATE x g2 0

set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}

r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}

r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}

r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}

r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 0

r XADD x 6-0 data f
r XADD x 7-0 data g
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 2

r XADD x MINID = 7-0 8-0 data h
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 3
set group [lindex [dict get $reply groups] 1]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] 2

r XREADGROUP GROUP g1 c11 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 8
assert_equal [dict get $group lag] 0
}

test {Loading from legacy (Redis <= v6.2.x, rdb_ver < 10) persistence} {
# The payload was DUMPed from a v5 instance after:
# XADD x 1-0 data a
Expand Down
Loading