import 'dart:async'; import '../../core/exceptions/sync_exception.dart'; import '../local/local_storage_service.dart'; import '../immich/immich_service.dart'; import '../nostr/nostr_service.dart'; import '../nostr/models/nostr_keypair.dart'; import 'models/sync_status.dart'; import 'models/sync_operation.dart'; /// Engine for coordinating data synchronization between local storage, Immich, and Nostr. /// /// This service provides: /// - Bidirectional sync between local storage, Immich, and Nostr /// - Conflict resolution strategies /// - Offline queue for operations when network is unavailable /// - Automatic retry with exponential backoff /// /// The service is modular and UI-independent, designed for offline-first behavior. class SyncEngine { /// Local storage service. final LocalStorageService _localStorage; /// Immich service (optional). final ImmichService? _immichService; /// Nostr service (optional). final NostrService? _nostrService; /// Nostr keypair for signing events (required if using Nostr). NostrKeyPair? _nostrKeyPair; /// Queue of pending sync operations. final List _operationQueue = []; /// Currently executing operation (null if idle). SyncOperation? _currentOperation; /// Stream controller for sync status updates. final StreamController _statusController = StreamController.broadcast(); /// Whether the engine has been disposed. bool _isDisposed = false; /// Conflict resolution strategy. ConflictResolution _conflictResolution = ConflictResolution.useLatest; /// Maximum queue size. final int maxQueueSize; /// Creates a [SyncEngine] instance. /// /// [localStorage] - Local storage service (required). /// [immichService] - Immich service (optional). /// [nostrService] - Nostr service (optional). /// [nostrKeyPair] - Nostr keypair for signing events (optional). /// [conflictResolution] - Conflict resolution strategy (default: useLatest). /// [maxQueueSize] - Maximum number of queued operations (default: 100). SyncEngine({ required LocalStorageService localStorage, ImmichService? immichService, NostrService? nostrService, NostrKeyPair? nostrKeyPair, ConflictResolution conflictResolution = ConflictResolution.useLatest, this.maxQueueSize = 100, }) : _localStorage = localStorage, _immichService = immichService, _nostrService = nostrService, _nostrKeyPair = nostrKeyPair, _conflictResolution = conflictResolution; /// Sets the Nostr keypair for signing events. void setNostrKeyPair(NostrKeyPair keypair) { _nostrKeyPair = keypair; } /// Sets the conflict resolution strategy. void setConflictResolution(ConflictResolution strategy) { _conflictResolution = strategy; } /// Stream of sync operation status updates. Stream get statusStream => _statusController.stream; /// Gets the current queue of pending operations. List getPendingOperations() { return _operationQueue .where((op) => op.status == SyncStatus.pending) .toList(); } /// Gets all operations (pending, in-progress, completed, failed). List getAllOperations() { return List.unmodifiable(_operationQueue); } /// Queues a sync operation. /// /// [operation] - The sync operation to queue. /// /// Throws [SyncException] if queue is full. void queueOperation(SyncOperation operation) { if (_isDisposed) { throw SyncException('SyncEngine has been disposed'); } if (_operationQueue.length >= maxQueueSize) { throw SyncException('Sync queue is full (max: $maxQueueSize)'); } _operationQueue.add(operation); _emitStatus(operation); // Auto-start processing if idle if (_currentOperation == null) { _processQueue(); } } /// Syncs an item from local storage to Immich. /// /// [itemId] - The ID of the item to sync. /// [priority] - Priority of the sync operation. /// /// Returns the sync operation ID. Future syncToImmich(String itemId, {SyncPriority priority = SyncPriority.normal}) async { if (_immichService == null) { throw SyncException('Immich service not configured'); } final operation = SyncOperation( id: 'sync-immich-${DateTime.now().millisecondsSinceEpoch}', type: SyncOperationType.upload, itemId: itemId, source: 'local', target: 'immich', priority: priority, ); queueOperation(operation); return operation.id; } /// Syncs metadata from Immich to local storage. /// /// [assetId] - The Immich asset ID to sync. /// [priority] - Priority of the sync operation. /// /// Returns the sync operation ID. Future syncFromImmich(String assetId, {SyncPriority priority = SyncPriority.normal}) async { if (_immichService == null) { throw SyncException('Immich service not configured'); } final operation = SyncOperation( id: 'sync-immich-${DateTime.now().millisecondsSinceEpoch}', type: SyncOperationType.download, itemId: assetId, source: 'immich', target: 'local', priority: priority, ); queueOperation(operation); return operation.id; } /// Syncs metadata to Nostr. /// /// [itemId] - The ID of the item to sync. /// [priority] - Priority of the sync operation. /// /// Returns the sync operation ID. Future syncToNostr(String itemId, {SyncPriority priority = SyncPriority.normal}) async { if (_nostrService == null) { throw SyncException('Nostr service not configured'); } if (_nostrKeyPair == null) { throw SyncException('Nostr keypair not set'); } final operation = SyncOperation( id: 'sync-nostr-${DateTime.now().millisecondsSinceEpoch}', type: SyncOperationType.upload, itemId: itemId, source: 'local', target: 'nostr', priority: priority, ); queueOperation(operation); return operation.id; } /// Performs a full sync: syncs all items between configured services. /// /// [priority] - Priority of sync operations. /// /// Returns a list of operation IDs. Future> syncAll( {SyncPriority priority = SyncPriority.normal}) async { final operationIds = []; // Sync local items to Immich if (_immichService != null) { final localItems = await _localStorage.getAllItems(); for (final item in localItems) { if (item.data['type'] == 'immich_asset') { // Already synced, skip continue; } final id = await syncToImmich(item.id, priority: priority); operationIds.add(id); } } // Sync local items to Nostr if (_nostrService != null && _nostrKeyPair != null) { final localItems = await _localStorage.getAllItems(); for (final item in localItems) { final id = await syncToNostr(item.id, priority: priority); operationIds.add(id); } } return operationIds; } /// Processes the sync queue. Future _processQueue() async { if (_currentOperation != null || _isDisposed) return; // Already processing or disposed // Sort queue by priority (high first) _operationQueue.sort((a, b) { if (a.priority != b.priority) { return b.priority.index.compareTo(a.priority.index); } return a.createdAt.compareTo(b.createdAt); }); // Process pending operations while (!_isDisposed && _operationQueue.any((op) => op.status == SyncStatus.pending)) { final operation = _operationQueue.firstWhere( (op) => op.status == SyncStatus.pending, ); _currentOperation = operation; operation.status = SyncStatus.syncing; _emitStatus(operation); try { await _executeOperation(operation); operation.markSuccess(); } catch (e) { operation.markFailed(e.toString()); // Retry if possible if (operation.canRetry() && !_isDisposed) { await Future.delayed(Duration(seconds: operation.retryCount)); if (!_isDisposed) { operation.incrementRetry(); _emitStatus(operation); continue; // Retry this operation } } } finally { if (!_isDisposed) { _emitStatus(operation); } _currentOperation = null; } } } /// Emits a status update if not disposed. void _emitStatus(SyncOperation operation) { if (!_isDisposed && !_statusController.isClosed) { try { _statusController.add(operation); } catch (e) { // Ignore if controller is closed } } } /// Executes a sync operation. Future _executeOperation(SyncOperation operation) async { switch (operation.type) { case SyncOperationType.upload: if (operation.target == 'immich') { await _uploadToImmich(operation); } else if (operation.target == 'nostr') { await _uploadToNostr(operation); } break; case SyncOperationType.download: if (operation.source == 'immich') { await _downloadFromImmich(operation); } break; case SyncOperationType.sync: // Bidirectional sync - would need more complex logic throw SyncException('Bidirectional sync not yet implemented'); } } /// Uploads item metadata to Immich. Future _uploadToImmich(SyncOperation operation) async { final item = await _localStorage.getItem(operation.itemId); if (item == null) { throw SyncException('Item not found: ${operation.itemId}'); } // Check if already synced final cachedAsset = await _immichService!.getCachedAsset(operation.itemId); if (cachedAsset != null) { // Already synced, skip return; } // For real upload, we'd need the actual image file // For now, we just mark as synced by storing metadata // In a real implementation, this would upload the image file } /// Downloads asset metadata from Immich. Future _downloadFromImmich(SyncOperation operation) async { final asset = await _immichService!.getCachedAsset(operation.itemId); if (asset == null) { // Try to fetch from Immich final assets = await _immichService!.fetchAssets(limit: 100); final matching = assets.where((a) => a.id == operation.itemId); if (matching.isEmpty) { throw SyncException('Asset not found: ${operation.itemId}'); } } // Metadata is automatically stored by ImmichService } /// Uploads item metadata to Nostr. Future _uploadToNostr(SyncOperation operation) async { final item = await _localStorage.getItem(operation.itemId); if (item == null) { throw SyncException('Item not found: ${operation.itemId}'); } // Prepare metadata for Nostr final metadata = { 'itemId': item.id, 'data': item.data, 'createdAt': item.createdAt, 'updatedAt': item.updatedAt, }; // Sync metadata to Nostr await _nostrService!.syncMetadata( metadata: metadata, privateKey: _nostrKeyPair!.privateKey, kind: 30000, // Custom kind for app metadata ); } /// Resolves a conflict between local and remote data. /// /// [localItem] - Local item data. /// [remoteItem] - Remote item data. /// /// Returns the resolved item data. Map resolveConflict( Map localItem, Map remoteItem, ) { switch (_conflictResolution) { case ConflictResolution.useLocal: return localItem; case ConflictResolution.useRemote: return remoteItem; case ConflictResolution.useLatest: final localTime = localItem['updatedAt'] as int? ?? 0; final remoteTime = remoteItem['updatedAt'] as int? ?? 0; return localTime > remoteTime ? localItem : remoteItem; case ConflictResolution.merge: // Simple merge: combine data, prefer remote for conflicts return { ...localItem, ...remoteItem, }; } } /// Clears all completed operations from the queue. void clearCompleted() { _operationQueue.removeWhere((op) => op.status == SyncStatus.success); } /// Clears all failed operations from the queue. void clearFailed() { _operationQueue.removeWhere((op) => op.status == SyncStatus.failed); } /// Disposes resources and closes streams. void dispose() { _isDisposed = true; _operationQueue.clear(); _currentOperation = null; if (!_statusController.isClosed) { _statusController.close(); } } }