| 1 |
use tokio::task::JoinHandle; |
| 2 |
use tracing::info; |
| 3 |
|
| 4 |
use pom::alerts::Alerter; |
| 5 |
use pom::checks::cors; |
| 6 |
use pom::config::Config; |
| 7 |
use pom::db; |
| 8 |
|
| 9 |
pub(crate) fn spawn_cors_tasks( |
| 10 |
config: &Config, |
| 11 |
pool: &sqlx::SqlitePool, |
| 12 |
cancel: &tokio_util::sync::CancellationToken, |
| 13 |
alerter: &Option<Alerter>, |
| 14 |
) -> Vec<JoinHandle<()>> { |
| 15 |
let interval_secs = config.serve.cors_check_interval_secs; |
| 16 |
let mut handles = Vec::new(); |
| 17 |
|
| 18 |
for name in config.target_names() { |
| 19 |
let target_config = config.get_target(&name).unwrap().clone(); |
| 20 |
if target_config.cors.is_empty() { |
| 21 |
continue; |
| 22 |
} |
| 23 |
let cors_checks = target_config.cors.clone(); |
| 24 |
let label = target_config.label.clone(); |
| 25 |
let pool = pool.clone(); |
| 26 |
let alerter = alerter.clone(); |
| 27 |
let cancel = cancel.clone(); |
| 28 |
let n = cors_checks.len(); |
| 29 |
|
| 30 |
info!("{name}: CORS check every {interval_secs}s ({n} endpoints)"); |
| 31 |
|
| 32 |
handles.push(tokio::spawn(async move { |
| 33 |
let mut interval = tokio::time::interval( |
| 34 |
std::time::Duration::from_secs(interval_secs), |
| 35 |
); |
| 36 |
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); |
| 37 |
let mut prev_failed: std::collections::HashSet<String> = std::collections::HashSet::new(); |
| 38 |
|
| 39 |
interval.tick().await; |
| 40 |
loop { |
| 41 |
tokio::select! { |
| 42 |
_ = cancel.cancelled() => break, |
| 43 |
_ = interval.tick() => {} |
| 44 |
} |
| 45 |
let results = cors::check_cors(&name, &cors_checks).await; |
| 46 |
|
| 47 |
for result in &results { |
| 48 |
if let Err(e) = db::insert_cors_check(&pool, result).await { |
| 49 |
tracing::error!("{}: failed to store CORS check for {}: {e}", name, result.url); |
| 50 |
} |
| 51 |
} |
| 52 |
|
| 53 |
let current_failed: std::collections::HashSet<String> = results |
| 54 |
.iter() |
| 55 |
.filter(|r| !r.passes) |
| 56 |
.map(|r| r.url.clone()) |
| 57 |
.collect(); |
| 58 |
|
| 59 |
let ok_count = results.iter().filter(|r| r.passes).count(); |
| 60 |
info!("{name}: CORS {ok_count}/{n} pass"); |
| 61 |
|
| 62 |
if let Some(ref alerter) = alerter { |
| 63 |
|
| 64 |
let new_failures: Vec<&pom::types::CorsCheckResult> = results |
| 65 |
.iter() |
| 66 |
.filter(|r| !r.passes && !prev_failed.contains(&r.url)) |
| 67 |
.collect(); |
| 68 |
if !new_failures.is_empty() { |
| 69 |
let owned: Vec<pom::types::CorsCheckResult> = new_failures.into_iter().cloned().collect(); |
| 70 |
alerter.send_cors_failure_alert(&name, &label, &owned).await; |
| 71 |
} |
| 72 |
|
| 73 |
|
| 74 |
if !prev_failed.is_empty() && current_failed.is_empty() { |
| 75 |
alerter.send_cors_recovery_alert(&name, &label).await; |
| 76 |
} |
| 77 |
} |
| 78 |
|
| 79 |
prev_failed = current_failed; |
| 80 |
} |
| 81 |
})); |
| 82 |
} |
| 83 |
|
| 84 |
handles |
| 85 |
} |
| 86 |
|