//! Build pipeline queries: configs, builds, status updates. use sqlx::PgPool; use super::models::*; use super::{BuildConfigId, BuildId, BuildStatus, OtaReleaseId, SyncAppId}; use crate::error::Result; // ── Build configs ── /// Create a new build config for a sync app. #[tracing::instrument(skip_all)] pub async fn create_build_config( pool: &PgPool, app_id: SyncAppId, repo_id: super::GitRepoId, build_command: &str, artifact_path: &str, signing_key_path: &str, targets: &[String], ) -> Result { let config = sqlx::query_as::<_, DbBuildConfig>( r#" INSERT INTO ota_build_configs (app_id, repo_id, build_command, artifact_path, signing_key_path, targets) VALUES ($1, $2, $3, $4, $5, $6) RETURNING * "#, ) .bind(app_id) .bind(repo_id) .bind(build_command) .bind(artifact_path) .bind(signing_key_path) .bind(targets) .fetch_one(pool) .await?; Ok(config) } /// Get the build config for a sync app. #[tracing::instrument(skip_all)] pub async fn get_build_config_by_app( pool: &PgPool, app_id: SyncAppId, ) -> Result> { let config = sqlx::query_as::<_, DbBuildConfig>( "SELECT * FROM ota_build_configs WHERE app_id = $1", ) .bind(app_id) .fetch_optional(pool) .await?; Ok(config) } /// Get an enabled build config by repo ID (for hook trigger lookup). #[tracing::instrument(skip_all)] pub async fn get_build_config_by_repo( pool: &PgPool, repo_id: super::GitRepoId, ) -> Result> { let config = sqlx::query_as::<_, DbBuildConfig>( "SELECT * FROM ota_build_configs WHERE repo_id = $1 AND enabled = true", ) .bind(repo_id) .fetch_optional(pool) .await?; Ok(config) } /// Update a build config. #[tracing::instrument(skip_all)] pub async fn update_build_config( pool: &PgPool, config_id: BuildConfigId, build_command: &str, artifact_path: &str, signing_key_path: &str, targets: &[String], enabled: bool, ) -> Result { let config = sqlx::query_as::<_, DbBuildConfig>( r#" UPDATE ota_build_configs SET build_command = $2, artifact_path = $3, signing_key_path = $4, targets = $5, enabled = $6, updated_at = now() WHERE id = $1 RETURNING * "#, ) .bind(config_id) .bind(build_command) .bind(artifact_path) .bind(signing_key_path) .bind(targets) .bind(enabled) .fetch_one(pool) .await?; Ok(config) } /// Delete a build config (cascades to builds). #[tracing::instrument(skip_all)] pub async fn delete_build_config(pool: &PgPool, config_id: BuildConfigId) -> Result { let result = sqlx::query("DELETE FROM ota_build_configs WHERE id = $1") .bind(config_id) .execute(pool) .await?; Ok(result.rows_affected() > 0) } // ── Builds ── /// Create a new pending build. #[tracing::instrument(skip_all)] pub async fn create_build( pool: &PgPool, config_id: BuildConfigId, app_id: SyncAppId, version: &str, tag: &str, triggered_by: &str, ) -> Result { let build = sqlx::query_as::<_, DbBuild>( r#" INSERT INTO ota_builds (config_id, app_id, version, tag, triggered_by) VALUES ($1, $2, $3, $4, $5) RETURNING * "#, ) .bind(config_id) .bind(app_id) .bind(version) .bind(tag) .bind(triggered_by) .fetch_one(pool) .await?; Ok(build) } /// Get a build by ID. #[tracing::instrument(skip_all)] pub async fn get_build(pool: &PgPool, build_id: BuildId) -> Result> { let build = sqlx::query_as::<_, DbBuild>( "SELECT * FROM ota_builds WHERE id = $1", ) .bind(build_id) .fetch_optional(pool) .await?; Ok(build) } /// Return the current `octet_length` of a build's log column, plus whether /// it already ends with `marker` (server-side string compare so the full /// log column never travels back over the wire). /// /// Used by `append_log_bounded` to make the cap check without pulling the /// 5 MiB log row into memory on every line. #[tracing::instrument(skip_all)] pub async fn get_build_log_size( pool: &PgPool, build_id: BuildId, marker: &str, ) -> Result> { let row: Option<(i64, bool)> = sqlx::query_as( "SELECT octet_length(log)::BIGINT, right(log, char_length($2)) = $2 \ FROM ota_builds WHERE id = $1", ) .bind(build_id) .bind(marker) .fetch_optional(pool) .await?; Ok(row) } /// List builds for an app, newest first. #[tracing::instrument(skip_all)] pub async fn list_builds_by_app( pool: &PgPool, app_id: SyncAppId, limit: i64, ) -> Result> { let builds = sqlx::query_as::<_, DbBuild>( "SELECT * FROM ota_builds WHERE app_id = $1 ORDER BY created_at DESC LIMIT $2", ) .bind(app_id) .bind(limit) .fetch_all(pool) .await?; Ok(builds) } /// Check if a config has any active (pending or running) build. #[tracing::instrument(skip_all)] pub async fn has_active_build(pool: &PgPool, config_id: BuildConfigId) -> Result { let count: (i64,) = sqlx::query_as( "SELECT COUNT(*) FROM ota_builds WHERE config_id = $1 AND status IN ('pending', 'running')", ) .bind(config_id) .fetch_one(pool) .await?; Ok(count.0 > 0) } /// Update build status with conditional timestamps. #[tracing::instrument(skip_all)] pub async fn update_build_status( pool: &PgPool, build_id: BuildId, status: BuildStatus, error_message: Option<&str>, ) -> Result<()> { match status { BuildStatus::Running => { sqlx::query( "UPDATE ota_builds SET status = $2, started_at = now() WHERE id = $1", ) .bind(build_id) .bind(status) .execute(pool) .await?; } BuildStatus::Succeeded | BuildStatus::Failed | BuildStatus::Cancelled => { // Gate on a non-terminal source status so the stale-build reaper // (`fail_stale_running_builds`) and a real terminal write can't // race: if the reaper just flipped the row to 'failed', the // successful builder's write must no-op rather than clobber it. // Pending IS a legitimate source — cancelling a build that never // started must still transition pending → cancelled. let result = sqlx::query( "UPDATE ota_builds SET status = $2, finished_at = now(), error_message = $3 WHERE id = $1 AND status IN ('pending', 'running')", ) .bind(build_id) .bind(status) .bind(error_message) .execute(pool) .await?; if result.rows_affected() == 0 { tracing::warn!(build_id = %build_id, target_status = ?status, "build status terminal write skipped — row already terminal (likely reaper-set)"); } } BuildStatus::Pending => { sqlx::query("UPDATE ota_builds SET status = $2 WHERE id = $1") .bind(build_id) .bind(status) .execute(pool) .await?; } } Ok(()) } /// Atomically claim the oldest pending build if no build is currently running. /// /// Sets status to 'running' and started_at in a single UPDATE with a subquery, /// eliminating the TOCTOU race between checking for running builds and fetching /// a pending one. `FOR UPDATE SKIP LOCKED` means concurrent callers never block ///; the loser simply gets no row. #[tracing::instrument(skip_all)] pub async fn claim_pending_build(pool: &PgPool) -> Result> { let result = sqlx::query_as::<_, DbBuild>( r#" UPDATE ota_builds SET status = 'running', started_at = now() WHERE id = ( SELECT id FROM ota_builds WHERE status = 'pending' AND NOT EXISTS (SELECT 1 FROM ota_builds WHERE status = 'running') ORDER BY created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) RETURNING * "#, ) .fetch_optional(pool) .await; // Multi-replica: the NOT EXISTS subquery races between replicas. The // `ota_builds_single_running` partial unique index is the backstop — // the loser's UPDATE surfaces as a 23505 unique violation, which means // a peer claimed first. Treat as "nothing to claim this tick". match result { Ok(build) => Ok(build), Err(sqlx::Error::Database(e)) if e.code().as_deref() == Some("23505") => { tracing::info!("claim_pending_build lost the running-slot race; another replica claimed"); Ok(None) } Err(e) => Err(e.into()), } } /// Mark any builds that have been "running" longer than the timeout as failed. /// /// Returns the number of builds marked as failed. #[tracing::instrument(skip_all)] pub async fn fail_stale_running_builds(pool: &PgPool, timeout_secs: i64) -> Result { let result = sqlx::query( r#" UPDATE ota_builds SET status = 'failed', finished_at = now(), error_message = 'Build timed out (stale running status)' WHERE status = 'running' AND started_at < now() - make_interval(secs => $1) "#, ) .bind(timeout_secs as f64) .execute(pool) .await?; Ok(result.rows_affected()) } /// Append a line to the build log. #[tracing::instrument(skip_all)] pub async fn append_build_log(pool: &PgPool, build_id: BuildId, line: &str) -> Result<()> { sqlx::query("UPDATE ota_builds SET log = log || $2 WHERE id = $1") .bind(build_id) .bind(line) .execute(pool) .await?; Ok(()) } /// Set the release ID on a build (after successful artifact upload). #[tracing::instrument(skip_all)] pub async fn set_build_release( pool: &PgPool, build_id: BuildId, release_id: OtaReleaseId, ) -> Result<()> { sqlx::query("UPDATE ota_builds SET release_id = $2 WHERE id = $1") .bind(build_id) .bind(release_id) .execute(pool) .await?; Ok(()) }