max / goingson
3 files changed,
+31 insertions,
-19 deletions
| @@ -4,7 +4,9 @@ | |||
| 4 | 4 | //! for restoring data from backups. The command layer handles file I/O; | |
| 5 | 5 | //! this module handles the restore logic using repository trait objects. | |
| 6 | 6 | ||
| 7 | - | use crate::id_types::UserId; | |
| 7 | + | use std::collections::HashMap; | |
| 8 | + | ||
| 9 | + | use crate::id_types::{ProjectId, UserId}; | |
| 8 | 10 | ||
| 9 | 11 | use crate::error::CoreError; | |
| 10 | 12 | use crate::models::{ | |
| @@ -47,7 +49,8 @@ pub async fn restore_from_backup( | |||
| 47 | 49 | ) -> Result<RestoreResult, CoreError> { | |
| 48 | 50 | let mut result = RestoreResult::default(); | |
| 49 | 51 | ||
| 50 | - | // Import projects | |
| 52 | + | // Import projects, tracking old-to-new ID mapping | |
| 53 | + | let mut project_id_map: HashMap<ProjectId, ProjectId> = HashMap::new(); | |
| 51 | 54 | for project in &input.projects { | |
| 52 | 55 | if projects.get_by_id(project.id, user_id).await?.is_none() { | |
| 53 | 56 | let new_project = NewProject { | |
| @@ -56,12 +59,13 @@ pub async fn restore_from_backup( | |||
| 56 | 59 | project_type: project.project_type.clone(), | |
| 57 | 60 | status: project.status.clone(), | |
| 58 | 61 | }; | |
| 59 | - | projects.create(user_id, new_project).await?; | |
| 62 | + | let created = projects.create(user_id, new_project).await?; | |
| 63 | + | project_id_map.insert(project.id, created.id); | |
| 60 | 64 | result.projects_restored += 1; | |
| 61 | 65 | } | |
| 62 | 66 | } | |
| 63 | 67 | ||
| 64 | - | // Import tasks | |
| 68 | + | // Import tasks, remapping project_id references | |
| 65 | 69 | for task in &input.tasks { | |
| 66 | 70 | if tasks.get_by_id(task.id, user_id).await?.is_none() { | |
| 67 | 71 | let new_task = crate::models::NewTask::builder(&task.description) | |
| @@ -76,7 +80,9 @@ pub async fn restore_from_backup( | |||
| 76 | 80 | new_task | |
| 77 | 81 | }; | |
| 78 | 82 | ||
| 79 | - | let new_task = if let Some(pid) = task.project_id { | |
| 83 | + | let remapped_pid = task.project_id | |
| 84 | + | .and_then(|pid| project_id_map.get(&pid).copied().or(Some(pid))); | |
| 85 | + | let new_task = if let Some(pid) = remapped_pid { | |
| 80 | 86 | new_task.project_id(pid) | |
| 81 | 87 | } else { | |
| 82 | 88 | new_task | |
| @@ -87,7 +93,7 @@ pub async fn restore_from_backup( | |||
| 87 | 93 | } | |
| 88 | 94 | } | |
| 89 | 95 | ||
| 90 | - | // Import events | |
| 96 | + | // Import events, remapping project_id references | |
| 91 | 97 | for event in &input.events { | |
| 92 | 98 | if events.get_by_id(event.id, user_id).await?.is_none() { | |
| 93 | 99 | let new_event = NewEvent::builder(&event.title, event.start_time) | |
| @@ -106,7 +112,9 @@ pub async fn restore_from_backup( | |||
| 106 | 112 | new_event | |
| 107 | 113 | }; | |
| 108 | 114 | ||
| 109 | - | let new_event = if let Some(pid) = event.project_id { | |
| 115 | + | let remapped_pid = event.project_id | |
| 116 | + | .and_then(|pid| project_id_map.get(&pid).copied().or(Some(pid))); | |
| 117 | + | let new_event = if let Some(pid) = remapped_pid { | |
| 110 | 118 | new_event.project_id(pid) | |
| 111 | 119 | } else { | |
| 112 | 120 | new_event |
| @@ -37,12 +37,11 @@ pub async fn pull_changes( | |||
| 37 | 37 | break; | |
| 38 | 38 | } | |
| 39 | 39 | ||
| 40 | - | let batch_count = changes.len() as i64; | |
| 41 | - | ||
| 42 | 40 | // Detect conflicts between remote changes and local pending | |
| 43 | 41 | let (clean, conflicts) = detect_conflicts(changes, &local_pending, device_id); | |
| 44 | 42 | ||
| 45 | 43 | // Apply non-conflicting changes directly | |
| 44 | + | let clean_count = clean.len() as i64; | |
| 46 | 45 | if !clean.is_empty() { | |
| 47 | 46 | let clean_entries: Vec<ChangeEntry> = | |
| 48 | 47 | clean.into_iter().map(|p| p.entry).collect(); | |
| @@ -50,6 +49,7 @@ pub async fn pull_changes( | |||
| 50 | 49 | } | |
| 51 | 50 | ||
| 52 | 51 | // Resolve conflicts with LWW | |
| 52 | + | let mut resolved_count: i64 = 0; | |
| 53 | 53 | if !conflicts.is_empty() { | |
| 54 | 54 | let conflict_count = conflicts.len(); | |
| 55 | 55 | let mut resolved_entries: Vec<ChangeEntry> = Vec::new(); | |
| @@ -74,6 +74,7 @@ pub async fn pull_changes( | |||
| 74 | 74 | } | |
| 75 | 75 | } | |
| 76 | 76 | ||
| 77 | + | resolved_count = resolved_entries.len() as i64; | |
| 77 | 78 | if !resolved_entries.is_empty() { | |
| 78 | 79 | apply_remote_changes(pool, resolved_entries).await?; | |
| 79 | 80 | } | |
| @@ -87,7 +88,7 @@ pub async fn pull_changes( | |||
| 87 | 88 | ); | |
| 88 | 89 | } | |
| 89 | 90 | ||
| 90 | - | total_applied += batch_count; | |
| 91 | + | total_applied += clean_count + resolved_count; | |
| 91 | 92 | ||
| 92 | 93 | // Save cursor after each batch (crash-safe) | |
| 93 | 94 | set_sync_state(pool, "pull_cursor", &new_cursor.to_string()).await?; |
| @@ -26,11 +26,10 @@ pub async fn push_changes( | |||
| 26 | 26 | return Ok(0); | |
| 27 | 27 | } | |
| 28 | 28 | ||
| 29 | - | let max_id = rows.last().map(|r| r.0).unwrap_or(0); | |
| 30 | - | ||
| 29 | + | let mut pushed_ids: Vec<i64> = Vec::new(); | |
| 31 | 30 | let changes: Vec<ChangeEntry> = rows | |
| 32 | 31 | .into_iter() | |
| 33 | - | .filter_map(|(_, table, op, row_id, timestamp, data)| { | |
| 32 | + | .filter_map(|(id, table, op, row_id, timestamp, data)| { | |
| 34 | 33 | let ts = chrono::DateTime::parse_from_rfc3339(×tamp) | |
| 35 | 34 | .map(|dt| dt.with_timezone(&Utc)) | |
| 36 | 35 | .unwrap_or_else(|_| Utc::now()); | |
| @@ -45,6 +44,8 @@ pub async fn push_changes( | |||
| 45 | 44 | } | |
| 46 | 45 | }; | |
| 47 | 46 | ||
| 47 | + | pushed_ids.push(id); | |
| 48 | + | ||
| 48 | 49 | Some(ChangeEntry { | |
| 49 | 50 | table, | |
| 50 | 51 | op: change_op, | |
| @@ -62,12 +63,14 @@ pub async fn push_changes( | |||
| 62 | 63 | .await | |
| 63 | 64 | .map_err(|e| CoreError::sync(format!("push failed: {}", e)))?; | |
| 64 | 65 | ||
| 65 | - | // Mark as pushed | |
| 66 | - | sqlx::query("UPDATE sync_changelog SET pushed = 1 WHERE id <= ? AND pushed = 0") | |
| 67 | - | .bind(max_id) | |
| 68 | - | .execute(pool) | |
| 69 | - | .await | |
| 70 | - | .map_err(CoreError::database)?; | |
| 66 | + | // Mark only the entries that were actually pushed | |
| 67 | + | for id in &pushed_ids { | |
| 68 | + | sqlx::query("UPDATE sync_changelog SET pushed = 1 WHERE id = ?") | |
| 69 | + | .bind(id) | |
| 70 | + | .execute(pool) | |
| 71 | + | .await | |
| 72 | + | .map_err(CoreError::database)?; | |
| 73 | + | } | |
| 71 | 74 | ||
| 72 | 75 | debug!("Pushed {} changes", count); | |
| 73 | 76 | Ok(count) |