diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc index 577d26fe0078..26c2ba5fe28e 100644 --- a/cpp/src/parquet/bloom_filter.cc +++ b/cpp/src/parquet/bloom_filter.cc @@ -30,6 +30,7 @@ #include "parquet/bloom_filter.h" #include "parquet/encryption/encryption_internal.h" #include "parquet/encryption/internal_file_decryptor.h" +#include "parquet/encryption/internal_file_encryptor.h" #include "parquet/exception.h" #include "parquet/thrift_internal.h" #include "parquet/xxhasher.h" @@ -345,6 +346,48 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* sink) const { PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_)); } +void BlockSplitBloomFilter::WriteEncrypted(ArrowOutputStream* sink, Encryptor* encryptor, + int16_t row_group_ordinal, + int16_t column_ordinal) const { + DCHECK(sink != nullptr); + if (encryptor == nullptr) { + throw ParquetException("Bloom filter encryptor must be provided"); + } + + format::BloomFilterHeader header; + if (ARROW_PREDICT_FALSE(algorithm_ != BloomFilter::Algorithm::BLOCK)) { + throw ParquetException("BloomFilter does not support Algorithm other than BLOCK"); + } + header.algorithm.__set_BLOCK(format::SplitBlockAlgorithm()); + if (ARROW_PREDICT_FALSE(hash_strategy_ != HashStrategy::XXHASH)) { + throw ParquetException("BloomFilter does not support Hash other than XXHASH"); + } + header.hash.__set_XXHASH(format::XxHash()); + if (ARROW_PREDICT_FALSE(compression_strategy_ != CompressionStrategy::UNCOMPRESSED)) { + throw ParquetException( + "BloomFilter does not support Compression other than UNCOMPRESSED"); + } + header.compression.__set_UNCOMPRESSED(format::Uncompressed()); + header.__set_numBytes(num_bytes_); + + // Bloom filter header and bitset are separate encrypted modules with different AADs. + encryptor->UpdateAad( + encryption::CreateModuleAad(encryptor->file_aad(), encryption::kBloomFilterHeader, + row_group_ordinal, column_ordinal, -1)); + ThriftSerializer serializer; + serializer.Serialize(&header, sink, encryptor); + + encryptor->UpdateAad( + encryption::CreateModuleAad(encryptor->file_aad(), encryption::kBloomFilterBitset, + row_group_ordinal, column_ordinal, -1)); + auto cipher_buffer = + AllocateBuffer(encryptor->pool(), encryptor->CiphertextLength(num_bytes_)); + std::span bitset_span(data_->data(), num_bytes_); + int32_t cipher_buffer_len = + encryptor->Encrypt(bitset_span, cipher_buffer->mutable_span_as()); + PARQUET_THROW_NOT_OK(sink->Write(cipher_buffer->data(), cipher_buffer_len)); +} + bool BlockSplitBloomFilter::FindHash(uint64_t hash) const { const uint32_t bucket_index = static_cast(((hash >> 32) * (num_bytes_ / kBytesPerFilterBlock)) >> 32); diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h index e14e0558d31a..3ac6f4ffe41d 100644 --- a/cpp/src/parquet/bloom_filter.h +++ b/cpp/src/parquet/bloom_filter.h @@ -276,6 +276,20 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { void InsertHash(uint64_t hash) override; void InsertHashes(const uint64_t* hashes, int num_values) override; void WriteTo(ArrowOutputStream* sink) const override; + + /// Serialize this Bloom filter as two encrypted modules (header and bitset) + /// using the supplied metadata encryptor. + /// + /// The same encryptor is used for both modules, switching the AAD between + /// kBloomFilterHeader and kBloomFilterBitset before each encryption. + /// + /// @param sink The output stream to write to. + /// @param encryptor Metadata encryptor for this column. Must not be null. + /// @param row_group_ordinal Ordinal of the row group containing this Bloom filter. + /// @param column_ordinal Ordinal of the column containing this Bloom filter. + void WriteEncrypted(ArrowOutputStream* sink, Encryptor* encryptor, + int16_t row_group_ordinal, int16_t column_ordinal) const; + uint32_t GetBitsetSize() const override { return num_bytes_; } uint64_t Hash(int32_t value) const override { return hasher_->Hash(value); } diff --git a/cpp/src/parquet/bloom_filter_writer.cc b/cpp/src/parquet/bloom_filter_writer.cc index f06b866c30e2..42720c507d47 100644 --- a/cpp/src/parquet/bloom_filter_writer.cc +++ b/cpp/src/parquet/bloom_filter_writer.cc @@ -26,6 +26,7 @@ #include "arrow/util/bit_run_reader.h" #include "arrow/util/checked_cast.h" +#include "parquet/encryption/internal_file_encryptor.h" #include "parquet/exception.h" #include "parquet/metadata.h" #include "parquet/properties.h" @@ -151,12 +152,15 @@ namespace { /// \brief A concrete implementation of BloomFilterBuilder. /// -/// \note Column encryption for bloom filter is not implemented yet. +/// When `file_encryptor` is provided, bloom filters of encrypted columns are +/// serialized using the column's metadata encryptor, bloom filters of +/// unencrypted columns are serialized in plaintext. class BloomFilterBuilderImpl : public BloomFilterBuilder { public: BloomFilterBuilderImpl(const SchemaDescriptor* schema, - const WriterProperties* properties) - : schema_(schema), properties_(properties) {} + const WriterProperties* properties, + InternalFileEncryptor* file_encryptor) + : schema_(schema), properties_(properties), file_encryptor_(file_encryptor) {} void AppendRowGroup() override; @@ -183,6 +187,7 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder { const SchemaDescriptor* schema_; const WriterProperties* properties_; + InternalFileEncryptor* file_encryptor_; bool finished_ = false; using RowGroupBloomFilters = @@ -225,6 +230,9 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink) } finished_ = true; + // Bloom filter ordinals are encoded as int16 in the AAD when encryption is enabled. + constexpr size_t kEncryptedOrdinalLimit = std::numeric_limits::max(); // 32767 + IndexLocations locations; for (size_t i = 0; i != bloom_filters_.size(); ++i) { @@ -232,7 +240,37 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink) for (const auto& [column_id, filter] : row_group_bloom_filters) { // TODO(GH-43138): Determine the quality of bloom filter before writing it. PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell()); - filter->WriteTo(sink); + + const auto column_path = schema_->Column(column_id)->path()->ToDotString(); + std::shared_ptr meta_encryptor = + file_encryptor_ != nullptr + ? file_encryptor_->GetColumnMetaEncryptor(column_path) + : nullptr; + if (meta_encryptor != nullptr) { + const auto& column_props = properties_->column_encryption_properties(column_path); + if (column_props != nullptr && column_props->is_encrypted() && + !column_props->is_encrypted_with_footer_key()) { + ParquetException::NYI("Bloom filter writing with a dedicated column key"); + } + if (ARROW_PREDICT_FALSE(i > kEncryptedOrdinalLimit)) { + throw ParquetException( + "Encrypted files cannot contain more than 32767 row groups"); + } + if (ARROW_PREDICT_FALSE(static_cast(column_id) > + kEncryptedOrdinalLimit)) { + throw ParquetException( + "Encrypted files cannot contain more than 32767 columns"); + } + auto* block_filter = dynamic_cast(filter.get()); + if (block_filter == nullptr) { + throw ParquetException( + "Only BlockSplitBloomFilter is supported for encrypted bloom filters"); + } + block_filter->WriteEncrypted(sink, meta_encryptor.get(), static_cast(i), + static_cast(column_id)); + } else { + filter->WriteTo(sink); + } PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell()); if (pos - offset > std::numeric_limits::max()) { @@ -253,8 +291,9 @@ IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink) } // namespace std::unique_ptr BloomFilterBuilder::Make( - const SchemaDescriptor* schema, const WriterProperties* properties) { - return std::make_unique(schema, properties); + const SchemaDescriptor* schema, const WriterProperties* properties, + InternalFileEncryptor* file_encryptor) { + return std::make_unique(schema, properties, file_encryptor); } } // namespace parquet diff --git a/cpp/src/parquet/bloom_filter_writer.h b/cpp/src/parquet/bloom_filter_writer.h index 628e1c59b181..a20807230d0f 100644 --- a/cpp/src/parquet/bloom_filter_writer.h +++ b/cpp/src/parquet/bloom_filter_writer.h @@ -20,6 +20,7 @@ #include "arrow/type_fwd.h" #include "parquet/bloom_filter.h" +#include "parquet/encryption/type_fwd.h" #include "parquet/index_location.h" #include "parquet/type_fwd.h" @@ -71,8 +72,12 @@ class PARQUET_EXPORT BloomFilterBuilder { /// \param schema The schema of the file and it must outlive the created builder. /// \param properties Properties to get bloom filter options. It must outlive the /// created builder. - static std::unique_ptr Make(const SchemaDescriptor* schema, - const WriterProperties* properties); + /// \param file_encryptor File level encryptor used to encrypt bloom filters of + /// encrypted columns. May be null for unencrypted files. Must outlive the created + /// builder. + static std::unique_ptr Make( + const SchemaDescriptor* schema, const WriterProperties* properties, + InternalFileEncryptor* file_encryptor = NULLPTR); /// \brief Start a new row group to write bloom filters, meaning that next calls /// to `CreateBloomFilter` will create bloom filters for the new row group. diff --git a/cpp/src/parquet/encryption/bloom_filter_encryption_test.cc b/cpp/src/parquet/encryption/bloom_filter_encryption_test.cc index 9a49e7277b6d..c2ea226558d5 100644 --- a/cpp/src/parquet/encryption/bloom_filter_encryption_test.cc +++ b/cpp/src/parquet/encryption/bloom_filter_encryption_test.cc @@ -21,12 +21,15 @@ #include #include "arrow/io/file.h" +#include "arrow/io/memory.h" #include "parquet/bloom_filter.h" #include "parquet/bloom_filter_reader.h" #include "parquet/encryption/test_encryption_util.h" #include "parquet/file_reader.h" +#include "parquet/file_writer.h" #include "parquet/properties.h" +#include "parquet/schema.h" namespace parquet::encryption::test { namespace { @@ -91,4 +94,104 @@ TEST(EncryptedBloomFilterReader, ReadEncryptedBloomFilter) { } } +namespace { + +std::shared_ptr SingleInt64Schema(const std::string& field_name) { + auto field = schema::PrimitiveNode::Make(field_name, Repetition::REQUIRED, Type::INT64, + ConvertedType::NONE); + return std::static_pointer_cast( + schema::GroupNode::Make("schema", Repetition::REQUIRED, {field})); +} + +std::shared_ptr BuildEncryptionProperties( + const std::string& /*field_name*/) { + FileEncryptionProperties::Builder builder(kFooterEncryptionKey); + return builder.build(); +} + +std::shared_ptr BuildDecryptionPropertiesWithExplicitKeys( + const std::string& /*field_name*/) { + FileDecryptionProperties::Builder builder; + return builder.footer_key(kFooterEncryptionKey)->build(); +} + +} // namespace + +// Round trip, write a small encrypted file with a Bloom filter on the encrypted +// column, then read it back and verify the Bloom filter contains the inserted +// values and rejects values that were never inserted. +TEST(EncryptedBloomFilterWriter, RoundTripEncryptedBloomFilter) { + const std::string field_name = "id"; + constexpr int kNumValues = 64; + + auto schema = SingleInt64Schema(field_name); + + WriterProperties::Builder prop_builder; + prop_builder.compression(Compression::UNCOMPRESSED); + prop_builder.enable_bloom_filter(field_name, {}); + prop_builder.encryption(BuildEncryptionProperties(field_name)); + auto writer_properties = prop_builder.build(); + + PARQUET_ASSIGN_OR_THROW(auto sink, ::arrow::io::BufferOutputStream::Create()); + auto file_writer = ParquetFileWriter::Open(sink, schema, writer_properties); + auto* row_group_writer = file_writer->AppendRowGroup(); + auto* int64_writer = static_cast(row_group_writer->NextColumn()); + std::vector values(kNumValues); + for (int i = 0; i < kNumValues; ++i) { + values[i] = static_cast(i) * 7 + 13; + } + int64_writer->WriteBatch(static_cast(values.size()), nullptr, nullptr, + values.data()); + file_writer->Close(); + PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish()); + + ReaderProperties reader_properties = default_reader_properties(); + reader_properties.file_decryption_properties( + BuildDecryptionPropertiesWithExplicitKeys(field_name)); + + auto source = std::make_shared<::arrow::io::BufferReader>(buffer); + auto file_reader = ParquetFileReader::Open(source, reader_properties); + auto& bloom_filter_reader = file_reader->GetBloomFilterReader(); + auto row_group_0 = bloom_filter_reader.RowGroup(0); + ASSERT_NE(nullptr, row_group_0); + auto filter = row_group_0->GetColumnBloomFilter(0); + ASSERT_NE(nullptr, filter); + + for (int64_t value : values) { + EXPECT_TRUE(filter->FindHash(filter->Hash(value))) + << "missing inserted value " << value; + } + + for (int64_t miss : {int64_t{-1}, int64_t{1'000'000}, int64_t{1'000'001}}) { + EXPECT_FALSE(filter->FindHash(filter->Hash(miss))) + << "unexpected hit for non-inserted value " << miss; + } +} + +TEST(EncryptedBloomFilterWriter, ColumnKeyEncryptedBloomFilterIsNotYetImplemented) { + const std::string field_name = "id"; + auto schema = SingleInt64Schema(field_name); + + auto col_props = + ColumnEncryptionProperties::Builder().key(kColumnEncryptionKey1)->build(); + ColumnPathToEncryptionPropertiesMap encrypted_columns{{field_name, col_props}}; + FileEncryptionProperties::Builder enc_builder(kFooterEncryptionKey); + auto file_encryption_properties = + enc_builder.encrypted_columns(std::move(encrypted_columns))->build(); + + WriterProperties::Builder prop_builder; + prop_builder.compression(Compression::UNCOMPRESSED); + prop_builder.enable_bloom_filter(field_name, {}); + prop_builder.encryption(file_encryption_properties); + auto writer_properties = prop_builder.build(); + + PARQUET_ASSIGN_OR_THROW(auto sink, ::arrow::io::BufferOutputStream::Create()); + auto file_writer = ParquetFileWriter::Open(sink, schema, writer_properties); + auto* row_group_writer = file_writer->AppendRowGroup(); + auto* int64_writer = static_cast(row_group_writer->NextColumn()); + int64_t value = 42; + int64_writer->WriteBatch(1, nullptr, nullptr, &value); + EXPECT_THROW(file_writer->Close(), ParquetException); +} + } // namespace parquet::encryption::test diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc index ec303408f363..037b63adc346 100644 --- a/cpp/src/parquet/file_writer.cc +++ b/cpp/src/parquet/file_writer.cc @@ -503,11 +503,10 @@ class FileSerializer : public ParquetFileWriter::Contents { void WriteBloomFilter() { if (bloom_filter_builder_ != nullptr) { - if (properties_->file_encryption_properties()) { - ParquetException::NYI("Encryption is not currently supported with bloom filter"); - } // Serialize bloom filter after all row groups have been written and report - // location to the file metadata. + // location to the file metadata. Bloom filters of encrypted columns are + // encrypted using each column's metadata encryptor (the builder was + // constructed with the file-level encryptor when encryption was enabled). auto locations = bloom_filter_builder_->WriteTo(sink_.get()); metadata_->SetIndexLocations(IndexKind::kBloomFilter, locations); } @@ -575,7 +574,8 @@ class FileSerializer : public ParquetFileWriter::Contents { } } if (properties_->bloom_filter_enabled()) { - bloom_filter_builder_ = BloomFilterBuilder::Make(schema(), properties_.get()); + bloom_filter_builder_ = + BloomFilterBuilder::Make(schema(), properties_.get(), file_encryptor_.get()); } if (properties_->page_index_enabled()) { page_index_builder_ = PageIndexBuilder::Make(&schema_, file_encryptor_.get());