Skip to content

Parquet 56: encounter error: item_reader def levels are None when reading nested field with row filter #8657

@lewiszlw

Description

@lewiszlw

Describe the bug

After upgrading parquet to 56, we encountered Parquet error: item_reader def levels are None when reading nested field with row filter.

To Reproduce

[dependencies]
arrow = "56"
parquet = { version = "56", features = ["async"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"

use arrow::array::{BooleanArray, Float32Builder, Int32Array, ListBuilder};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use futures::stream::StreamExt;
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::{AsyncArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use std::sync::Arc;
use tokio::fs::File;

#[tokio::main]
async fn main() {
    write_parquet_file().await.unwrap();
    read_parquet_file().await.unwrap();
}

async fn read_parquet_file() -> Result<(), Box<dyn std::error::Error>> {
    let file = File::open("output.parquet").await?;
    let builder = ParquetRecordBatchStreamBuilder::new(file).await?;

    let predicate = ArrowPredicateFn::new(ProjectionMask::all(), |batch| {
        Ok(BooleanArray::from(vec![true; batch.num_rows()]))
    });

    let projection_mask = ProjectionMask::all();

    let mut stream = builder
        .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
        .with_projection(projection_mask)
        .build()?;

    while let Some(batch) = stream.next().await {
        let batch = batch?;
        println!("Read batch {batch:?}");
    }

    Ok(())
}

async fn write_parquet_file() -> Result<(), Box<dyn std::error::Error>> {
    let list_inner_field = Arc::new(Field::new("item", DataType::Float32, true));
    let table_schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("vector", DataType::List(list_inner_field.clone()), true),
    ]));

    let mut list_builder =
        ListBuilder::new(Float32Builder::new()).with_field(list_inner_field.clone());
    list_builder.values().append_slice(&[10.0, 10.0, 10.0]);
    list_builder.append(true);
    list_builder.values().append_slice(&[20.0, 20.0, 20.0]);
    list_builder.append(true);
    list_builder.values().append_slice(&[30.0, 30.0, 30.0]);
    list_builder.append(true);
    list_builder.values().append_slice(&[40.0, 40.0, 40.0]);
    list_builder.append(true);
    let list_array = list_builder.finish();

    let data = vec![RecordBatch::try_new(
        table_schema.clone(),
        vec![
            Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
            Arc::new(list_array),
        ],
    )?];

    let file = File::create("output.parquet").await?;
    let mut writer = AsyncArrowWriter::try_new(file, table_schema, None)?;

    for batch in data {
        writer.write(&batch).await?;
    }

    writer.close().await?;

    println!("Parquet file written successfully!");

    Ok(())
}

thread 'main' panicked at src\main.rs:13:31:
called `Result::unwrap()` on an `Err` value: External(ParquetError("Parquet error: item_reader def levels are None."))
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Above code works in parquet 55.

Expected behavior

Additional context

Metadata

Metadata

Assignees

Labels

bugparquetChanges to the parquet crate

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions