//! Fetch the prod backup that `migration_dry_run` runs against. //! //! Sources supported: //! - `file:///abs/path/to/dump.sql.gz` — local copy (dev). //! - `rsync://host/module/path` — rsync daemon protocol. //! - `ssh://user@host[:port]/path/file.sql.gz` — rsync-over-ssh. Used to pull //! prod backups from //! `backup-puller@alpha-west-1`. //! //! The fetch is command-driven: the operator triggers it via /backup/fetch, it //! is not implicit in promote. That keeps the slowest, most failure-prone step //! visible in the TUI rather than buried inside a deploy. use crate::config::Config; use crate::topology::Topology; use anyhow::{Context, Result, bail}; use chrono::Utc; use sqlx::SqlitePool; use std::path::Path; use std::sync::Arc; use tokio::process::Command; #[derive(Debug, Clone)] pub struct FetchedBackup { pub source: String, pub local_path: String, pub byte_size: Option, } /// Parsed `backup.source` URL. Owned strings so the parsed form outlives the /// (possibly transient) URL we read from config. #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum BackupSource { /// Local file copy. Path follows the `file://` prefix. File { path: String }, /// rsync daemon protocol. Full URL stays intact (rsync handles it). RsyncDaemon { url: String }, /// rsync-over-ssh. Port is optional. Ssh { user_host: String, port: Option, path: String, }, } /// Parse a `backup.source` URL into a `BackupSource`. Rejects unsupported /// schemes and malformed `ssh://` URLs (no path part). pub(crate) fn parse_source(s: &str) -> Result { if let Some(rest) = s.strip_prefix("file://") { if rest.is_empty() { bail!("file:// URL is missing a path: {s}"); } return Ok(BackupSource::File { path: rest.into() }); } if s.starts_with("rsync://") { return Ok(BackupSource::RsyncDaemon { url: s.into() }); } if let Some(rest) = s.strip_prefix("ssh://") { let (user_host_port, path_rest) = rest .split_once('/') .with_context(|| format!("ssh:// URL missing path: {s}"))?; if user_host_port.is_empty() { bail!("ssh:// URL missing user@host: {s}"); } let path = format!("/{path_rest}"); let (user_host, port) = match user_host_port.rsplit_once(':') { Some((uh, p)) => { // Heuristic: trailing `:digits` after the final `:` is the port. // Anything else (IPv6 literal, etc.) gets left alone. match p.parse::() { Ok(n) => (uh.to_string(), Some(n)), Err(_) => (user_host_port.to_string(), None), } } None => (user_host_port.to_string(), None), }; if user_host.is_empty() { bail!("ssh:// URL has empty host (port {:?})", port); } return Ok(BackupSource::Ssh { user_host, port, path }); } bail!("unsupported backup source scheme: {s}"); } pub async fn fetch( pool: &SqlitePool, _cfg: &Arc, topo: &Arc, ) -> Result { let source = topo.backup.source.clone(); let local_path = topo.backup.local_path.clone(); if let Some(parent) = Path::new(&local_path).parent() { tokio::fs::create_dir_all(parent).await?; } let parsed = parse_source(&source)?; match parsed { BackupSource::File { path } => { tokio::fs::copy(&path, &local_path) .await .with_context(|| format!("copy {path} -> {local_path}"))?; } BackupSource::RsyncDaemon { url } => { let out = Command::new("rsync") .args(["-az", "--inplace", &url, &local_path]) .output() .await .context("spawning rsync")?; anyhow::ensure!( out.status.success(), "rsync (daemon) failed: {}", String::from_utf8_lossy(&out.stderr), ); } BackupSource::Ssh { user_host, port, path } => { let ssh_cmd = match port { Some(p) => format!("ssh -p {p} -o BatchMode=yes -o StrictHostKeyChecking=accept-new"), None => "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new".into(), }; let remote = format!("{user_host}:{path}"); let out = Command::new("rsync") .args(["-a", "--partial"]) .arg("-e").arg(&ssh_cmd) .arg(&remote) .arg(&local_path) .output() .await .context("spawning rsync")?; anyhow::ensure!( out.status.success(), "rsync (ssh) failed: {}", String::from_utf8_lossy(&out.stderr), ); } } let meta = tokio::fs::metadata(&local_path).await?; let size = meta.len() as i64; sqlx::query( "INSERT INTO backups (fetched_at, source, local_path, byte_size) VALUES (?, ?, ?, ?)", ) .bind(Utc::now().to_rfc3339()) .bind(&source) .bind(&local_path) .bind(size) .execute(pool) .await?; // Retention: prune rows fetched more than 30 days ago. The on-disk file // is overwritten each fetch (single `local_path`), so old rows reference // a path that no longer exists — keep the table from growing for no // good reason. sqlx::query( "DELETE FROM backups WHERE fetched_at < datetime('now', '-30 days')", ) .execute(pool) .await?; Ok(FetchedBackup { source, local_path, byte_size: Some(size) }) } #[cfg(test)] mod tests { use super::*; #[test] fn parses_file_url() { let s = parse_source("file:///opt/backups/latest.sql.gz").unwrap(); assert_eq!(s, BackupSource::File { path: "/opt/backups/latest.sql.gz".into() }); } #[test] fn file_url_without_path_errors() { assert!(parse_source("file://").is_err()); } #[test] fn parses_rsync_daemon_url() { let s = parse_source("rsync://astra/mnw/latest.sql.gz").unwrap(); assert_eq!(s, BackupSource::RsyncDaemon { url: "rsync://astra/mnw/latest.sql.gz".into() }); } #[test] fn parses_ssh_url_with_port() { let s = parse_source("ssh://backup-puller@alpha-west-1:2200/latest.sql.gz").unwrap(); assert_eq!( s, BackupSource::Ssh { user_host: "backup-puller@alpha-west-1".into(), port: Some(2200), path: "/latest.sql.gz".into(), } ); } #[test] fn parses_ssh_url_without_port() { let s = parse_source("ssh://max@astra/opt/backups/mnw/latest.sql.gz").unwrap(); assert_eq!( s, BackupSource::Ssh { user_host: "max@astra".into(), port: None, path: "/opt/backups/mnw/latest.sql.gz".into(), } ); } #[test] fn ssh_url_without_path_errors() { // `split_once('/')` — `ssh://user@host` has no `/` after the scheme. assert!(parse_source("ssh://backup-puller@alpha-west-1").is_err()); } #[test] fn ssh_url_without_user_host_errors() { // Empty user@host: `ssh:///foo`. Caught by the empty-prefix check. assert!(parse_source("ssh:///latest.sql.gz").is_err()); } #[test] fn ssh_url_with_non_numeric_after_colon_treats_as_part_of_host() { // `host:notaport` should NOT parse `notaport` as a port. Leave the // colon part of user_host; libssh/rsync will reject if truly wrong. let s = parse_source("ssh://user@host:notaport/path").unwrap(); assert_eq!( s, BackupSource::Ssh { user_host: "user@host:notaport".into(), port: None, path: "/path".into(), } ); } #[test] fn rejects_unknown_scheme() { assert!(parse_source("ftp://example.com/file").is_err()); assert!(parse_source("just-a-path.sql.gz").is_err()); assert!(parse_source("").is_err()); } #[test] fn ssh_url_preserves_multi_segment_path() { let s = parse_source("ssh://a@b:22/opt/foo/bar/baz.sql.gz").unwrap(); match s { BackupSource::Ssh { path, .. } => assert_eq!(path, "/opt/foo/bar/baz.sql.gz"), _ => panic!("wrong variant"), } } }