From 4e8b3090f9a94ea9b45e0dfeffe95dfbaed74250 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 20 Mar 2025 13:52:26 -0400 Subject: [PATCH 01/16] Stash. --- datafusion/common/src/config.rs | 2 + .../common/src/file_options/parquet_writer.rs | 1 + .../datasource-parquet/src/file_format.rs | 62 +++++++++++++++++++ datafusion/datasource-parquet/src/opener.rs | 19 +++++- datafusion/proto-common/src/from_proto/mod.rs | 1 + 5 files changed, 82 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b0f17630c910c..c53fd4972af79 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -451,6 +451,8 @@ config_namespace! { /// BLOB instead. pub binary_as_string: bool, default = false + pub coerce_int96: bool, default = false + // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 939cb5e1a3578..cd5faaee8a9cc 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -239,6 +239,7 @@ impl ParquetOptions { bloom_filter_on_read: _, // reads not used for writer props schema_force_view_types: _, binary_as_string: _, // not used for writer props + coerce_int96: _, // not used for writer props skip_arrow_metadata: _, } = self; diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 1d9a67fd2eb6d..20eefaad9d634 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -24,6 +24,16 @@ use std::ops::Range; use std::sync::Arc; use arrow::array::RecordBatch; +use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit}; +use datafusion_datasource::file_compression_type::FileCompressionType; +use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; +use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer}; + +use datafusion_datasource::file_format::{ + FileFormat, FileFormatFactory, FilePushdownSupport, +}; +use datafusion_datasource::write::demux::DemuxedStreamReceiver; + use arrow::compute::sum; use arrow::datatypes::{DataType, Field, FieldRef}; use arrow::datatypes::{Fields, Schema, SchemaRef}; @@ -76,11 +86,13 @@ use parquet::arrow::arrow_writer::{ }; use parquet::arrow::async_reader::MetadataFetch; use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter}; +use parquet::basic::Type; use parquet::errors::ParquetError; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; +use parquet::schema::types::SchemaDescriptor; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -569,6 +581,56 @@ pub fn apply_file_schema_type_coercions( )) } +/// Coerces the file schema if the table schema uses a view type. +pub fn coerce_int96_to_resolution( + parquet_schema: &SchemaDescriptor, + file_schema: &Schema, + time_unit: &TimeUnit, +) -> Option { + let mut transform = false; + let parquet_fields: HashMap<_, _> = parquet_schema + .columns() + .iter() + .map(|f| { + let dt = f.physical_type(); + if dt.eq(&Type::INT96) { + transform = true; + } + (f.name(), dt) + }) + .collect(); + + if !transform { + return None; + } + + let transformed_fields: Vec> = file_schema + .fields + .iter() + .map(|field| match parquet_fields.get(field.name().as_str()) { + Some(Type::INT96) => { + field_with_new_type(field, DataType::Timestamp(*time_unit, None)) + + // match field.data_type() { + // DataType::Timestamp(TimeUnit::Nanosecond, None) => { + // field_with_new_type(field, DataType::Timestamp(*time_unit,None)) + // } + // DataType::Timestamp(TimeUnit::Nanosecond, Some(tz)) => { + // field_with_new_type(field, DataType::Timestamp(*time_unit,Some(tz.clone()))) + // } + // _ => unreachable!() + // } + } + _ => Arc::clone(field), + }) + .collect(); + + Some(Schema::new_with_metadata( + transformed_fields, + file_schema.metadata.clone(), + )) +} + /// Coerces the file schema if the table schema uses a view type. #[deprecated( since = "47.0.0", diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 732fef47d5a75..355402291bf76 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -22,14 +22,15 @@ use std::sync::Arc; use crate::page_filter::PagePruningAccessPlanFilter; use crate::row_group_filter::RowGroupAccessPlanFilter; use crate::{ - apply_file_schema_type_coercions, row_filter, should_enable_page_index, - ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory, + apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, + should_enable_page_index, ParquetAccessPlan, ParquetFileMetrics, + ParquetFileReaderFactory, }; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; -use arrow::datatypes::SchemaRef; +use arrow::datatypes::{SchemaRef, TimeUnit}; use arrow::error::ArrowError; use datafusion_common::{exec_err, Result}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; @@ -80,6 +81,8 @@ pub(super) struct ParquetOpener { pub enable_bloom_filter: bool, /// Schema adapter factory pub schema_adapter_factory: Arc, + /// Coerce INT96 timestamps to specific TimeUnit + pub coerce_int96: Option, } impl FileOpener for ParquetOpener { @@ -135,6 +138,16 @@ impl FileOpener for ParquetOpener { schema = Arc::new(merged); } + if self.coerce_int96.is_some() { + if let Some(merged) = coerce_int96_to_resolution( + metadata.parquet_schema(), + &schema, + &(self.coerce_int96.unwrap()), + ) { + schema = Arc::new(merged); + } + } + let options = ArrowReaderOptions::new() .with_page_index(enable_page_index) .with_schema(Arc::clone(&schema)); diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index da43a97899565..e803416c658f1 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -984,6 +984,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, + coerce_int96: value.coerce_int96, skip_arrow_metadata: value.skip_arrow_metadata, }) } From 958050c915e0519c401df137ddc19e715fa9767f Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 25 Mar 2025 14:23:31 -0400 Subject: [PATCH 02/16] Stash. --- datafusion/common/src/config.rs | 2 +- .../common/src/file_options/parquet_writer.rs | 2 ++ .../datasource-parquet/src/file_format.rs | 9 ++++++++ datafusion/datasource-parquet/src/opener.rs | 5 +++-- datafusion/datasource-parquet/src/source.rs | 15 ++++++++++++- .../proto/datafusion_common.proto | 4 ++++ datafusion/proto-common/src/from_proto/mod.rs | 4 +++- .../proto-common/src/generated/pbjson.rs | 22 +++++++++++++++++++ .../proto-common/src/generated/prost.rs | 7 ++++++ datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 7 ++++++ .../proto/src/logical_plan/file_formats.rs | 6 +++++ 12 files changed, 79 insertions(+), 5 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index c53fd4972af79..a5c9d7a94e934 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -451,7 +451,7 @@ config_namespace! { /// BLOB instead. pub binary_as_string: bool, default = false - pub coerce_int96: bool, default = false + pub coerce_int96: Option, transform = str::to_lowercase, default = None // The following options affect writing to parquet files // and map to parquet::file::properties::WriterProperties diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index cd5faaee8a9cc..3e33466edf505 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -517,6 +517,7 @@ mod tests { schema_force_view_types: defaults.schema_force_view_types, binary_as_string: defaults.binary_as_string, skip_arrow_metadata: defaults.skip_arrow_metadata, + coerce_int96: None, } } @@ -623,6 +624,7 @@ mod tests { schema_force_view_types: global_options_defaults.schema_force_view_types, binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, + coerce_int96: None, }, column_specific_options, key_value_metadata, diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 20eefaad9d634..9fb2ef722ec65 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -280,6 +280,15 @@ impl ParquetFormat { self.options.global.binary_as_string = binary_as_string; self } + + pub fn coerce_int96(&self) -> Option { + self.options.global.coerce_int96.clone() + } + + pub fn with_coerce_int96(mut self, time_unit: Option) -> Self { + self.options.global.coerce_int96 = time_unit; + self + } } /// Clears all metadata (Schema level and field level) on an iterator diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 355402291bf76..b354fd8631cad 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -117,6 +117,7 @@ impl FileOpener for ParquetOpener { let table_schema = Arc::clone(&self.table_schema); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; + let coerce_int96 = self.coerce_int96; let enable_page_index = should_enable_page_index( self.enable_page_index, &self.page_pruning_predicate, @@ -138,11 +139,11 @@ impl FileOpener for ParquetOpener { schema = Arc::new(merged); } - if self.coerce_int96.is_some() { + if coerce_int96.is_some() { if let Some(merged) = coerce_int96_to_resolution( metadata.parquet_schema(), &schema, - &(self.coerce_int96.unwrap()), + &(coerce_int96.unwrap()), ) { schema = Arc::new(merged); } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 66d4d313d5a61..43db03b398d3f 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -29,7 +29,7 @@ use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; use datafusion_common::config::TableParquetOptions; use datafusion_common::Statistics; use datafusion_datasource::file::FileSource; @@ -480,6 +480,18 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); + let coerce_int96: Option = + match &self.table_parquet_options.global.coerce_int96 { + None => None, + Some(time_unit) => match time_unit.as_str() { + "ns" => Some(TimeUnit::Nanosecond), + "us" => Some(TimeUnit::Microsecond), + "ms" => Some(TimeUnit::Millisecond), + "s" => Some(TimeUnit::Second), + _ => None, + }, + }; + Arc::new(ParquetOpener { partition_index: partition, projection: Arc::from(projection), @@ -499,6 +511,7 @@ impl FileSource for ParquetSource { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), schema_adapter_factory, + coerce_int96, }) } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index bbeea5e1ec237..82f1e91d9c9b4 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -545,6 +545,10 @@ message ParquetOptions { uint64 max_row_group_size = 15; string created_by = 16; + + oneof coerce_int96_opt { + string coerce_int96 = 32; + } } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index e803416c658f1..bd969db316872 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -984,7 +984,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, - coerce_int96: value.coerce_int96, + coerce_int96: value.coerce_int96_opt.clone().map(|opt| match opt { + protobuf::parquet_options::CoerceInt96Opt::CoerceInt96(v) => Some(v), + }).unwrap_or(None), skip_arrow_metadata: value.skip_arrow_metadata, }) } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b0241fd47a26f..b44b05e9ca296 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4981,6 +4981,9 @@ impl serde::Serialize for ParquetOptions { if self.bloom_filter_ndv_opt.is_some() { len += 1; } + if self.coerce_int96_opt.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -5136,6 +5139,13 @@ impl serde::Serialize for ParquetOptions { } } } + if let Some(v) = self.coerce_int96_opt.as_ref() { + match v { + parquet_options::CoerceInt96Opt::CoerceInt96(v) => { + struct_ser.serialize_field("coerceInt96", v)?; + } + } + } struct_ser.end() } } @@ -5203,6 +5213,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterFpp", "bloom_filter_ndv", "bloomFilterNdv", + "coerce_int96", + "coerceInt96", ]; #[allow(clippy::enum_variant_names)] @@ -5237,6 +5249,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { Encoding, BloomFilterFpp, BloomFilterNdv, + CoerceInt96, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5288,6 +5301,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "encoding" => Ok(GeneratedField::Encoding), "bloomFilterFpp" | "bloom_filter_fpp" => Ok(GeneratedField::BloomFilterFpp), "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), + "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5337,6 +5351,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut encoding_opt__ = None; let mut bloom_filter_fpp_opt__ = None; let mut bloom_filter_ndv_opt__ = None; + let mut coerce_int96_opt__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::EnablePageIndex => { @@ -5533,6 +5548,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } bloom_filter_ndv_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::BloomFilterNdvOpt::BloomFilterNdv(x.0)); } + GeneratedField::CoerceInt96 => { + if coerce_int96_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("coerceInt96")); + } + coerce_int96_opt__ = map_.next_value::<::std::option::Option<_>>()?.map(parquet_options::CoerceInt96Opt::CoerceInt96); + } } } Ok(ParquetOptions { @@ -5566,6 +5587,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { encoding_opt: encoding_opt__, bloom_filter_fpp_opt: bloom_filter_fpp_opt__, bloom_filter_ndv_opt: bloom_filter_ndv_opt__, + coerce_int96_opt: coerce_int96_opt__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index b6e9bc1379832..e029327d481d1 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -804,6 +804,8 @@ pub struct ParquetOptions { pub bloom_filter_fpp_opt: ::core::option::Option, #[prost(oneof = "parquet_options::BloomFilterNdvOpt", tags = "22")] pub bloom_filter_ndv_opt: ::core::option::Option, + #[prost(oneof = "parquet_options::CoerceInt96Opt", tags = "32")] + pub coerce_int96_opt: ::core::option::Option, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -857,6 +859,11 @@ pub mod parquet_options { #[prost(uint64, tag = "22")] BloomFilterNdv(u64), } + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum CoerceInt96Opt { + #[prost(string, tag = "32")] + CoerceInt96(::prost::alloc::string::String), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index decd0cf630388..28927cad03b4c 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -836,6 +836,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, skip_arrow_metadata: value.skip_arrow_metadata, + coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index b6e9bc1379832..e029327d481d1 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -804,6 +804,8 @@ pub struct ParquetOptions { pub bloom_filter_fpp_opt: ::core::option::Option, #[prost(oneof = "parquet_options::BloomFilterNdvOpt", tags = "22")] pub bloom_filter_ndv_opt: ::core::option::Option, + #[prost(oneof = "parquet_options::CoerceInt96Opt", tags = "32")] + pub coerce_int96_opt: ::core::option::Option, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -857,6 +859,11 @@ pub mod parquet_options { #[prost(uint64, tag = "22")] BloomFilterNdv(u64), } + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum CoerceInt96Opt { + #[prost(string, tag = "32")] + CoerceInt96(::prost::alloc::string::String), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index e22738973284e..5c33277dc9f74 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -415,6 +415,9 @@ impl TableParquetOptionsProto { schema_force_view_types: global_options.global.schema_force_view_types, binary_as_string: global_options.global.binary_as_string, skip_arrow_metadata: global_options.global.skip_arrow_metadata, + coerce_int96_opt: global_options.global.coerce_int96.map(|compression| { + parquet_options::CoerceInt96Opt::CoerceInt96(compression) + }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -511,6 +514,9 @@ impl From<&ParquetOptionsProto> for ParquetOptions { schema_force_view_types: proto.schema_force_view_types, binary_as_string: proto.binary_as_string, skip_arrow_metadata: proto.skip_arrow_metadata, + coerce_int96: proto.coerce_int96_opt.as_ref().map(|opt| match opt { + parquet_options::CoerceInt96Opt::CoerceInt96(coerce_int96) => coerce_int96.clone(), + }), } } } From 2dbcfbf6d82cc8c37b24379d6e05707cc4db3df2 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 25 Mar 2025 14:49:00 -0400 Subject: [PATCH 03/16] Checkpoint. --- datafusion/datasource-parquet/src/source.rs | 35 ++++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 43db03b398d3f..21d8f7c5bacb1 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -31,7 +31,7 @@ use datafusion_datasource::schema_adapter::{ use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; use datafusion_common::config::TableParquetOptions; -use datafusion_common::Statistics; +use datafusion_common::{DataFusionError, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr_common::physical_expr::fmt_sql; @@ -460,6 +460,22 @@ impl ParquetSource { } } +/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit +fn parse_coerce_int96_string(str_setting: &str) -> datafusion_common::Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + + match str_setting_lower { + "ns" => Ok(TimeUnit::Nanosecond), + "us" => Ok(TimeUnit::Microsecond), + "ms" => Ok(TimeUnit::Millisecond), + "s" => Ok(TimeUnit::Second), + _ => Err(DataFusionError::Configuration(format!( + "Unknown or unsupported parquet coerce_int96: \ + {str_setting}. Valid values are: ns, us, ms, and s." + ))), + } +} + impl FileSource for ParquetSource { fn create_file_opener( &self, @@ -480,17 +496,12 @@ impl FileSource for ParquetSource { Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _ }); - let coerce_int96: Option = - match &self.table_parquet_options.global.coerce_int96 { - None => None, - Some(time_unit) => match time_unit.as_str() { - "ns" => Some(TimeUnit::Nanosecond), - "us" => Some(TimeUnit::Microsecond), - "ms" => Some(TimeUnit::Millisecond), - "s" => Some(TimeUnit::Second), - _ => None, - }, - }; + let coerce_int96 = self + .table_parquet_options + .global + .coerce_int96 + .as_ref() + .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap()); Arc::new(ParquetOpener { partition_index: partition, From 7dd593d6251e817a8df999438aa38e2d51a64984 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 25 Mar 2025 15:37:33 -0400 Subject: [PATCH 04/16] update arrow --- Cargo.lock | 95 ++++++++++++++++++++++++------------------------------ Cargo.toml | 16 ++++----- 2 files changed, 51 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa4ffeb17cbc5..6d14edb521ec9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -246,9 +246,8 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arrow" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc208515aa0151028e464cc94a692156e945ce5126abd3537bb7fd6ba2143ed1" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-arith", "arrow-array", @@ -270,9 +269,8 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e07e726e2b3f7816a85c6a45b6ec118eeeabf0b2a8c208122ad949437181f49a" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-array", "arrow-buffer", @@ -284,9 +282,8 @@ dependencies = [ [[package]] name = "arrow-array" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2262eba4f16c78496adfd559a29fe4b24df6088efc9985a873d58e92be022d5" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "ahash 0.8.11", "arrow-buffer", @@ -301,9 +298,8 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e899dade2c3b7f5642eb8366cfd898958bcca099cde6dfea543c7e8d3ad88d4" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "bytes", "half", @@ -312,9 +308,8 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4103d88c5b441525ed4ac23153be7458494c2b0c9a11115848fdb9b81f6f886a" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-array", "arrow-buffer", @@ -333,9 +328,8 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d3cb0914486a3cae19a5cad2598e44e225d53157926d0ada03c20521191a65" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-array", "arrow-cast", @@ -349,9 +343,8 @@ dependencies = [ [[package]] name = "arrow-data" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a329fb064477c9ec5f0870d2f5130966f91055c7c5bce2b3a084f116bc28c3b" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-buffer", "arrow-schema", @@ -361,9 +354,8 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7408f2bf3b978eddda272c7699f439760ebc4ac70feca25fefa82c5b8ce808d" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-arith", "arrow-array", @@ -388,9 +380,8 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddecdeab02491b1ce88885986e25002a3da34dd349f682c7cfe67bab7cc17b86" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-array", "arrow-buffer", @@ -402,9 +393,8 @@ dependencies = [ [[package]] name = "arrow-json" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d03b9340013413eb84868682ace00a1098c81a5ebc96d279f7ebf9a4cac3c0fd" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-array", "arrow-buffer", @@ -415,16 +405,17 @@ dependencies = [ "half", "indexmap 2.8.0", "lexical-core", + "memchr", "num", "serde", "serde_json", + "simdutf8", ] [[package]] name = "arrow-ord" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f841bfcc1997ef6ac48ee0305c4dfceb1f7c786fe31e67c1186edf775e1f1160" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-array", "arrow-buffer", @@ -435,9 +426,8 @@ dependencies = [ [[package]] name = "arrow-row" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1eeb55b0a0a83851aa01f2ca5ee5648f607e8506ba6802577afdda9d75cdedcd" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-array", "arrow-buffer", @@ -448,9 +438,8 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85934a9d0261e0fa5d4e2a5295107d743b543a6e0484a835d4b8db2da15306f9" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "bitflags 2.8.0", "serde", @@ -458,9 +447,8 @@ dependencies = [ [[package]] name = "arrow-select" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e2932aece2d0c869dd2125feb9bd1709ef5c445daa3838ac4112dcfa0fda52c" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -472,9 +460,8 @@ dependencies = [ [[package]] name = "arrow-string" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "912e38bd6a7a7714c1d9b61df80315685553b7455e8a6045c27531d8ecd5b458" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "arrow-array", "arrow-buffer", @@ -1361,9 +1348,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.39" +version = "0.4.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" +checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1371,7 +1358,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.52.6", + "windows-link", ] [[package]] @@ -4362,9 +4349,8 @@ dependencies = [ [[package]] name = "parquet" -version = "54.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f88838dca3b84d41444a0341b19f347e8098a3898b0f21536654b8b799e11abd" +version = "54.3.0" +source = "git+https://github.com/apache/arrow-rs?rev=660a3ac22a8ef8601acf4548d65146bc623f653a#660a3ac22a8ef8601acf4548d65146bc623f653a" dependencies = [ "ahash 0.8.11", "arrow-array", @@ -4394,7 +4380,6 @@ dependencies = [ "tokio", "twox-hash", "zstd", - "zstd-sys", ] [[package]] @@ -7124,6 +7109,12 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "windows-link" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" + [[package]] name = "windows-registry" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index d26446c111675..b747f16c83307 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,23 +87,23 @@ ahash = { version = "0.8", default-features = false, features = [ "runtime-rng", ] } apache-avro = { version = "0.17", default-features = false } -arrow = { version = "54.2.1", features = [ +arrow = { git = "https://github.com/apache/arrow-rs", rev = "660a3ac22a8ef8601acf4548d65146bc623f653a", features = [ "prettyprint", "chrono-tz", ] } -arrow-buffer = { version = "54.1.0", default-features = false } -arrow-flight = { version = "54.2.1", features = [ +arrow-buffer = { git = "https://github.com/apache/arrow-rs", rev = "660a3ac22a8ef8601acf4548d65146bc623f653a", default-features = false } +arrow-flight = { git = "https://github.com/apache/arrow-rs", rev = "660a3ac22a8ef8601acf4548d65146bc623f653a", features = [ "flight-sql-experimental", ] } -arrow-ipc = { version = "54.2.0", default-features = false, features = [ +arrow-ipc = { git = "https://github.com/apache/arrow-rs", rev = "660a3ac22a8ef8601acf4548d65146bc623f653a", default-features = false, features = [ "lz4", ] } -arrow-ord = { version = "54.1.0", default-features = false } -arrow-schema = { version = "54.1.0", default-features = false } +arrow-ord = { git = "https://github.com/apache/arrow-rs", rev = "660a3ac22a8ef8601acf4548d65146bc623f653a", default-features = false } +arrow-schema = { git = "https://github.com/apache/arrow-rs", rev = "660a3ac22a8ef8601acf4548d65146bc623f653a", default-features = false } async-trait = "0.1.88" bigdecimal = "0.4.7" bytes = "1.10" -chrono = { version = "0.4.38", default-features = false } +chrono = { version = "0.4.40", default-features = false } criterion = "0.5.1" ctor = "0.2.9" dashmap = "6.0.1" @@ -149,7 +149,7 @@ itertools = "0.14" log = "^0.4" object_store = { version = "0.11.0", default-features = false } parking_lot = "0.12" -parquet = { version = "54.2.1", default-features = false, features = [ +parquet = { git = "https://github.com/apache/arrow-rs", rev = "660a3ac22a8ef8601acf4548d65146bc623f653a", default-features = false, features = [ "arrow", "async", "object_store", From ad32c9fc3062280db80ba33f77f6d7e6db627b83 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 31 Mar 2025 12:21:30 -0400 Subject: [PATCH 05/16] Fix after merging main. --- datafusion/datasource-parquet/src/file_format.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 9fb2ef722ec65..ae49ce532150e 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -36,7 +36,6 @@ use datafusion_datasource::write::demux::DemuxedStreamReceiver; use arrow::compute::sum; use arrow::datatypes::{DataType, Field, FieldRef}; -use arrow::datatypes::{Fields, Schema, SchemaRef}; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; @@ -48,15 +47,8 @@ use datafusion_common::{HashMap, Statistics}; use datafusion_common_runtime::{JoinSet, SpawnedTask}; use datafusion_datasource::display::FileGroupDisplay; use datafusion_datasource::file::FileSource; -use datafusion_datasource::file_compression_type::FileCompressionType; -use datafusion_datasource::file_format::{ - FileFormat, FileFormatFactory, FilePushdownSupport, -}; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; -use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; -use datafusion_datasource::write::demux::DemuxedStreamReceiver; -use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; From 4bfdc92943858b7e3632224d39c9decbffc64efe Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 1 Apr 2025 11:50:13 -0400 Subject: [PATCH 06/16] Add test for int96_from_spark. --- .../core/src/datasource/file_format/csv.rs | 4 +- .../core/src/datasource/file_format/json.rs | 2 +- .../core/src/datasource/file_format/mod.rs | 15 +++-- .../src/datasource/file_format/parquet.rs | 5 +- .../src/datasource/physical_plan/parquet.rs | 59 ++++++++++++++++++- 5 files changed, 76 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 309458975ab6c..e723c90bb70a5 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -249,6 +249,7 @@ mod tests { let exec = scan_format( &state, &format, + None, root, "aggregate_test_100_with_nulls.csv", projection, @@ -299,6 +300,7 @@ mod tests { let exec = scan_format( &state, &format, + None, root, "aggregate_test_100_with_nulls.csv", projection, @@ -581,7 +583,7 @@ mod tests { ) -> Result> { let root = format!("{}/csv", arrow_test_data()); let format = CsvFormat::default().with_has_header(has_header); - scan_format(state, &format, &root, file_name, projection, limit).await + scan_format(state, &format, None, &root, file_name, projection, limit).await } #[tokio::test] diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index d533dcf7646da..a70a0f51d3307 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -149,7 +149,7 @@ mod tests { ) -> Result> { let filename = "tests/data/2.json"; let format = JsonFormat::default(); - scan_format(state, &format, ".", filename, projection, limit).await + scan_format(state, &format, None, ".", filename, projection, limit).await } #[tokio::test] diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index e921f0158e540..0a568987ac415 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -36,19 +36,20 @@ pub use datafusion_datasource::write; #[cfg(test)] pub(crate) mod test_util { - use std::sync::Arc; - + use arrow_schema::SchemaRef; use datafusion_catalog::Session; use datafusion_common::Result; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::{file_format::FileFormat, PartitionedFile}; use datafusion_execution::object_store::ObjectStoreUrl; + use std::sync::Arc; use crate::test::object_store::local_unpartitioned_file; pub async fn scan_format( state: &dyn Session, format: &dyn FileFormat, + schema: Option, store_root: &str, file_name: &str, projection: Option>, @@ -57,9 +58,13 @@ pub(crate) mod test_util { let store = Arc::new(object_store::local::LocalFileSystem::new()) as _; let meta = local_unpartitioned_file(format!("{store_root}/{file_name}")); - let file_schema = format - .infer_schema(state, &store, std::slice::from_ref(&meta)) - .await?; + let file_schema = if schema.is_some() { + schema.unwrap() + } else { + format + .infer_schema(state, &store, std::slice::from_ref(&meta)) + .await? + }; let statistics = format .infer_stats(state, &store, file_schema.clone(), &meta) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 27a7e7ae3c061..048374c0f7da4 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1075,7 +1075,10 @@ mod tests { .map(|factory| factory.create(state, &Default::default()).unwrap()) .unwrap_or(Arc::new(ParquetFormat::new())); - scan_format(state, &*format, &testdata, file_name, projection, limit).await + scan_format( + state, &*format, None, &testdata, file_name, projection, limit, + ) + .await } /// Test that 0-byte files don't break while reading diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 9e1b2822e8540..a252a9c30fd9f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -38,7 +38,7 @@ mod tests { use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; use arrow::array::{ - ArrayRef, Date64Array, Int32Array, Int64Array, Int8Array, StringArray, + ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray, StructArray, }; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder}; @@ -1069,6 +1069,7 @@ mod tests { let parquet_exec = scan_format( &state, &ParquetFormat::default(), + None, &testdata, filename, Some(vec![0, 1, 2]), @@ -1101,6 +1102,62 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parquet_exec_with_int96_from_spark() -> Result<()> { + // arrow-rs relies on the chrono library to convert between timestamps and strings, so + // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64 + // anyway, so this should be a zero-copy non-modifying cast at the SchemaAdapter. + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])); + let testdata = datafusion_common::test_util::parquet_test_data(); + let filename = "int96_from_spark.parquet"; + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let parquet_exec = scan_format( + &state, + &ParquetFormat::default().with_coerce_int96(Some("us".to_string())), + Some(schema), + &testdata, + filename, + Some(vec![0]), + None, + ) + .await + .unwrap(); + assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); + + let mut results = parquet_exec.execute(0, task_ctx)?; + let batch = results.next().await.unwrap()?; + + assert_eq!(6, batch.num_rows()); + assert_eq!(1, batch.num_columns()); + + assert_eq!(batch.num_columns(), 1); + let column = batch.column(0); + + let expected = Arc::new(Int64Array::from(vec![ + Some(1704141296123456), + Some(1704070800000000), + Some(253402225200000000), + Some(1735599600000000), + None, + Some(9089380393200000000), + ])); + + assert_eq!(column.len(), expected.len()); + + column + .as_primitive::() + .iter() + .zip(expected.iter()) + .for_each(|(lhs, rhs)| { + assert_eq!(lhs, rhs); + }); + + Ok(()) + } + #[tokio::test] async fn parquet_exec_with_range() -> Result<()> { fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile { From f080f83a7b13c3fe1d96b2541a9c3255ad11fe9d Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 1 Apr 2025 12:00:32 -0400 Subject: [PATCH 07/16] Remove commented out code. --- datafusion/datasource-parquet/src/file_format.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index ae49ce532150e..f68366a26fc2a 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -611,16 +611,6 @@ pub fn coerce_int96_to_resolution( .map(|field| match parquet_fields.get(field.name().as_str()) { Some(Type::INT96) => { field_with_new_type(field, DataType::Timestamp(*time_unit, None)) - - // match field.data_type() { - // DataType::Timestamp(TimeUnit::Nanosecond, None) => { - // field_with_new_type(field, DataType::Timestamp(*time_unit,None)) - // } - // DataType::Timestamp(TimeUnit::Nanosecond, Some(tz)) => { - // field_with_new_type(field, DataType::Timestamp(*time_unit,Some(tz.clone()))) - // } - // _ => unreachable!() - // } } _ => Arc::clone(field), }) From 69ed7d464cc9915ba979d97c4a45a3c78cab4f9b Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 3 Apr 2025 12:04:40 -0400 Subject: [PATCH 08/16] Update parquet-testing to include int96_from_spark.parquet. --- parquet-testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-testing b/parquet-testing index f4d7ed772a62a..6e851ddd768d6 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit f4d7ed772a62a95111db50fbcad2460833e8c882 +Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff From b1a31ba06b69a04f3a8c3a3e60923ad3705716a9 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 8 Apr 2025 15:35:46 -0400 Subject: [PATCH 09/16] Add more test scenarios. Need to understand why ms and s look wrong for last value. --- .../core/src/datasource/file_format/avro.rs | 11 +- .../src/datasource/physical_plan/parquet.rs | 120 +++++++++++++----- 2 files changed, 96 insertions(+), 35 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index a9516aad9e22d..3428d08a6ae52 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -382,6 +382,15 @@ mod tests { let testdata = test_util::arrow_test_data(); let store_root = format!("{testdata}/avro"); let format = AvroFormat {}; - scan_format(state, &format, &store_root, file_name, projection, limit).await + scan_format( + state, + &format, + None, + &store_root, + file_name, + projection, + limit, + ) + .await } } diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index a252a9c30fd9f..e10bf54f3b93e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1114,46 +1114,98 @@ mod tests { let session_ctx = SessionContext::new(); let state = session_ctx.state(); let task_ctx = state.task_ctx(); - let parquet_exec = scan_format( - &state, - &ParquetFormat::default().with_coerce_int96(Some("us".to_string())), - Some(schema), - &testdata, - filename, - Some(vec![0]), - None, - ) - .await - .unwrap(); - assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); - let mut results = parquet_exec.execute(0, task_ctx)?; - let batch = results.next().await.unwrap()?; + let time_units_and_expected = vec![ + ( + None, // Same as "ns" time_unit + Arc::new(Int64Array::from(vec![ + Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s) + Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s) + Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999) + Some(1735599600000000000), // Reads as nanosecond fine (note 3 extra 0s) + None, + Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000) + ])), + ), + ( + Some("ns".to_string()), + Arc::new(Int64Array::from(vec![ + Some(1704141296123456000), + Some(1704070800000000000), + Some(-4852191831933722624), + Some(1735599600000000000), + None, + Some(-4864435138808946688), + ])), + ), + ( + Some("us".to_string()), + Arc::new(Int64Array::from(vec![ + Some(1704141296123456), + Some(1704070800000000), + Some(253402225200000000), + Some(1735599600000000), + None, + Some(9089380393200000000), + ])), + ), + ( + Some("ms".to_string()), + Arc::new(Int64Array::from(vec![ + Some(1704141296123), + Some(1704070800000), + Some(253402225200000), + Some(1735599600000), + None, + Some(-9357363680509551), //TODO: This one looks wrong + ])), + ), + ( + Some("s".to_string()), + Arc::new(Int64Array::from(vec![ + Some(1704141296), + Some(1704070800), + Some(253402225200), + Some(1735599600), + None, + Some(-9357363680509), //TODO: This one looks wrong + ])), + ), + ]; + + for (time_unit, expected) in time_units_and_expected { + let parquet_exec = scan_format( + &state, + &ParquetFormat::default().with_coerce_int96(time_unit.clone()), + Some(schema.clone()), + &testdata, + filename, + Some(vec![0]), + None, + ) + .await + .unwrap(); + assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); - assert_eq!(6, batch.num_rows()); - assert_eq!(1, batch.num_columns()); + let mut results = parquet_exec.execute(0, task_ctx.clone())?; + let batch = results.next().await.unwrap()?; - assert_eq!(batch.num_columns(), 1); - let column = batch.column(0); + assert_eq!(6, batch.num_rows()); + assert_eq!(1, batch.num_columns()); - let expected = Arc::new(Int64Array::from(vec![ - Some(1704141296123456), - Some(1704070800000000), - Some(253402225200000000), - Some(1735599600000000), - None, - Some(9089380393200000000), - ])); + assert_eq!(batch.num_columns(), 1); + let column = batch.column(0); - assert_eq!(column.len(), expected.len()); + assert_eq!(column.len(), expected.len()); - column - .as_primitive::() - .iter() - .zip(expected.iter()) - .for_each(|(lhs, rhs)| { - assert_eq!(lhs, rhs); - }); + column + .as_primitive::() + .iter() + .zip(expected.iter()) + .for_each(|(lhs, rhs)| { + assert_eq!(lhs, rhs); + }); + } Ok(()) } From 8f372a195c5ee4206533084881bf60b3619c4287 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 8 Apr 2025 15:52:47 -0400 Subject: [PATCH 10/16] Adjust test scenarios. --- .../src/datasource/physical_plan/parquet.rs | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index e10bf54f3b93e..324b4b8cf8d84 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1149,28 +1149,6 @@ mod tests { Some(9089380393200000000), ])), ), - ( - Some("ms".to_string()), - Arc::new(Int64Array::from(vec![ - Some(1704141296123), - Some(1704070800000), - Some(253402225200000), - Some(1735599600000), - None, - Some(-9357363680509551), //TODO: This one looks wrong - ])), - ), - ( - Some("s".to_string()), - Arc::new(Int64Array::from(vec![ - Some(1704141296), - Some(1704070800), - Some(253402225200), - Some(1735599600), - None, - Some(-9357363680509), //TODO: This one looks wrong - ])), - ), ]; for (time_unit, expected) in time_units_and_expected { From 796c5f18e4a21efea82900be4fc79b0b3a4f7248 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 14 Apr 2025 12:33:49 -0400 Subject: [PATCH 11/16] Fix sqlllogic tests. --- datafusion/common/src/config.rs | 6 ++++++ datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ 2 files changed, 8 insertions(+) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9e32817dda6a9..f64e60952f046 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -459,6 +459,12 @@ config_namespace! { /// BLOB instead. pub binary_as_string: bool, default = false + /// (reading) If true, parquet reader will read columns of + /// physical type int96 as originating from a different resolution + /// than nanosecond. This is useful for systems like Spark + /// which stores its 64-bit timestamps as microsecond resolution, + /// so it can write values with a larger date range than 64-bit + /// timestamps with nanosecond resolution. pub coerce_int96: Option, transform = str::to_lowercase, default = None // The following options affect writing to parquet files diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 496f24abf6ed7..4a145197a647b 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -197,6 +197,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false +datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) datafusion.execution.parquet.created_by datafusion @@ -296,6 +297,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (writing) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files +datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for systems like Spark which stores its 64-bit timestamps as microsecond resolution, so it can write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. datafusion.execution.parquet.created_by datafusion (writing) Sets "created by" property From f9227c9c7aae40077461c6b79d9a3b83744cbb87 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 14 Apr 2025 12:37:05 -0400 Subject: [PATCH 12/16] Update docs. --- docs/source/user-guide/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 68e21183938b1..1b9148091f473 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -75,6 +75,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.encoding | NULL | (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.bloom_filter_on_read | true | (writing) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.bloom_filter_on_write | false | (writing) Write bloom filters for all columns when creating parquet files | +| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for systems like Spark which stores its 64-bit timestamps as microsecond resolution, so it can write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_fpp | NULL | (writing) Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.bloom_filter_ndv | NULL | (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | From e8e0324030e2bd178ef9db6053fd74cb03899245 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 14 Apr 2025 12:41:30 -0400 Subject: [PATCH 13/16] clippy --- datafusion/core/src/datasource/file_format/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index fceb2ab257a02..3a098301f14e3 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -58,8 +58,8 @@ pub(crate) mod test_util { let store = Arc::new(object_store::local::LocalFileSystem::new()) as _; let meta = local_unpartitioned_file(format!("{store_root}/{file_name}")); - let file_schema = if schema.is_some() { - schema.unwrap() + let file_schema = if let Some(file_schema) = schema { + file_schema } else { format .infer_schema(state, &store, std::slice::from_ref(&meta)) From 4a48ff9d75d3f27f9adc1903499b267bbc1c54a3 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 14 Apr 2025 13:16:08 -0400 Subject: [PATCH 14/16] Fix docs. --- docs/source/user-guide/configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 1b9148091f473..a9916583ace64 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -58,6 +58,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | +| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for systems like Spark which stores its 64-bit timestamps as microsecond resolution, so it can write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | @@ -75,7 +76,6 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.encoding | NULL | (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.bloom_filter_on_read | true | (writing) Use any available bloom filters when reading parquet files | | datafusion.execution.parquet.bloom_filter_on_write | false | (writing) Write bloom filters for all columns when creating parquet files | -| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for systems like Spark which stores its 64-bit timestamps as microsecond resolution, so it can write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_fpp | NULL | (writing) Sets bloom filter false positive probability. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.bloom_filter_ndv | NULL | (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | From b75a96094e113538fb638a5e96c5556266287b93 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 14 Apr 2025 15:07:07 -0400 Subject: [PATCH 15/16] Adjust config wording based on PR feedback. --- datafusion/common/src/config.rs | 8 ++++---- datafusion/sqllogictest/test_files/information_schema.slt | 2 +- docs/source/user-guide/configs.md | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index f64e60952f046..1c746a4e98405 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -461,10 +461,10 @@ config_namespace! { /// (reading) If true, parquet reader will read columns of /// physical type int96 as originating from a different resolution - /// than nanosecond. This is useful for systems like Spark - /// which stores its 64-bit timestamps as microsecond resolution, - /// so it can write values with a larger date range than 64-bit - /// timestamps with nanosecond resolution. + /// than nanosecond. This is useful for reading data from systems like Spark + /// which stores microsecond resolution timestamps in an int96 allowing it + /// to write values with a larger date range than 64-bit timestamps with + /// nanosecond resolution. pub coerce_int96: Option, transform = str::to_lowercase, default = None // The following options affect writing to parquet files diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 4a145197a647b..efbafe369467a 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -297,7 +297,7 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (writing) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files -datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for systems like Spark which stores its 64-bit timestamps as microsecond resolution, so it can write values with a larger date range than 64-bit timestamps with nanosecond resolution. +datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. datafusion.execution.parquet.created_by datafusion (writing) Sets "created by" property diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index a9916583ace64..a90da66e4b0b7 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -58,7 +58,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | | datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | -| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for systems like Spark which stores its 64-bit timestamps as microsecond resolution, so it can write values with a larger date range than 64-bit timestamps with nanosecond resolution. | +| datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | From 83a139752d26f962dd51efcda04cc76c01d8e9e3 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 14 Apr 2025 15:52:19 -0400 Subject: [PATCH 16/16] Update comment. --- datafusion/datasource-parquet/src/file_format.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index 0d8af556c366d..2ef4f236f2787 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -582,7 +582,7 @@ pub fn apply_file_schema_type_coercions( )) } -/// Coerces the file schema if the table schema uses a view type. +/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96. pub fn coerce_int96_to_resolution( parquet_schema: &SchemaDescriptor, file_schema: &Schema,