| 1 |
|
| 2 |
|
| 3 |
|
| 4 |
|
| 5 |
|
| 6 |
|
| 7 |
|
| 8 |
|
| 9 |
|
| 10 |
|
| 11 |
|
| 12 |
|
| 13 |
|
| 14 |
use crate::config::Config; |
| 15 |
use crate::topology::Topology; |
| 16 |
use anyhow::{Context, Result, bail}; |
| 17 |
use chrono::Utc; |
| 18 |
use sqlx::SqlitePool; |
| 19 |
use std::path::Path; |
| 20 |
use std::sync::Arc; |
| 21 |
use tokio::process::Command; |
| 22 |
|
| 23 |
#[derive(Debug, Clone)] |
| 24 |
pub struct FetchedBackup { |
| 25 |
pub source: String, |
| 26 |
pub local_path: String, |
| 27 |
pub byte_size: Option<i64>, |
| 28 |
} |
| 29 |
|
| 30 |
|
| 31 |
|
| 32 |
#[derive(Debug, Clone, PartialEq, Eq)] |
| 33 |
pub(crate) enum BackupSource { |
| 34 |
|
| 35 |
File { path: String }, |
| 36 |
|
| 37 |
RsyncDaemon { url: String }, |
| 38 |
|
| 39 |
Ssh { |
| 40 |
user_host: String, |
| 41 |
port: Option<u16>, |
| 42 |
path: String, |
| 43 |
}, |
| 44 |
} |
| 45 |
|
| 46 |
|
| 47 |
|
| 48 |
pub(crate) fn parse_source(s: &str) -> Result<BackupSource> { |
| 49 |
if let Some(rest) = s.strip_prefix("file://") { |
| 50 |
if rest.is_empty() { |
| 51 |
bail!("file:// URL is missing a path: {s}"); |
| 52 |
} |
| 53 |
return Ok(BackupSource::File { path: rest.into() }); |
| 54 |
} |
| 55 |
if s.starts_with("rsync://") { |
| 56 |
return Ok(BackupSource::RsyncDaemon { url: s.into() }); |
| 57 |
} |
| 58 |
if let Some(rest) = s.strip_prefix("ssh://") { |
| 59 |
let (user_host_port, path_rest) = rest |
| 60 |
.split_once('/') |
| 61 |
.with_context(|| format!("ssh:// URL missing path: {s}"))?; |
| 62 |
if user_host_port.is_empty() { |
| 63 |
bail!("ssh:// URL missing user@host: {s}"); |
| 64 |
} |
| 65 |
let path = format!("/{path_rest}"); |
| 66 |
let (user_host, port) = match user_host_port.rsplit_once(':') { |
| 67 |
Some((uh, p)) => { |
| 68 |
|
| 69 |
|
| 70 |
match p.parse::<u16>() { |
| 71 |
Ok(n) => (uh.to_string(), Some(n)), |
| 72 |
Err(_) => (user_host_port.to_string(), None), |
| 73 |
} |
| 74 |
} |
| 75 |
None => (user_host_port.to_string(), None), |
| 76 |
}; |
| 77 |
if user_host.is_empty() { |
| 78 |
bail!("ssh:// URL has empty host (port {:?})", port); |
| 79 |
} |
| 80 |
return Ok(BackupSource::Ssh { user_host, port, path }); |
| 81 |
} |
| 82 |
bail!("unsupported backup source scheme: {s}"); |
| 83 |
} |
| 84 |
|
| 85 |
pub async fn fetch( |
| 86 |
pool: &SqlitePool, |
| 87 |
_cfg: &Arc<Config>, |
| 88 |
topo: &Arc<Topology>, |
| 89 |
) -> Result<FetchedBackup> { |
| 90 |
let source = topo.backup.source.clone(); |
| 91 |
let local_path = topo.backup.local_path.clone(); |
| 92 |
|
| 93 |
if let Some(parent) = Path::new(&local_path).parent() { |
| 94 |
tokio::fs::create_dir_all(parent).await?; |
| 95 |
} |
| 96 |
|
| 97 |
let parsed = parse_source(&source)?; |
| 98 |
match parsed { |
| 99 |
BackupSource::File { path } => { |
| 100 |
tokio::fs::copy(&path, &local_path) |
| 101 |
.await |
| 102 |
.with_context(|| format!("copy {path} -> {local_path}"))?; |
| 103 |
} |
| 104 |
BackupSource::RsyncDaemon { url } => { |
| 105 |
let out = Command::new("rsync") |
| 106 |
.args(["-az", "--inplace", &url, &local_path]) |
| 107 |
.output() |
| 108 |
.await |
| 109 |
.context("spawning rsync")?; |
| 110 |
anyhow::ensure!( |
| 111 |
out.status.success(), |
| 112 |
"rsync (daemon) failed: {}", |
| 113 |
String::from_utf8_lossy(&out.stderr), |
| 114 |
); |
| 115 |
} |
| 116 |
BackupSource::Ssh { user_host, port, path } => { |
| 117 |
let ssh_cmd = match port { |
| 118 |
Some(p) => format!("ssh -p {p} -o BatchMode=yes -o StrictHostKeyChecking=accept-new"), |
| 119 |
None => "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new".into(), |
| 120 |
}; |
| 121 |
let remote = format!("{user_host}:{path}"); |
| 122 |
let out = Command::new("rsync") |
| 123 |
.args(["-a", "--partial"]) |
| 124 |
.arg("-e").arg(&ssh_cmd) |
| 125 |
.arg(&remote) |
| 126 |
.arg(&local_path) |
| 127 |
.output() |
| 128 |
.await |
| 129 |
.context("spawning rsync")?; |
| 130 |
anyhow::ensure!( |
| 131 |
out.status.success(), |
| 132 |
"rsync (ssh) failed: {}", |
| 133 |
String::from_utf8_lossy(&out.stderr), |
| 134 |
); |
| 135 |
} |
| 136 |
} |
| 137 |
|
| 138 |
let meta = tokio::fs::metadata(&local_path).await?; |
| 139 |
let size = meta.len() as i64; |
| 140 |
|
| 141 |
sqlx::query( |
| 142 |
"INSERT INTO backups (fetched_at, source, local_path, byte_size) VALUES (?, ?, ?, ?)", |
| 143 |
) |
| 144 |
.bind(Utc::now().to_rfc3339()) |
| 145 |
.bind(&source) |
| 146 |
.bind(&local_path) |
| 147 |
.bind(size) |
| 148 |
.execute(pool) |
| 149 |
.await?; |
| 150 |
|
| 151 |
|
| 152 |
|
| 153 |
|
| 154 |
|
| 155 |
sqlx::query( |
| 156 |
"DELETE FROM backups WHERE fetched_at < datetime('now', '-30 days')", |
| 157 |
) |
| 158 |
.execute(pool) |
| 159 |
.await?; |
| 160 |
|
| 161 |
Ok(FetchedBackup { source, local_path, byte_size: Some(size) }) |
| 162 |
} |
| 163 |
|
| 164 |
#[cfg(test)] |
| 165 |
mod tests { |
| 166 |
use super::*; |
| 167 |
|
| 168 |
#[test] |
| 169 |
fn parses_file_url() { |
| 170 |
let s = parse_source("file:///opt/backups/latest.sql.gz").unwrap(); |
| 171 |
assert_eq!(s, BackupSource::File { path: "/opt/backups/latest.sql.gz".into() }); |
| 172 |
} |
| 173 |
|
| 174 |
#[test] |
| 175 |
fn file_url_without_path_errors() { |
| 176 |
assert!(parse_source("file://").is_err()); |
| 177 |
} |
| 178 |
|
| 179 |
#[test] |
| 180 |
fn parses_rsync_daemon_url() { |
| 181 |
let s = parse_source("rsync://astra/mnw/latest.sql.gz").unwrap(); |
| 182 |
assert_eq!(s, BackupSource::RsyncDaemon { url: "rsync://astra/mnw/latest.sql.gz".into() }); |
| 183 |
} |
| 184 |
|
| 185 |
#[test] |
| 186 |
fn parses_ssh_url_with_port() { |
| 187 |
let s = parse_source("ssh://backup-puller@alpha-west-1:2200/latest.sql.gz").unwrap(); |
| 188 |
assert_eq!( |
| 189 |
s, |
| 190 |
BackupSource::Ssh { |
| 191 |
user_host: "backup-puller@alpha-west-1".into(), |
| 192 |
port: Some(2200), |
| 193 |
path: "/latest.sql.gz".into(), |
| 194 |
} |
| 195 |
); |
| 196 |
} |
| 197 |
|
| 198 |
#[test] |
| 199 |
fn parses_ssh_url_without_port() { |
| 200 |
let s = parse_source("ssh://max@astra/opt/backups/mnw/latest.sql.gz").unwrap(); |
| 201 |
assert_eq!( |
| 202 |
s, |
| 203 |
BackupSource::Ssh { |
| 204 |
user_host: "max@astra".into(), |
| 205 |
port: None, |
| 206 |
path: "/opt/backups/mnw/latest.sql.gz".into(), |
| 207 |
} |
| 208 |
); |
| 209 |
} |
| 210 |
|
| 211 |
#[test] |
| 212 |
fn ssh_url_without_path_errors() { |
| 213 |
|
| 214 |
assert!(parse_source("ssh://backup-puller@alpha-west-1").is_err()); |
| 215 |
} |
| 216 |
|
| 217 |
#[test] |
| 218 |
fn ssh_url_without_user_host_errors() { |
| 219 |
|
| 220 |
assert!(parse_source("ssh:///latest.sql.gz").is_err()); |
| 221 |
} |
| 222 |
|
| 223 |
#[test] |
| 224 |
fn ssh_url_with_non_numeric_after_colon_treats_as_part_of_host() { |
| 225 |
|
| 226 |
|
| 227 |
let s = parse_source("ssh://user@host:notaport/path").unwrap(); |
| 228 |
assert_eq!( |
| 229 |
s, |
| 230 |
BackupSource::Ssh { |
| 231 |
user_host: "user@host:notaport".into(), |
| 232 |
port: None, |
| 233 |
path: "/path".into(), |
| 234 |
} |
| 235 |
); |
| 236 |
} |
| 237 |
|
| 238 |
#[test] |
| 239 |
fn rejects_unknown_scheme() { |
| 240 |
assert!(parse_source("ftp://example.com/file").is_err()); |
| 241 |
assert!(parse_source("just-a-path.sql.gz").is_err()); |
| 242 |
assert!(parse_source("").is_err()); |
| 243 |
} |
| 244 |
|
| 245 |
#[test] |
| 246 |
fn ssh_url_preserves_multi_segment_path() { |
| 247 |
let s = parse_source("ssh://a@b:22/opt/foo/bar/baz.sql.gz").unwrap(); |
| 248 |
match s { |
| 249 |
BackupSource::Ssh { path, .. } => assert_eq!(path, "/opt/foo/bar/baz.sql.gz"), |
| 250 |
_ => panic!("wrong variant"), |
| 251 |
} |
| 252 |
} |
| 253 |
} |
| 254 |
|