| 1 |
|
| 2 |
|
| 3 |
|
| 4 |
|
| 5 |
use tokio::task::JoinHandle; |
| 6 |
use tracing::{info, warn}; |
| 7 |
|
| 8 |
use pom::alerts::Alerter; |
| 9 |
use pom::checks::scan_pipeline; |
| 10 |
use pom::config::Config; |
| 11 |
|
| 12 |
pub(crate) fn spawn_scan_pipeline_tasks( |
| 13 |
config: &Config, |
| 14 |
cancel: &tokio_util::sync::CancellationToken, |
| 15 |
alerter: &Option<Alerter>, |
| 16 |
) -> Vec<JoinHandle<()>> { |
| 17 |
let mut handles = Vec::new(); |
| 18 |
|
| 19 |
for name in config.target_names() { |
| 20 |
let target_config = config.get_target(&name).unwrap().clone(); |
| 21 |
let Some(sp_config) = target_config.scan_pipeline else { continue }; |
| 22 |
|
| 23 |
let name = name.clone(); |
| 24 |
let label = target_config.label.clone(); |
| 25 |
let alerter = alerter.clone(); |
| 26 |
let cancel = cancel.clone(); |
| 27 |
|
| 28 |
info!( |
| 29 |
"{name}: scan-pipeline check every {}s against {}", |
| 30 |
sp_config.interval_secs, sp_config.base_url, |
| 31 |
); |
| 32 |
|
| 33 |
handles.push(tokio::spawn(async move { |
| 34 |
let mut interval = tokio::time::interval( |
| 35 |
std::time::Duration::from_secs(sp_config.interval_secs), |
| 36 |
); |
| 37 |
interval.tick().await; |
| 38 |
|
| 39 |
|
| 40 |
|
| 41 |
let mut previous_status: Option<String> = None; |
| 42 |
|
| 43 |
loop { |
| 44 |
tokio::select! { |
| 45 |
_ = cancel.cancelled() => break, |
| 46 |
_ = interval.tick() => {} |
| 47 |
} |
| 48 |
|
| 49 |
let result = scan_pipeline::check_scan_pipeline( |
| 50 |
&name, &sp_config.base_url, sp_config.timeout_secs, |
| 51 |
).await; |
| 52 |
|
| 53 |
if result.issues.is_empty() && result.error.is_none() { |
| 54 |
info!("{name}: scan pipeline operational (queue p={}/r={}, held={})", |
| 55 |
result.queue_pending, result.queue_running, result.held_total); |
| 56 |
} else { |
| 57 |
warn!( |
| 58 |
target = %name, |
| 59 |
status = %result.status, |
| 60 |
issues = ?result.issues, |
| 61 |
"scan pipeline non-operational" |
| 62 |
); |
| 63 |
} |
| 64 |
|
| 65 |
|
| 66 |
if let Some(ref alerter) = alerter { |
| 67 |
let prev_ok = previous_status.as_deref().is_none_or(|s| s == "operational"); |
| 68 |
let now_ok = result.status == "operational"; |
| 69 |
|
| 70 |
if prev_ok && !now_ok { |
| 71 |
alerter.send_scan_pipeline_alert( |
| 72 |
&name, &label, &result.status, &result.issues, |
| 73 |
).await; |
| 74 |
} else if !prev_ok && now_ok { |
| 75 |
alerter.send_scan_pipeline_recovery(&name, &label).await; |
| 76 |
} |
| 77 |
} |
| 78 |
|
| 79 |
previous_status = Some(result.status); |
| 80 |
} |
| 81 |
})); |
| 82 |
} |
| 83 |
|
| 84 |
handles |
| 85 |
} |
| 86 |
|