From 1ad724f8f60b6a1bdfa3160be456947d28f61638 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 2 Jan 2026 15:17:30 +0200 Subject: [PATCH 1/6] feat: add benchmark for JSON reader performance and improve field indexing in StructArrayDecoder --- arrow-json/benches/reader.rs | 113 ++++++++++++++++ arrow-json/src/reader/struct_array.rs | 181 ++++++++++++++++---------- 2 files changed, 223 insertions(+), 71 deletions(-) create mode 100644 arrow-json/benches/reader.rs diff --git a/arrow-json/benches/reader.rs b/arrow-json/benches/reader.rs new file mode 100644 index 000000000000..5c4696483db2 --- /dev/null +++ b/arrow-json/benches/reader.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_json::ReaderBuilder; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{Criterion, criterion_group, criterion_main}; +use serde_json::{Map, Number, Value}; +use std::fmt::Write; +use std::sync::Arc; + +fn build_schema(field_count: usize) -> Arc { + let fields: Vec = (0..field_count) + .map(|i| Field::new(format!("f{i}"), DataType::Int64, false)) + .collect(); + Arc::new(Schema::new(fields)) +} + +fn build_wide_json(rows: usize, fields: usize) -> Vec { + let mut out = String::with_capacity(rows * fields * 12); + for row in 0..rows { + out.push('{'); + for field in 0..fields { + if field > 0 { + out.push(','); + } + let value = row as i64 + field as i64; + write!(&mut out, "\"f{field}\":{value}").unwrap(); + } + out.push('}'); + out.push('\n'); + } + out.into_bytes() +} + +fn build_wide_values(rows: usize, fields: usize) -> Vec { + let mut out = Vec::with_capacity(rows); + for row in 0..rows { + let mut map = Map::with_capacity(fields); + for field in 0..fields { + let key = format!("f{field}"); + let value = Number::from((row + field) as i64); + map.insert(key, Value::Number(value)); + } + out.push(Value::Object(map)); + } + out +} + +fn bench_decode_wide_object(c: &mut Criterion) { + let rows = 4096; + let fields = 64; + let data = build_wide_json(rows, fields); + let schema = build_schema(fields); + + c.bench_function("decode_wide_object_i64_json", |b| { + b.iter(|| { + let mut decoder = ReaderBuilder::new(schema.clone()) + .with_batch_size(1024) + .build_decoder() + .unwrap(); + + let mut offset = 0; + while offset < data.len() { + let read = decoder.decode(&data[offset..]).unwrap(); + if read == 0 { + break; + } + offset += read; + while let Some(_batch) = decoder.flush().unwrap() {} + } + }) + }); +} + +fn bench_serialize_wide_object(c: &mut Criterion) { + let rows = 4096; + let fields = 64; + let values = build_wide_values(rows, fields); + let schema = build_schema(fields); + + c.bench_function("decode_wide_object_i64_serialize", |b| { + b.iter(|| { + let mut decoder = ReaderBuilder::new(schema.clone()) + .with_batch_size(1024) + .build_decoder() + .unwrap(); + + decoder.serialize(&values).unwrap(); + while let Some(_batch) = decoder.flush().unwrap() {} + }) + }); +} + +criterion_group!( + benches, + bench_decode_wide_object, + bench_serialize_wide_object +); +criterion_main!(benches); diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 262097ace396..64d89b2868bb 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -21,6 +21,7 @@ use arrow_array::builder::BooleanBufferBuilder; use arrow_buffer::buffer::NullBuffer; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType, Fields}; +use std::collections::HashMap; pub struct StructArrayDecoder { data_type: DataType, @@ -28,6 +29,8 @@ pub struct StructArrayDecoder { strict_mode: bool, is_nullable: bool, struct_mode: StructMode, + field_name_to_index: Option>, + child_pos: Vec, } impl StructArrayDecoder { @@ -38,7 +41,9 @@ impl StructArrayDecoder { is_nullable: bool, struct_mode: StructMode, ) -> Result { - let decoders = struct_fields(&data_type) + let binding = data_type.clone(); + let fields = struct_fields(&binding); + let decoders = fields .iter() .map(|f| { // If this struct nullable, need to permit nullability in child array @@ -61,6 +66,8 @@ impl StructArrayDecoder { strict_mode, is_nullable, struct_mode, + field_name_to_index: build_field_index(fields), + child_pos: Vec::new(), }) } } @@ -68,101 +75,117 @@ impl StructArrayDecoder { impl ArrayDecoder for StructArrayDecoder { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let fields = struct_fields(&self.data_type); - let mut child_pos: Vec<_> = (0..fields.len()).map(|_| vec![0; pos.len()]).collect(); - + let row_count = pos.len(); + let field_count = fields.len(); + let total_len = field_count * row_count; + self.child_pos.resize(total_len, 0); + self.child_pos.fill(0); let mut nulls = self .is_nullable .then(|| BooleanBufferBuilder::new(pos.len())); - // We avoid having the match on self.struct_mode inside the hot loop for performance - // TODO: Investigate how to extract duplicated logic. - match self.struct_mode { - StructMode::ObjectOnly => { - for (row, p) in pos.iter().enumerate() { - let end_idx = match (tape.get(*p), nulls.as_mut()) { - (TapeElement::StartObject(end_idx), None) => end_idx, - (TapeElement::StartObject(end_idx), Some(nulls)) => { - nulls.append(true); - end_idx - } - (TapeElement::Null, Some(nulls)) => { - nulls.append(false); - continue; - } - (_, _) => return Err(tape.error(*p, "{")), - }; - - let mut cur_idx = *p + 1; - while cur_idx < end_idx { - // Read field name - let field_name = match tape.get(cur_idx) { - TapeElement::String(s) => tape.get_string(s), - _ => return Err(tape.error(cur_idx, "field name")), + { + let child_pos = self.child_pos.as_mut_slice(); + // We avoid having the match on self.struct_mode inside the hot loop for performance + // TODO: Investigate how to extract duplicated logic. + match self.struct_mode { + StructMode::ObjectOnly => { + for (row, p) in pos.iter().enumerate() { + let end_idx = match (tape.get(*p), nulls.as_mut()) { + (TapeElement::StartObject(end_idx), None) => end_idx, + (TapeElement::StartObject(end_idx), Some(nulls)) => { + nulls.append(true); + end_idx + } + (TapeElement::Null, Some(nulls)) => { + nulls.append(false); + continue; + } + (_, _) => return Err(tape.error(*p, "{")), }; - // Update child pos if match found - match fields.iter().position(|x| x.name() == field_name) { - Some(field_idx) => child_pos[field_idx][row] = cur_idx + 1, - None => { - if self.strict_mode { - return Err(ArrowError::JsonError(format!( - "column '{field_name}' missing from schema", - ))); + let mut cur_idx = *p + 1; + while cur_idx < end_idx { + // Read field name + let field_name = match tape.get(cur_idx) { + TapeElement::String(s) => tape.get_string(s), + _ => return Err(tape.error(cur_idx, "field name")), + }; + + // Update child pos if match found + let field_idx = match &self.field_name_to_index { + Some(map) => map.get(field_name).copied(), + None => fields.iter().position(|x| x.name() == field_name), + }; + match field_idx { + Some(field_idx) => { + child_pos[field_idx * row_count + row] = cur_idx + 1; + } + None => { + if self.strict_mode { + return Err(ArrowError::JsonError(format!( + "column '{field_name}' missing from schema", + ))); + } } } + // Advance to next field + cur_idx = tape.next(cur_idx + 1, "field value")?; } - // Advance to next field - cur_idx = tape.next(cur_idx + 1, "field value")?; } } - } - StructMode::ListOnly => { - for (row, p) in pos.iter().enumerate() { - let end_idx = match (tape.get(*p), nulls.as_mut()) { - (TapeElement::StartList(end_idx), None) => end_idx, - (TapeElement::StartList(end_idx), Some(nulls)) => { - nulls.append(true); - end_idx - } - (TapeElement::Null, Some(nulls)) => { - nulls.append(false); - continue; - } - (_, _) => return Err(tape.error(*p, "[")), - }; + StructMode::ListOnly => { + for (row, p) in pos.iter().enumerate() { + let end_idx = match (tape.get(*p), nulls.as_mut()) { + (TapeElement::StartList(end_idx), None) => end_idx, + (TapeElement::StartList(end_idx), Some(nulls)) => { + nulls.append(true); + end_idx + } + (TapeElement::Null, Some(nulls)) => { + nulls.append(false); + continue; + } + (_, _) => return Err(tape.error(*p, "[")), + }; - let mut cur_idx = *p + 1; - let mut entry_idx = 0; - while cur_idx < end_idx { - if entry_idx >= fields.len() { + let mut cur_idx = *p + 1; + let mut entry_idx = 0; + while cur_idx < end_idx { + if entry_idx >= fields.len() { + return Err(ArrowError::JsonError(format!( + "found extra columns for {} fields", + fields.len() + ))); + } + child_pos[entry_idx * row_count + row] = cur_idx; + entry_idx += 1; + // Advance to next field + cur_idx = tape.next(cur_idx, "field value")?; + } + if entry_idx != fields.len() { return Err(ArrowError::JsonError(format!( - "found extra columns for {} fields", + "found {} columns for {} fields", + entry_idx, fields.len() ))); } - child_pos[entry_idx][row] = cur_idx; - entry_idx += 1; - // Advance to next field - cur_idx = tape.next(cur_idx, "field value")?; - } - if entry_idx != fields.len() { - return Err(ArrowError::JsonError(format!( - "found {} columns for {} fields", - entry_idx, - fields.len() - ))); } } } } + let child_pos = self.child_pos.as_slice(); let child_data = self .decoders .iter_mut() - .zip(child_pos) + .enumerate() .zip(fields) - .map(|((d, pos), f)| { - d.decode(tape, &pos).map_err(|e| match e { + .map(|((field_idx, d), f)| { + let start = field_idx * row_count; + let end = start + row_count; + let pos = &child_pos[start..end]; + d.decode(tape, pos).map_err(|e| match e { ArrowError::JsonError(s) => { ArrowError::JsonError(format!("whilst decoding field '{}': {s}", f.name())) } @@ -205,3 +228,19 @@ fn struct_fields(data_type: &DataType) -> &Fields { _ => unreachable!(), } } + +fn build_field_index(fields: &Fields) -> Option> { + const FIELD_INDEX_LINEAR_THRESHOLD: usize = 16; + if fields.len() < FIELD_INDEX_LINEAR_THRESHOLD { + return None; + } + + let mut map = HashMap::with_capacity(fields.len()); + for (idx, field) in fields.iter().enumerate() { + let name = field.name(); + if !map.contains_key(name) { + map.insert(name.to_string(), idx); + } + } + Some(map) +} From f25902d81170589ba04baa20d2021de95ed0a5d0 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 2 Jan 2026 15:20:20 +0200 Subject: [PATCH 2/6] refactor: streamline StructArrayDecoder initialization by combining decoders and field index creation --- arrow-json/src/reader/struct_array.rs | 41 ++++++++++++++------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 64d89b2868bb..63e34fd46817 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -41,24 +41,27 @@ impl StructArrayDecoder { is_nullable: bool, struct_mode: StructMode, ) -> Result { - let binding = data_type.clone(); - let fields = struct_fields(&binding); - let decoders = fields - .iter() - .map(|f| { - // If this struct nullable, need to permit nullability in child array - // StructArrayDecoder::decode verifies that if the child is not nullable - // it doesn't contain any nulls not masked by its parent - let nullable = f.is_nullable() || is_nullable; - make_decoder( - f.data_type().clone(), - coerce_primitive, - strict_mode, - nullable, - struct_mode, - ) - }) - .collect::, ArrowError>>()?; + let (decoders, field_name_to_index) = { + let fields = struct_fields(&data_type); + let decoders = fields + .iter() + .map(|f| { + // If this struct nullable, need to permit nullability in child array + // StructArrayDecoder::decode verifies that if the child is not nullable + // it doesn't contain any nulls not masked by its parent + let nullable = f.is_nullable() || is_nullable; + make_decoder( + f.data_type().clone(), + coerce_primitive, + strict_mode, + nullable, + struct_mode, + ) + }) + .collect::, ArrowError>>()?; + let field_name_to_index = build_field_index(fields); + (decoders, field_name_to_index) + }; Ok(Self { data_type, @@ -66,7 +69,7 @@ impl StructArrayDecoder { strict_mode, is_nullable, struct_mode, - field_name_to_index: build_field_index(fields), + field_name_to_index, child_pos: Vec::new(), }) } From 47bd73b2cd3905206d36c25c8e71ff5af0dc8dd1 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Fri, 2 Jan 2026 15:32:02 +0200 Subject: [PATCH 3/6] chore --- arrow-json/src/reader/struct_array.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 63e34fd46817..2dbc7dff4593 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -59,7 +59,11 @@ impl StructArrayDecoder { ) }) .collect::, ArrowError>>()?; - let field_name_to_index = build_field_index(fields); + let field_name_to_index = if struct_mode == StructMode::ObjectOnly { + build_field_index(fields) + } else { + None + }; (decoders, field_name_to_index) }; @@ -80,7 +84,20 @@ impl ArrayDecoder for StructArrayDecoder { let fields = struct_fields(&self.data_type); let row_count = pos.len(); let field_count = fields.len(); - let total_len = field_count * row_count; + let total_len = field_count.checked_mul(row_count).ok_or_else(|| { + ArrowError::JsonError(format!( + "StructArrayDecoder child position buffer size overflow for rows={row_count} fields={field_count}" + )) + })?; + if total_len > self.child_pos.len() { + self.child_pos + .try_reserve(total_len - self.child_pos.len()) + .map_err(|_| { + ArrowError::JsonError(format!( + "StructArrayDecoder child position buffer allocation failed for rows={row_count} fields={field_count}" + )) + })?; + } self.child_pos.resize(total_len, 0); self.child_pos.fill(0); let mut nulls = self From 1195aa2e9d2110a68d4cf71706dd39071ce1fc6f Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 10 Jan 2026 14:54:24 +0200 Subject: [PATCH 4/6] chore --- arrow-json/benches/reader.rs | 113 ----------------------------------- 1 file changed, 113 deletions(-) delete mode 100644 arrow-json/benches/reader.rs diff --git a/arrow-json/benches/reader.rs b/arrow-json/benches/reader.rs deleted file mode 100644 index 5c4696483db2..000000000000 --- a/arrow-json/benches/reader.rs +++ /dev/null @@ -1,113 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow_json::ReaderBuilder; -use arrow_schema::{DataType, Field, Schema}; -use criterion::{Criterion, criterion_group, criterion_main}; -use serde_json::{Map, Number, Value}; -use std::fmt::Write; -use std::sync::Arc; - -fn build_schema(field_count: usize) -> Arc { - let fields: Vec = (0..field_count) - .map(|i| Field::new(format!("f{i}"), DataType::Int64, false)) - .collect(); - Arc::new(Schema::new(fields)) -} - -fn build_wide_json(rows: usize, fields: usize) -> Vec { - let mut out = String::with_capacity(rows * fields * 12); - for row in 0..rows { - out.push('{'); - for field in 0..fields { - if field > 0 { - out.push(','); - } - let value = row as i64 + field as i64; - write!(&mut out, "\"f{field}\":{value}").unwrap(); - } - out.push('}'); - out.push('\n'); - } - out.into_bytes() -} - -fn build_wide_values(rows: usize, fields: usize) -> Vec { - let mut out = Vec::with_capacity(rows); - for row in 0..rows { - let mut map = Map::with_capacity(fields); - for field in 0..fields { - let key = format!("f{field}"); - let value = Number::from((row + field) as i64); - map.insert(key, Value::Number(value)); - } - out.push(Value::Object(map)); - } - out -} - -fn bench_decode_wide_object(c: &mut Criterion) { - let rows = 4096; - let fields = 64; - let data = build_wide_json(rows, fields); - let schema = build_schema(fields); - - c.bench_function("decode_wide_object_i64_json", |b| { - b.iter(|| { - let mut decoder = ReaderBuilder::new(schema.clone()) - .with_batch_size(1024) - .build_decoder() - .unwrap(); - - let mut offset = 0; - while offset < data.len() { - let read = decoder.decode(&data[offset..]).unwrap(); - if read == 0 { - break; - } - offset += read; - while let Some(_batch) = decoder.flush().unwrap() {} - } - }) - }); -} - -fn bench_serialize_wide_object(c: &mut Criterion) { - let rows = 4096; - let fields = 64; - let values = build_wide_values(rows, fields); - let schema = build_schema(fields); - - c.bench_function("decode_wide_object_i64_serialize", |b| { - b.iter(|| { - let mut decoder = ReaderBuilder::new(schema.clone()) - .with_batch_size(1024) - .build_decoder() - .unwrap(); - - decoder.serialize(&values).unwrap(); - while let Some(_batch) = decoder.flush().unwrap() {} - }) - }); -} - -criterion_group!( - benches, - bench_decode_wide_object, - bench_serialize_wide_object -); -criterion_main!(benches); From 04bea37d55077a342d7fe5f6435d9a2a7cfd84f0 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Sat, 10 Jan 2026 22:19:53 +0200 Subject: [PATCH 5/6] refactor: replace child_pos with field_tape_positions in StructArrayDecoder for improved clarity and performance --- arrow-json/src/reader/struct_array.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 2dbc7dff4593..7f8ad62726a0 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -30,7 +30,9 @@ pub struct StructArrayDecoder { is_nullable: bool, struct_mode: StructMode, field_name_to_index: Option>, - child_pos: Vec, + /// Reusable buffer of tape positions indexed as `[field_idx * row_count + row_idx]`. + /// A value of 0 indicates the field is absent for that row. + field_tape_positions: Vec, } impl StructArrayDecoder { @@ -74,7 +76,7 @@ impl StructArrayDecoder { is_nullable, struct_mode, field_name_to_index, - child_pos: Vec::new(), + field_tape_positions: Vec::new(), }) } } @@ -89,23 +91,14 @@ impl ArrayDecoder for StructArrayDecoder { "StructArrayDecoder child position buffer size overflow for rows={row_count} fields={field_count}" )) })?; - if total_len > self.child_pos.len() { - self.child_pos - .try_reserve(total_len - self.child_pos.len()) - .map_err(|_| { - ArrowError::JsonError(format!( - "StructArrayDecoder child position buffer allocation failed for rows={row_count} fields={field_count}" - )) - })?; - } - self.child_pos.resize(total_len, 0); - self.child_pos.fill(0); + self.field_tape_positions.clear(); + self.field_tape_positions.resize(total_len, 0); let mut nulls = self .is_nullable .then(|| BooleanBufferBuilder::new(pos.len())); { - let child_pos = self.child_pos.as_mut_slice(); + let child_pos = self.field_tape_positions.as_mut_slice(); // We avoid having the match on self.struct_mode inside the hot loop for performance // TODO: Investigate how to extract duplicated logic. match self.struct_mode { @@ -195,7 +188,7 @@ impl ArrayDecoder for StructArrayDecoder { } } - let child_pos = self.child_pos.as_slice(); + let child_pos = self.field_tape_positions.as_slice(); let child_data = self .decoders .iter_mut() From ad44ec27d0f130dff2ab1fd55939581b1346cc59 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 13 Jan 2026 15:37:16 +0200 Subject: [PATCH 6/6] refactor: replace Vec with FieldTapePositions in StructArrayDecoder for improved clarity and performance --- arrow-json/src/reader/struct_array.rs | 83 +++++++++++++++++++-------- 1 file changed, 59 insertions(+), 24 deletions(-) diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 7f8ad62726a0..df0d5b8a5b83 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -23,6 +23,51 @@ use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType, Fields}; use std::collections::HashMap; +/// Reusable buffer for tape positions, indexed by (field_idx, row_idx). +/// A value of 0 indicates the field is absent for that row. +struct FieldTapePositions { + data: Vec, + row_count: usize, +} + +impl FieldTapePositions { + fn new() -> Self { + Self { + data: Vec::new(), + row_count: 0, + } + } + + fn resize(&mut self, field_count: usize, row_count: usize) -> Result<(), ArrowError> { + let total_len = field_count.checked_mul(row_count).ok_or_else(|| { + ArrowError::JsonError(format!( + "FieldTapePositions buffer size overflow for rows={row_count} fields={field_count}" + )) + })?; + self.data.clear(); + self.data.resize(total_len, 0); + self.row_count = row_count; + Ok(()) + } + + fn try_set(&mut self, field_idx: usize, row_idx: usize, pos: u32) -> Option<()> { + let idx = field_idx + .checked_mul(self.row_count)? + .checked_add(row_idx)?; + *self.data.get_mut(idx)? = pos; + Some(()) + } + + fn set(&mut self, field_idx: usize, row_idx: usize, pos: u32) { + self.data[field_idx * self.row_count + row_idx] = pos; + } + + fn field_positions(&self, field_idx: usize) -> &[u32] { + let start = field_idx * self.row_count; + &self.data[start..start + self.row_count] + } +} + pub struct StructArrayDecoder { data_type: DataType, decoders: Vec>, @@ -30,9 +75,7 @@ pub struct StructArrayDecoder { is_nullable: bool, struct_mode: StructMode, field_name_to_index: Option>, - /// Reusable buffer of tape positions indexed as `[field_idx * row_count + row_idx]`. - /// A value of 0 indicates the field is absent for that row. - field_tape_positions: Vec, + field_tape_positions: FieldTapePositions, } impl StructArrayDecoder { @@ -76,7 +119,7 @@ impl StructArrayDecoder { is_nullable, struct_mode, field_name_to_index, - field_tape_positions: Vec::new(), + field_tape_positions: FieldTapePositions::new(), }) } } @@ -86,19 +129,12 @@ impl ArrayDecoder for StructArrayDecoder { let fields = struct_fields(&self.data_type); let row_count = pos.len(); let field_count = fields.len(); - let total_len = field_count.checked_mul(row_count).ok_or_else(|| { - ArrowError::JsonError(format!( - "StructArrayDecoder child position buffer size overflow for rows={row_count} fields={field_count}" - )) - })?; - self.field_tape_positions.clear(); - self.field_tape_positions.resize(total_len, 0); + self.field_tape_positions.resize(field_count, row_count)?; let mut nulls = self .is_nullable .then(|| BooleanBufferBuilder::new(pos.len())); { - let child_pos = self.field_tape_positions.as_mut_slice(); // We avoid having the match on self.struct_mode inside the hot loop for performance // TODO: Investigate how to extract duplicated logic. match self.struct_mode { @@ -132,7 +168,7 @@ impl ArrayDecoder for StructArrayDecoder { }; match field_idx { Some(field_idx) => { - child_pos[field_idx * row_count + row] = cur_idx + 1; + self.field_tape_positions.set(field_idx, row, cur_idx + 1); } None => { if self.strict_mode { @@ -165,13 +201,14 @@ impl ArrayDecoder for StructArrayDecoder { let mut cur_idx = *p + 1; let mut entry_idx = 0; while cur_idx < end_idx { - if entry_idx >= fields.len() { - return Err(ArrowError::JsonError(format!( - "found extra columns for {} fields", - fields.len() - ))); - } - child_pos[entry_idx * row_count + row] = cur_idx; + self.field_tape_positions + .try_set(entry_idx, row, cur_idx) + .ok_or_else(|| { + ArrowError::JsonError(format!( + "found extra columns for {} fields", + fields.len() + )) + })?; entry_idx += 1; // Advance to next field cur_idx = tape.next(cur_idx, "field value")?; @@ -188,16 +225,13 @@ impl ArrayDecoder for StructArrayDecoder { } } - let child_pos = self.field_tape_positions.as_slice(); let child_data = self .decoders .iter_mut() .enumerate() .zip(fields) .map(|((field_idx, d), f)| { - let start = field_idx * row_count; - let end = start + row_count; - let pos = &child_pos[start..end]; + let pos = self.field_tape_positions.field_positions(field_idx); d.decode(tape, pos).map_err(|e| match e { ArrowError::JsonError(s) => { ArrowError::JsonError(format!("whilst decoding field '{}': {s}", f.name())) @@ -243,6 +277,7 @@ fn struct_fields(data_type: &DataType) -> &Fields { } fn build_field_index(fields: &Fields) -> Option> { + // Heuristic threshold: for small field counts, linear scan avoids HashMap overhead. const FIELD_INDEX_LINEAR_THRESHOLD: usize = 16; if fields.len() < FIELD_INDEX_LINEAR_THRESHOLD { return None;