Skip to main content

max / goingson

Harden sync, email, plugin, and export subsystems Code fuzz remediation: sync push/pull accuracy, plugin registry hardening, backup FK integrity, email sync improvements, parser cleanup, and export safety improvements. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Author: Max J. <87768334+MaxJMath@users.noreply.github.com> · 2026-05-02 23:12 UTC
Commit: 8e4136ed753986aa78a4be515b815a2d932712c3
Parent: df3de31
28 files changed, +476 insertions, -184 deletions
@@ -6,7 +6,7 @@
6 6
7 7 use std::collections::HashMap;
8 8
9 - use crate::id_types::{ProjectId, UserId};
9 + use crate::id_types::{ProjectId, TaskId, UserId};
10 10
11 11 use crate::error::CoreError;
12 12 use crate::models::{
@@ -25,6 +25,10 @@ pub struct RestoreResult {
25 25 pub events_restored: usize,
26 26 /// Number of emails restored.
27 27 pub emails_restored: usize,
28 + /// Number of subtasks restored.
29 + pub subtasks_restored: usize,
30 + /// Number of annotations restored.
31 + pub annotations_restored: usize,
28 32 }
29 33
30 34 /// Pre-parsed backup data for restoration.
@@ -65,7 +69,8 @@ pub async fn restore_from_backup(
65 69 }
66 70 }
67 71
68 - // Import tasks, remapping project_id references
72 + // Import tasks, remapping project_id references and restoring subtasks/annotations
73 + let mut task_id_map: HashMap<TaskId, TaskId> = HashMap::new();
69 74 for task in &input.tasks {
70 75 if tasks.get_by_id(task.id, user_id).await?.is_none() {
71 76 let new_task = crate::models::NewTask::builder(&task.description)
@@ -88,8 +93,38 @@ pub async fn restore_from_backup(
88 93 new_task
89 94 };
90 95
91 - tasks.create(user_id, new_task.build()).await?;
96 + let created = tasks.create(user_id, new_task.build()).await?;
97 + task_id_map.insert(task.id, created.id);
92 98 result.tasks_restored += 1;
99 +
100 + // Restore annotations
101 + for annotation in &task.annotations {
102 + if tasks.add_annotation(created.id, user_id, &annotation.note).await?.is_some() {
103 + result.annotations_restored += 1;
104 + }
105 + }
106 +
107 + // Restore subtasks (text-only; linked subtasks handled in second pass)
108 + for subtask in &task.subtasks {
109 + if subtask.linked_task_id.is_none() {
110 + if tasks.add_subtask(created.id, user_id, &subtask.text).await?.is_some() {
111 + result.subtasks_restored += 1;
112 + }
113 + }
114 + }
115 + }
116 + }
117 +
118 + // Second pass: restore subtasks with linked_task_id (requires all tasks to exist)
119 + for task in &input.tasks {
120 + let new_parent_id = task_id_map.get(&task.id).copied().unwrap_or(task.id);
121 + for subtask in &task.subtasks {
122 + if let Some(linked_id) = subtask.linked_task_id {
123 + let new_linked_id = task_id_map.get(&linked_id).copied().unwrap_or(linked_id);
124 + if tasks.add_subtask_link(new_parent_id, user_id, new_linked_id).await.ok().flatten().is_some() {
125 + result.subtasks_restored += 1;
126 + }
127 + }
93 128 }
94 129 }
95 130
@@ -65,6 +65,9 @@ pub const MAX_CONTACT_DISPLAY_NAME_LENGTH: usize = 255;
65 65 /// Maximum scheduled duration in minutes (24 hours).
66 66 pub const MAX_SCHEDULED_DURATION_MINUTES: i32 = 24 * 60;
67 67
68 + /// Maximum relative date offset in days (~10 years).
69 + pub const MAX_RELATIVE_DATE_DAYS: i64 = 3650;
70 +
68 71 #[cfg(test)]
69 72 mod tests {
70 73 use super::*;
@@ -19,6 +19,8 @@ use crate::repository::EmailRepository;
19 19 pub struct FetchedEmail {
20 20 pub message_id: Option<String>,
21 21 pub in_reply_to: Option<String>,
22 + /// First entry from the RFC 2822 References header (the thread root message-ID).
23 + pub references_root: Option<String>,
22 24 pub from: String,
23 25 pub to: String,
24 26 pub subject: String,
@@ -55,6 +57,21 @@ pub async fn process_fetched_emails(
55 57 ) -> Result<SyncProcessResult, CoreError> {
56 58 let mut result = SyncProcessResult::default();
57 59
60 + // Ensure every email has a message_id for dedup: real if present, else synthetic hash
61 + let mut emails = emails;
62 + for e in &mut emails {
63 + if e.message_id.is_none() {
64 + use std::collections::hash_map::DefaultHasher;
65 + use std::hash::{Hash, Hasher};
66 + let mut h = DefaultHasher::new();
67 + e.from.hash(&mut h);
68 + e.to.hash(&mut h);
69 + e.subject.hash(&mut h);
70 + e.date.hash(&mut h);
71 + e.message_id = Some(format!("synth-{:x}", h.finish()));
72 + }
73 + }
74 +
58 75 // Batch check existing message IDs
59 76 let msg_ids: Vec<&str> = emails.iter()
60 77 .filter_map(|e| e.message_id.as_deref())
@@ -78,7 +95,9 @@ pub async fn process_fetched_emails(
78 95 // thread_id groups conversations: use in_reply_to if this is a reply,
79 96 // otherwise fall back to message_id (starts a new thread). This means
80 97 // the first email in a thread has thread_id == message_id.
81 - let thread_id = email.in_reply_to.clone().or_else(|| email.message_id.clone());
98 + let thread_id = email.references_root.clone()
99 + .or_else(|| email.in_reply_to.clone())
100 + .or_else(|| email.message_id.clone());
82 101
83 102 if let Some(ref reply_to) = email.in_reply_to {
84 103 reply_to_ids.push(reply_to.clone());
@@ -137,6 +156,7 @@ mod tests {
137 156 let email = FetchedEmail {
138 157 message_id: Some("msg-1@example.com".to_string()),
139 158 in_reply_to: None,
159 + references_root: None,
140 160 from: "sender@example.com".to_string(),
141 161 to: "recipient@example.com".to_string(),
142 162 subject: "Test".to_string(),
@@ -45,6 +45,7 @@ pub mod plugin;
45 45 pub mod recurrence;
46 46 pub mod repository;
47 47 pub mod search_parser;
48 + pub mod text_utils;
48 49 pub mod monthly_review;
49 50 pub mod urgency;
50 51 pub mod validation;
@@ -1,7 +1,7 @@
1 1 //! Natural language parser for quick-add task input.
2 2
3 3 use chrono::{DateTime, Duration, NaiveDate, NaiveTime, Utc};
4 - use crate::constants::{APPROXIMATE_DAYS_PER_MONTH, DEFAULT_PARSE_HOUR, DEFAULT_PARSE_MINUTE};
4 + use crate::constants::{APPROXIMATE_DAYS_PER_MONTH, DEFAULT_PARSE_HOUR, DEFAULT_PARSE_MINUTE, MAX_RELATIVE_DATE_DAYS};
5 5 use crate::models::{Priority, Recurrence};
6 6
7 7 /// Parsed task data from quick-add input.
@@ -114,21 +114,7 @@ pub fn parse_quick_add_with_warnings(input: &str) -> ParseResult {
114 114 ParseResult { task, warnings }
115 115 }
116 116
117 - /// Efficient case-insensitive prefix check (no allocation).
118 - ///
119 - /// Returns the remaining part of the string if `s` starts with `prefix`
120 - /// (case-insensitive), otherwise returns `None`.
121 - fn strip_prefix_ci<'a>(s: &'a str, prefix: &str) -> Option<&'a str> {
122 - if s.len() < prefix.len() {
123 - return None;
124 - }
125 - let s_prefix = &s[..prefix.len()];
126 - if s_prefix.eq_ignore_ascii_case(prefix) {
127 - Some(&s[prefix.len()..])
128 - } else {
129 - None
130 - }
131 - }
117 + use crate::text_utils::strip_prefix_ci;
132 118
133 119 /// Parse priority from string
134 120 fn parse_priority(s: &str) -> Option<Priority> {
@@ -209,7 +195,7 @@ fn parse_relative_date(s: &str, from: NaiveDate, time: NaiveTime) -> Option<Date
209 195
210 196 let (num_str, unit) = s.split_at(s.len() - 1);
211 197 let num: i64 = num_str.parse().ok()?;
212 - if num < 1 {
198 + if num < 1 || num > MAX_RELATIVE_DATE_DAYS {
213 199 return None;
214 200 }
215 201
@@ -38,7 +38,7 @@
38 38 use chrono::{DateTime, Duration, NaiveDate, NaiveTime, Utc};
39 39 use serde::{Deserialize, Serialize};
40 40
41 - use crate::constants::{APPROXIMATE_DAYS_PER_MONTH, DEFAULT_PARSE_HOUR, DEFAULT_PARSE_MINUTE};
41 + use crate::constants::{APPROXIMATE_DAYS_PER_MONTH, DEFAULT_PARSE_HOUR, DEFAULT_PARSE_MINUTE, MAX_RELATIVE_DATE_DAYS};
42 42 use crate::models::Priority;
43 43 use crate::repository::SearchResultType;
44 44
@@ -200,18 +200,7 @@ pub fn parse_search_query(input: &str) -> ParsedSearchQuery {
200 200 result
201 201 }
202 202
203 - /// Case-insensitive prefix check without allocation.
204 - fn strip_prefix_ci<'a>(s: &'a str, prefix: &str) -> Option<&'a str> {
205 - if s.len() < prefix.len() {
206 - return None;
207 - }
208 - let s_prefix = &s[..prefix.len()];
209 - if s_prefix.eq_ignore_ascii_case(prefix) {
210 - Some(&s[prefix.len()..])
211 - } else {
212 - None
213 - }
214 - }
203 + use crate::text_utils::strip_prefix_ci;
215 204
216 205 /// Parse priority from string.
217 206 fn parse_priority(s: &str) -> Option<Priority> {
@@ -335,7 +324,11 @@ fn parse_relative_date(s: &str, from: NaiveDate, time: NaiveTime) -> Option<Date
335 324 }
336 325
337 326 let (num_str, unit) = rest.split_at(rest.len() - 1);
338 - let num: i64 = num_str.parse::<i64>().ok()? * sign;
327 + let raw: i64 = num_str.parse::<i64>().ok()?;
328 + if raw > MAX_RELATIVE_DATE_DAYS {
329 + return None;
330 + }
331 + let num: i64 = raw * sign;
339 332
340 333 let target = match unit {
341 334 "d" => from + Duration::days(num),
@@ -0,0 +1,41 @@
1 + //! Shared text utilities for parsing and matching.
2 +
3 + /// Case-insensitive prefix check without allocation.
4 + ///
5 + /// Returns the remaining part of the string if `s` starts with `prefix`
6 + /// (case-insensitive), otherwise returns `None`. Safe for multi-byte UTF-8.
7 + pub fn strip_prefix_ci<'a>(s: &'a str, prefix: &str) -> Option<&'a str> {
8 + if s.len() < prefix.len() || !s.is_char_boundary(prefix.len()) {
9 + return None;
10 + }
11 + let s_prefix = &s[..prefix.len()];
12 + if s_prefix.eq_ignore_ascii_case(prefix) {
13 + Some(&s[prefix.len()..])
14 + } else {
15 + None
16 + }
17 + }
18 +
19 + #[cfg(test)]
20 + mod tests {
21 + use super::*;
22 +
23 + #[test]
24 + fn test_basic_match() {
25 + assert_eq!(strip_prefix_ci("PROJECT:test", "project:"), Some("test"));
26 + assert_eq!(strip_prefix_ci("Project:test", "project:"), Some("test"));
27 + }
28 +
29 + #[test]
30 + fn test_no_match() {
31 + assert_eq!(strip_prefix_ci("proj:test", "project:"), None);
32 + assert_eq!(strip_prefix_ci("pr", "project:"), None);
33 + }
34 +
35 + #[test]
36 + fn test_multibyte_utf8_safe() {
37 + // Ensure no panic on multi-byte characters near prefix boundary
38 + assert_eq!(strip_prefix_ci("pr😀:H", "pri:"), None);
39 + assert_eq!(strip_prefix_ci("düe:tom", "due:"), None);
40 + }
41 + }
@@ -29,7 +29,8 @@ fn validate_required_string(field: &'static str, value: &str, max_len: usize) ->
29 29 if value.trim().is_empty() {
30 30 return Err(CoreError::validation(field, "cannot be empty"));
31 31 }
32 - if value.len() > max_len {
32 + // Fast path: byte length can't exceed char count, so if bytes fit, chars do too
33 + if value.len() > max_len && value.chars().count() > max_len {
33 34 return Err(CoreError::validation(
34 35 field,
35 36 format!("must be {} characters or less", max_len),
@@ -47,7 +47,7 @@ pub async fn run_migrations(pool: &SqlitePool) -> Result<(), sqlx::migrate::Migr
47 47 sqlx::query("PRAGMA foreign_keys = ON")
48 48 .execute(pool)
49 49 .await
50 - .expect("failed to re-enable foreign key constraints");
50 + .map_err(|e| sqlx::migrate::MigrateError::Execute(e))?;
51 51
52 52 Ok(())
53 53 }
@@ -1,7 +1,7 @@
1 1 //! Application-level data migrations that require Rust logic (not expressible in SQL).
2 2
3 3 use goingson_core::email_id::deterministic_email_id;
4 - use sqlx::SqlitePool;
4 + use sqlx::{Acquire, SqlitePool};
5 5 use tracing::{info, warn};
6 6
7 7 const MIGRATION_KEY: &str = "migration_deterministic_email_ids";
@@ -41,12 +41,18 @@ pub async fn migrate_deterministic_email_ids(pool: &SqlitePool) -> Result<(), St
41 41
42 42 let mut updated = 0u64;
43 43
44 - // Disable FK enforcement during migration (we're updating PKs and FKs together)
44 + // Use a dedicated connection with FK enforcement off, wrapped in a transaction
45 + let mut conn = pool.acquire().await
46 + .map_err(|e| format!("Failed to acquire connection: {e}"))?;
47 +
45 48 sqlx::query("PRAGMA foreign_keys = OFF")
46 - .execute(pool)
49 + .execute(&mut *conn)
47 50 .await
48 51 .map_err(|e| format!("Failed to disable FK: {e}"))?;
49 52
53 + let mut tx = conn.begin().await
54 + .map_err(|e| format!("Failed to begin transaction: {e}"))?;
55 +
50 56 for (old_id, message_id) in &rows {
51 57 let new_id = deterministic_email_id(Some(message_id));
52 58 let new_id_str = new_id.to_string();
@@ -59,7 +65,7 @@ pub async fn migrate_deterministic_email_ids(pool: &SqlitePool) -> Result<(), St
59 65 let result = sqlx::query("UPDATE emails SET id = ? WHERE id = ?")
60 66 .bind(&new_id_str)
61 67 .bind(old_id)
62 - .execute(pool)
68 + .execute(&mut *tx)
63 69 .await;
64 70
65 71 match result {
@@ -70,7 +76,7 @@ pub async fn migrate_deterministic_email_ids(pool: &SqlitePool) -> Result<(), St
70 76 )
71 77 .bind(&new_id_str)
72 78 .bind(old_id)
73 - .execute(pool)
79 + .execute(&mut *tx)
74 80 .await;
75 81
76 82 updated += 1;
@@ -83,9 +89,12 @@ pub async fn migrate_deterministic_email_ids(pool: &SqlitePool) -> Result<(), St
83 89 }
84 90 }
85 91
86 - // Re-enable FK enforcement
92 + tx.commit().await
93 + .map_err(|e| format!("Failed to commit migration: {e}"))?;
94 +
95 + // Re-enable FK enforcement on the same connection
87 96 sqlx::query("PRAGMA foreign_keys = ON")
88 - .execute(pool)
97 + .execute(&mut *conn)
89 98 .await
90 99 .map_err(|e| format!("Failed to re-enable FK: {e}"))?;
91 100
@@ -165,6 +165,9 @@ mod goingson_api {
165 165 #[rhai_fn(return_raw)]
166 166 #[tracing::instrument(skip_all)]
167 167 pub fn parse_csv(content: &str, options: Map) -> Result<rhai::Array, Box<EvalAltResult>> {
168 + // Excel-exported CSVs start with a BOM that breaks header detection
169 + let content = content.strip_prefix('\u{FEFF}').unwrap_or(content);
170 +
168 171 let has_header = options
169 172 .get("has_header")
170 173 .and_then(|v| v.as_bool().ok())
@@ -41,6 +41,10 @@ pub enum PluginError {
41 41 #[error("Permission denied: {plugin} lacks capability {capability}")]
42 42 PermissionDenied { plugin: String, capability: String },
43 43
44 + /// Plugin reload requested capabilities beyond what was previously approved.
45 + #[error("Capability escalation in {plugin}: new capabilities requested: {details}")]
46 + CapabilityEscalation { plugin: String, details: String },
47 +
44 48 /// Import parse error.
45 49 #[error("Import parse error: {0}")]
46 50 ImportError(String),
@@ -95,6 +99,14 @@ impl PluginError {
95 99 capability: capability.into(),
96 100 }
97 101 }
102 +
103 + /// Creates a capability escalation error.
104 + pub fn capability_escalation(plugin: impl Into<String>, details: impl Into<String>) -> Self {
105 + PluginError::CapabilityEscalation {
106 + plugin: plugin.into(),
107 + details: details.into(),
108 + }
109 + }
98 110 }
99 111
100 112 /// Result type for plugin operations.
@@ -112,8 +112,21 @@ impl PluginLoader {
112 112
113 113 // Follow symlinks to get the actual plugin directory
114 114 let plugin_path = if path.is_symlink() {
115 - std::fs::read_link(&path)
116 - .map_err(|e| PluginError::FileError(format!("Failed to read symlink: {}", e)))?
115 + let target = std::fs::read_link(&path)
116 + .map_err(|e| PluginError::FileError(format!("Failed to read symlink: {}", e)))?;
117 + // Resolve to absolute and verify target is within available/
118 + let canonical = std::fs::canonicalize(&target).or_else(|_| std::fs::canonicalize(&path))
119 + .map_err(|e| PluginError::FileError(format!("Failed to resolve symlink: {}", e)))?;
120 + let available_canonical = std::fs::canonicalize(self.available_dir())
121 + .unwrap_or_else(|_| self.available_dir());
122 + if !canonical.starts_with(&available_canonical) {
123 + tracing::warn!(
124 + "Skipping symlink '{}': target '{}' is outside available/",
125 + path.display(), canonical.display()
126 + );
127 + continue;
128 + }
129 + canonical
117 130 } else if path.is_dir() {
118 131 path.clone()
119 132 } else {
@@ -280,18 +293,37 @@ impl PluginLoader {
280 293 ///
281 294 /// Removes the cached `LoadedPlugin` first so `load_plugin` doesn't
282 295 /// short-circuit to the stale AST, then re-reads and recompiles from
283 - /// the same directory path.
296 + /// the same directory path. Rejects the reload if capabilities escalated.
284 297 #[tracing::instrument(skip_all)]
285 298 pub fn reload_plugin(&mut self, plugin_id: &str) -> Result<LoadedPlugin> {
286 - let path = self
299 + let old = self
287 300 .loaded
288 301 .get(plugin_id)
289 - .map(|p| p.path.clone())
290 302 .ok_or_else(|| PluginError::PluginNotFound(plugin_id.to_string()))?;
291 303
304 + let old_caps = old.meta.capabilities.clone();
305 + let path = old.path.clone();
306 +
292 307 self.loaded.remove(plugin_id);
293 308
294 - self.load_plugin(plugin_id, &path)
309 + let new_plugin = self.load_plugin(plugin_id, &path)?;
310 +
311 + // Detect escalation: any capability that was false and is now true
312 + let new_caps = &new_plugin.meta.capabilities;
313 + let mut escalated = Vec::new();
314 + if !old_caps.file_read && new_caps.file_read { escalated.push("file_read"); }
315 + if !old_caps.database_write && new_caps.database_write { escalated.push("database_write"); }
316 + if !old_caps.network && new_caps.network { escalated.push("network"); }
317 +
318 + if !escalated.is_empty() {
319 + self.loaded.remove(plugin_id);
320 + return Err(PluginError::capability_escalation(
321 + plugin_id,
322 + escalated.join(", "),
323 + ));
324 + }
325 +
326 + Ok(new_plugin)
295 327 }
296 328
297 329 /// Gets a loaded plugin by ID.
@@ -1052,14 +1052,11 @@ fn parse(file_path, options) {
1052 1052 // ============ Permission Escalation ============
1053 1053
1054 1054 /// A plugin that initially has no capabilities should not gain them on
1055 - /// reload even if the manifest changes, because the host checks the
1056 - /// PluginMeta.capabilities at execution time. This test verifies the
1057 - /// capabilities are faithfully read from the new manifest.
1055 + /// Reload rejects capability escalation (false → true).
1058 1056 #[test]
1059 - fn reload_reflects_changed_capabilities() {
1057 + fn reload_rejects_capability_escalation() {
1060 1058 let temp_dir = TempDir::new().unwrap();
1061 1059
1062 - // Create a plugin with minimal capabilities
1063 1060 let plugin_dir = temp_dir.path().join("available").join("sneaky");
1064 1061 std::fs::create_dir_all(&plugin_dir).unwrap();
1065 1062
@@ -1087,11 +1084,7 @@ fn execute(args) { 42 }
1087 1084 let mut registry = PluginRegistry::new(temp_dir.path()).unwrap();
1088 1085 registry.enable_plugin("sneaky").unwrap();
1089 1086
1090 - // Verify initial capabilities are restricted
1091 - let meta = registry.loader().get_plugin("sneaky").unwrap().meta.clone();
1092 - assert!(!meta.capabilities.file_read);
1093 - assert!(!meta.capabilities.database_write);
1094 - assert!(!meta.capabilities.network);
1087 + assert!(!registry.loader().get_plugin("sneaky").unwrap().meta.capabilities.file_read);
1095 1088
1096 1089 // Attacker modifies manifest to escalate permissions
1097 1090 let escalated_manifest = r#"
@@ -1110,15 +1103,86 @@ network = true
1110 1103 "#;
1111 1104 std::fs::write(plugin_dir.join("plugin.toml"), escalated_manifest).unwrap();
1112 1105
1113 - // Reload picks up the new manifest -- the host is responsible for
1114 - // checking whether the user approved the new capabilities. The
1115 - // important thing is that the capabilities field is accurate (not
1116 - // stale from the old manifest).
1117 - let reloaded = registry.reload_plugin("sneaky").unwrap();
1118 - assert!(reloaded.capabilities.file_read);
1119 - assert!(reloaded.capabilities.database_write);
1120 - assert!(reloaded.capabilities.network);
1121 - assert_eq!(reloaded.version, "1.0.1");
1106 + // Reload must reject the escalation
1107 + let err = registry.reload_plugin("sneaky").unwrap_err();
1108 + match err {
1109 + PluginError::CapabilityEscalation { plugin, details } => {
1110 + assert_eq!(plugin, "sneaky");
1111 + assert!(details.contains("file_read"));
1112 + assert!(details.contains("database_write"));
1113 + assert!(details.contains("network"));
1114 + }
1115 + other => panic!("Expected CapabilityEscalation, got {:?}", other),
1116 + }
1117 +
1118 + // Plugin should be evicted from cache after escalation rejection
1119 + assert!(registry.loader().get_plugin("sneaky").is_none());
1120 + }
1121 +
1122 + /// Reload allows de-escalation (removing capabilities).
1123 + #[test]
1124 + fn reload_allows_deescalation() {
1125 + let temp_dir = TempDir::new().unwrap();
1126 +
1127 + let plugin_dir = temp_dir.path().join("available").join("generous");
1128 + std::fs::create_dir_all(&plugin_dir).unwrap();
1129 +
1130 + let full_manifest = r#"
1131 + [plugin]
1132 + name = "Generous"
1133 + version = "1.0.0"
1134 + description = "Has all capabilities"
1135 +
1136 + [plugin.type]
1137 + kind = "command"
1138 +
1139 + [plugin.capabilities]
1140 + file_read = true
1141 + database_write = true
1142 + network = true
1143 + "#;
1144 + std::fs::write(plugin_dir.join("plugin.toml"), full_manifest).unwrap();
1145 + std::fs::write(plugin_dir.join("main.rhai"), "fn describe() { #{} }\nfn execute(a) { 0 }").unwrap();
1146 +
1147 + let mut registry = PluginRegistry::new(temp_dir.path()).unwrap();
1148 + registry.enable_plugin("generous").unwrap();
1149 +
1150 + // De-escalate: remove all capabilities
1151 + let reduced_manifest = r#"
1152 + [plugin]
1153 + name = "Generous"
1154 + version = "1.1.0"
1155 + description = "Reduced capabilities"
1156 +
1157 + [plugin.type]
1158 + kind = "command"
1159 +
1160 + [plugin.capabilities]
1161 + file_read = false
1162 + database_write = false
1163 + network = false
1164 + "#;
1165 + std::fs::write(plugin_dir.join("plugin.toml"), reduced_manifest).unwrap();
1166 +
1167 + let reloaded = registry.reload_plugin("generous").unwrap();
1168 + assert!(!reloaded.capabilities.file_read);
1169 + assert!(!reloaded.capabilities.database_write);
1170 + assert!(!reloaded.capabilities.network);
1171 + }
1172 +
1173 + /// Reload succeeds when capabilities are unchanged.
1174 + #[test]
1175 + fn reload_succeeds_unchanged_capabilities() {
1176 + let temp_dir = TempDir::new().unwrap();
1177 + create_csv_import_plugin(temp_dir.path());
1178 +
1179 + let mut registry = PluginRegistry::new(temp_dir.path()).unwrap();
1180 + registry.enable_plugin("csv-import").unwrap();
1181 +
1182 + let original_caps = registry.loader().get_plugin("csv-import").unwrap().meta.capabilities.clone();
1183 +
1184 + let reloaded = registry.reload_plugin("csv-import").unwrap();
1185 + assert_eq!(reloaded.capabilities, original_caps);
1122 1186 }
1123 1187
1124 1188 // --- with_engine constructor ---
@@ -55,8 +55,10 @@ pub(super) async fn get_valid_access_token(
55 55 .or_else(|| account.oauth2_access_token.clone())
56 56 .or_api_err(|| ApiError::auth("No access token available"))?;
57 57
58 - // Check if token needs refresh
58 + // Check if token needs refresh (serialized per-account to prevent thundering herd)
59 59 if account.needs_token_refresh() {
60 + let refresh_lock = state.token_refresh_lock(raw_id);
61 + let _guard = refresh_lock.lock().await;
60 62 let token_manager = TokenManager::from_env();
61 63 match token_manager.refresh_if_needed(account).await
62 64 .map_api_err("Token refresh failed", ApiError::auth)?
@@ -434,7 +436,8 @@ async fn test_jmap_account(account: &EmailAccount) -> Result<TestConnectionRespo
434 436 .or_else(|| account.oauth2_access_token.clone())
435 437 .or_api_err(|| ApiError::auth("No access token available"))?;
436 438
437 - let mut client = JmapClient::new(session_url, &access_token);
439 + let mut client = JmapClient::new(session_url, &access_token)
440 + .map_err(ApiError::external_service)?;
438 441
439 442 let session_result = client.session().await;
440 443 let username = match &session_result {
@@ -45,22 +45,32 @@ pub async fn sync_email_account_inner(
45 45 id: EmailAccountId,
46 46 full_sync: Option<bool>,
47 47 ) -> Result<SyncResponse, ApiError> {
48 - let account = state.email_accounts
49 - .get_by_id(id, DESKTOP_USER_ID)
50 - .await?
51 - .or_not_found("emailAccount", id)?;
52 -
53 - // Dispatch to protocol-specific sync path based on account configuration.
54 - // JMAP accounts have a session URL; everything else uses IMAP.
55 - let result = if uses_jmap(&account) {
56 - sync_jmap_account_inner(state, &account, id, full_sync).await?
57 - } else {
58 - sync_imap_account_inner(state, &account, id, full_sync).await?
59 - };
48 + // Acquire per-account lock to prevent concurrent syncs on the same account
49 + if !state.email_sync_locks.lock().insert(id) {
50 + return Err(ApiError::bad_request("Sync already in progress for this account"));
51 + }
52 +
53 + let result = async {
54 + let account = state.email_accounts
55 + .get_by_id(id, DESKTOP_USER_ID)
56 + .await?
57 + .or_not_found("emailAccount", id)?;
58 +
59 + let result = if uses_jmap(&account) {
60 + sync_jmap_account_inner(state, &account, id, full_sync).await?
61 + } else {
62 + sync_imap_account_inner(state, &account, id, full_sync).await?
63 + };
64 +
65 + state.email_accounts.update_last_sync(id, DESKTOP_USER_ID).await?;
66 +
67 + Ok(result)
68 + }.await;
60 69
61 - state.email_accounts.update_last_sync(id, DESKTOP_USER_ID).await?;
70 + // Always release the lock (parking_lot::Mutex — safe even if async block panicked)
71 + state.email_sync_locks.lock().remove(&id);
62 72
63 - Ok(result)
73 + result
64 74 }
65 75
66 76 /// Synchronizes emails for an account.
@@ -215,6 +225,7 @@ async fn process_folder_emails(
215 225 FetchedEmail {
216 226 message_id: p.message_id,
217 227 in_reply_to: p.in_reply_to,
228 + references_root: p.references_root,
218 229 from: p.from,
219 230 to: p.to,
220 231 subject: p.subject,
@@ -267,8 +278,11 @@ async fn sync_jmap_account_inner(
267 278 .or_else(|| account.oauth2_access_token.clone())
268 279 .or_api_err(|| ApiError::auth("No access token available"))?;
269 280
270 - // Check if token needs refresh
281 + // Check if token needs refresh (serialized per-account to prevent thundering herd)
271 282 if account.needs_token_refresh() {
283 + let refresh_lock = state.token_refresh_lock(raw_id);
284 + let _guard = refresh_lock.lock().await;
285 + // Re-check after acquiring lock — another task may have refreshed already
272 286 let token_manager = TokenManager::from_env();
273 287 if let Ok(Some((new_token, new_refresh, expires_at))) = token_manager.refresh_if_needed(account).await {
274 288 state.email_accounts
@@ -279,7 +293,8 @@ async fn sync_jmap_account_inner(
279 293 }
280 294 }
281 295
282 - let mut client = JmapClient::new(session_url, &access_token);
296 + let mut client = JmapClient::new(session_url, &access_token)
297 + .map_err(ApiError::external_service)?;
283 298 let mut debug_parts = Vec::new();
284 299
285 300 let since = if full_sync.unwrap_or(false) { None } else { account.last_sync_at };
@@ -304,6 +319,7 @@ async fn sync_jmap_account_inner(
304 319 let fetched_inbox: Vec<FetchedEmail> = inbox_emails.into_iter().map(|p| FetchedEmail {
305 320 message_id: p.message_id,
306 321 in_reply_to: p.in_reply_to,
322 + references_root: p.references_root,
307 323 from: p.from,
308 324 to: p.to,
309 325 subject: p.subject,
@@ -338,6 +354,7 @@ async fn sync_jmap_account_inner(
338 354 let fetched_archive: Vec<FetchedEmail> = archive_emails.into_iter().map(|p| FetchedEmail {
339 355 message_id: p.message_id,
340 356 in_reply_to: p.in_reply_to,
357 + references_root: p.references_root,
341 358 from: p.from,
342 359 to: p.to,
343 360 subject: p.subject,
@@ -316,6 +316,8 @@ pub async fn refresh_oauth_tokens(
316 316 return Err(ApiError::bad_request("Account does not use OAuth"));
317 317 }
318 318
319 + let refresh_lock = state.token_refresh_lock(account.id.into());
320 + let _guard = refresh_lock.lock().await;
319 321 let token_manager = TokenManager::from_env();
320 322 let result = token_manager.refresh_if_needed(&account).await
321 323 .map_api_err("Token refresh failed", ApiError::external_service)?;
@@ -219,10 +219,10 @@ pub struct JmapProvider {
219 219 }
220 220
221 221 impl JmapProvider {
222 - pub fn new(session_url: &str, access_token: &str) -> Self {
223 - Self {
224 - client: JmapClient::new(session_url, access_token),
225 - }
222 + pub fn new(session_url: &str, access_token: &str) -> Result<Self, String> {
223 + Ok(Self {
224 + client: JmapClient::new(session_url, access_token)?,
225 + })
226 226 }
227 227
228 228 /// Creates a JMAP provider from an email account.
@@ -236,7 +236,7 @@ impl JmapProvider {
236 236 .as_ref()
237 237 .ok_or_else(|| "No access token available".to_string())?;
238 238
239 - Ok(Self::new(session_url, access_token))
239 + Self::new(session_url, access_token)
240 240 }
241 241
242 242 /// Updates the access token (after refresh).
@@ -76,14 +76,20 @@ impl FullExport {
76 76 ///
77 77 /// The size of the compressed file in bytes.
78 78 pub fn write_backup<P: AsRef<Path>>(export: &FullExport, path: P) -> Result<u64, BackupError> {
79 - let file = File::create(path.as_ref())?;
79 + let dest = path.as_ref();
80 + let tmp_path = dest.with_extension("tmp");
81 +
82 + let file = File::create(&tmp_path)?;
80 83 let mut encoder = GzEncoder::new(file, Compression::default());
81 84
82 85 let json = serde_json::to_vec(export)?;
83 86 encoder.write_all(&json)?;
84 87 encoder.finish()?;
85 88
86 - let metadata = std::fs::metadata(path.as_ref())?;
89 + // Prevents corrupt backups if the process crashes mid-write
90 + std::fs::rename(&tmp_path, dest)?;
91 +
92 + let metadata = std::fs::metadata(dest)?;
87 93 Ok(metadata.len())
88 94 }
89 95
@@ -75,12 +75,12 @@ pub fn write_tasks_csv<W: Write>(
75 75
76 76 csv_writer.write_record([
77 77 task.id.to_string(),
78 - project_name.to_string(),
78 + sanitize_csv_field(project_name),
79 79 sanitize_csv_field(&task.description),
80 80 task.status.as_str().to_string(),
81 81 task.priority.as_str().to_string(),
82 82 due,
83 - tags,
83 + sanitize_csv_field(&tags),
84 84 subtasks_done.to_string(),
85 85 subtasks_total.to_string(),
86 86 recurrence.to_string(),