Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 67 additions & 2 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,14 @@ impl BatchCoalescer {

let (_schema, arrays, mut num_rows) = batch.into_parts();

// setup input rows
assert_eq!(arrays.len(), self.in_progress_arrays.len());
// Validate column count matches the expected schema
if arrays.len() != self.in_progress_arrays.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Batch has {} columns but BatchCoalescer expects {}",
arrays.len(),
self.in_progress_arrays.len()
)));
}
self.in_progress_arrays
.iter_mut()
.zip(arrays)
Expand Down Expand Up @@ -2178,4 +2184,63 @@ mod tests {

assert_eq!(expected, actual);
}

#[test]
fn test_push_batch_schema_mismatch_fewer_columns() {
// Coalescer expects 0 columns, batch has 1
let empty_schema = Arc::new(Schema::empty());
let mut coalescer = BatchCoalescer::new(empty_schema, 100);
let batch = uint32_batch(0..5);
let result = coalescer.push_batch(batch);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("Batch has 1 columns but BatchCoalescer expects 0"),
"unexpected error: {err}"
);
}

#[test]
fn test_push_batch_schema_mismatch_more_columns() {
// Coalescer expects 2 columns, batch has 1
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::UInt32, false),
Field::new("c1", DataType::UInt32, false),
]));
let mut coalescer = BatchCoalescer::new(schema, 100);
let batch = uint32_batch(0..5);
let result = coalescer.push_batch(batch);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("Batch has 1 columns but BatchCoalescer expects 2"),
"unexpected error: {err}"
);
}

#[test]
fn test_push_batch_schema_mismatch_two_vs_zero() {
// Coalescer expects 0 columns, batch has 2
let empty_schema = Arc::new(Schema::empty());
let mut coalescer = BatchCoalescer::new(empty_schema, 100);
let schema = Arc::new(Schema::new(vec![
Field::new("c0", DataType::UInt32, false),
Field::new("c1", DataType::UInt32, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(UInt32Array::from(vec![1, 2, 3])),
Arc::new(UInt32Array::from(vec![4, 5, 6])),
],
)
.unwrap();
let result = coalescer.push_batch(batch);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("Batch has 2 columns but BatchCoalescer expects 0"),
"unexpected error: {err}"
);
}
}
Loading