You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

430 lines
13 KiB

import 'dart:async';
import '../local/local_storage_service.dart';
import '../immich/immich_service.dart';
import '../nostr/nostr_service.dart';
import '../nostr/models/nostr_keypair.dart';
import 'models/sync_status.dart';
import 'models/sync_operation.dart';
/// Exception thrown when sync operations fail.
class SyncException implements Exception {
/// Error message.
final String message;
/// Creates a [SyncException] with the provided message.
SyncException(this.message);
@override
String toString() => 'SyncException: $message';
}
/// Engine for coordinating data synchronization between local storage, Immich, and Nostr.
///
/// This service provides:
/// - Bidirectional sync between local storage, Immich, and Nostr
/// - Conflict resolution strategies
/// - Offline queue for operations when network is unavailable
/// - Automatic retry with exponential backoff
///
/// The service is modular and UI-independent, designed for offline-first behavior.
class SyncEngine {
/// Local storage service.
final LocalStorageService _localStorage;
/// Immich service (optional).
final ImmichService? _immichService;
/// Nostr service (optional).
final NostrService? _nostrService;
/// Nostr keypair for signing events (required if using Nostr).
NostrKeyPair? _nostrKeyPair;
/// Queue of pending sync operations.
final List<SyncOperation> _operationQueue = [];
/// Currently executing operation (null if idle).
SyncOperation? _currentOperation;
/// Stream controller for sync status updates.
final StreamController<SyncOperation> _statusController =
StreamController<SyncOperation>.broadcast();
/// Whether the engine has been disposed.
bool _isDisposed = false;
/// Conflict resolution strategy.
ConflictResolution _conflictResolution = ConflictResolution.useLatest;
/// Maximum queue size.
final int maxQueueSize;
/// Creates a [SyncEngine] instance.
///
/// [localStorage] - Local storage service (required).
/// [immichService] - Immich service (optional).
/// [nostrService] - Nostr service (optional).
/// [nostrKeyPair] - Nostr keypair for signing events (optional).
/// [conflictResolution] - Conflict resolution strategy (default: useLatest).
/// [maxQueueSize] - Maximum number of queued operations (default: 100).
SyncEngine({
required LocalStorageService localStorage,
ImmichService? immichService,
NostrService? nostrService,
NostrKeyPair? nostrKeyPair,
ConflictResolution conflictResolution = ConflictResolution.useLatest,
this.maxQueueSize = 100,
}) : _localStorage = localStorage,
_immichService = immichService,
_nostrService = nostrService,
_nostrKeyPair = nostrKeyPair,
_conflictResolution = conflictResolution;
/// Sets the Nostr keypair for signing events.
void setNostrKeyPair(NostrKeyPair keypair) {
_nostrKeyPair = keypair;
}
/// Sets the conflict resolution strategy.
void setConflictResolution(ConflictResolution strategy) {
_conflictResolution = strategy;
}
/// Stream of sync operation status updates.
Stream<SyncOperation> get statusStream => _statusController.stream;
/// Gets the current queue of pending operations.
List<SyncOperation> getPendingOperations() {
return _operationQueue
.where((op) => op.status == SyncStatus.pending)
.toList();
}
/// Gets all operations (pending, in-progress, completed, failed).
List<SyncOperation> getAllOperations() {
return List.unmodifiable(_operationQueue);
}
/// Queues a sync operation.
///
/// [operation] - The sync operation to queue.
///
/// Throws [SyncException] if queue is full.
void queueOperation(SyncOperation operation) {
if (_isDisposed) {
throw SyncException('SyncEngine has been disposed');
}
if (_operationQueue.length >= maxQueueSize) {
throw SyncException('Sync queue is full (max: $maxQueueSize)');
}
_operationQueue.add(operation);
_emitStatus(operation);
// Auto-start processing if idle
if (_currentOperation == null) {
_processQueue();
}
}
/// Syncs an item from local storage to Immich.
///
/// [itemId] - The ID of the item to sync.
/// [priority] - Priority of the sync operation.
///
/// Returns the sync operation ID.
Future<String> syncToImmich(String itemId,
{SyncPriority priority = SyncPriority.normal}) async {
if (_immichService == null) {
throw SyncException('Immich service not configured');
}
final operation = SyncOperation(
id: 'sync-immich-${DateTime.now().millisecondsSinceEpoch}',
type: SyncOperationType.upload,
itemId: itemId,
source: 'local',
target: 'immich',
priority: priority,
);
queueOperation(operation);
return operation.id;
}
/// Syncs metadata from Immich to local storage.
///
/// [assetId] - The Immich asset ID to sync.
/// [priority] - Priority of the sync operation.
///
/// Returns the sync operation ID.
Future<String> syncFromImmich(String assetId,
{SyncPriority priority = SyncPriority.normal}) async {
if (_immichService == null) {
throw SyncException('Immich service not configured');
}
final operation = SyncOperation(
id: 'sync-immich-${DateTime.now().millisecondsSinceEpoch}',
type: SyncOperationType.download,
itemId: assetId,
source: 'immich',
target: 'local',
priority: priority,
);
queueOperation(operation);
return operation.id;
}
/// Syncs metadata to Nostr.
///
/// [itemId] - The ID of the item to sync.
/// [priority] - Priority of the sync operation.
///
/// Returns the sync operation ID.
Future<String> syncToNostr(String itemId,
{SyncPriority priority = SyncPriority.normal}) async {
if (_nostrService == null) {
throw SyncException('Nostr service not configured');
}
if (_nostrKeyPair == null) {
throw SyncException('Nostr keypair not set');
}
final operation = SyncOperation(
id: 'sync-nostr-${DateTime.now().millisecondsSinceEpoch}',
type: SyncOperationType.upload,
itemId: itemId,
source: 'local',
target: 'nostr',
priority: priority,
);
queueOperation(operation);
return operation.id;
}
/// Performs a full sync: syncs all items between configured services.
///
/// [priority] - Priority of sync operations.
///
/// Returns a list of operation IDs.
Future<List<String>> syncAll(
{SyncPriority priority = SyncPriority.normal}) async {
final operationIds = <String>[];
// Sync local items to Immich
if (_immichService != null) {
final localItems = await _localStorage.getAllItems();
for (final item in localItems) {
if (item.data['type'] == 'immich_asset') {
// Already synced, skip
continue;
}
final id = await syncToImmich(item.id, priority: priority);
operationIds.add(id);
}
}
// Sync local items to Nostr
if (_nostrService != null && _nostrKeyPair != null) {
final localItems = await _localStorage.getAllItems();
for (final item in localItems) {
final id = await syncToNostr(item.id, priority: priority);
operationIds.add(id);
}
}
return operationIds;
}
/// Processes the sync queue.
Future<void> _processQueue() async {
if (_currentOperation != null || _isDisposed)
return; // Already processing or disposed
// Sort queue by priority (high first)
_operationQueue.sort((a, b) {
if (a.priority != b.priority) {
return b.priority.index.compareTo(a.priority.index);
}
return a.createdAt.compareTo(b.createdAt);
});
// Process pending operations
while (!_isDisposed &&
_operationQueue.any((op) => op.status == SyncStatus.pending)) {
final operation = _operationQueue.firstWhere(
(op) => op.status == SyncStatus.pending,
);
_currentOperation = operation;
operation.status = SyncStatus.syncing;
_emitStatus(operation);
try {
await _executeOperation(operation);
operation.markSuccess();
} catch (e) {
operation.markFailed(e.toString());
// Retry if possible
if (operation.canRetry() && !_isDisposed) {
await Future.delayed(Duration(seconds: operation.retryCount));
if (!_isDisposed) {
operation.incrementRetry();
_emitStatus(operation);
continue; // Retry this operation
}
}
} finally {
if (!_isDisposed) {
_emitStatus(operation);
}
_currentOperation = null;
}
}
}
/// Emits a status update if not disposed.
void _emitStatus(SyncOperation operation) {
if (!_isDisposed && !_statusController.isClosed) {
try {
_statusController.add(operation);
} catch (e) {
// Ignore if controller is closed
}
}
}
/// Executes a sync operation.
Future<void> _executeOperation(SyncOperation operation) async {
switch (operation.type) {
case SyncOperationType.upload:
if (operation.target == 'immich') {
await _uploadToImmich(operation);
} else if (operation.target == 'nostr') {
await _uploadToNostr(operation);
}
break;
case SyncOperationType.download:
if (operation.source == 'immich') {
await _downloadFromImmich(operation);
}
break;
case SyncOperationType.sync:
// Bidirectional sync - would need more complex logic
throw SyncException('Bidirectional sync not yet implemented');
}
}
/// Uploads item metadata to Immich.
Future<void> _uploadToImmich(SyncOperation operation) async {
final item = await _localStorage.getItem(operation.itemId);
if (item == null) {
throw SyncException('Item not found: ${operation.itemId}');
}
// Check if already synced
final cachedAsset = await _immichService!.getCachedAsset(operation.itemId);
if (cachedAsset != null) {
// Already synced, skip
return;
}
// For real upload, we'd need the actual image file
// For now, we just mark as synced by storing metadata
// In a real implementation, this would upload the image file
}
/// Downloads asset metadata from Immich.
Future<void> _downloadFromImmich(SyncOperation operation) async {
final asset = await _immichService!.getCachedAsset(operation.itemId);
if (asset == null) {
// Try to fetch from Immich
final assets = await _immichService!.fetchAssets(limit: 100);
final matching = assets.where((a) => a.id == operation.itemId);
if (matching.isEmpty) {
throw SyncException('Asset not found: ${operation.itemId}');
}
}
// Metadata is automatically stored by ImmichService
}
/// Uploads item metadata to Nostr.
Future<void> _uploadToNostr(SyncOperation operation) async {
final item = await _localStorage.getItem(operation.itemId);
if (item == null) {
throw SyncException('Item not found: ${operation.itemId}');
}
// Prepare metadata for Nostr
final metadata = {
'itemId': item.id,
'data': item.data,
'createdAt': item.createdAt,
'updatedAt': item.updatedAt,
};
// Sync metadata to Nostr
await _nostrService!.syncMetadata(
metadata: metadata,
privateKey: _nostrKeyPair!.privateKey,
kind: 30000, // Custom kind for app metadata
);
}
/// Resolves a conflict between local and remote data.
///
/// [localItem] - Local item data.
/// [remoteItem] - Remote item data.
///
/// Returns the resolved item data.
Map<String, dynamic> resolveConflict(
Map<String, dynamic> localItem,
Map<String, dynamic> remoteItem,
) {
switch (_conflictResolution) {
case ConflictResolution.useLocal:
return localItem;
case ConflictResolution.useRemote:
return remoteItem;
case ConflictResolution.useLatest:
final localTime = localItem['updatedAt'] as int? ?? 0;
final remoteTime = remoteItem['updatedAt'] as int? ?? 0;
return localTime > remoteTime ? localItem : remoteItem;
case ConflictResolution.merge:
// Simple merge: combine data, prefer remote for conflicts
return {
...localItem,
...remoteItem,
};
}
}
/// Clears all completed operations from the queue.
void clearCompleted() {
_operationQueue.removeWhere((op) => op.status == SyncStatus.success);
}
/// Clears all failed operations from the queue.
void clearFailed() {
_operationQueue.removeWhere((op) => op.status == SyncStatus.failed);
}
/// Disposes resources and closes streams.
void dispose() {
_isDisposed = true;
_operationQueue.clear();
_currentOperation = null;
if (!_statusController.isClosed) {
_statusController.close();
}
}
}

Powered by TurnKey Linux.