max / balanced_breakfast
4 files changed,
+177 insertions,
-121 deletions
| @@ -9,6 +9,7 @@ pub mod crypto; | |||
| 9 | 9 | pub mod orchestrator; | |
| 10 | 10 | pub mod plugin_manager; | |
| 11 | 11 | pub mod rhai_plugin; | |
| 12 | + | pub mod scheduler; | |
| 12 | 13 | pub mod url_cleaner; | |
| 13 | 14 | ||
| 14 | 15 | pub use orchestrator::*; |
| @@ -0,0 +1,170 @@ | |||
| 1 | + | //! Pure scheduling helpers. | |
| 2 | + | //! | |
| 3 | + | //! Timing and backoff logic used by both the auto-fetch loop and the sync | |
| 4 | + | //! scheduler. All functions are deterministic (time is passed in explicitly) | |
| 5 | + | //! so they can be unit-tested without async or database dependencies. | |
| 6 | + | ||
| 7 | + | use bb_db::parse_timestamp; | |
| 8 | + | use chrono::{DateTime, Utc}; | |
| 9 | + | ||
| 10 | + | /// Is a single feed overdue given its last-fetch timestamp? | |
| 11 | + | /// | |
| 12 | + | /// `last_fetch` is the optional timestamp string from the database. `None` | |
| 13 | + | /// (never fetched) is always considered due. `interval_secs` is the desired | |
| 14 | + | /// interval between fetches. `now` is the current time. | |
| 15 | + | pub fn is_single_feed_due( | |
| 16 | + | last_fetch: Option<&str>, | |
| 17 | + | interval_secs: u64, | |
| 18 | + | now: DateTime<Utc>, | |
| 19 | + | ) -> bool { | |
| 20 | + | let last = last_fetch | |
| 21 | + | .map(parse_timestamp) | |
| 22 | + | .unwrap_or(DateTime::UNIX_EPOCH); | |
| 23 | + | let elapsed = now.signed_duration_since(last); | |
| 24 | + | elapsed.num_seconds() >= interval_secs as i64 | |
| 25 | + | } | |
| 26 | + | ||
| 27 | + | /// Check whether ANY feed in a set is overdue for fetching. | |
| 28 | + | /// | |
| 29 | + | /// A busser's feeds are fetched together (one plugin fetch call returns items | |
| 30 | + | /// for all its feeds), so if any single feed is due we return `true`. | |
| 31 | + | pub fn any_feed_due( | |
| 32 | + | last_fetches: &[Option<&str>], | |
| 33 | + | interval_secs: u64, | |
| 34 | + | now: DateTime<Utc>, | |
| 35 | + | ) -> bool { | |
| 36 | + | last_fetches | |
| 37 | + | .iter() | |
| 38 | + | .any(|ts| is_single_feed_due(*ts, interval_secs, now)) | |
| 39 | + | } | |
| 40 | + | ||
| 41 | + | /// Exponential backoff: `2^consecutive_failures` seconds, capped at `max_secs`. | |
| 42 | + | /// | |
| 43 | + | /// Returns the number of seconds to wait before the next retry attempt. | |
| 44 | + | pub fn exponential_backoff_secs(consecutive_failures: u32, max_secs: u64) -> u64 { | |
| 45 | + | let raw = 2u64.saturating_pow(consecutive_failures); | |
| 46 | + | raw.min(max_secs) | |
| 47 | + | } | |
| 48 | + | ||
| 49 | + | #[cfg(test)] | |
| 50 | + | mod tests { | |
| 51 | + | use super::*; | |
| 52 | + | use chrono::{Duration, Utc}; | |
| 53 | + | ||
| 54 | + | // --- is_single_feed_due --- | |
| 55 | + | ||
| 56 | + | #[test] | |
| 57 | + | fn is_fetch_due_first_time() { | |
| 58 | + | // Never fetched (None) should always be due. | |
| 59 | + | let now = Utc::now(); | |
| 60 | + | assert!(is_single_feed_due(None, 3600, now)); | |
| 61 | + | } | |
| 62 | + | ||
| 63 | + | #[test] | |
| 64 | + | fn is_fetch_due_overdue() { | |
| 65 | + | // Last fetch was 2 hours ago, interval is 1 hour -- due. | |
| 66 | + | let now = Utc::now(); | |
| 67 | + | let two_hours_ago = now - Duration::hours(2); | |
| 68 | + | let ts = two_hours_ago.to_rfc3339(); | |
| 69 | + | assert!(is_single_feed_due(Some(&ts), 3600, now)); | |
| 70 | + | } | |
| 71 | + | ||
| 72 | + | #[test] | |
| 73 | + | fn is_fetch_due_not_yet() { | |
| 74 | + | // Last fetch was 5 minutes ago, interval is 1 hour -- not due. | |
| 75 | + | let now = Utc::now(); | |
| 76 | + | let five_min_ago = now - Duration::minutes(5); | |
| 77 | + | let ts = five_min_ago.to_rfc3339(); | |
| 78 | + | assert!(!is_single_feed_due(Some(&ts), 3600, now)); | |
| 79 | + | } | |
| 80 | + | ||
| 81 | + | // --- any_feed_due --- | |
| 82 | + | ||
| 83 | + | #[test] | |
| 84 | + | fn any_feed_due_empty() { | |
| 85 | + | // No feeds means nothing is due. | |
| 86 | + | let now = Utc::now(); | |
| 87 | + | assert!(!any_feed_due(&[], 3600, now)); | |
| 88 | + | } | |
| 89 | + | ||
| 90 | + | #[test] | |
| 91 | + | fn any_feed_due_one_overdue() { | |
| 92 | + | let now = Utc::now(); | |
| 93 | + | let two_hours_ago = (now - Duration::hours(2)).to_rfc3339(); | |
| 94 | + | let five_min_ago = (now - Duration::minutes(5)).to_rfc3339(); | |
| 95 | + | let feeds: Vec<Option<&str>> = vec![ | |
| 96 | + | Some(five_min_ago.as_str()), | |
| 97 | + | Some(two_hours_ago.as_str()), | |
| 98 | + | ]; | |
| 99 | + | assert!(any_feed_due(&feeds, 3600, now)); | |
| 100 | + | } | |
| 101 | + | ||
| 102 | + | #[test] | |
| 103 | + | fn any_feed_due_none_overdue() { | |
| 104 | + | let now = Utc::now(); | |
| 105 | + | let five_min_ago = (now - Duration::minutes(5)).to_rfc3339(); | |
| 106 | + | let ten_min_ago = (now - Duration::minutes(10)).to_rfc3339(); | |
| 107 | + | let feeds: Vec<Option<&str>> = vec![ | |
| 108 | + | Some(five_min_ago.as_str()), | |
| 109 | + | Some(ten_min_ago.as_str()), | |
| 110 | + | ]; | |
| 111 | + | assert!(!any_feed_due(&feeds, 3600, now)); | |
| 112 | + | } | |
| 113 | + | ||
| 114 | + | #[test] | |
| 115 | + | fn any_feed_due_never_fetched() { | |
| 116 | + | // One feed never fetched (None) — should be due. | |
| 117 | + | let now = Utc::now(); | |
| 118 | + | let five_min_ago = (now - Duration::minutes(5)).to_rfc3339(); | |
| 119 | + | let feeds: Vec<Option<&str>> = vec![ | |
| 120 | + | Some(five_min_ago.as_str()), | |
| 121 | + | None, | |
| 122 | + | ]; | |
| 123 | + | assert!(any_feed_due(&feeds, 3600, now)); | |
| 124 | + | } | |
| 125 | + | ||
| 126 | + | #[test] | |
| 127 | + | fn any_feed_due_all_never_fetched() { | |
| 128 | + | let now = Utc::now(); | |
| 129 | + | let feeds: Vec<Option<&str>> = vec![None, None, None]; | |
| 130 | + | assert!(any_feed_due(&feeds, 3600, now)); | |
| 131 | + | } | |
| 132 | + | ||
| 133 | + | // --- exponential_backoff_secs --- | |
| 134 | + | ||
| 135 | + | #[test] | |
| 136 | + | fn backoff_zero_failures() { | |
| 137 | + | // 2^0 = 1 second | |
| 138 | + | assert_eq!(exponential_backoff_secs(0, 900), 1); | |
| 139 | + | } | |
| 140 | + | ||
| 141 | + | #[test] | |
| 142 | + | fn backoff_one_failure() { | |
| 143 | + | // 2^1 = 2 seconds | |
| 144 | + | assert_eq!(exponential_backoff_secs(1, 900), 2); | |
| 145 | + | } | |
| 146 | + | ||
| 147 | + | #[test] | |
| 148 | + | fn backoff_three_failures() { | |
| 149 | + | // 2^3 = 8 seconds | |
| 150 | + | assert_eq!(exponential_backoff_secs(3, 900), 8); | |
| 151 | + | } | |
| 152 | + | ||
| 153 | + | #[test] | |
| 154 | + | fn backoff_capped_at_max() { | |
| 155 | + | // 2^20 = 1_048_576, but max is 900 → capped at 900 | |
| 156 | + | assert_eq!(exponential_backoff_secs(20, 900), 900); | |
| 157 | + | } | |
| 158 | + | ||
| 159 | + | #[test] | |
| 160 | + | fn backoff_exact_cap() { | |
| 161 | + | // 2^4 = 16, max is 16 → exactly 16 | |
| 162 | + | assert_eq!(exponential_backoff_secs(4, 16), 16); | |
| 163 | + | } | |
| 164 | + | ||
| 165 | + | #[test] | |
| 166 | + | fn backoff_large_failure_count_no_overflow() { | |
| 167 | + | // Very large exponent should not panic, just saturate + cap. | |
| 168 | + | assert_eq!(exponential_backoff_secs(64, 900), 900); | |
| 169 | + | } | |
| 170 | + | } |
| @@ -1,6 +1,6 @@ | |||
| 1 | 1 | //! Application state wrapping the Orchestrator | |
| 2 | + | use bb_core::scheduler::any_feed_due; | |
| 2 | 3 | use bb_core::{Orchestrator, OrchestratorConfig}; | |
| 3 | - | use bb_db::parse_timestamp; | |
| 4 | 4 | use bb_interface::ErrorCategory; | |
| 5 | 5 | use parking_lot::RwLock; | |
| 6 | 6 | use std::collections::HashMap; | |
| @@ -449,23 +449,6 @@ pub fn spawn_stale_cleanup(state: Arc<AppState>) { | |||
| 449 | 449 | state.set_cleanup_handle(handle.inner().abort_handle()); | |
| 450 | 450 | } | |
| 451 | 451 | ||
| 452 | - | /// Pure timing check: is a single feed overdue given its last_fetch timestamp? | |
| 453 | - | /// | |
| 454 | - | /// `last_fetch` is the optional timestamp string from the DB. `None` (never | |
| 455 | - | /// fetched) is always considered due. `interval_secs` is the desired interval. | |
| 456 | - | /// `now` is the current time (passed explicitly for testability). | |
| 457 | - | fn is_single_feed_due( | |
| 458 | - | last_fetch: Option<&str>, | |
| 459 | - | interval_secs: u64, | |
| 460 | - | now: chrono::DateTime<chrono::Utc>, | |
| 461 | - | ) -> bool { | |
| 462 | - | let last = last_fetch | |
| 463 | - | .map(parse_timestamp) | |
| 464 | - | .unwrap_or(chrono::DateTime::UNIX_EPOCH); | |
| 465 | - | let elapsed = now.signed_duration_since(last); | |
| 466 | - | elapsed.num_seconds() >= interval_secs as i64 | |
| 467 | - | } | |
| 468 | - | ||
| 469 | 452 | /// Check if a plugin is due for a fetch by comparing its last_fetch time | |
| 470 | 453 | /// against the configured interval. | |
| 471 | 454 | async fn is_fetch_due( | |
| @@ -492,102 +475,3 @@ async fn is_fetch_due( | |||
| 492 | 475 | .collect(); | |
| 493 | 476 | Ok(any_feed_due(&feed_times, interval_secs, now)) | |
| 494 | 477 | } | |
| 495 | - | ||
| 496 | - | /// Check whether ANY feed in a set is overdue for fetching. | |
| 497 | - | /// | |
| 498 | - | /// A busser's feeds are fetched together (one plugin fetch call returns items | |
| 499 | - | /// for all its feeds), so if any single feed is due we return `true`. | |
| 500 | - | /// | |
| 501 | - | /// Pure function extracted from `is_fetch_due` for testability. | |
| 502 | - | fn any_feed_due( | |
| 503 | - | last_fetches: &[Option<&str>], | |
| 504 | - | interval_secs: u64, | |
| 505 | - | now: chrono::DateTime<chrono::Utc>, | |
| 506 | - | ) -> bool { | |
| 507 | - | last_fetches | |
| 508 | - | .iter() | |
| 509 | - | .any(|ts| is_single_feed_due(*ts, interval_secs, now)) | |
| 510 | - | } | |
| 511 | - | ||
| 512 | - | #[cfg(test)] | |
| 513 | - | mod tests { | |
| 514 | - | use super::*; | |
| 515 | - | use chrono::{Duration, Utc}; | |
| 516 | - | ||
| 517 | - | #[test] | |
| 518 | - | fn is_fetch_due_first_time() { | |
| 519 | - | // Never fetched (None) should always be due. | |
| 520 | - | let now = Utc::now(); | |
| 521 | - | assert!(is_single_feed_due(None, 3600, now)); | |
| 522 | - | } | |
| 523 | - | ||
| 524 | - | #[test] | |
| 525 | - | fn is_fetch_due_overdue() { | |
| 526 | - | // Last fetch was 2 hours ago, interval is 1 hour -- due. | |
| 527 | - | let now = Utc::now(); | |
| 528 | - | let two_hours_ago = now - Duration::hours(2); | |
| 529 | - | let ts = two_hours_ago.to_rfc3339(); | |
| 530 | - | assert!(is_single_feed_due(Some(&ts), 3600, now)); | |
| 531 | - | } | |
| 532 | - | ||
| 533 | - | #[test] | |
| 534 | - | fn is_fetch_due_not_yet() { | |
| 535 | - | // Last fetch was 5 minutes ago, interval is 1 hour -- not due. | |
| 536 | - | let now = Utc::now(); | |
| 537 | - | let five_min_ago = now - Duration::minutes(5); | |
| 538 | - | let ts = five_min_ago.to_rfc3339(); | |
| 539 | - | assert!(!is_single_feed_due(Some(&ts), 3600, now)); | |
| 540 | - | } | |
| 541 | - | ||
| 542 | - | // --- any_feed_due --- | |
| 543 | - | ||
| 544 | - | #[test] | |
| 545 | - | fn any_feed_due_empty() { | |
| 546 | - | // No feeds means nothing is due. | |
| 547 | - | let now = Utc::now(); | |
| 548 | - | assert!(!any_feed_due(&[], 3600, now)); | |
| 549 | - | } | |
| 550 | - | ||
| 551 | - | #[test] | |
| 552 | - | fn any_feed_due_one_overdue() { | |
| 553 | - | let now = Utc::now(); | |
| 554 | - | let two_hours_ago = (now - Duration::hours(2)).to_rfc3339(); | |
| 555 | - | let five_min_ago = (now - Duration::minutes(5)).to_rfc3339(); | |
| 556 | - | let feeds: Vec<Option<&str>> = vec![ | |
| 557 | - | Some(five_min_ago.as_str()), | |
| 558 | - | Some(two_hours_ago.as_str()), | |
| 559 | - | ]; | |
| 560 | - | assert!(any_feed_due(&feeds, 3600, now)); | |
| 561 | - | } | |
| 562 | - | ||
| 563 | - | #[test] | |
| 564 | - | fn any_feed_due_none_overdue() { | |
| 565 | - | let now = Utc::now(); | |
| 566 | - | let five_min_ago = (now - Duration::minutes(5)).to_rfc3339(); | |
| 567 | - | let ten_min_ago = (now - Duration::minutes(10)).to_rfc3339(); | |
| 568 | - | let feeds: Vec<Option<&str>> = vec![ | |
| 569 | - | Some(five_min_ago.as_str()), | |
| 570 | - | Some(ten_min_ago.as_str()), | |
| 571 | - | ]; | |
| 572 | - | assert!(!any_feed_due(&feeds, 3600, now)); | |
| 573 | - | } | |
| 574 | - | ||
| 575 | - | #[test] | |
| 576 | - | fn any_feed_due_never_fetched() { | |
| 577 | - | // One feed never fetched (None) — should be due. | |
| 578 | - | let now = Utc::now(); | |
| 579 | - | let five_min_ago = (now - Duration::minutes(5)).to_rfc3339(); | |
| 580 | - | let feeds: Vec<Option<&str>> = vec![ | |
| 581 | - | Some(five_min_ago.as_str()), | |
| 582 | - | None, | |
| 583 | - | ]; | |
| 584 | - | assert!(any_feed_due(&feeds, 3600, now)); | |
| 585 | - | } | |
| 586 | - | ||
| 587 | - | #[test] | |
| 588 | - | fn any_feed_due_all_never_fetched() { | |
| 589 | - | let now = Utc::now(); | |
| 590 | - | let feeds: Vec<Option<&str>> = vec![None, None, None]; | |
| 591 | - | assert!(any_feed_due(&feeds, 3600, now)); | |
| 592 | - | } | |
| 593 | - | } |
| @@ -1,5 +1,6 @@ | |||
| 1 | 1 | //! Background sync scheduler with exponential backoff and SSE push notifications. | |
| 2 | 2 | ||
| 3 | + | use bb_core::scheduler::exponential_backoff_secs; | |
| 3 | 4 | use crate::state::AppState; | |
| 4 | 5 | use crate::sync_service; | |
| 5 | 6 | use std::sync::Arc; | |
| @@ -178,16 +179,16 @@ pub fn start_sync_scheduler(app: AppHandle) -> tokio::task::AbortHandle { | |||
| 178 | 179 | } | |
| 179 | 180 | Err(e) => { | |
| 180 | 181 | consecutive_failures += 1; | |
| 181 | - | let backoff_minutes = | |
| 182 | - | std::cmp::min(2u64.pow(consecutive_failures), 15); | |
| 182 | + | let backoff_secs = | |
| 183 | + | exponential_backoff_secs(consecutive_failures, 15) * 60; | |
| 183 | 184 | backoff_until = Some( | |
| 184 | 185 | chrono::Utc::now() | |
| 185 | - | + chrono::Duration::minutes(backoff_minutes as i64), | |
| 186 | + | + chrono::Duration::seconds(backoff_secs as i64), | |
| 186 | 187 | ); | |
| 187 | 188 | warn!( | |
| 188 | 189 | error = %e, | |
| 189 | 190 | attempt = consecutive_failures, | |
| 190 | - | backoff_minutes, | |
| 191 | + | backoff_secs, | |
| 191 | 192 | "Auto-sync failed" | |
| 192 | 193 | ); | |
| 193 | 194 | } |