Skip to main content

max / makenotwork

10.1 KB · 353 lines History Blame Raw
1 //! Build pipeline queries: configs, builds, status updates.
2
3 use sqlx::PgPool;
4
5 use super::models::*;
6 use super::{BuildConfigId, BuildId, BuildStatus, OtaReleaseId, SyncAppId};
7 use crate::error::Result;
8
9 // ── Build configs ──
10
11 /// Create a new build config for a sync app.
12 #[tracing::instrument(skip_all)]
13 pub async fn create_build_config(
14 pool: &PgPool,
15 app_id: SyncAppId,
16 repo_id: super::GitRepoId,
17 build_command: &str,
18 artifact_path: &str,
19 signing_key_path: &str,
20 targets: &[String],
21 ) -> Result<DbBuildConfig> {
22 let config = sqlx::query_as::<_, DbBuildConfig>(
23 r#"
24 INSERT INTO ota_build_configs (app_id, repo_id, build_command, artifact_path, signing_key_path, targets)
25 VALUES ($1, $2, $3, $4, $5, $6)
26 RETURNING *
27 "#,
28 )
29 .bind(app_id)
30 .bind(repo_id)
31 .bind(build_command)
32 .bind(artifact_path)
33 .bind(signing_key_path)
34 .bind(targets)
35 .fetch_one(pool)
36 .await?;
37
38 Ok(config)
39 }
40
41 /// Get the build config for a sync app.
42 #[tracing::instrument(skip_all)]
43 pub async fn get_build_config_by_app(
44 pool: &PgPool,
45 app_id: SyncAppId,
46 ) -> Result<Option<DbBuildConfig>> {
47 let config = sqlx::query_as::<_, DbBuildConfig>(
48 "SELECT * FROM ota_build_configs WHERE app_id = $1",
49 )
50 .bind(app_id)
51 .fetch_optional(pool)
52 .await?;
53
54 Ok(config)
55 }
56
57 /// Get an enabled build config by repo ID (for hook trigger lookup).
58 #[tracing::instrument(skip_all)]
59 pub async fn get_build_config_by_repo(
60 pool: &PgPool,
61 repo_id: super::GitRepoId,
62 ) -> Result<Option<DbBuildConfig>> {
63 let config = sqlx::query_as::<_, DbBuildConfig>(
64 "SELECT * FROM ota_build_configs WHERE repo_id = $1 AND enabled = true",
65 )
66 .bind(repo_id)
67 .fetch_optional(pool)
68 .await?;
69
70 Ok(config)
71 }
72
73 /// Update a build config.
74 #[tracing::instrument(skip_all)]
75 pub async fn update_build_config(
76 pool: &PgPool,
77 config_id: BuildConfigId,
78 build_command: &str,
79 artifact_path: &str,
80 signing_key_path: &str,
81 targets: &[String],
82 enabled: bool,
83 ) -> Result<DbBuildConfig> {
84 let config = sqlx::query_as::<_, DbBuildConfig>(
85 r#"
86 UPDATE ota_build_configs
87 SET build_command = $2, artifact_path = $3, signing_key_path = $4,
88 targets = $5, enabled = $6, updated_at = now()
89 WHERE id = $1
90 RETURNING *
91 "#,
92 )
93 .bind(config_id)
94 .bind(build_command)
95 .bind(artifact_path)
96 .bind(signing_key_path)
97 .bind(targets)
98 .bind(enabled)
99 .fetch_one(pool)
100 .await?;
101
102 Ok(config)
103 }
104
105 /// Delete a build config (cascades to builds).
106 #[tracing::instrument(skip_all)]
107 pub async fn delete_build_config(pool: &PgPool, config_id: BuildConfigId) -> Result<bool> {
108 let result = sqlx::query("DELETE FROM ota_build_configs WHERE id = $1")
109 .bind(config_id)
110 .execute(pool)
111 .await?;
112
113 Ok(result.rows_affected() > 0)
114 }
115
116 // ── Builds ──
117
118 /// Create a new pending build.
119 #[tracing::instrument(skip_all)]
120 pub async fn create_build(
121 pool: &PgPool,
122 config_id: BuildConfigId,
123 app_id: SyncAppId,
124 version: &str,
125 tag: &str,
126 triggered_by: &str,
127 ) -> Result<DbBuild> {
128 let build = sqlx::query_as::<_, DbBuild>(
129 r#"
130 INSERT INTO ota_builds (config_id, app_id, version, tag, triggered_by)
131 VALUES ($1, $2, $3, $4, $5)
132 RETURNING *
133 "#,
134 )
135 .bind(config_id)
136 .bind(app_id)
137 .bind(version)
138 .bind(tag)
139 .bind(triggered_by)
140 .fetch_one(pool)
141 .await?;
142
143 Ok(build)
144 }
145
146 /// Get a build by ID.
147 #[tracing::instrument(skip_all)]
148 pub async fn get_build(pool: &PgPool, build_id: BuildId) -> Result<Option<DbBuild>> {
149 let build = sqlx::query_as::<_, DbBuild>(
150 "SELECT * FROM ota_builds WHERE id = $1",
151 )
152 .bind(build_id)
153 .fetch_optional(pool)
154 .await?;
155
156 Ok(build)
157 }
158
159 /// Return the current `octet_length` of a build's log column, plus whether
160 /// it already ends with `marker` (server-side string compare so the full
161 /// log column never travels back over the wire).
162 ///
163 /// Used by `append_log_bounded` to make the cap check without pulling the
164 /// 5 MiB log row into memory on every line.
165 #[tracing::instrument(skip_all)]
166 pub async fn get_build_log_size(
167 pool: &PgPool,
168 build_id: BuildId,
169 marker: &str,
170 ) -> Result<Option<(i64, bool)>> {
171 let row: Option<(i64, bool)> = sqlx::query_as(
172 "SELECT octet_length(log)::BIGINT, right(log, char_length($2)) = $2 \
173 FROM ota_builds WHERE id = $1",
174 )
175 .bind(build_id)
176 .bind(marker)
177 .fetch_optional(pool)
178 .await?;
179
180 Ok(row)
181 }
182
183 /// List builds for an app, newest first.
184 #[tracing::instrument(skip_all)]
185 pub async fn list_builds_by_app(
186 pool: &PgPool,
187 app_id: SyncAppId,
188 limit: i64,
189 ) -> Result<Vec<DbBuild>> {
190 let builds = sqlx::query_as::<_, DbBuild>(
191 "SELECT * FROM ota_builds WHERE app_id = $1 ORDER BY created_at DESC LIMIT $2",
192 )
193 .bind(app_id)
194 .bind(limit)
195 .fetch_all(pool)
196 .await?;
197
198 Ok(builds)
199 }
200
201 /// Check if a config has any active (pending or running) build.
202 #[tracing::instrument(skip_all)]
203 pub async fn has_active_build(pool: &PgPool, config_id: BuildConfigId) -> Result<bool> {
204 let count: (i64,) = sqlx::query_as(
205 "SELECT COUNT(*) FROM ota_builds WHERE config_id = $1 AND status IN ('pending', 'running')",
206 )
207 .bind(config_id)
208 .fetch_one(pool)
209 .await?;
210
211 Ok(count.0 > 0)
212 }
213
214 /// Update build status with conditional timestamps.
215 #[tracing::instrument(skip_all)]
216 pub async fn update_build_status(
217 pool: &PgPool,
218 build_id: BuildId,
219 status: BuildStatus,
220 error_message: Option<&str>,
221 ) -> Result<()> {
222 match status {
223 BuildStatus::Running => {
224 sqlx::query(
225 "UPDATE ota_builds SET status = $2, started_at = now() WHERE id = $1",
226 )
227 .bind(build_id)
228 .bind(status)
229 .execute(pool)
230 .await?;
231 }
232 BuildStatus::Succeeded | BuildStatus::Failed | BuildStatus::Cancelled => {
233 // Gate on a non-terminal source status so the stale-build reaper
234 // (`fail_stale_running_builds`) and a real terminal write can't
235 // race: if the reaper just flipped the row to 'failed', the
236 // successful builder's write must no-op rather than clobber it.
237 // Pending IS a legitimate source — cancelling a build that never
238 // started must still transition pending → cancelled.
239 let result = sqlx::query(
240 "UPDATE ota_builds SET status = $2, finished_at = now(), error_message = $3 WHERE id = $1 AND status IN ('pending', 'running')",
241 )
242 .bind(build_id)
243 .bind(status)
244 .bind(error_message)
245 .execute(pool)
246 .await?;
247 if result.rows_affected() == 0 {
248 tracing::warn!(build_id = %build_id, target_status = ?status,
249 "build status terminal write skipped — row already terminal (likely reaper-set)");
250 }
251 }
252 BuildStatus::Pending => {
253 sqlx::query("UPDATE ota_builds SET status = $2 WHERE id = $1")
254 .bind(build_id)
255 .bind(status)
256 .execute(pool)
257 .await?;
258 }
259 }
260
261 Ok(())
262 }
263
264 /// Atomically claim the oldest pending build if no build is currently running.
265 ///
266 /// Sets status to 'running' and started_at in a single UPDATE with a subquery,
267 /// eliminating the TOCTOU race between checking for running builds and fetching
268 /// a pending one. `FOR UPDATE SKIP LOCKED` means concurrent callers never block
269 ///; the loser simply gets no row.
270 #[tracing::instrument(skip_all)]
271 pub async fn claim_pending_build(pool: &PgPool) -> Result<Option<DbBuild>> {
272 let result = sqlx::query_as::<_, DbBuild>(
273 r#"
274 UPDATE ota_builds
275 SET status = 'running', started_at = now()
276 WHERE id = (
277 SELECT id FROM ota_builds
278 WHERE status = 'pending'
279 AND NOT EXISTS (SELECT 1 FROM ota_builds WHERE status = 'running')
280 ORDER BY created_at ASC
281 LIMIT 1
282 FOR UPDATE SKIP LOCKED
283 )
284 RETURNING *
285 "#,
286 )
287 .fetch_optional(pool)
288 .await;
289
290 // Multi-replica: the NOT EXISTS subquery races between replicas. The
291 // `ota_builds_single_running` partial unique index is the backstop —
292 // the loser's UPDATE surfaces as a 23505 unique violation, which means
293 // a peer claimed first. Treat as "nothing to claim this tick".
294 match result {
295 Ok(build) => Ok(build),
296 Err(sqlx::Error::Database(e)) if e.code().as_deref() == Some("23505") => {
297 tracing::info!("claim_pending_build lost the running-slot race; another replica claimed");
298 Ok(None)
299 }
300 Err(e) => Err(e.into()),
301 }
302 }
303
304 /// Mark any builds that have been "running" longer than the timeout as failed.
305 ///
306 /// Returns the number of builds marked as failed.
307 #[tracing::instrument(skip_all)]
308 pub async fn fail_stale_running_builds(pool: &PgPool, timeout_secs: i64) -> Result<u64> {
309 let result = sqlx::query(
310 r#"
311 UPDATE ota_builds
312 SET status = 'failed',
313 finished_at = now(),
314 error_message = 'Build timed out (stale running status)'
315 WHERE status = 'running'
316 AND started_at < now() - make_interval(secs => $1)
317 "#,
318 )
319 .bind(timeout_secs as f64)
320 .execute(pool)
321 .await?;
322
323 Ok(result.rows_affected())
324 }
325
326 /// Append a line to the build log.
327 #[tracing::instrument(skip_all)]
328 pub async fn append_build_log(pool: &PgPool, build_id: BuildId, line: &str) -> Result<()> {
329 sqlx::query("UPDATE ota_builds SET log = log || $2 WHERE id = $1")
330 .bind(build_id)
331 .bind(line)
332 .execute(pool)
333 .await?;
334
335 Ok(())
336 }
337
338 /// Set the release ID on a build (after successful artifact upload).
339 #[tracing::instrument(skip_all)]
340 pub async fn set_build_release(
341 pool: &PgPool,
342 build_id: BuildId,
343 release_id: OtaReleaseId,
344 ) -> Result<()> {
345 sqlx::query("UPDATE ota_builds SET release_id = $2 WHERE id = $1")
346 .bind(build_id)
347 .bind(release_id)
348 .execute(pool)
349 .await?;
350
351 Ok(())
352 }
353