Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 22 additions & 33 deletions pkg/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@ func genericPartition[T types.FixedSizeT](sels []int64, diffs []bool, partitions
diffs[0] = true
diffs = diffs[:len(sels)]

if vec.IsConst() {
if vec.IsConstNull() {
for i := range sels {
diffs[i] = false
}
}
partitions = append(partitions, 0)

} else {
// Note: diffs is accumulated across multiple Partition calls (one per
// sort key in pkg/sql/colexec/order/order.go and per partition spec in
// pkg/sql/colexec/window/window.go). We must only OR new boundaries in
// and never overwrite existing ones with false.
if !vec.IsConst() {
var n bool
var v T

Expand All @@ -51,11 +47,10 @@ func genericPartition[T types.FixedSizeT](sels []int64, diffs []bool, partitions
isNull := nulls.Contains(nsp, uint64(sel))
if n != isNull {
diffs[i] = true
} else if n && isNull {
diffs[i] = false
} else {
diffs[i] = diffs[i] || (v != vs[sel])
} else if !isNull {
diffs[i] = diffs[i] || (v != w)
}
// else: both NULL → equal, preserve diffs[i]
n = isNull
v = w
}
Expand All @@ -66,11 +61,12 @@ func genericPartition[T types.FixedSizeT](sels []int64, diffs []bool, partitions
v = w
}
}
}
// vec.IsConst(): all rows equal under this key, no new boundaries.

for i, j := int64(0), int64(len(diffs)); i < j; i++ {
if diffs[i] {
partitions = append(partitions, i)
}
for i, j := int64(0), int64(len(diffs)); i < j; i++ {
if diffs[i] {
partitions = append(partitions, i)
}
}

Expand All @@ -85,15 +81,8 @@ func bytesPartition(sels []int64, diffs []bool, partitions []int64, vec *vector.
diffs[0] = true
diffs = diffs[:len(sels)]

if vec.IsConst() {
if vec.IsConstNull() {
for i := range sels {
diffs[i] = false
}
}
partitions = append(partitions, 0)

} else {
// See genericPartition: diffs is accumulated; never overwrite to false.
if !vec.IsConst() {
var n bool
var v []byte

Expand All @@ -105,11 +94,10 @@ func bytesPartition(sels []int64, diffs []bool, partitions []int64, vec *vector.
isNull := nulls.Contains(nsp, uint64(sel))
if n != isNull {
diffs[i] = true
} else if n && isNull {
diffs[i] = false
} else {
} else if !isNull {
diffs[i] = diffs[i] || !(bytes.Equal(v, w))
}
// else: both NULL → equal, preserve diffs[i]
n = isNull
v = w
}
Expand All @@ -120,10 +108,11 @@ func bytesPartition(sels []int64, diffs []bool, partitions []int64, vec *vector.
v = w
}
}
for i, j := int64(0), int64(len(diffs)); i < j; i++ {
if diffs[i] {
partitions = append(partitions, i)
}
}

for i, j := int64(0), int64(len(diffs)); i < j; i++ {
if diffs[i] {
partitions = append(partitions, i)
}
}

Expand Down
125 changes: 125 additions & 0 deletions pkg/partition/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,128 @@ func TestPartition(t *testing.T) {
require.Equal(t, []int64{0, 1}, partitions)

}

// TestPartitionAccumulatesDiffs verifies that successive Partition calls on
// the same diffs slice OR new boundaries on top of existing ones rather than
// overwriting them. This is the contract relied on by ORDER BY multi-key sort
// in pkg/sql/colexec/order/order.go::sortAndSend (and similarly by window).
//
// Regression test for issue #24248: ORDER BY a, b on rows where the primary
// key has adjacent NULLs would lose the secondary boundary because the
// "both NULL" case used to overwrite diffs[i] = false.
func TestPartitionAccumulatesDiffs(t *testing.T) {
mp := mpool.MustNewZero()

t.Run("fixed_width_null_null_preserves_prior_boundary", func(t *testing.T) {
// Simulates the multi-key path in sortAndSend.
// First key: int32 [1, 1, 2, 2] → boundary at indices 0 and 2.
k1 := vector.NewVec(types.T_int32.ToType())
require.NoError(t, vector.AppendFixedList(k1, []int32{1, 1, 2, 2}, nil, mp))

// Second key: int32 with all NULLs → on its own would be one
// partition, but it must NOT erase k1's boundaries.
k2 := vector.NewVec(types.T_int32.ToType())
require.NoError(t, vector.AppendFixedList(k2, []int32{0, 0, 0, 0},
[]bool{true, true, true, true}, mp))

sels := []int64{0, 1, 2, 3}
ds := make([]bool, len(sels))
ps := make([]int64, 0, 4)

ps = Partition(sels, ds, ps, k1)
require.Equal(t, []int64{0, 2}, ps)
ps = Partition(sels, ds, ps, k2)
require.Equal(t, []int64{0, 2}, ps,
"both-NULL second key must not erase first key's boundary")
})

t.Run("fixed_width_const_preserves_prior_boundary", func(t *testing.T) {
// First key: int32 [1, 1, 2, 2].
k1 := vector.NewVec(types.T_int32.ToType())
require.NoError(t, vector.AppendFixedList(k1, []int32{1, 1, 2, 2}, nil, mp))

// Second key: const non-null. Must preserve k1's boundaries.
k2, err := vector.NewConstFixed(types.T_int32.ToType(), int32(7), 4, mp)
require.NoError(t, err)

sels := []int64{0, 1, 2, 3}
ds := make([]bool, len(sels))
ps := make([]int64, 0, 4)

ps = Partition(sels, ds, ps, k1)
require.Equal(t, []int64{0, 2}, ps)
ps = Partition(sels, ds, ps, k2)
require.Equal(t, []int64{0, 2}, ps, "const second key must not collapse partitions")
})

t.Run("fixed_width_const_null_preserves_prior_boundary", func(t *testing.T) {
k1 := vector.NewVec(types.T_int32.ToType())
require.NoError(t, vector.AppendFixedList(k1, []int32{1, 1, 2, 2}, nil, mp))

k2 := vector.NewConstNull(types.T_int32.ToType(), 4, mp)

sels := []int64{0, 1, 2, 3}
ds := make([]bool, len(sels))
ps := make([]int64, 0, 4)

ps = Partition(sels, ds, ps, k1)
require.Equal(t, []int64{0, 2}, ps)
ps = Partition(sels, ds, ps, k2)
require.Equal(t, []int64{0, 2}, ps,
"const-NULL second key must not collapse partitions")
})

t.Run("bytes_null_null_preserves_prior_boundary", func(t *testing.T) {
k1 := vector.NewVec(types.T_int32.ToType())
require.NoError(t, vector.AppendFixedList(k1, []int32{1, 1, 2, 2}, nil, mp))

k2 := vector.NewVec(types.T_varchar.ToType())
require.NoError(t, vector.AppendStringList(k2,
[]string{"", "", "", ""},
[]bool{true, true, true, true}, mp))

sels := []int64{0, 1, 2, 3}
ds := make([]bool, len(sels))
ps := make([]int64, 0, 4)

ps = Partition(sels, ds, ps, k1)
require.Equal(t, []int64{0, 2}, ps)
ps = Partition(sels, ds, ps, k2)
require.Equal(t, []int64{0, 2}, ps,
"both-NULL bytes second key must not erase first key's boundary")
})

t.Run("bytes_const_preserves_prior_boundary", func(t *testing.T) {
k1 := vector.NewVec(types.T_int32.ToType())
require.NoError(t, vector.AppendFixedList(k1, []int32{1, 1, 2, 2}, nil, mp))

k2, err := vector.NewConstBytes(types.T_varchar.ToType(), []byte("x"), 4, mp)
require.NoError(t, err)

sels := []int64{0, 1, 2, 3}
ds := make([]bool, len(sels))
ps := make([]int64, 0, 4)

ps = Partition(sels, ds, ps, k1)
require.Equal(t, []int64{0, 2}, ps)
ps = Partition(sels, ds, ps, k2)
require.Equal(t, []int64{0, 2}, ps, "const bytes second key must not collapse")
})

t.Run("fixed_width_or_with_existing_diff", func(t *testing.T) {
// Both keys contribute new boundaries; final partitioning is the union.
k1 := vector.NewVec(types.T_int32.ToType())
require.NoError(t, vector.AppendFixedList(k1, []int32{1, 1, 2, 2}, nil, mp))
k2 := vector.NewVec(types.T_int32.ToType())
require.NoError(t, vector.AppendFixedList(k2, []int32{5, 6, 7, 7}, nil, mp))

sels := []int64{0, 1, 2, 3}
ds := make([]bool, len(sels))
ps := make([]int64, 0, 4)

ps = Partition(sels, ds, ps, k1)
require.Equal(t, []int64{0, 2}, ps)
ps = Partition(sels, ds, ps, k2)
require.Equal(t, []int64{0, 1, 2}, ps)
})
}
24 changes: 24 additions & 0 deletions test/distributed/cases/join/fullouterjoin.result
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,28 @@ select * from n1 natural full outer join n2 order by id, v;
3 ¦ C
drop table n1;
drop table n2;
create table fj_o1(s int, v varchar(5));
create table fj_o2(s int, v varchar(5));
insert into fj_o1 values (1,'a'),(5,'b'),(NULL,'x');
insert into fj_o2 values (13,'c'),(14,NULL);
select fj_o1.s, fj_o1.v, fj_o2.s, fj_o2.v
from fj_o1 full outer join fj_o2 on fj_o1.s = fj_o2.s
order by fj_o1.s, fj_o2.s, fj_o1.v, fj_o2.v;
➤ s[4,32,0] ¦ v[12,-1,0] ¦ s[4,32,0] ¦ v[12,-1,0] 𝄀
null ¦ x ¦ null ¦ null 𝄀
null ¦ null ¦ 13 ¦ c 𝄀
null ¦ null ¦ 14 ¦ null 𝄀
1 ¦ a ¦ null ¦ null 𝄀
5 ¦ b ¦ null ¦ null
select fj_o1.s, fj_o2.s
from fj_o1 full outer join fj_o2 on fj_o1.s = fj_o2.s
order by fj_o1.s, fj_o2.s desc;
➤ s[4,32,0] ¦ s[4,32,0] 𝄀
null ¦ 14 𝄀
null ¦ 13 𝄀
null ¦ null 𝄀
1 ¦ null 𝄀
5 ¦ null
drop table fj_o1;
drop table fj_o2;
drop database fojdb;
20 changes: 20 additions & 0 deletions test/distributed/cases/join/fullouterjoin.sql
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ select e.k as ek, l.k as lk
from e full outer join l on e.k > l.k
order by lk;


-- ---------------------------------------------------------------------
-- 10. FULL OUTER JOIN ... USING(col): merged column must coalesce both
-- sides (issue #24247). Null-padded rows must surface the value from
Expand Down Expand Up @@ -234,4 +235,23 @@ select * from n1 natural full outer join n2 order by id, v;
drop table n1;
drop table n2;

-- ---------------------------------------------------------------------
-- 12. ORDER BY multi-key over FULL OUTER JOIN with NULLs in keys.
-- Regression for #24248: secondary sort key must work within
-- partitions formed when the primary key has adjacent NULLs (from
-- null-padded rows).
-- ---------------------------------------------------------------------
create table fj_o1(s int, v varchar(5));
create table fj_o2(s int, v varchar(5));
insert into fj_o1 values (1,'a'),(5,'b'),(NULL,'x');
insert into fj_o2 values (13,'c'),(14,NULL);
select fj_o1.s, fj_o1.v, fj_o2.s, fj_o2.v
from fj_o1 full outer join fj_o2 on fj_o1.s = fj_o2.s
order by fj_o1.s, fj_o2.s, fj_o1.v, fj_o2.v;
select fj_o1.s, fj_o2.s
from fj_o1 full outer join fj_o2 on fj_o1.s = fj_o2.s
order by fj_o1.s, fj_o2.s desc;
drop table fj_o1;
drop table fj_o2;

drop database fojdb;
Loading