max / audiofiles
1 file changed,
+38 insertions,
-23 deletions
| @@ -168,7 +168,7 @@ async fn push_changes( | |||
| 168 | 168 | device_id: Uuid, | |
| 169 | 169 | ) -> Result<i64> { | |
| 170 | 170 | let p = db_path.to_path_buf(); | |
| 171 | - | let (rows, max_id) = tokio::task::spawn_blocking(move || -> Result<_> { | |
| 171 | + | let rows = tokio::task::spawn_blocking(move || -> Result<_> { | |
| 172 | 172 | let conn = open_conn(&p)?; | |
| 173 | 173 | let mut stmt = conn.prepare( | |
| 174 | 174 | "SELECT id, table_name, op, row_id, timestamp, data | |
| @@ -191,8 +191,7 @@ async fn push_changes( | |||
| 191 | 191 | })? | |
| 192 | 192 | .collect::<std::result::Result<Vec<_>, _>>()?; | |
| 193 | 193 | ||
| 194 | - | let max_id = rows.last().map(|r| r.0).unwrap_or(0); | |
| 195 | - | Ok((rows, max_id)) | |
| 194 | + | Ok(rows) | |
| 196 | 195 | }) | |
| 197 | 196 | .await | |
| 198 | 197 | .map_err(|e| SyncError::Other(e.to_string()))??; | |
| @@ -201,11 +200,12 @@ async fn push_changes( | |||
| 201 | 200 | return Ok(0); | |
| 202 | 201 | } | |
| 203 | 202 | ||
| 204 | - | let count = rows.len() as i64; | |
| 203 | + | let mut pushed_ids: Vec<i64> = Vec::with_capacity(rows.len()); | |
| 204 | + | let mut skipped = 0i64; | |
| 205 | 205 | ||
| 206 | 206 | let changes: Vec<ChangeEntry> = rows | |
| 207 | 207 | .into_iter() | |
| 208 | - | .filter_map(|(_, table, op, row_id, timestamp, data)| { | |
| 208 | + | .filter_map(|(id, table, op, row_id, timestamp, data)| { | |
| 209 | 209 | let ts = chrono::DateTime::parse_from_rfc3339(×tamp) | |
| 210 | 210 | .map(|dt| dt.with_timezone(&chrono::Utc)) | |
| 211 | 211 | .unwrap_or_else(|_| chrono::Utc::now()); | |
| @@ -213,7 +213,8 @@ async fn push_changes( | |||
| 213 | 213 | let change_op = match ChangeOp::from_str_opt(&op) { | |
| 214 | 214 | Some(o) => o, | |
| 215 | 215 | None => { | |
| 216 | - | tracing::warn!("Skipping changelog entry with unknown op: {}", op); | |
| 216 | + | tracing::warn!(id, "Skipping changelog entry with unknown op: {}", op); | |
| 217 | + | skipped += 1; | |
| 217 | 218 | return None; | |
| 218 | 219 | } | |
| 219 | 220 | }; | |
| @@ -226,6 +227,7 @@ async fn push_changes( | |||
| 226 | 227 | .ok() | |
| 227 | 228 | }); | |
| 228 | 229 | ||
| 230 | + | pushed_ids.push(id); | |
| 229 | 231 | Some(ChangeEntry { | |
| 230 | 232 | table, | |
| 231 | 233 | op: change_op, | |
| @@ -236,23 +238,36 @@ async fn push_changes( | |||
| 236 | 238 | }) | |
| 237 | 239 | .collect(); | |
| 238 | 240 | ||
| 239 | - | client | |
| 240 | - | .push(device_id, changes) | |
| 241 | - | .await | |
| 242 | - | .map_err(|e| SyncError::Client(e.to_string()))?; | |
| 241 | + | if skipped > 0 { | |
| 242 | + | tracing::warn!(skipped, "Some changelog entries could not be pushed — they will be retried next sync"); | |
| 243 | + | } | |
| 243 | 244 | ||
| 244 | - | // Mark as pushed | |
| 245 | - | let p = db_path.to_path_buf(); | |
| 246 | - | tokio::task::spawn_blocking(move || { | |
| 247 | - | let conn = open_conn(&p)?; | |
| 248 | - | conn.execute( | |
| 249 | - | "UPDATE sync_changelog SET pushed = 1 WHERE id <= ?1 AND pushed = 0", | |
| 250 | - | [max_id], | |
| 251 | - | )?; | |
| 252 | - | Ok::<_, SyncError>(()) | |
| 253 | - | }) | |
| 254 | - | .await | |
| 255 | - | .map_err(|e| SyncError::Other(e.to_string()))??; | |
| 245 | + | if !changes.is_empty() { | |
| 246 | + | client | |
| 247 | + | .push(device_id, changes) | |
| 248 | + | .await | |
| 249 | + | .map_err(|e| SyncError::Client(e.to_string()))?; | |
| 250 | + | } | |
| 251 | + | ||
| 252 | + | let pushed_count = pushed_ids.len() as i64; | |
| 253 | + | ||
| 254 | + | // Mark only successfully-pushed entries | |
| 255 | + | if !pushed_ids.is_empty() { | |
| 256 | + | let p = db_path.to_path_buf(); | |
| 257 | + | tokio::task::spawn_blocking(move || { | |
| 258 | + | let conn = open_conn(&p)?; | |
| 259 | + | let placeholders: String = pushed_ids.iter().map(|_| "?").collect::<Vec<_>>().join(","); | |
| 260 | + | let sql = format!("UPDATE sync_changelog SET pushed = 1 WHERE id IN ({})", placeholders); | |
| 261 | + | let params: Vec<&dyn rusqlite::types::ToSql> = pushed_ids | |
| 262 | + | .iter() | |
| 263 | + | .map(|id| id as &dyn rusqlite::types::ToSql) | |
| 264 | + | .collect(); | |
| 265 | + | conn.execute(&sql, params.as_slice())?; | |
| 266 | + | Ok::<_, SyncError>(()) | |
| 267 | + | }) | |
| 268 | + | .await | |
| 269 | + | .map_err(|e| SyncError::Other(e.to_string()))??; | |
| 270 | + | } | |
| 256 | 271 | ||
| 257 | - | Ok(count) | |
| 272 | + | Ok(pushed_count) | |
| 258 | 273 | } |