You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
425 lines
13 KiB
425 lines
13 KiB
import 'dart:async';
|
|
import '../local/local_storage_service.dart';
|
|
import '../local/models/item.dart';
|
|
import '../immich/immich_service.dart';
|
|
import '../immich/models/immich_asset.dart';
|
|
import '../nostr/nostr_service.dart';
|
|
import '../nostr/models/nostr_event.dart';
|
|
import '../nostr/models/nostr_keypair.dart';
|
|
import 'models/sync_status.dart';
|
|
import 'models/sync_operation.dart';
|
|
|
|
/// Exception thrown when sync operations fail.
|
|
class SyncException implements Exception {
|
|
/// Error message.
|
|
final String message;
|
|
|
|
/// Creates a [SyncException] with the provided message.
|
|
SyncException(this.message);
|
|
|
|
@override
|
|
String toString() => 'SyncException: $message';
|
|
}
|
|
|
|
/// Engine for coordinating data synchronization between local storage, Immich, and Nostr.
|
|
///
|
|
/// This service provides:
|
|
/// - Bidirectional sync between local storage, Immich, and Nostr
|
|
/// - Conflict resolution strategies
|
|
/// - Offline queue for operations when network is unavailable
|
|
/// - Automatic retry with exponential backoff
|
|
///
|
|
/// The service is modular and UI-independent, designed for offline-first behavior.
|
|
class SyncEngine {
|
|
/// Local storage service.
|
|
final LocalStorageService _localStorage;
|
|
|
|
/// Immich service (optional).
|
|
final ImmichService? _immichService;
|
|
|
|
/// Nostr service (optional).
|
|
final NostrService? _nostrService;
|
|
|
|
/// Nostr keypair for signing events (required if using Nostr).
|
|
NostrKeyPair? _nostrKeyPair;
|
|
|
|
/// Queue of pending sync operations.
|
|
final List<SyncOperation> _operationQueue = [];
|
|
|
|
/// Currently executing operation (null if idle).
|
|
SyncOperation? _currentOperation;
|
|
|
|
/// Stream controller for sync status updates.
|
|
final StreamController<SyncOperation> _statusController = StreamController<SyncOperation>.broadcast();
|
|
|
|
/// Whether the engine has been disposed.
|
|
bool _isDisposed = false;
|
|
|
|
/// Conflict resolution strategy.
|
|
ConflictResolution _conflictResolution = ConflictResolution.useLatest;
|
|
|
|
/// Maximum queue size.
|
|
final int maxQueueSize;
|
|
|
|
/// Creates a [SyncEngine] instance.
|
|
///
|
|
/// [localStorage] - Local storage service (required).
|
|
/// [immichService] - Immich service (optional).
|
|
/// [nostrService] - Nostr service (optional).
|
|
/// [nostrKeyPair] - Nostr keypair for signing events (optional).
|
|
/// [conflictResolution] - Conflict resolution strategy (default: useLatest).
|
|
/// [maxQueueSize] - Maximum number of queued operations (default: 100).
|
|
SyncEngine({
|
|
required LocalStorageService localStorage,
|
|
ImmichService? immichService,
|
|
NostrService? nostrService,
|
|
NostrKeyPair? nostrKeyPair,
|
|
ConflictResolution conflictResolution = ConflictResolution.useLatest,
|
|
this.maxQueueSize = 100,
|
|
}) : _localStorage = localStorage,
|
|
_immichService = immichService,
|
|
_nostrService = nostrService,
|
|
_nostrKeyPair = nostrKeyPair,
|
|
_conflictResolution = conflictResolution;
|
|
|
|
/// Sets the Nostr keypair for signing events.
|
|
void setNostrKeyPair(NostrKeyPair keypair) {
|
|
_nostrKeyPair = keypair;
|
|
}
|
|
|
|
/// Sets the conflict resolution strategy.
|
|
void setConflictResolution(ConflictResolution strategy) {
|
|
_conflictResolution = strategy;
|
|
}
|
|
|
|
/// Stream of sync operation status updates.
|
|
Stream<SyncOperation> get statusStream => _statusController.stream;
|
|
|
|
/// Gets the current queue of pending operations.
|
|
List<SyncOperation> getPendingOperations() {
|
|
return _operationQueue.where((op) => op.status == SyncStatus.pending).toList();
|
|
}
|
|
|
|
/// Gets all operations (pending, in-progress, completed, failed).
|
|
List<SyncOperation> getAllOperations() {
|
|
return List.unmodifiable(_operationQueue);
|
|
}
|
|
|
|
/// Queues a sync operation.
|
|
///
|
|
/// [operation] - The sync operation to queue.
|
|
///
|
|
/// Throws [SyncException] if queue is full.
|
|
void queueOperation(SyncOperation operation) {
|
|
if (_isDisposed) {
|
|
throw SyncException('SyncEngine has been disposed');
|
|
}
|
|
|
|
if (_operationQueue.length >= maxQueueSize) {
|
|
throw SyncException('Sync queue is full (max: $maxQueueSize)');
|
|
}
|
|
|
|
_operationQueue.add(operation);
|
|
_emitStatus(operation);
|
|
|
|
// Auto-start processing if idle
|
|
if (_currentOperation == null) {
|
|
_processQueue();
|
|
}
|
|
}
|
|
|
|
/// Syncs an item from local storage to Immich.
|
|
///
|
|
/// [itemId] - The ID of the item to sync.
|
|
/// [priority] - Priority of the sync operation.
|
|
///
|
|
/// Returns the sync operation ID.
|
|
Future<String> syncToImmich(String itemId, {SyncPriority priority = SyncPriority.normal}) async {
|
|
if (_immichService == null) {
|
|
throw SyncException('Immich service not configured');
|
|
}
|
|
|
|
final operation = SyncOperation(
|
|
id: 'sync-immich-${DateTime.now().millisecondsSinceEpoch}',
|
|
type: SyncOperationType.upload,
|
|
itemId: itemId,
|
|
source: 'local',
|
|
target: 'immich',
|
|
priority: priority,
|
|
);
|
|
|
|
queueOperation(operation);
|
|
return operation.id;
|
|
}
|
|
|
|
/// Syncs metadata from Immich to local storage.
|
|
///
|
|
/// [assetId] - The Immich asset ID to sync.
|
|
/// [priority] - Priority of the sync operation.
|
|
///
|
|
/// Returns the sync operation ID.
|
|
Future<String> syncFromImmich(String assetId, {SyncPriority priority = SyncPriority.normal}) async {
|
|
if (_immichService == null) {
|
|
throw SyncException('Immich service not configured');
|
|
}
|
|
|
|
final operation = SyncOperation(
|
|
id: 'sync-immich-${DateTime.now().millisecondsSinceEpoch}',
|
|
type: SyncOperationType.download,
|
|
itemId: assetId,
|
|
source: 'immich',
|
|
target: 'local',
|
|
priority: priority,
|
|
);
|
|
|
|
queueOperation(operation);
|
|
return operation.id;
|
|
}
|
|
|
|
/// Syncs metadata to Nostr.
|
|
///
|
|
/// [itemId] - The ID of the item to sync.
|
|
/// [priority] - Priority of the sync operation.
|
|
///
|
|
/// Returns the sync operation ID.
|
|
Future<String> syncToNostr(String itemId, {SyncPriority priority = SyncPriority.normal}) async {
|
|
if (_nostrService == null) {
|
|
throw SyncException('Nostr service not configured');
|
|
}
|
|
|
|
if (_nostrKeyPair == null) {
|
|
throw SyncException('Nostr keypair not set');
|
|
}
|
|
|
|
final operation = SyncOperation(
|
|
id: 'sync-nostr-${DateTime.now().millisecondsSinceEpoch}',
|
|
type: SyncOperationType.upload,
|
|
itemId: itemId,
|
|
source: 'local',
|
|
target: 'nostr',
|
|
priority: priority,
|
|
);
|
|
|
|
queueOperation(operation);
|
|
return operation.id;
|
|
}
|
|
|
|
/// Performs a full sync: syncs all items between configured services.
|
|
///
|
|
/// [priority] - Priority of sync operations.
|
|
///
|
|
/// Returns a list of operation IDs.
|
|
Future<List<String>> syncAll({SyncPriority priority = SyncPriority.normal}) async {
|
|
final operationIds = <String>[];
|
|
|
|
// Sync local items to Immich
|
|
if (_immichService != null) {
|
|
final localItems = await _localStorage.getAllItems();
|
|
for (final item in localItems) {
|
|
if (item.data['type'] == 'immich_asset') {
|
|
// Already synced, skip
|
|
continue;
|
|
}
|
|
final id = await syncToImmich(item.id, priority: priority);
|
|
operationIds.add(id);
|
|
}
|
|
}
|
|
|
|
// Sync local items to Nostr
|
|
if (_nostrService != null && _nostrKeyPair != null) {
|
|
final localItems = await _localStorage.getAllItems();
|
|
for (final item in localItems) {
|
|
final id = await syncToNostr(item.id, priority: priority);
|
|
operationIds.add(id);
|
|
}
|
|
}
|
|
|
|
return operationIds;
|
|
}
|
|
|
|
/// Processes the sync queue.
|
|
Future<void> _processQueue() async {
|
|
if (_currentOperation != null || _isDisposed) return; // Already processing or disposed
|
|
|
|
// Sort queue by priority (high first)
|
|
_operationQueue.sort((a, b) {
|
|
if (a.priority != b.priority) {
|
|
return b.priority.index.compareTo(a.priority.index);
|
|
}
|
|
return a.createdAt.compareTo(b.createdAt);
|
|
});
|
|
|
|
// Process pending operations
|
|
while (!_isDisposed && _operationQueue.any((op) => op.status == SyncStatus.pending)) {
|
|
final operation = _operationQueue.firstWhere(
|
|
(op) => op.status == SyncStatus.pending,
|
|
);
|
|
|
|
_currentOperation = operation;
|
|
operation.status = SyncStatus.syncing;
|
|
_emitStatus(operation);
|
|
|
|
try {
|
|
await _executeOperation(operation);
|
|
operation.markSuccess();
|
|
} catch (e) {
|
|
operation.markFailed(e.toString());
|
|
|
|
// Retry if possible
|
|
if (operation.canRetry() && !_isDisposed) {
|
|
await Future.delayed(Duration(seconds: operation.retryCount));
|
|
if (!_isDisposed) {
|
|
operation.incrementRetry();
|
|
_emitStatus(operation);
|
|
continue; // Retry this operation
|
|
}
|
|
}
|
|
} finally {
|
|
if (!_isDisposed) {
|
|
_emitStatus(operation);
|
|
}
|
|
_currentOperation = null;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Emits a status update if not disposed.
|
|
void _emitStatus(SyncOperation operation) {
|
|
if (!_isDisposed && !_statusController.isClosed) {
|
|
try {
|
|
_statusController.add(operation);
|
|
} catch (e) {
|
|
// Ignore if controller is closed
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Executes a sync operation.
|
|
Future<void> _executeOperation(SyncOperation operation) async {
|
|
switch (operation.type) {
|
|
case SyncOperationType.upload:
|
|
if (operation.target == 'immich') {
|
|
await _uploadToImmich(operation);
|
|
} else if (operation.target == 'nostr') {
|
|
await _uploadToNostr(operation);
|
|
}
|
|
break;
|
|
|
|
case SyncOperationType.download:
|
|
if (operation.source == 'immich') {
|
|
await _downloadFromImmich(operation);
|
|
}
|
|
break;
|
|
|
|
case SyncOperationType.sync:
|
|
// Bidirectional sync - would need more complex logic
|
|
throw SyncException('Bidirectional sync not yet implemented');
|
|
}
|
|
}
|
|
|
|
/// Uploads item metadata to Immich.
|
|
Future<void> _uploadToImmich(SyncOperation operation) async {
|
|
final item = await _localStorage.getItem(operation.itemId);
|
|
if (item == null) {
|
|
throw SyncException('Item not found: ${operation.itemId}');
|
|
}
|
|
|
|
// Check if already synced
|
|
final cachedAsset = await _immichService!.getCachedAsset(operation.itemId);
|
|
if (cachedAsset != null) {
|
|
// Already synced, skip
|
|
return;
|
|
}
|
|
|
|
// For real upload, we'd need the actual image file
|
|
// For now, we just mark as synced by storing metadata
|
|
// In a real implementation, this would upload the image file
|
|
}
|
|
|
|
/// Downloads asset metadata from Immich.
|
|
Future<void> _downloadFromImmich(SyncOperation operation) async {
|
|
final asset = await _immichService!.getCachedAsset(operation.itemId);
|
|
if (asset == null) {
|
|
// Try to fetch from Immich
|
|
final assets = await _immichService!.fetchAssets(limit: 100);
|
|
final matching = assets.where((a) => a.id == operation.itemId);
|
|
if (matching.isEmpty) {
|
|
throw SyncException('Asset not found: ${operation.itemId}');
|
|
}
|
|
}
|
|
// Metadata is automatically stored by ImmichService
|
|
}
|
|
|
|
/// Uploads item metadata to Nostr.
|
|
Future<void> _uploadToNostr(SyncOperation operation) async {
|
|
final item = await _localStorage.getItem(operation.itemId);
|
|
if (item == null) {
|
|
throw SyncException('Item not found: ${operation.itemId}');
|
|
}
|
|
|
|
// Prepare metadata for Nostr
|
|
final metadata = {
|
|
'itemId': item.id,
|
|
'data': item.data,
|
|
'createdAt': item.createdAt,
|
|
'updatedAt': item.updatedAt,
|
|
};
|
|
|
|
// Sync metadata to Nostr
|
|
await _nostrService!.syncMetadata(
|
|
metadata: metadata,
|
|
privateKey: _nostrKeyPair!.privateKey,
|
|
kind: 30000, // Custom kind for app metadata
|
|
);
|
|
}
|
|
|
|
/// Resolves a conflict between local and remote data.
|
|
///
|
|
/// [localItem] - Local item data.
|
|
/// [remoteItem] - Remote item data.
|
|
///
|
|
/// Returns the resolved item data.
|
|
Map<String, dynamic> resolveConflict(
|
|
Map<String, dynamic> localItem,
|
|
Map<String, dynamic> remoteItem,
|
|
) {
|
|
switch (_conflictResolution) {
|
|
case ConflictResolution.useLocal:
|
|
return localItem;
|
|
case ConflictResolution.useRemote:
|
|
return remoteItem;
|
|
case ConflictResolution.useLatest:
|
|
final localTime = localItem['updatedAt'] as int? ?? 0;
|
|
final remoteTime = remoteItem['updatedAt'] as int? ?? 0;
|
|
return localTime > remoteTime ? localItem : remoteItem;
|
|
case ConflictResolution.merge:
|
|
// Simple merge: combine data, prefer remote for conflicts
|
|
return {
|
|
...localItem,
|
|
...remoteItem,
|
|
};
|
|
}
|
|
}
|
|
|
|
/// Clears all completed operations from the queue.
|
|
void clearCompleted() {
|
|
_operationQueue.removeWhere((op) => op.status == SyncStatus.success);
|
|
}
|
|
|
|
/// Clears all failed operations from the queue.
|
|
void clearFailed() {
|
|
_operationQueue.removeWhere((op) => op.status == SyncStatus.failed);
|
|
}
|
|
|
|
/// Disposes resources and closes streams.
|
|
void dispose() {
|
|
_isDisposed = true;
|
|
_operationQueue.clear();
|
|
_currentOperation = null;
|
|
if (!_statusController.isClosed) {
|
|
_statusController.close();
|
|
}
|
|
}
|
|
}
|
|
|