max / makenotwork
1 file changed,
+135 insertions,
-55 deletions
| @@ -57,6 +57,50 @@ fn admin_url() -> String { | |||
| 57 | 57 | .unwrap_or_else(|_| "postgres://localhost/postgres".to_string()) | |
| 58 | 58 | } | |
| 59 | 59 | ||
| 60 | + | /// Advisory-lock key serializing template setup across every process/connection | |
| 61 | + | /// on the cluster (`std::sync::Once` only covers one process). Concurrent test | |
| 62 | + | /// binaries must not both drop+recreate the shared template. Arbitrary, stable. | |
| 63 | + | const TEMPLATE_LOCK_KEY: i64 = 0x6D6E_775F_7470_6C00; // "mnw_tpl\0" | |
| 64 | + | ||
| 65 | + | /// Latest migration version embedded in this binary (head of `migrations/`). | |
| 66 | + | fn latest_migration_version() -> i64 { | |
| 67 | + | sqlx::migrate!("./migrations") | |
| 68 | + | .iter() | |
| 69 | + | .map(|m| m.version) | |
| 70 | + | .max() | |
| 71 | + | .unwrap_or(0) | |
| 72 | + | } | |
| 73 | + | ||
| 74 | + | /// True if `template` already exists and its applied-migration head matches the | |
| 75 | + | /// code's latest migration — a clone of it is up to date, so we reuse it instead | |
| 76 | + | /// of dropping + rebuilding. Any error / missing table => "not current" => | |
| 77 | + | /// caller rebuilds. | |
| 78 | + | async fn template_is_current(admin_url: &str, template: &str) -> bool { | |
| 79 | + | let Ok(mut conn) = PgConnection::connect(admin_url).await else { | |
| 80 | + | return false; | |
| 81 | + | }; | |
| 82 | + | let exists = sqlx::query_as::<_, (i32,)>("SELECT 1 FROM pg_database WHERE datname = $1") | |
| 83 | + | .bind(template) | |
| 84 | + | .fetch_optional(&mut conn) | |
| 85 | + | .await | |
| 86 | + | .ok() | |
| 87 | + | .flatten() | |
| 88 | + | .is_some(); | |
| 89 | + | if !exists { | |
| 90 | + | return false; | |
| 91 | + | } | |
| 92 | + | let tpl_url = replace_db_name(admin_url, template); | |
| 93 | + | let Ok(mut tconn) = PgConnection::connect(&tpl_url).await else { | |
| 94 | + | return false; | |
| 95 | + | }; | |
| 96 | + | matches!( | |
| 97 | + | sqlx::query_as::<_, (Option<i64>,)>("SELECT MAX(version) FROM _sqlx_migrations") | |
| 98 | + | .fetch_one(&mut tconn) | |
| 99 | + | .await, | |
| 100 | + | Ok((Some(v),)) if v == latest_migration_version() | |
| 101 | + | ) | |
| 102 | + | } | |
| 103 | + | ||
| 60 | 104 | /// Create the template database with all migrations. Runs in a dedicated | |
| 61 | 105 | /// single-threaded tokio runtime so it works from any context (including | |
| 62 | 106 | /// inside `#[tokio::test]` and plain `#[test]`). | |
| @@ -90,53 +134,77 @@ fn ensure_template() { | |||
| 90 | 134 | let template = format!("{TEMPLATE_DB_BASE}_{}", sanitize_role(&role)); | |
| 91 | 135 | let _ = TEMPLATE_NAME.set(template.clone()); | |
| 92 | 136 | ||
| 93 | - | // Drop stale template (migrations may have changed) before | |
| 94 | - | // recreating. We own it, so this succeeds — and we PROPAGATE the | |
| 95 | - | // error rather than swallowing it, so an ownership/lock failure | |
| 96 | - | // surfaces here instead of cascading into a CREATE "already exists" | |
| 97 | - | // that poisons this `Once` and fails the entire suite opaquely. | |
| 98 | - | conn.execute(format!( | |
| 99 | - | "DROP DATABASE IF EXISTS \"{template}\" WITH (FORCE)" | |
| 100 | - | ).as_str()) | |
| 101 | - | .await | |
| 102 | - | .unwrap_or_else(|e| panic!( | |
| 103 | - | "drop stale template {template}: {e} \ | |
| 104 | - | (if owned by a different role, drop it as the postgres superuser)" | |
| 105 | - | )); | |
| 106 | - | ||
| 107 | - | conn.execute(format!( | |
| 108 | - | "CREATE DATABASE \"{template}\"" | |
| 109 | - | ).as_str()) | |
| 110 | - | .await | |
| 111 | - | .expect("create template database"); | |
| 112 | - | ||
| 113 | - | // Connect to the template and run all migrations | |
| 114 | - | let tpl_url = replace_db_name(&admin, &template); | |
| 115 | - | let tpl_pool = PgPoolOptions::new() | |
| 116 | - | .max_connections(2) | |
| 117 | - | .acquire_timeout(Duration::from_secs(10)) | |
| 118 | - | .connect(&tpl_url) | |
| 137 | + | // Serialize template setup across ALL processes on this cluster, not | |
| 138 | + | // just this process's `Once`. Without it, concurrent test binaries | |
| 139 | + | // each unconditionally drop+recreate the SAME shared-role template, | |
| 140 | + | // so one drops the template another is mid-clone from — the mass | |
| 141 | + | // "template database ... does not exist" flake on the deploy gate. | |
| 142 | + | // Held until `conn` drops at the end of this block. | |
| 143 | + | sqlx::query("SELECT pg_advisory_lock($1)") | |
| 144 | + | .bind(TEMPLATE_LOCK_KEY) | |
| 145 | + | .execute(&mut conn) | |
| 119 | 146 | .await | |
| 120 | - | .expect("connect to template database"); | |
| 121 | - | ||
| 122 | - | let t_migrate = std::time::Instant::now(); | |
| 123 | - | sqlx::migrate!("./migrations") | |
| 124 | - | .run(&tpl_pool) | |
| 147 | + | .expect("acquire template advisory lock"); | |
| 148 | + | ||
| 149 | + | // Reuse the template when it's already migration-current. Dropping a | |
| 150 | + | // live template is what races concurrent clones, and a FORCE drop | |
| 151 | + | // also fails when another role holds connections we can't terminate | |
| 152 | + | // ("permission denied to terminate process"). Only rebuild when the | |
| 153 | + | // template is missing or its migration head is stale. | |
| 154 | + | if template_is_current(&admin, &template).await { | |
| 155 | + | eprintln!("[test-harness] Reusing current template DB {template}"); | |
| 156 | + | } else { | |
| 157 | + | // We hold the cross-process lock, so no clone can be reading it. | |
| 158 | + | conn.execute(format!( | |
| 159 | + | "DROP DATABASE IF EXISTS \"{template}\" WITH (FORCE)" | |
| 160 | + | ).as_str()) | |
| 125 | 161 | .await | |
| 126 | - | .expect("run migrations on template"); | |
| 127 | - | let migrate_ms = t_migrate.elapsed().as_millis(); | |
| 128 | - | ||
| 129 | - | // Also create the session store table | |
| 130 | - | let session_store = tower_sessions_sqlx_store::PostgresStore::new(tpl_pool.clone()); | |
| 131 | - | session_store.migrate().await.expect("session store migration on template"); | |
| 132 | - | ||
| 133 | - | tpl_pool.close().await; | |
| 134 | - | ||
| 135 | - | let total_ms = t0.elapsed().as_millis(); | |
| 136 | - | eprintln!( | |
| 137 | - | "[test-harness] Template DB created in {}ms (migrations: {}ms)", | |
| 138 | - | total_ms, migrate_ms | |
| 139 | - | ); | |
| 162 | + | .unwrap_or_else(|e| panic!( | |
| 163 | + | "drop stale template {template}: {e} \ | |
| 164 | + | (if owned by a different role, drop it as the postgres superuser)" | |
| 165 | + | )); | |
| 166 | + | ||
| 167 | + | conn.execute(format!( | |
| 168 | + | "CREATE DATABASE \"{template}\"" | |
| 169 | + | ).as_str()) | |
| 170 | + | .await | |
| 171 | + | .expect("create template database"); | |
| 172 | + | ||
| 173 | + | // Connect to the template and run all migrations | |
| 174 | + | let tpl_url = replace_db_name(&admin, &template); | |
| 175 | + | let tpl_pool = PgPoolOptions::new() | |
| 176 | + | .max_connections(2) | |
| 177 | + | .acquire_timeout(Duration::from_secs(10)) | |
| 178 | + | .connect(&tpl_url) | |
| 179 | + | .await | |
| 180 | + | .expect("connect to template database"); | |
| 181 | + | ||
| 182 | + | let t_migrate = std::time::Instant::now(); | |
| 183 | + | sqlx::migrate!("./migrations") | |
| 184 | + | .run(&tpl_pool) | |
| 185 | + | .await | |
| 186 | + | .expect("run migrations on template"); | |
| 187 | + | let migrate_ms = t_migrate.elapsed().as_millis(); | |
| 188 | + | ||
| 189 | + | // Also create the session store table | |
| 190 | + | let session_store = tower_sessions_sqlx_store::PostgresStore::new(tpl_pool.clone()); | |
| 191 | + | session_store.migrate().await.expect("session store migration on template"); | |
| 192 | + | ||
| 193 | + | tpl_pool.close().await; | |
| 194 | + | ||
| 195 | + | let total_ms = t0.elapsed().as_millis(); | |
| 196 | + | eprintln!( | |
| 197 | + | "[test-harness] Template DB created in {}ms (migrations: {}ms)", | |
| 198 | + | total_ms, migrate_ms | |
| 199 | + | ); | |
| 200 | + | } | |
| 201 | + | ||
| 202 | + | // Release explicitly (also released when `conn` drops); the lock was | |
| 203 | + | // held across the whole reuse-or-rebuild window above. | |
| 204 | + | let _ = sqlx::query("SELECT pg_advisory_unlock($1)") | |
| 205 | + | .bind(TEMPLATE_LOCK_KEY) | |
| 206 | + | .execute(&mut conn) | |
| 207 | + | .await; | |
| 140 | 208 | }); | |
| 141 | 209 | }); | |
| 142 | 210 | } | |
| @@ -174,16 +242,28 @@ impl TestDb { | |||
| 174 | 242 | // can drop it. Matches the template's ownership. | |
| 175 | 243 | assume_shared_role(&mut admin_conn).await; | |
| 176 | 244 | ||
| 177 | - | admin_conn | |
| 178 | - | .execute( | |
| 179 | - | format!( | |
| 180 | - | "CREATE DATABASE \"{db_name}\" TEMPLATE \"{}\"", | |
| 181 | - | template_name() | |
| 182 | - | ) | |
| 183 | - | .as_str(), | |
| 184 | - | ) | |
| 185 | - | .await | |
| 186 | - | .expect("Failed to create test database from template"); | |
| 245 | + | // Clone from the template, retrying on transient contention. Postgres | |
| 246 | + | // serializes `CREATE DATABASE ... TEMPLATE t`: a concurrent clone of the | |
| 247 | + | // same template (under the full-suite parallel stampede) fails with | |
| 248 | + | // "source database ... is being accessed by other users". Retry rather | |
| 249 | + | // than fail the test on that transient. | |
| 250 | + | let create_sql = format!( | |
| 251 | + | "CREATE DATABASE \"{db_name}\" TEMPLATE \"{}\"", | |
| 252 | + | template_name() | |
| 253 | + | ); | |
| 254 | + | let mut attempt = 0u32; | |
| 255 | + | loop { | |
| 256 | + | match admin_conn.execute(create_sql.as_str()).await { | |
| 257 | + | Ok(_) => break, | |
| 258 | + | Err(_) if attempt < 8 => { | |
| 259 | + | attempt += 1; | |
| 260 | + | tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await; | |
| 261 | + | } | |
| 262 | + | Err(e) => panic!( | |
| 263 | + | "Failed to create test database from template after {attempt} retries: {e}" | |
| 264 | + | ), | |
| 265 | + | } | |
| 266 | + | } | |
| 187 | 267 | ||
| 188 | 268 | let test_url = replace_db_name(&admin, &db_name); | |
| 189 | 269 |