Skip to main content

max / makenotwork

8.5 KB · 254 lines History Blame Raw
1 //! Fetch the prod backup that `migration_dry_run` runs against.
2 //!
3 //! Sources supported:
4 //! - `file:///abs/path/to/dump.sql.gz` — local copy (dev).
5 //! - `rsync://host/module/path` — rsync daemon protocol.
6 //! - `ssh://user@host[:port]/path/file.sql.gz` — rsync-over-ssh. Used to pull
7 //! prod backups from
8 //! `backup-puller@alpha-west-1`.
9 //!
10 //! The fetch is command-driven: the operator triggers it via /backup/fetch, it
11 //! is not implicit in promote. That keeps the slowest, most failure-prone step
12 //! visible in the TUI rather than buried inside a deploy.
13
14 use crate::config::Config;
15 use crate::topology::Topology;
16 use anyhow::{Context, Result, bail};
17 use chrono::Utc;
18 use sqlx::SqlitePool;
19 use std::path::Path;
20 use std::sync::Arc;
21 use tokio::process::Command;
22
23 #[derive(Debug, Clone)]
24 pub struct FetchedBackup {
25 pub source: String,
26 pub local_path: String,
27 pub byte_size: Option<i64>,
28 }
29
30 /// Parsed `backup.source` URL. Owned strings so the parsed form outlives the
31 /// (possibly transient) URL we read from config.
32 #[derive(Debug, Clone, PartialEq, Eq)]
33 pub(crate) enum BackupSource {
34 /// Local file copy. Path follows the `file://` prefix.
35 File { path: String },
36 /// rsync daemon protocol. Full URL stays intact (rsync handles it).
37 RsyncDaemon { url: String },
38 /// rsync-over-ssh. Port is optional.
39 Ssh {
40 user_host: String,
41 port: Option<u16>,
42 path: String,
43 },
44 }
45
46 /// Parse a `backup.source` URL into a `BackupSource`. Rejects unsupported
47 /// schemes and malformed `ssh://` URLs (no path part).
48 pub(crate) fn parse_source(s: &str) -> Result<BackupSource> {
49 if let Some(rest) = s.strip_prefix("file://") {
50 if rest.is_empty() {
51 bail!("file:// URL is missing a path: {s}");
52 }
53 return Ok(BackupSource::File { path: rest.into() });
54 }
55 if s.starts_with("rsync://") {
56 return Ok(BackupSource::RsyncDaemon { url: s.into() });
57 }
58 if let Some(rest) = s.strip_prefix("ssh://") {
59 let (user_host_port, path_rest) = rest
60 .split_once('/')
61 .with_context(|| format!("ssh:// URL missing path: {s}"))?;
62 if user_host_port.is_empty() {
63 bail!("ssh:// URL missing user@host: {s}");
64 }
65 let path = format!("/{path_rest}");
66 let (user_host, port) = match user_host_port.rsplit_once(':') {
67 Some((uh, p)) => {
68 // Heuristic: trailing `:digits` after the final `:` is the port.
69 // Anything else (IPv6 literal, etc.) gets left alone.
70 match p.parse::<u16>() {
71 Ok(n) => (uh.to_string(), Some(n)),
72 Err(_) => (user_host_port.to_string(), None),
73 }
74 }
75 None => (user_host_port.to_string(), None),
76 };
77 if user_host.is_empty() {
78 bail!("ssh:// URL has empty host (port {:?})", port);
79 }
80 return Ok(BackupSource::Ssh { user_host, port, path });
81 }
82 bail!("unsupported backup source scheme: {s}");
83 }
84
85 pub async fn fetch(
86 pool: &SqlitePool,
87 _cfg: &Arc<Config>,
88 topo: &Arc<Topology>,
89 ) -> Result<FetchedBackup> {
90 let source = topo.backup.source.clone();
91 let local_path = topo.backup.local_path.clone();
92
93 if let Some(parent) = Path::new(&local_path).parent() {
94 tokio::fs::create_dir_all(parent).await?;
95 }
96
97 let parsed = parse_source(&source)?;
98 match parsed {
99 BackupSource::File { path } => {
100 tokio::fs::copy(&path, &local_path)
101 .await
102 .with_context(|| format!("copy {path} -> {local_path}"))?;
103 }
104 BackupSource::RsyncDaemon { url } => {
105 let out = Command::new("rsync")
106 .args(["-az", "--inplace", &url, &local_path])
107 .output()
108 .await
109 .context("spawning rsync")?;
110 anyhow::ensure!(
111 out.status.success(),
112 "rsync (daemon) failed: {}",
113 String::from_utf8_lossy(&out.stderr),
114 );
115 }
116 BackupSource::Ssh { user_host, port, path } => {
117 let ssh_cmd = match port {
118 Some(p) => format!("ssh -p {p} -o BatchMode=yes -o StrictHostKeyChecking=accept-new"),
119 None => "ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new".into(),
120 };
121 let remote = format!("{user_host}:{path}");
122 let out = Command::new("rsync")
123 .args(["-a", "--partial"])
124 .arg("-e").arg(&ssh_cmd)
125 .arg(&remote)
126 .arg(&local_path)
127 .output()
128 .await
129 .context("spawning rsync")?;
130 anyhow::ensure!(
131 out.status.success(),
132 "rsync (ssh) failed: {}",
133 String::from_utf8_lossy(&out.stderr),
134 );
135 }
136 }
137
138 let meta = tokio::fs::metadata(&local_path).await?;
139 let size = meta.len() as i64;
140
141 sqlx::query(
142 "INSERT INTO backups (fetched_at, source, local_path, byte_size) VALUES (?, ?, ?, ?)",
143 )
144 .bind(Utc::now().to_rfc3339())
145 .bind(&source)
146 .bind(&local_path)
147 .bind(size)
148 .execute(pool)
149 .await?;
150
151 // Retention: prune rows fetched more than 30 days ago. The on-disk file
152 // is overwritten each fetch (single `local_path`), so old rows reference
153 // a path that no longer exists — keep the table from growing for no
154 // good reason.
155 sqlx::query(
156 "DELETE FROM backups WHERE fetched_at < datetime('now', '-30 days')",
157 )
158 .execute(pool)
159 .await?;
160
161 Ok(FetchedBackup { source, local_path, byte_size: Some(size) })
162 }
163
164 #[cfg(test)]
165 mod tests {
166 use super::*;
167
168 #[test]
169 fn parses_file_url() {
170 let s = parse_source("file:///opt/backups/latest.sql.gz").unwrap();
171 assert_eq!(s, BackupSource::File { path: "/opt/backups/latest.sql.gz".into() });
172 }
173
174 #[test]
175 fn file_url_without_path_errors() {
176 assert!(parse_source("file://").is_err());
177 }
178
179 #[test]
180 fn parses_rsync_daemon_url() {
181 let s = parse_source("rsync://astra/mnw/latest.sql.gz").unwrap();
182 assert_eq!(s, BackupSource::RsyncDaemon { url: "rsync://astra/mnw/latest.sql.gz".into() });
183 }
184
185 #[test]
186 fn parses_ssh_url_with_port() {
187 let s = parse_source("ssh://backup-puller@alpha-west-1:2200/latest.sql.gz").unwrap();
188 assert_eq!(
189 s,
190 BackupSource::Ssh {
191 user_host: "backup-puller@alpha-west-1".into(),
192 port: Some(2200),
193 path: "/latest.sql.gz".into(),
194 }
195 );
196 }
197
198 #[test]
199 fn parses_ssh_url_without_port() {
200 let s = parse_source("ssh://max@astra/opt/backups/mnw/latest.sql.gz").unwrap();
201 assert_eq!(
202 s,
203 BackupSource::Ssh {
204 user_host: "max@astra".into(),
205 port: None,
206 path: "/opt/backups/mnw/latest.sql.gz".into(),
207 }
208 );
209 }
210
211 #[test]
212 fn ssh_url_without_path_errors() {
213 // `split_once('/')` — `ssh://user@host` has no `/` after the scheme.
214 assert!(parse_source("ssh://backup-puller@alpha-west-1").is_err());
215 }
216
217 #[test]
218 fn ssh_url_without_user_host_errors() {
219 // Empty user@host: `ssh:///foo`. Caught by the empty-prefix check.
220 assert!(parse_source("ssh:///latest.sql.gz").is_err());
221 }
222
223 #[test]
224 fn ssh_url_with_non_numeric_after_colon_treats_as_part_of_host() {
225 // `host:notaport` should NOT parse `notaport` as a port. Leave the
226 // colon part of user_host; libssh/rsync will reject if truly wrong.
227 let s = parse_source("ssh://user@host:notaport/path").unwrap();
228 assert_eq!(
229 s,
230 BackupSource::Ssh {
231 user_host: "user@host:notaport".into(),
232 port: None,
233 path: "/path".into(),
234 }
235 );
236 }
237
238 #[test]
239 fn rejects_unknown_scheme() {
240 assert!(parse_source("ftp://example.com/file").is_err());
241 assert!(parse_source("just-a-path.sql.gz").is_err());
242 assert!(parse_source("").is_err());
243 }
244
245 #[test]
246 fn ssh_url_preserves_multi_segment_path() {
247 let s = parse_source("ssh://a@b:22/opt/foo/bar/baz.sql.gz").unwrap();
248 match s {
249 BackupSource::Ssh { path, .. } => assert_eq!(path, "/opt/foo/bar/baz.sql.gz"),
250 _ => panic!("wrong variant"),
251 }
252 }
253 }
254