diff --git a/.github/workflows/windows.yaml b/.github/workflows/windows.yaml index 71b7013d83bb..272cf888fbc8 100644 --- a/.github/workflows/windows.yaml +++ b/.github/workflows/windows.yaml @@ -63,7 +63,6 @@ jobs: - name: Start Firebase Emulator and run tests run: cd ./.github/workflows/scripts && firebase emulators:exec --project flutterfire-e2e-tests "cd ../../../tests && flutter test .\integration_test\e2e_test.dart -d windows --verbose" -# We cannot run the tests but we can still try to build the app because of https://github.com/flutter/flutter/issues/79213 windows-firestore: runs-on: windows-latest if: ${{ !inputs.nightly_test_mode }} @@ -94,4 +93,13 @@ jobs: run: | npm install -g firebase-tools - name: Start Firebase Emulator and run tests - run: cd ./.github/workflows/scripts && firebase emulators:exec --project flutterfire-e2e-tests "cd ../../../packages/cloud_firestore/cloud_firestore/example && flutter build windows" + run: | + cd ./.github/workflows/scripts + firebase emulators:exec --project flutterfire-e2e-tests "cd ../../../packages/cloud_firestore/cloud_firestore/example && flutter drive --target=.\integration_test\e2e_test.dart --driver=.\test_driver\integration_test.dart -d windows --verbose" 2>&1 | Tee-Object -FilePath output.log + $exitCode = $LASTEXITCODE + $output = Get-Content output.log -Raw + if ($output -match '\[E\]' -or $output -match 'Some tests failed') { + Write-Error "All tests did not pass. Please check the logs for more information." + exit 1 + } + exit $exitCode diff --git a/packages/cloud_firestore/cloud_firestore/example/integration_test/document_reference_e2e.dart b/packages/cloud_firestore/cloud_firestore/example/integration_test/document_reference_e2e.dart index 5e570f983331..9063b00f15b6 100644 --- a/packages/cloud_firestore/cloud_firestore/example/integration_test/document_reference_e2e.dart +++ b/packages/cloud_firestore/cloud_firestore/example/integration_test/document_reference_e2e.dart @@ -88,51 +88,62 @@ void runDocumentReferenceTests() { }); }); - test('listens to a single response from cache', () async { - DocumentReference> document = - await initializeTest('document-snapshot'); - Stream>> stream = - document.snapshots(source: ListenSource.cache); - StreamSubscription>>? - subscription; - - subscription = stream.listen( - expectAsync1( - (DocumentSnapshot> snapshot) { - expect(snapshot.exists, isFalse); - }, - reason: 'Stream should only have been called once.', - ), - ); - - addTearDown(() async { - await subscription?.cancel(); - }); - }); + test( + 'listens to a single response from cache', + () async { + DocumentReference> document = + await initializeTest('document-snapshot'); + Stream>> stream = + document.snapshots(source: ListenSource.cache); + StreamSubscription>>? + subscription; + + subscription = stream.listen( + expectAsync1( + (DocumentSnapshot> snapshot) { + expect(snapshot.exists, isFalse); + }, + reason: 'Stream should only have been called once.', + ), + ); - test('listens to a document from cache', () async { - DocumentReference> document = - await initializeTest('document-snapshot-cache'); - await document.set({'foo': 'bar'}); - Stream>> stream = - document.snapshots(source: ListenSource.cache); - StreamSubscription>>? - subscription; + addTearDown(() async { + await subscription?.cancel(); + }); + }, + // Listening from cache is not supported on Windows (see + // DocumentReference.snapshots in cloud_firestore). + skip: defaultTargetPlatform == TargetPlatform.windows, + ); - subscription = stream.listen( - expectAsync1( - (DocumentSnapshot> snapshot) { - expect(snapshot.exists, isTrue); - expect(snapshot.data(), equals({'foo': 'bar'})); - }, - reason: 'Stream should only have been called once.', - ), - ); + test( + 'listens to a document from cache', + () async { + DocumentReference> document = + await initializeTest('document-snapshot-cache'); + await document.set({'foo': 'bar'}); + Stream>> stream = + document.snapshots(source: ListenSource.cache); + StreamSubscription>>? + subscription; + + subscription = stream.listen( + expectAsync1( + (DocumentSnapshot> snapshot) { + expect(snapshot.exists, isTrue); + expect(snapshot.data(), equals({'foo': 'bar'})); + }, + reason: 'Stream should only have been called once.', + ), + ); - addTearDown(() async { - await subscription?.cancel(); - }); - }); + addTearDown(() async { + await subscription?.cancel(); + }); + }, + // Listening from cache is not supported on Windows. + skip: defaultTargetPlatform == TargetPlatform.windows, + ); test('listens to multiple documents', () async { DocumentReference> doc1 = @@ -408,7 +419,8 @@ void runDocumentReferenceTests() { 'null': null, 'timestamp': Timestamp.now(), 'geopoint': const GeoPoint(1, 2), - 'vectorValue': const VectorValue([1, 2, 3]), + if (defaultTargetPlatform != TargetPlatform.windows) + 'vectorValue': const VectorValue([1, 2, 3]), 'reference': firestore.doc('foo/bar'), 'nan': double.nan, 'infinity': double.infinity, @@ -445,11 +457,13 @@ void runDocumentReferenceTests() { expect(data['geopoint'], isA()); expect((data['geopoint'] as GeoPoint).latitude, equals(1)); expect((data['geopoint'] as GeoPoint).longitude, equals(2)); - expect(data['vectorValue'], isA()); - expect( - (data['vectorValue'] as VectorValue).toArray(), - equals([1, 2, 3]), - ); + if (defaultTargetPlatform != TargetPlatform.windows) { + expect(data['vectorValue'], isA()); + expect( + (data['vectorValue'] as VectorValue).toArray(), + equals([1, 2, 3]), + ); + } expect(data['reference'], isA()); expect((data['reference'] as DocumentReference).id, equals('bar')); expect(data['nan'].isNaN, equals(true)); diff --git a/packages/cloud_firestore/cloud_firestore/example/integration_test/query_e2e.dart b/packages/cloud_firestore/cloud_firestore/example/integration_test/query_e2e.dart index 49b7bb0c932a..4c2df96bb3fc 100644 --- a/packages/cloud_firestore/cloud_firestore/example/integration_test/query_e2e.dart +++ b/packages/cloud_firestore/cloud_firestore/example/integration_test/query_e2e.dart @@ -295,8 +295,9 @@ void runQueryTests() { await subscription?.cancel(); }); }, - // Failing on CI but works locally - skip: kIsWeb, + // Failing on CI but works locally. Listening from cache is not + // supported on Windows. + skip: kIsWeb || defaultTargetPlatform == TargetPlatform.windows, ); test('listens to multiple queries', () async { @@ -1052,14 +1053,17 @@ void runQueryTests() { isA().having( (e) => e.message, 'message', - contains( - 'Client specified an invalid argument', + anyOf( + contains('Client specified an invalid argument'), + contains('order by clause cannot contain more fields ' + 'after the key'), ), ), ), ); }, - // firebase-js-sdk does not require an orderBy() field to be set for this to work + // firebase-js-sdk does not require an orderBy() field to be set for + // this to work skip: kIsWeb, ); @@ -1106,8 +1110,10 @@ void runQueryTests() { isA().having( (e) => e.message, 'message', - contains( - 'Client specified an invalid argument', + anyOf( + contains('Client specified an invalid argument'), + contains('order by clause cannot contain more fields ' + 'after the key'), ), ), ), @@ -3798,6 +3804,7 @@ void runQueryTests() { 3, ); }, + skip: defaultTargetPlatform == TargetPlatform.windows, ); test( @@ -3820,6 +3827,7 @@ void runQueryTests() { 1, ); }, + skip: defaultTargetPlatform == TargetPlatform.windows, ); test( @@ -3841,6 +3849,7 @@ void runQueryTests() { 1.5, ); }, + skip: defaultTargetPlatform == TargetPlatform.windows, ); test( @@ -3863,6 +3872,7 @@ void runQueryTests() { 1, ); }, + skip: defaultTargetPlatform == TargetPlatform.windows, ); test( @@ -3894,37 +3904,42 @@ void runQueryTests() { 1.5, ); }, + skip: defaultTargetPlatform == TargetPlatform.windows, ); - test('chaining multiples aggregate queries', () async { - final collection = await initializeTest('chaining'); + test( + 'chaining multiples aggregate queries', + () async { + final collection = await initializeTest('chaining'); - await Future.wait([ - collection.add({'foo': 1}), - collection.add({'foo': 2}), - ]); + await Future.wait([ + collection.add({'foo': 1}), + collection.add({'foo': 2}), + ]); - AggregateQuery query = collection - .where('foo', isEqualTo: 1) - .aggregate(count(), sum('foo'), average('foo')); + AggregateQuery query = collection + .where('foo', isEqualTo: 1) + .aggregate(count(), sum('foo'), average('foo')); - AggregateQuerySnapshot snapshot = await query.get(); + AggregateQuerySnapshot snapshot = await query.get(); - expect( - snapshot.count, - 1, - ); + expect( + snapshot.count, + 1, + ); - expect( - snapshot.getSum('foo'), - 1, - ); + expect( + snapshot.getSum('foo'), + 1, + ); - expect( - snapshot.getAverage('foo'), - 1, - ); - }); + expect( + snapshot.getAverage('foo'), + 1, + ); + }, + skip: defaultTargetPlatform == TargetPlatform.windows, + ); test( 'count() with collectionGroup', @@ -3966,16 +3981,20 @@ void runQueryTests() { }, ); - test('count(), average() & sum() on empty collection', () async { - final collection = await initializeTest('empty-collection'); + test( + 'count(), average() & sum() on empty collection', + () async { + final collection = await initializeTest('empty-collection'); - final snapshot = await collection - .aggregate(count(), sum('foo'), average('foo')) - .get(); - expect(snapshot.count, 0); - expect(snapshot.getSum('foo'), 0); - expect(snapshot.getAverage('foo'), null); - }); + final snapshot = await collection + .aggregate(count(), sum('foo'), average('foo')) + .get(); + expect(snapshot.count, 0); + expect(snapshot.getSum('foo'), 0); + expect(snapshot.getAverage('foo'), null); + }, + skip: defaultTargetPlatform == TargetPlatform.windows, + ); }); group('startAfterDocument', () { diff --git a/packages/cloud_firestore/cloud_firestore/example/integration_test/transaction_e2e.dart b/packages/cloud_firestore/cloud_firestore/example/integration_test/transaction_e2e.dart index 3b5017077dc7..8a55da126ad6 100644 --- a/packages/cloud_firestore/cloud_firestore/example/integration_test/transaction_e2e.dart +++ b/packages/cloud_firestore/cloud_firestore/example/integration_test/transaction_e2e.dart @@ -126,39 +126,43 @@ void runTransactionTests() { retry: 2, ); - test('should collide if number of maxAttempts is too low', () async { - DocumentReference> doc1 = - await initializeTest('transaction-maxAttempts-2'); + test( + 'should collide if number of maxAttempts is too low', + () async { + DocumentReference> doc1 = + await initializeTest('transaction-maxAttempts-2'); - await doc1.set({'test': 0}); + await doc1.set({'test': 0}); - await expectLater( - Future.wait([ - firestore.runTransaction( - (Transaction transaction) async { - final value = await transaction.get(doc1); - transaction.set(doc1, { - 'test': value['test'] + 1, - }); - }, - maxAttempts: 1, - ), - firestore.runTransaction( - (Transaction transaction) async { - final value = await transaction.get(doc1); - transaction.set(doc1, { - 'test': value['test'] + 1, - }); - }, - maxAttempts: 1, + await expectLater( + Future.wait([ + firestore.runTransaction( + (Transaction transaction) async { + final value = await transaction.get(doc1); + transaction.set(doc1, { + 'test': value['test'] + 1, + }); + }, + maxAttempts: 1, + ), + firestore.runTransaction( + (Transaction transaction) async { + final value = await transaction.get(doc1); + transaction.set(doc1, { + 'test': value['test'] + 1, + }); + }, + maxAttempts: 1, + ), + ]), + throwsA( + isA() + .having((e) => e.code, 'code', 'failed-precondition'), ), - ]), - throwsA( - isA() - .having((e) => e.code, 'code', 'failed-precondition'), - ), - ); - }); + ); + }, + skip: kIsWeb || defaultTargetPlatform == TargetPlatform.windows, + ); test('runs multiple transactions in parallel', () async { DocumentReference> doc1 = @@ -188,19 +192,23 @@ void runTransactionTests() { expect(snapshot2.data()!['test'], equals('value4')); }); - test('should abort if timeout is exceeded', () async { - await expectLater( - firestore.runTransaction( - (Transaction transaction) => - Future.delayed(const Duration(seconds: 2)), - timeout: const Duration(seconds: 1), - ), - throwsA( - isA() - .having((e) => e.code, 'code', 'deadline-exceeded'), - ), - ); - }); + test( + 'should abort if timeout is exceeded', + () async { + await expectLater( + firestore.runTransaction( + (Transaction transaction) => + Future.delayed(const Duration(seconds: 2)), + timeout: const Duration(seconds: 1), + ), + throwsA( + isA() + .having((e) => e.code, 'code', 'deadline-exceeded'), + ), + ); + }, + skip: kIsWeb || defaultTargetPlatform == TargetPlatform.windows, + ); test('should throw with exception', () async { try { @@ -217,23 +225,26 @@ void runTransactionTests() { } }); - test('should throw a native error, and convert to a [FirebaseException]', - () async { - DocumentReference> documentReference = - firestore.doc('not-allowed/document'); + test( + 'should throw a native error, and convert to a [FirebaseException]', + () async { + DocumentReference> documentReference = + firestore.doc('not-allowed/document'); - try { - await firestore.runTransaction((Transaction transaction) async { - transaction.set(documentReference, {'foo': 'bar'}); - }); - fail('Transaction should not have resolved'); - } on FirebaseException catch (e) { - expect(e.code, equals('permission-denied')); - return; - } catch (e) { - fail('Transaction threw invalid exception'); - } - }); + try { + await firestore.runTransaction((Transaction transaction) async { + transaction.set(documentReference, {'foo': 'bar'}); + }); + fail('Transaction should not have resolved'); + } on FirebaseException catch (e) { + expect(e.code, equals('permission-denied')); + return; + } catch (e) { + fail('Transaction threw invalid exception'); + } + }, + skip: kIsWeb || defaultTargetPlatform == TargetPlatform.windows, + ); group('Transaction.get()', () { test('should throw if get is called after a command', () async { @@ -251,23 +262,25 @@ void runTransactionTests() { }); test( - 'should throw a native error, and convert to a [FirebaseException]', - () async { - DocumentReference> documentReference = - firestore.doc('not-allowed/document'); + 'should throw a native error, and convert to a [FirebaseException]', + () async { + DocumentReference> documentReference = + firestore.doc('not-allowed/document'); - try { - await firestore.runTransaction((Transaction transaction) async { - await transaction.get(documentReference); - }); - fail('Transaction should not have resolved'); - } on FirebaseException catch (e) { - expect(e.code, equals('permission-denied')); - return; - } catch (e) { - fail('Transaction threw invalid exception'); - } - }); + try { + await firestore.runTransaction((Transaction transaction) async { + await transaction.get(documentReference); + }); + fail('Transaction should not have resolved'); + } on FirebaseException catch (e) { + expect(e.code, equals('permission-denied')); + return; + } catch (e) { + fail('Transaction threw invalid exception'); + } + }, + skip: kIsWeb || defaultTargetPlatform == TargetPlatform.windows, + ); // ignore: todo // TODO(Salakar): Test seems to fail sometimes. Will look at in a future PR. diff --git a/packages/cloud_firestore/cloud_firestore/example/integration_test/vector_value_e2e.dart b/packages/cloud_firestore/cloud_firestore/example/integration_test/vector_value_e2e.dart index 3163a65b36a4..e10b4689e642 100644 --- a/packages/cloud_firestore/cloud_firestore/example/integration_test/vector_value_e2e.dart +++ b/packages/cloud_firestore/cloud_firestore/example/integration_test/vector_value_e2e.dart @@ -3,9 +3,21 @@ // BSD-style license that can be found in the LICENSE file. import 'package:cloud_firestore/cloud_firestore.dart'; +import 'package:flutter/foundation.dart'; import 'package:flutter_test/flutter_test.dart'; void runVectorValueTests() { + if (defaultTargetPlatform == TargetPlatform.windows) { + group('$VectorValue', () { + test( + 'is not supported on Windows', + () {}, + skip: 'The Firebase C++ SDK does not expose Firestore vector values.', + ); + }); + return; + } + group('$VectorValue', () { late FirebaseFirestore firestore; diff --git a/packages/cloud_firestore/cloud_firestore/windows/cloud_firestore_plugin.cpp b/packages/cloud_firestore/cloud_firestore/windows/cloud_firestore_plugin.cpp index 597a60adbae1..46d7f52dd372 100644 --- a/packages/cloud_firestore/cloud_firestore/windows/cloud_firestore_plugin.cpp +++ b/packages/cloud_firestore/cloud_firestore/windows/cloud_firestore_plugin.cpp @@ -15,9 +15,11 @@ #include #include +#include #include #include #include +#include #include #include "cloud_firestore/plugin_version.h" @@ -40,9 +42,160 @@ using flutter::EncodableValue; namespace cloud_firestore_windows { static std::string kLibraryName = "flutter-fire-fst"; + +namespace { + +constexpr wchar_t kTaskRunnerWindowClassName[] = + L"CloudFirestoreWindowsTaskRunnerWindow"; +constexpr UINT kTaskRunnerWindowMessage = WM_APP + 0x4673; + +class PlatformThreadDispatcher { + public: + static PlatformThreadDispatcher& GetInstance() { + static PlatformThreadDispatcher instance; + return instance; + } + + void Initialize() { + std::lock_guard lock(mutex_); + if (window_ != nullptr) { + return; + } + + platform_thread_id_ = GetCurrentThreadId(); + + WNDCLASSW window_class = {}; + window_class.lpfnWndProc = PlatformThreadDispatcher::WindowProc; + window_class.hInstance = GetModuleHandle(nullptr); + window_class.lpszClassName = kTaskRunnerWindowClassName; + + RegisterClassW(&window_class); + window_ = + CreateWindowExW(0, kTaskRunnerWindowClassName, L"", 0, 0, 0, 0, 0, + HWND_MESSAGE, nullptr, window_class.hInstance, this); + } + + void Post(std::function task) { + if (GetCurrentThreadId() == platform_thread_id_) { + task(); + return; + } + + { + std::lock_guard lock(mutex_); + tasks_.push(std::move(task)); + } + PostMessageW(window_, kTaskRunnerWindowMessage, 0, 0); + } + + private: + static LRESULT CALLBACK WindowProc(HWND window, UINT message, WPARAM wparam, + LPARAM lparam) { + if (message == WM_NCCREATE) { + auto create_struct = reinterpret_cast(lparam); + SetWindowLongPtr( + window, GWLP_USERDATA, + reinterpret_cast(create_struct->lpCreateParams)); + return TRUE; + } + + auto dispatcher = reinterpret_cast( + GetWindowLongPtr(window, GWLP_USERDATA)); + if (dispatcher != nullptr && message == kTaskRunnerWindowMessage) { + dispatcher->ProcessTasks(); + return 0; + } + + return DefWindowProc(window, message, wparam, lparam); + } + + void ProcessTasks() { + std::queue> tasks; + { + std::lock_guard lock(mutex_); + tasks.swap(tasks_); + } + + while (!tasks.empty()) { + tasks.front()(); + tasks.pop(); + } + } + + PlatformThreadDispatcher() = default; + + HWND window_ = nullptr; + DWORD platform_thread_id_ = 0; + std::mutex mutex_; + std::queue> tasks_; +}; + +struct EventSinkState { + std::mutex mutex; + std::unique_ptr> events; + bool active = true; +}; + +void SendSuccessOnPlatformThread(std::shared_ptr state, + flutter::EncodableValue value) { + if (!state) { + return; + } + + PlatformThreadDispatcher::GetInstance().Post( + [state, value = std::move(value)]() mutable { + std::lock_guard lock(state->mutex); + if (state->active && state->events) { + state->events->Success(value); + } + }); +} + +void SendErrorOnPlatformThread(std::shared_ptr state, + const std::string& code, + const std::string& message, + flutter::EncodableValue details, + bool end_stream = false) { + if (!state) { + return; + } + + PlatformThreadDispatcher::GetInstance().Post( + [state, code, message, details = std::move(details), end_stream]() { + std::lock_guard lock(state->mutex); + if (!state->active || !state->events) { + return; + } + + state->events->Error(code, message, details); + if (end_stream) { + state->events->EndOfStream(); + state->active = false; + } + }); +} + +void EndStreamOnPlatformThread(std::shared_ptr state) { + if (!state) { + return; + } + + PlatformThreadDispatcher::GetInstance().Post([state]() { + std::lock_guard lock(state->mutex); + if (state->active && state->events) { + state->events->EndOfStream(); + state->active = false; + } + }); +} + +} // namespace + // static void CloudFirestorePlugin::RegisterWithRegistrar( flutter::PluginRegistrarWindows* registrar) { + PlatformThreadDispatcher::GetInstance().Initialize(); + auto channel = std::make_unique>( registrar->messenger(), "cloud_firestore", @@ -130,8 +283,7 @@ std::map>> stream_handlers_; -std::map>> +std::map*> cloud_firestore_windows::CloudFirestorePlugin::transaction_handlers_; std::map> cloud_firestore_windows::CloudFirestorePlugin::transactions_; @@ -542,10 +694,12 @@ class LoadBundleStreamHandler const flutter::EncodableValue* arguments, std::unique_ptr>&& events) override { - events_ = std::move(events); + events_state_ = std::make_shared(); + events_state_->events = std::move(events); events.reset(); firestore_->LoadBundle( - bundle_, [this](const LoadBundleTaskProgress& progress) { + bundle_, + [events_state = events_state_](const LoadBundleTaskProgress& progress) { flutter::EncodableMap map; map[flutter::EncodableValue("bytesLoaded")] = flutter::EncodableValue(progress.bytes_loaded()); @@ -563,9 +717,9 @@ class LoadBundleStreamHandler details[EncodableValue("message")] = EncodableValue("Error loading the bundle"); - events_->Error("firebase_firestore", "Error loading the bundle", - details); - events_->EndOfStream(); + SendErrorOnPlatformThread(events_state, "firebase_firestore", + "Error loading the bundle", + EncodableValue(details), true); return; } case LoadBundleTaskProgress::State::kInProgress: { @@ -574,7 +728,7 @@ class LoadBundleStreamHandler map[flutter::EncodableValue("taskState")] = flutter::EncodableValue("running"); - events_->Success(map); + SendSuccessOnPlatformThread(events_state, EncodableValue(map)); break; } case LoadBundleTaskProgress::State::kSuccess: { @@ -582,8 +736,8 @@ class LoadBundleStreamHandler map[flutter::EncodableValue("taskState")] = flutter::EncodableValue("success"); - events_->Success(map); - events_->EndOfStream(); + SendSuccessOnPlatformThread(events_state, EncodableValue(map)); + EndStreamOnPlatformThread(events_state); break; } } @@ -593,13 +747,13 @@ class LoadBundleStreamHandler std::unique_ptr> OnCancelInternal(const flutter::EncodableValue* arguments) override { - events_->EndOfStream(); + EndStreamOnPlatformThread(events_state_); return nullptr; } private: Firestore* firestore_; - std::unique_ptr> events_; + std::shared_ptr events_state_; std::string bundle_; }; @@ -762,7 +916,8 @@ class SnapshotInSyncStreamHandler const flutter::EncodableValue* arguments, std::unique_ptr>&& events) override { - events_ = std::move(events); + events_state_ = std::make_shared(); + events_state_->events = std::move(events); events.reset(); // We do this to bind the event to the main channel auto boundSendEvent = @@ -778,7 +933,7 @@ class SnapshotInSyncStreamHandler std::unique_ptr> OnCancelInternal(const flutter::EncodableValue* arguments) override { listener_.Remove(); - events_->EndOfStream(); + EndStreamOnPlatformThread(events_state_); return nullptr; } @@ -786,12 +941,14 @@ class SnapshotInSyncStreamHandler sendEventFunc_ = func; } - void SendEvent() { events_->Success(flutter::EncodableValue()); } + void SendEvent() { + SendSuccessOnPlatformThread(events_state_, flutter::EncodableValue()); + } private: Firestore* firestore_; ListenerRegistration listener_; - std::unique_ptr> events_; + std::shared_ptr events_state_; std::function sendEventFunc_; }; @@ -838,7 +995,8 @@ class TransactionStreamHandler const flutter::EncodableValue* arguments, std::unique_ptr>&& events) override { - events_ = std::move(events); + events_state_ = std::make_shared(); + events_state_->events = std::move(events); events.reset(); TransactionOptions options; options.set_max_attempts(maxAttempts_); @@ -854,17 +1012,22 @@ class TransactionStreamHandler flutter::EncodableMap map; map.emplace("appName", firestore_->app()->name()); - events_->Success(flutter::EncodableValue(map)); + SendSuccessOnPlatformThread(events_state_, + flutter::EncodableValue(map)); std::unique_lock lock(mtx_); if (cv_.wait_for(lock, std::chrono::milliseconds(timeout_)) == std::cv_status::timeout) { - events_->Error("Timeout", "Transaction timed out."); - events_->EndOfStream(); + SendErrorOnPlatformThread(events_state_, "Timeout", + "Transaction timed out.", + flutter::EncodableValue(), true); return Error::kErrorDeadlineExceeded; } std::lock_guard command_lock(commands_mutex_); + if (resultType_ == InternalTransactionResult::kFailure) { + return Error::kErrorAborted; + } if (commands_.empty()) return Error::kErrorOk; for (InternalTransactionCommand& command : commands_) { @@ -924,12 +1087,14 @@ class TransactionStreamHandler if (completed_future.error() == firebase::firestore::kErrorOk) { result.insert(std::make_pair(flutter::EncodableValue("complete"), flutter::EncodableValue(true))); - events_->Success(result); + SendSuccessOnPlatformThread(events_state_, + flutter::EncodableValue(result)); } else { - events_->Error("transaction_error", - completed_future.error_message()); + SendErrorOnPlatformThread(events_state_, "transaction_error", + completed_future.error_message(), + flutter::EncodableValue()); } - events_->EndOfStream(); + EndStreamOnPlatformThread(events_state_); }); return nullptr; @@ -939,7 +1104,7 @@ class TransactionStreamHandler OnCancelInternal(const flutter::EncodableValue* arguments) override { std::unique_lock lock(mtx_); cv_.notify_one(); - events_->EndOfStream(); + EndStreamOnPlatformThread(events_state_); return nullptr; } @@ -949,11 +1114,11 @@ class TransactionStreamHandler int maxAttempts_; std::string transactionId_; std::vector commands_; - InternalTransactionResult resultType_; + InternalTransactionResult resultType_ = InternalTransactionResult::kSuccess; std::mutex mtx_; std::mutex commands_mutex_; std::condition_variable cv_; - std::unique_ptr> events_; + std::shared_ptr events_state_; }; void CloudFirestorePlugin::TransactionCreate( @@ -971,16 +1136,13 @@ void CloudFirestorePlugin::TransactionCreate( auto handler = std::make_unique( firestore, static_cast(timeout), static_cast(max_attempts), transactionId); - - // Temporarily release the ownership. - TransactionStreamHandler* raw_handler = handler.release(); - CloudFirestorePlugin::transaction_handlers_[transactionId] = - std::unique_ptr(raw_handler); + TransactionStreamHandler* raw_handler = handler.get(); + CloudFirestorePlugin::transaction_handlers_[transactionId] = raw_handler; // Register the event channel. std::string channelName = RegisterEventChannelWithUUID( "plugins.flutter.io/firebase_firestore/transaction/", transactionId, - std::unique_ptr(raw_handler)); + std::move(handler)); // Return the result (assumed to be transaction ID in this example). result(transactionId); @@ -993,15 +1155,20 @@ void CloudFirestorePlugin::TransactionStoreResult( const InternalTransactionResult& result_type, const flutter::EncodableList* commands, std::function reply)> result) { - if (CloudFirestorePlugin::transaction_handlers_[transaction_id]) { - TransactionStreamHandler& handler = *static_cast( - CloudFirestorePlugin::transaction_handlers_[transaction_id].get()); + auto handler_it = + CloudFirestorePlugin::transaction_handlers_.find(transaction_id); + if (handler_it != CloudFirestorePlugin::transaction_handlers_.end() && + handler_it->second) { + TransactionStreamHandler& handler = + *static_cast(handler_it->second); std::vector commandVector; - for (const auto& element : *commands) { - const InternalTransactionCommand& command = - std::any_cast( - std::get(element)); - commandVector.push_back(command); + if (commands) { + for (const auto& element : *commands) { + const InternalTransactionCommand& command = + std::any_cast( + std::get(element)); + commandVector.push_back(command); + } } handler.ReceiveTransactionResponse(result_type, commandVector); result(std::nullopt); @@ -1557,49 +1724,41 @@ class QuerySnapshotStreamHandler ? MetadataChanges::kInclude : MetadataChanges::kExclude; - events_ = std::move(events); + events_state_ = std::make_shared(); + events_state_->events = std::move(events); events.reset(); listener_ = query_->AddSnapshotListener( metadataChanges, - [this, serverTimestampBehavior = serverTimestampBehavior_, + [events_state = events_state_, + serverTimestampBehavior = serverTimestampBehavior_, metadataChanges](const firebase::firestore::QuerySnapshot& snapshot, firebase::firestore::Error error, const std::string& errorMessage) mutable { if (error == firebase::firestore::kErrorOk) { - flutter::EncodableList toListResult(3); - std::vector documents; - std::vector documentChanges; - - for (const auto& documentSnapshot : snapshot.documents()) { - documents.push_back(ParseDocumentSnapshot(documentSnapshot, - serverTimestampBehavior) - .ToEncodableList()); - } - - // Assuming querySnapshot.getDocumentChanges() returns an iterable - // collection - for (const auto& documentChange : - snapshot.DocumentChanges(metadataChanges)) { - documentChanges.push_back( - ParseDocumentChange(documentChange, serverTimestampBehavior) - .ToEncodableList()); - } - - toListResult[0] = documents; - toListResult[1] = documentChanges; - toListResult[2] = - ParseSnapshotMetadata(snapshot.metadata()).ToEncodableList(); - - events_->Success(toListResult); + // Emit the Pigeon object directly so the Pigeon-aware codec on + // the EventChannel serializes it end-to-end. Pigeon 26 no longer + // flattens nested types, so sending a raw list here would cause + // the Dart side to receive a List it can no longer + // decode into InternalQuerySnapshot. + SendSuccessOnPlatformThread( + events_state, + CustomEncodableValue(InternalQuerySnapshot( + ParseDocumentSnapshots(snapshot.documents(), + serverTimestampBehavior), + ParseDocumentChanges( + snapshot.DocumentChanges(metadataChanges), + serverTimestampBehavior), + ParseSnapshotMetadata(snapshot.metadata())))); } else { EncodableMap details; details[EncodableValue("code")] = EncodableValue(CloudFirestorePlugin::GetErrorCode(error)); details[EncodableValue("message")] = EncodableValue(errorMessage); - events_->Error("firebase_firestore", errorMessage, details); - events_->EndOfStream(); + SendErrorOnPlatformThread(events_state, "firebase_firestore", + errorMessage, EncodableValue(details), + true); } }); return nullptr; @@ -1608,14 +1767,14 @@ class QuerySnapshotStreamHandler std::unique_ptr> OnCancelInternal(const flutter::EncodableValue* arguments) override { listener_.Remove(); - events_->EndOfStream(); + EndStreamOnPlatformThread(events_state_); return nullptr; } private: ListenerRegistration listener_; std::unique_ptr query_; - std::unique_ptr> events_; + std::shared_ptr events_state_; bool includeMetadataChanges_; firebase::firestore::DocumentSnapshot::ServerTimestampBehavior serverTimestampBehavior_; @@ -1669,27 +1828,35 @@ class DocumentSnapshotStreamHandler ? MetadataChanges::kInclude : MetadataChanges::kExclude; - events_ = std::move(events); + events_state_ = std::make_shared(); + events_state_->events = std::move(events); events.reset(); listener_ = reference_->AddSnapshotListener( metadataChanges, - [this, serverTimestampBehavior = serverTimestampBehavior_, - metadataChanges](const firebase::firestore::DocumentSnapshot& snapshot, - firebase::firestore::Error error, - const std::string& errorMessage) mutable { + [events_state = events_state_, + serverTimestampBehavior = serverTimestampBehavior_]( + const firebase::firestore::DocumentSnapshot& snapshot, + firebase::firestore::Error error, + const std::string& errorMessage) mutable { if (error == firebase::firestore::kErrorOk) { - events_->Success( - ParseDocumentSnapshot(snapshot, serverTimestampBehavior) - .ToEncodableList()); + // Emit the Pigeon object directly so the Pigeon-aware codec on + // the EventChannel serializes it end-to-end. Pigeon 26 no longer + // flattens nested types, so sending a raw list here would cause + // the Dart side to receive a List it can no longer + // decode into InternalDocumentSnapshot. + SendSuccessOnPlatformThread( + events_state, CustomEncodableValue(ParseDocumentSnapshot( + snapshot, serverTimestampBehavior))); } else { EncodableMap details; details[EncodableValue("code")] = EncodableValue(CloudFirestorePlugin::GetErrorCode(error)); details[EncodableValue("message")] = EncodableValue(errorMessage); - events_->Error("firebase_firestore", errorMessage, details); - events_->EndOfStream(); + SendErrorOnPlatformThread(events_state, "firebase_firestore", + errorMessage, EncodableValue(details), + true); } }); return nullptr; @@ -1698,14 +1865,14 @@ class DocumentSnapshotStreamHandler std::unique_ptr> OnCancelInternal(const flutter::EncodableValue* arguments) override { listener_.Remove(); - events_->EndOfStream(); + EndStreamOnPlatformThread(events_state_); return nullptr; } private: firebase::firestore::ListenerRegistration listener_; std::unique_ptr reference_; - std::unique_ptr> events_; + std::shared_ptr events_state_; bool includeMetadataChanges_; firebase::firestore::DocumentSnapshot::ServerTimestampBehavior serverTimestampBehavior_; diff --git a/packages/cloud_firestore/cloud_firestore/windows/cloud_firestore_plugin.h b/packages/cloud_firestore/cloud_firestore/windows/cloud_firestore_plugin.h index e99dac003c16..84081f5ffd21 100644 --- a/packages/cloud_firestore/cloud_firestore/windows/cloud_firestore_plugin.h +++ b/packages/cloud_firestore/cloud_firestore/windows/cloud_firestore_plugin.h @@ -152,8 +152,7 @@ class CloudFirestorePlugin : public flutter::Plugin, event_channels_; static std::map>> stream_handlers_; - static std::map>> - transaction_handlers_; + static std::map*> transaction_handlers_; static std::map> transactions_; diff --git a/packages/cloud_firestore/cloud_firestore/windows/firestore_codec.cpp b/packages/cloud_firestore/cloud_firestore/windows/firestore_codec.cpp index 14be5a1776b0..238479a4776b 100644 --- a/packages/cloud_firestore/cloud_firestore/windows/firestore_codec.cpp +++ b/packages/cloud_firestore/cloud_firestore/windows/firestore_codec.cpp @@ -194,7 +194,7 @@ cloud_firestore_windows::FirestoreCodec::ReadValueOfType( } case DATA_TYPE_INCREMENT_INTEGER: { - int incrementValue = std::get(FirestoreCodec::ReadValue(stream)); + int64_t incrementValue = FirestoreCodec::ReadValue(stream).LongValue(); return CustomEncodableValue(FieldValue::Increment(incrementValue)); }