diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 9b6e6aa5dac37..1312d57016a5b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -24,7 +24,7 @@ use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; use crate::format::{ExplainAnalyzeLevel, ExplainFormat}; use crate::parquet_config::DFParquetWriterVersion; -use crate::parsers::CompressionTypeVariant; +use crate::parsers::{CompressionTypeVariant, CsvQuoteStyle}; use crate::utils::get_available_parallelism; use crate::{DataFusionError, Result}; #[cfg(feature = "parquet_encryption")] @@ -1855,6 +1855,17 @@ impl ConfigField for CompressionTypeVariant { } } +impl ConfigField for CsvQuoteStyle { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = CsvQuoteStyle::from_str(value)?; + Ok(()) + } +} + /// An implementation trait used to recursively walk configuration pub trait Visit { fn some(&mut self, key: &str, value: V, description: &'static str); @@ -2927,6 +2938,15 @@ config_namespace! { pub terminator: Option, default = None pub escape: Option, default = None pub double_quote: Option, default = None + /// Quote style for CSV writing. + /// One of: "Always", "Necessary", "NonNumeric", "Never" + pub quote_style: CsvQuoteStyle, default = CsvQuoteStyle::Necessary + /// Whether to ignore leading whitespace in string values when writing CSV. + /// Defaults to `false` when `None`. + pub ignore_leading_whitespace: Option, default = None + /// Whether to ignore trailing whitespace in string values when writing CSV. + /// Defaults to `false` when `None`. + pub ignore_trailing_whitespace: Option, default = None /// Specifies whether newlines in (quoted) values are supported. /// /// Parsing newlines in quoted values may be affected by execution behaviour such as @@ -3035,6 +3055,30 @@ impl CsvOptions { self } + /// Set the quote style for CSV writing. + pub fn with_quote_style(mut self, quote_style: CsvQuoteStyle) -> Self { + self.quote_style = quote_style; + self + } + + /// Set whether to ignore leading whitespace in string values when writing CSV. + pub fn with_ignore_leading_whitespace( + mut self, + ignore_leading_whitespace: bool, + ) -> Self { + self.ignore_leading_whitespace = Some(ignore_leading_whitespace); + self + } + + /// Set whether to ignore trailing whitespace in string values when writing CSV. + pub fn with_ignore_trailing_whitespace( + mut self, + ignore_trailing_whitespace: bool, + ) -> Self { + self.ignore_trailing_whitespace = Some(ignore_trailing_whitespace); + self + } + /// Specifies whether newlines in (quoted) values are supported. /// /// Parsing newlines in quoted values may be affected by execution behaviour such as diff --git a/datafusion/common/src/file_options/csv_writer.rs b/datafusion/common/src/file_options/csv_writer.rs index 4e6f74a4448af..fa116d17277cc 100644 --- a/datafusion/common/src/file_options/csv_writer.rs +++ b/datafusion/common/src/file_options/csv_writer.rs @@ -94,6 +94,13 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions { if let Some(v) = &value.double_quote { builder = builder.with_double_quote(*v) } + builder = builder.with_quote_style(value.quote_style.into()); + if let Some(v) = &value.ignore_leading_whitespace { + builder = builder.with_ignore_leading_whitespace(*v) + } + if let Some(v) = &value.ignore_trailing_whitespace { + builder = builder.with_ignore_trailing_whitespace(*v) + } Ok(CsvWriterOptions { writer_options: builder, compression: value.compression, diff --git a/datafusion/common/src/parsers.rs b/datafusion/common/src/parsers.rs index cd3d607dacd88..6b930d110f47b 100644 --- a/datafusion/common/src/parsers.rs +++ b/datafusion/common/src/parsers.rs @@ -73,3 +73,59 @@ impl CompressionTypeVariant { !matches!(self, &Self::UNCOMPRESSED) } } + +/// CSV quote style +/// +/// Controls when fields are quoted when writing CSV files. +/// Corresponds to [`arrow::csv::QuoteStyle`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +pub enum CsvQuoteStyle { + /// Quote all fields + Always, + /// Only quote fields when necessary (default) + #[default] + Necessary, + /// Quote all non-numeric fields + NonNumeric, + /// Never quote fields + Never, +} + +impl FromStr for CsvQuoteStyle { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "always" => Ok(Self::Always), + "necessary" => Ok(Self::Necessary), + "non_numeric" | "nonnumeric" => Ok(Self::NonNumeric), + "never" => Ok(Self::Never), + _ => Err(DataFusionError::NotImplemented(format!( + "Unsupported CSV quote style {s}" + ))), + } + } +} + +impl From for arrow::csv::QuoteStyle { + fn from(style: CsvQuoteStyle) -> Self { + match style { + CsvQuoteStyle::Always => Self::Always, + CsvQuoteStyle::NonNumeric => Self::NonNumeric, + CsvQuoteStyle::Never => Self::Never, + CsvQuoteStyle::Necessary => Self::Necessary, + } + } +} + +impl Display for CsvQuoteStyle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let str = match self { + Self::Always => "Always", + Self::Necessary => "Necessary", + Self::NonNumeric => "NonNumeric", + Self::Never => "Never", + }; + write!(f, "{str}") + } +} diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 62c6bbe85612a..2ddfe156a9b12 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -430,6 +430,13 @@ message JsonWriterOptions { } +enum CsvQuoteStyle { + NECESSARY = 0; + ALWAYS = 1; + NON_NUMERIC = 2; + NEVER = 3; +} + message CsvWriterOptions { // Compression type CompressionTypeVariant compression = 1; @@ -453,6 +460,12 @@ message CsvWriterOptions { string escape = 10; // Optional flag whether to double quotes, instead of escaping. Defaults to `true` bool double_quote = 11; + // Quote style for CSV writing + CsvQuoteStyle quote_style = 12; + // Whether to ignore leading whitespace in string values + bool ignore_leading_whitespace = 13; + // Whether to ignore trailing whitespace in string values + bool ignore_trailing_whitespace = 14; } // Options controlling CSV format @@ -476,6 +489,12 @@ message CsvOptions { bytes terminator = 17; // Optional terminator character as a byte bytes truncated_rows = 18; // Indicates if truncated rows are allowed optional uint32 compression_level = 19; // Optional compression level + // Quote style for CSV writing + CsvQuoteStyle quote_style = 20; + // Whether to ignore leading whitespace in string values + bytes ignore_leading_whitespace = 21; + // Whether to ignore trailing whitespace in string values + bytes ignore_trailing_whitespace = 22; } // Options controlling CSV format diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..08ed7eeecbaf9 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -23,7 +23,7 @@ use crate::common::proto_error; use crate::protobuf_common as protobuf; use arrow::array::{ArrayRef, AsArray}; use arrow::buffer::Buffer; -use arrow::csv::WriterBuilder; +use arrow::csv::{QuoteStyle, WriterBuilder}; use arrow::datatypes::{ DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema, TimeUnit, UnionFields, UnionMode, i256, @@ -947,6 +947,17 @@ impl From for protobuf::CompressionTypeVariant { } } +impl From for datafusion_common::parsers::CsvQuoteStyle { + fn from(value: protobuf::CsvQuoteStyle) -> Self { + match value { + protobuf::CsvQuoteStyle::Necessary => Self::Necessary, + protobuf::CsvQuoteStyle::Always => Self::Always, + protobuf::CsvQuoteStyle::NonNumeric => Self::NonNumeric, + protobuf::CsvQuoteStyle::Never => Self::Never, + } + } +} + impl TryFrom<&protobuf::CsvWriterOptions> for CsvWriterOptions { type Error = DataFusionError; @@ -1003,6 +1014,15 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { .then(|| proto_opts.null_regex.clone()), comment: proto_opts.comment.first().copied(), truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0), + quote_style: proto_opts.quote_style().into(), + ignore_leading_whitespace: proto_opts + .ignore_leading_whitespace + .first() + .map(|h| *h != 0), + ignore_trailing_whitespace: proto_opts + .ignore_trailing_whitespace + .first() + .map(|h| *h != 0), }) } } @@ -1253,6 +1273,16 @@ pub(crate) fn csv_writer_options_from_proto( return Err(proto_error("Error parsing CSV Escape")); } } + let quote_style = match protobuf::CsvQuoteStyle::try_from(writer_options.quote_style) + { + Ok(protobuf::CsvQuoteStyle::Always) => QuoteStyle::Always, + Ok(protobuf::CsvQuoteStyle::NonNumeric) => QuoteStyle::NonNumeric, + Ok(protobuf::CsvQuoteStyle::Never) => QuoteStyle::Never, + Ok(protobuf::CsvQuoteStyle::Necessary) => QuoteStyle::Necessary, + _ => Err(proto_error( + "Unknown quote style, must be one of: 'Always', 'NonNumeric', 'Never', 'Necessary'", + ))?, + }; Ok(builder .with_header(writer_options.has_header) .with_date_format(writer_options.date_format.clone()) @@ -1260,5 +1290,8 @@ pub(crate) fn csv_writer_options_from_proto( .with_timestamp_format(writer_options.timestamp_format.clone()) .with_time_format(writer_options.time_format.clone()) .with_null(writer_options.null_value.clone()) - .with_double_quote(writer_options.double_quote)) + .with_double_quote(writer_options.double_quote) + .with_quote_style(quote_style) + .with_ignore_leading_whitespace(writer_options.ignore_leading_whitespace) + .with_ignore_trailing_whitespace(writer_options.ignore_trailing_whitespace)) } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b00e7546bba20..33f679edafcd6 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1701,6 +1701,15 @@ impl serde::Serialize for CsvOptions { if self.compression_level.is_some() { len += 1; } + if self.quote_style != 0 { + len += 1; + } + if !self.ignore_leading_whitespace.is_empty() { + len += 1; + } + if !self.ignore_trailing_whitespace.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] @@ -1781,6 +1790,21 @@ impl serde::Serialize for CsvOptions { if let Some(v) = self.compression_level.as_ref() { struct_ser.serialize_field("compressionLevel", v)?; } + if self.quote_style != 0 { + let v = CsvQuoteStyle::try_from(self.quote_style) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.quote_style)))?; + struct_ser.serialize_field("quoteStyle", &v)?; + } + if !self.ignore_leading_whitespace.is_empty() { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("ignoreLeadingWhitespace", pbjson::private::base64::encode(&self.ignore_leading_whitespace).as_str())?; + } + if !self.ignore_trailing_whitespace.is_empty() { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("ignoreTrailingWhitespace", pbjson::private::base64::encode(&self.ignore_trailing_whitespace).as_str())?; + } struct_ser.end() } } @@ -1823,6 +1847,12 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "truncatedRows", "compression_level", "compressionLevel", + "quote_style", + "quoteStyle", + "ignore_leading_whitespace", + "ignoreLeadingWhitespace", + "ignore_trailing_whitespace", + "ignoreTrailingWhitespace", ]; #[allow(clippy::enum_variant_names)] @@ -1846,6 +1876,9 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Terminator, TruncatedRows, CompressionLevel, + QuoteStyle, + IgnoreLeadingWhitespace, + IgnoreTrailingWhitespace, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1886,6 +1919,9 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "terminator" => Ok(GeneratedField::Terminator), "truncatedRows" | "truncated_rows" => Ok(GeneratedField::TruncatedRows), "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), + "quoteStyle" | "quote_style" => Ok(GeneratedField::QuoteStyle), + "ignoreLeadingWhitespace" | "ignore_leading_whitespace" => Ok(GeneratedField::IgnoreLeadingWhitespace), + "ignoreTrailingWhitespace" | "ignore_trailing_whitespace" => Ok(GeneratedField::IgnoreTrailingWhitespace), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1924,6 +1960,9 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut terminator__ = None; let mut truncated_rows__ = None; let mut compression_level__ = None; + let mut quote_style__ = None; + let mut ignore_leading_whitespace__ = None; + let mut ignore_trailing_whitespace__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -2062,6 +2101,28 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::QuoteStyle => { + if quote_style__.is_some() { + return Err(serde::de::Error::duplicate_field("quoteStyle")); + } + quote_style__ = Some(map_.next_value::()? as i32); + } + GeneratedField::IgnoreLeadingWhitespace => { + if ignore_leading_whitespace__.is_some() { + return Err(serde::de::Error::duplicate_field("ignoreLeadingWhitespace")); + } + ignore_leading_whitespace__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } + GeneratedField::IgnoreTrailingWhitespace => { + if ignore_trailing_whitespace__.is_some() { + return Err(serde::de::Error::duplicate_field("ignoreTrailingWhitespace")); + } + ignore_trailing_whitespace__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } } } Ok(CsvOptions { @@ -2084,12 +2145,92 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { terminator: terminator__.unwrap_or_default(), truncated_rows: truncated_rows__.unwrap_or_default(), compression_level: compression_level__, + quote_style: quote_style__.unwrap_or_default(), + ignore_leading_whitespace: ignore_leading_whitespace__.unwrap_or_default(), + ignore_trailing_whitespace: ignore_trailing_whitespace__.unwrap_or_default(), }) } } deserializer.deserialize_struct("datafusion_common.CsvOptions", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for CsvQuoteStyle { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let variant = match self { + Self::Necessary => "NECESSARY", + Self::Always => "ALWAYS", + Self::NonNumeric => "NON_NUMERIC", + Self::Never => "NEVER", + }; + serializer.serialize_str(variant) + } +} +impl<'de> serde::Deserialize<'de> for CsvQuoteStyle { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "NECESSARY", + "ALWAYS", + "NON_NUMERIC", + "NEVER", + ]; + + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = CsvQuoteStyle; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + fn visit_i64(self, v: i64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self) + }) + } + + fn visit_u64(self, v: u64) -> std::result::Result + where + E: serde::de::Error, + { + i32::try_from(v) + .ok() + .and_then(|x| x.try_into().ok()) + .ok_or_else(|| { + serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self) + }) + } + + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "NECESSARY" => Ok(CsvQuoteStyle::Necessary), + "ALWAYS" => Ok(CsvQuoteStyle::Always), + "NON_NUMERIC" => Ok(CsvQuoteStyle::NonNumeric), + "NEVER" => Ok(CsvQuoteStyle::Never), + _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), + } + } + } + deserializer.deserialize_any(GeneratedVisitor) + } +} impl serde::Serialize for CsvWriterOptions { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -2131,6 +2272,15 @@ impl serde::Serialize for CsvWriterOptions { if self.double_quote { len += 1; } + if self.quote_style != 0 { + len += 1; + } + if self.ignore_leading_whitespace { + len += 1; + } + if self.ignore_trailing_whitespace { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvWriterOptions", len)?; if self.compression != 0 { let v = CompressionTypeVariant::try_from(self.compression) @@ -2167,6 +2317,17 @@ impl serde::Serialize for CsvWriterOptions { if self.double_quote { struct_ser.serialize_field("doubleQuote", &self.double_quote)?; } + if self.quote_style != 0 { + let v = CsvQuoteStyle::try_from(self.quote_style) + .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.quote_style)))?; + struct_ser.serialize_field("quoteStyle", &v)?; + } + if self.ignore_leading_whitespace { + struct_ser.serialize_field("ignoreLeadingWhitespace", &self.ignore_leading_whitespace)?; + } + if self.ignore_trailing_whitespace { + struct_ser.serialize_field("ignoreTrailingWhitespace", &self.ignore_trailing_whitespace)?; + } struct_ser.end() } } @@ -2195,6 +2356,12 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { "escape", "double_quote", "doubleQuote", + "quote_style", + "quoteStyle", + "ignore_leading_whitespace", + "ignoreLeadingWhitespace", + "ignore_trailing_whitespace", + "ignoreTrailingWhitespace", ]; #[allow(clippy::enum_variant_names)] @@ -2210,6 +2377,9 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { Quote, Escape, DoubleQuote, + QuoteStyle, + IgnoreLeadingWhitespace, + IgnoreTrailingWhitespace, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -2242,6 +2412,9 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { "quote" => Ok(GeneratedField::Quote), "escape" => Ok(GeneratedField::Escape), "doubleQuote" | "double_quote" => Ok(GeneratedField::DoubleQuote), + "quoteStyle" | "quote_style" => Ok(GeneratedField::QuoteStyle), + "ignoreLeadingWhitespace" | "ignore_leading_whitespace" => Ok(GeneratedField::IgnoreLeadingWhitespace), + "ignoreTrailingWhitespace" | "ignore_trailing_whitespace" => Ok(GeneratedField::IgnoreTrailingWhitespace), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -2272,6 +2445,9 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { let mut quote__ = None; let mut escape__ = None; let mut double_quote__ = None; + let mut quote_style__ = None; + let mut ignore_leading_whitespace__ = None; + let mut ignore_trailing_whitespace__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Compression => { @@ -2340,6 +2516,24 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { } double_quote__ = Some(map_.next_value()?); } + GeneratedField::QuoteStyle => { + if quote_style__.is_some() { + return Err(serde::de::Error::duplicate_field("quoteStyle")); + } + quote_style__ = Some(map_.next_value::()? as i32); + } + GeneratedField::IgnoreLeadingWhitespace => { + if ignore_leading_whitespace__.is_some() { + return Err(serde::de::Error::duplicate_field("ignoreLeadingWhitespace")); + } + ignore_leading_whitespace__ = Some(map_.next_value()?); + } + GeneratedField::IgnoreTrailingWhitespace => { + if ignore_trailing_whitespace__.is_some() { + return Err(serde::de::Error::duplicate_field("ignoreTrailingWhitespace")); + } + ignore_trailing_whitespace__ = Some(map_.next_value()?); + } } } Ok(CsvWriterOptions { @@ -2354,6 +2548,9 @@ impl<'de> serde::Deserialize<'de> for CsvWriterOptions { quote: quote__.unwrap_or_default(), escape: escape__.unwrap_or_default(), double_quote: double_quote__.unwrap_or_default(), + quote_style: quote_style__.unwrap_or_default(), + ignore_leading_whitespace: ignore_leading_whitespace__.unwrap_or_default(), + ignore_trailing_whitespace: ignore_trailing_whitespace__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a09826a29be52..1da9a4238ba3c 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -611,6 +611,15 @@ pub struct CsvWriterOptions { /// Optional flag whether to double quotes, instead of escaping. Defaults to `true` #[prost(bool, tag = "11")] pub double_quote: bool, + /// Quote style for CSV writing + #[prost(enumeration = "CsvQuoteStyle", tag = "12")] + pub quote_style: i32, + /// Whether to ignore leading whitespace in string values + #[prost(bool, tag = "13")] + pub ignore_leading_whitespace: bool, + /// Whether to ignore trailing whitespace in string values + #[prost(bool, tag = "14")] + pub ignore_trailing_whitespace: bool, } /// Options controlling CSV format #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -672,6 +681,15 @@ pub struct CsvOptions { /// Optional compression level #[prost(uint32, optional, tag = "19")] pub compression_level: ::core::option::Option, + /// Quote style for CSV writing + #[prost(enumeration = "CsvQuoteStyle", tag = "20")] + pub quote_style: i32, + /// Whether to ignore leading whitespace in string values + #[prost(bytes = "vec", tag = "21")] + pub ignore_leading_whitespace: ::prost::alloc::vec::Vec, + /// Whether to ignore trailing whitespace in string values + #[prost(bytes = "vec", tag = "22")] + pub ignore_trailing_whitespace: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] @@ -1188,6 +1206,38 @@ impl CompressionTypeVariant { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] +pub enum CsvQuoteStyle { + Necessary = 0, + Always = 1, + NonNumeric = 2, + Never = 3, +} +impl CsvQuoteStyle { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Necessary => "NECESSARY", + Self::Always => "ALWAYS", + Self::NonNumeric => "NON_NUMERIC", + Self::Never => "NEVER", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "NECESSARY" => Some(Self::Necessary), + "ALWAYS" => Some(Self::Always), + "NON_NUMERIC" => Some(Self::NonNumeric), + "NEVER" => Some(Self::Never), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] pub enum JoinSide { LeftSide = 0, RightSide = 1, diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..3e78d28ee1727 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -23,7 +23,7 @@ use crate::protobuf_common::{ EmptyMessage, arrow_type::ArrowTypeEnum, scalar_value::Value, }; use arrow::array::{ArrayRef, RecordBatch}; -use arrow::csv::WriterBuilder; +use arrow::csv::{QuoteStyle, WriterBuilder}; use arrow::datatypes::{ DataType, Field, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode, @@ -31,6 +31,7 @@ use arrow::datatypes::{ use arrow::ipc::writer::{ CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, }; +use datafusion_common::parsers::CsvQuoteStyle; use datafusion_common::{ Column, ColumnStatistics, Constraint, Constraints, DFSchema, DFSchemaRef, DataFusionError, JoinSide, ScalarValue, Statistics, @@ -843,6 +844,29 @@ impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant { } } +impl From for protobuf::CsvQuoteStyle { + fn from(value: CsvQuoteStyle) -> Self { + match value { + CsvQuoteStyle::Necessary => Self::Necessary, + CsvQuoteStyle::Always => Self::Always, + CsvQuoteStyle::NonNumeric => Self::NonNumeric, + CsvQuoteStyle::Never => Self::Never, + } + } +} + +impl From for protobuf::CsvQuoteStyle { + fn from(value: QuoteStyle) -> Self { + match value { + QuoteStyle::Necessary => Self::Necessary, + QuoteStyle::Always => Self::Always, + QuoteStyle::NonNumeric => Self::NonNumeric, + QuoteStyle::Never => Self::Never, + _ => Self::Necessary, + } + } +} + impl TryFrom<&CsvWriterOptions> for protobuf::CsvWriterOptions { type Error = DataFusionError; @@ -976,6 +1000,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { fn try_from(opts: &CsvOptions) -> datafusion_common::Result { let compression: protobuf::CompressionTypeVariant = opts.compression.into(); + let quote_style: protobuf::CsvQuoteStyle = opts.quote_style.into(); Ok(protobuf::CsvOptions { has_header: opts.has_header.map_or_else(Vec::new, |h| vec![h as u8]), delimiter: vec![opts.delimiter], @@ -998,6 +1023,13 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { comment: opts.comment.map_or_else(Vec::new, |h| vec![h]), truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]), compression_level: opts.compression_level, + quote_style: quote_style.into(), + ignore_leading_whitespace: opts + .ignore_leading_whitespace + .map_or_else(Vec::new, |h| vec![h as u8]), + ignore_trailing_whitespace: opts + .ignore_trailing_whitespace + .map_or_else(Vec::new, |h| vec![h as u8]), }) } } @@ -1125,6 +1157,7 @@ pub(crate) fn csv_writer_options_to_proto( compression: &CompressionTypeVariant, ) -> protobuf::CsvWriterOptions { let compression: protobuf::CompressionTypeVariant = compression.into(); + let quote_style: protobuf::CsvQuoteStyle = csv_options.quote_style().into(); protobuf::CsvWriterOptions { compression: compression.into(), delimiter: (csv_options.delimiter() as char).to_string(), @@ -1137,5 +1170,8 @@ pub(crate) fn csv_writer_options_to_proto( quote: (csv_options.quote() as char).to_string(), escape: (csv_options.escape() as char).to_string(), double_quote: csv_options.double_quote(), + quote_style: quote_style.into(), + ignore_leading_whitespace: csv_options.ignore_leading_whitespace(), + ignore_trailing_whitespace: csv_options.ignore_trailing_whitespace(), } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a09826a29be52..1da9a4238ba3c 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -611,6 +611,15 @@ pub struct CsvWriterOptions { /// Optional flag whether to double quotes, instead of escaping. Defaults to `true` #[prost(bool, tag = "11")] pub double_quote: bool, + /// Quote style for CSV writing + #[prost(enumeration = "CsvQuoteStyle", tag = "12")] + pub quote_style: i32, + /// Whether to ignore leading whitespace in string values + #[prost(bool, tag = "13")] + pub ignore_leading_whitespace: bool, + /// Whether to ignore trailing whitespace in string values + #[prost(bool, tag = "14")] + pub ignore_trailing_whitespace: bool, } /// Options controlling CSV format #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -672,6 +681,15 @@ pub struct CsvOptions { /// Optional compression level #[prost(uint32, optional, tag = "19")] pub compression_level: ::core::option::Option, + /// Quote style for CSV writing + #[prost(enumeration = "CsvQuoteStyle", tag = "20")] + pub quote_style: i32, + /// Whether to ignore leading whitespace in string values + #[prost(bytes = "vec", tag = "21")] + pub ignore_leading_whitespace: ::prost::alloc::vec::Vec, + /// Whether to ignore trailing whitespace in string values + #[prost(bytes = "vec", tag = "22")] + pub ignore_trailing_whitespace: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] @@ -1188,6 +1206,38 @@ impl CompressionTypeVariant { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] +pub enum CsvQuoteStyle { + Necessary = 0, + Always = 1, + NonNumeric = 2, + Never = 3, +} +impl CsvQuoteStyle { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Necessary => "NECESSARY", + Self::Always => "ALWAYS", + Self::NonNumeric => "NON_NUMERIC", + Self::Never => "NEVER", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "NECESSARY" => Some(Self::Necessary), + "ALWAYS" => Some(Self::Always), + "NON_NUMERIC" => Some(Self::NonNumeric), + "NEVER" => Some(Self::Never), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] pub enum JoinSide { LeftSide = 0, RightSide = 1, diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..e79bafe645b09 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -17,11 +17,15 @@ use std::sync::Arc; -use crate::protobuf::{CsvOptions as CsvOptionsProto, JsonOptions as JsonOptionsProto}; +use super::LogicalExtensionCodec; +use crate::protobuf::{ + CsvOptions as CsvOptionsProto, CsvQuoteStyle as CsvQuoteStyleProto, + JsonOptions as JsonOptionsProto, +}; use datafusion_common::config::{CsvOptions, JsonOptions}; use datafusion_common::{ TableReference, exec_datafusion_err, exec_err, not_impl_err, - parsers::CompressionTypeVariant, + parsers::{CompressionTypeVariant, CsvQuoteStyle}, }; use datafusion_datasource::file_format::FileFormatFactory; use datafusion_datasource_arrow::file_format::ArrowFormatFactory; @@ -30,8 +34,6 @@ use datafusion_datasource_json::file_format::JsonFormatFactory; use datafusion_execution::TaskContext; use prost::Message; -use super::LogicalExtensionCodec; - #[derive(Debug)] pub struct CsvLogicalExtensionCodec; @@ -63,6 +65,13 @@ impl CsvOptionsProto { .map_or(vec![], |v| vec![v as u8]), truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]), compression_level: options.compression_level, + quote_style: options.quote_style as i32, + ignore_leading_whitespace: options + .ignore_leading_whitespace + .map_or(vec![], |v| vec![v as u8]), + ignore_trailing_whitespace: options + .ignore_trailing_whitespace + .map_or(vec![], |v| vec![v as u8]), } } else { CsvOptionsProto::default() @@ -154,6 +163,23 @@ impl From<&CsvOptionsProto> for CsvOptions { Some(proto.truncated_rows[0] != 0) }, compression_level: proto.compression_level, + quote_style: match CsvQuoteStyleProto::try_from(proto.quote_style) { + Ok(CsvQuoteStyleProto::Always) => CsvQuoteStyle::Always, + Ok(CsvQuoteStyleProto::NonNumeric) => CsvQuoteStyle::NonNumeric, + Ok(CsvQuoteStyleProto::Never) => CsvQuoteStyle::Never, + Ok(CsvQuoteStyleProto::Necessary) => CsvQuoteStyle::Necessary, + _ => CsvQuoteStyle::Necessary, + }, + ignore_leading_whitespace: if proto.ignore_leading_whitespace.is_empty() { + None + } else { + Some(proto.ignore_leading_whitespace[0] != 0) + }, + ignore_trailing_whitespace: if proto.ignore_trailing_whitespace.is_empty() { + None + } else { + Some(proto.ignore_trailing_whitespace[0] != 0) + }, } } } diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index 5a7fa309dbfab..d980e802c83cb 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -380,3 +380,194 @@ SET datafusion.optimizer.repartition_file_min_size = 10485760; statement ok drop table stored_table_with_cr_terminator; + +# Test quote_style option + +statement ok +CREATE TABLE quote_style_source ( + int_col INT, + string_col TEXT, + float_col DOUBLE +) AS VALUES +(1, 'hello', 1.1), +(2, 'world', 2.2), +(3, 'comma,value', 3.3); + +# QuoteStyle::Always - all fields are quoted +query I +COPY quote_style_source TO 'test_files/scratch/csv_files/quote_style_always.csv' +STORED AS csv +OPTIONS ('format.has_header' 'true', 'format.quote_style' 'Always'); +---- +3 + +# Read the file back using a delimiter that doesn't appear in the data, +# so each line is a single column and we can verify the actual quoting +statement ok +CREATE EXTERNAL TABLE stored_quote_style_always ( + whole_file TEXT +) STORED AS CSV +LOCATION 'test_files/scratch/csv_files/quote_style_always.csv' +OPTIONS ('format.has_header' 'true', 'format.delimiter' '@', 'format.quote' '~'); + +query T +select * from stored_quote_style_always; +---- +"1","hello","1.1" +"2","world","2.2" +"3","comma,value","3.3" + +statement ok +DROP TABLE stored_quote_style_always; + +# QuoteStyle::NonNumeric - only string fields are quoted +query I +COPY quote_style_source TO 'test_files/scratch/csv_files/quote_style_nonnumeric.csv' +STORED AS csv +OPTIONS ('format.has_header' 'true', 'format.quote_style' 'NonNumeric'); +---- +3 + +# Read as a single column with a non-matching delimiter/quote to see raw output +statement ok +CREATE EXTERNAL TABLE stored_quote_style_nonnumeric ( + whole_file TEXT +) STORED AS CSV +LOCATION 'test_files/scratch/csv_files/quote_style_nonnumeric.csv' +OPTIONS ('format.has_header' 'true', 'format.delimiter' '@', 'format.quote' '~'); + +query T +select * from stored_quote_style_nonnumeric; +---- +1,"hello",1.1 +2,"world",2.2 +3,"comma,value",3.3 + +statement ok +DROP TABLE stored_quote_style_nonnumeric; + +# QuoteStyle::Never - no fields are quoted (can produce invalid CSV) +# Note: the comma in 'comma,value' will NOT be quoted, so reading back +# will see an extra column +query I +COPY quote_style_source TO 'test_files/scratch/csv_files/quote_style_never.csv' +STORED AS csv +OPTIONS ('format.has_header' 'true', 'format.quote_style' 'Never'); +---- +3 + +# Read as a single column with a non-matching delimiter/quote to see raw output +statement ok +CREATE EXTERNAL TABLE stored_quote_style_never ( + whole_file TEXT +) STORED AS CSV +LOCATION 'test_files/scratch/csv_files/quote_style_never.csv' +OPTIONS ('format.has_header' 'true', 'format.delimiter' '@', 'format.quote' '~'); + +# No fields are quoted - note the comma in 'comma,value' is unescaped +query T +select * from stored_quote_style_never; +---- +1,hello,1.1 +2,world,2.2 +3,comma,value,3.3 + +statement ok +DROP TABLE stored_quote_style_never; + +statement ok +DROP TABLE quote_style_source; + +# Test ignore_leading_whitespace and ignore_trailing_whitespace options + +statement ok +CREATE TABLE whitespace_source ( + id INT, + value TEXT +) AS VALUES +(1, ' hello '), +(2, ' world '), +(3, 'no_space'); + +# Write with ignore_leading_whitespace to trim leading spaces +query I +COPY whitespace_source TO 'test_files/scratch/csv_files/trim_leading.csv' +STORED AS csv +OPTIONS ('format.has_header' 'true', 'format.ignore_leading_whitespace' 'true'); +---- +3 + +statement ok +CREATE EXTERNAL TABLE stored_trim_leading ( + id INT, + value TEXT +) STORED AS CSV +LOCATION 'test_files/scratch/csv_files/trim_leading.csv' +OPTIONS ('format.has_header' 'true'); + +query ITT +select id, value, 'end' from stored_trim_leading order by id; +---- +1 hello end +2 world end +3 no_space end + +statement ok +DROP TABLE stored_trim_leading; + +# Write with ignore_trailing_whitespace to trim trailing spaces +query I +COPY whitespace_source TO 'test_files/scratch/csv_files/trim_trailing.csv' +STORED AS csv +OPTIONS ('format.has_header' 'true', 'format.ignore_trailing_whitespace' 'true'); +---- +3 + +statement ok +CREATE EXTERNAL TABLE stored_trim_trailing ( + id INT, + value TEXT +) STORED AS CSV +LOCATION 'test_files/scratch/csv_files/trim_trailing.csv' +OPTIONS ('format.has_header' 'true'); + +query ITT +select id, value, 'end' from stored_trim_trailing order by id; +---- +1 hello end +2 world end +3 no_space end + +statement ok +DROP TABLE stored_trim_trailing; + +# Write with both ignore_leading and ignore_trailing whitespace +query I +COPY whitespace_source TO 'test_files/scratch/csv_files/trim_both.csv' +STORED AS csv +OPTIONS ('format.has_header' 'true', + 'format.ignore_leading_whitespace' 'true', + 'format.ignore_trailing_whitespace' 'true'); +---- +3 + +statement ok +CREATE EXTERNAL TABLE stored_trim_both ( + id INT, + value TEXT +) STORED AS CSV +LOCATION 'test_files/scratch/csv_files/trim_both.csv' +OPTIONS ('format.has_header' 'true'); + +query ITT +select id, value, 'end' from stored_trim_both order by id; +---- +1 hello end +2 world end +3 no_space end + +statement ok +DROP TABLE stored_trim_both; + +statement ok +DROP TABLE whitespace_source;