//! Import job CRUD: create, update progress, complete, list. use sqlx::PgPool; use super::id_types::{ImportJobId, ProjectId, UserId}; use super::models::DbImportJob; use crate::import::ImportSource; use crate::error::Result; /// Create a new import job in `pending` status. #[tracing::instrument(skip_all)] pub async fn create_import_job( pool: &PgPool, user_id: UserId, project_id: ProjectId, source: ImportSource, total_rows: i32, ) -> Result { let job = sqlx::query_as::<_, DbImportJob>( r#" INSERT INTO import_jobs (user_id, project_id, source, total_rows) VALUES ($1, $2, $3, $4) RETURNING * "#, ) .bind(user_id) .bind(project_id) .bind(source) .bind(total_rows) .fetch_one(pool) .await?; Ok(job) } /// Update the processing progress of an import job. #[tracing::instrument(skip_all)] pub async fn update_import_progress( pool: &PgPool, job_id: ImportJobId, processed_rows: i32, created_rows: i32, skipped_rows: i32, ) -> Result<()> { sqlx::query( r#" UPDATE import_jobs SET processed_rows = $2, created_rows = $3, skipped_rows = $4 WHERE id = $1 "#, ) .bind(job_id) .bind(processed_rows) .bind(created_rows) .bind(skipped_rows) .execute(pool) .await?; Ok(()) } /// Update the status of an import job. #[tracing::instrument(skip_all)] pub async fn update_import_status( pool: &PgPool, job_id: ImportJobId, status: &str, ) -> Result<()> { sqlx::query("UPDATE import_jobs SET status = $2 WHERE id = $1") .bind(job_id) .bind(status) .execute(pool) .await?; Ok(()) } /// Mark an import job as completed with optional error log. #[tracing::instrument(skip_all)] pub async fn complete_import_job( pool: &PgPool, job_id: ImportJobId, error_log: Option, ) -> Result<()> { sqlx::query( r#" UPDATE import_jobs SET status = 'completed', completed_at = NOW(), error_log = $2 WHERE id = $1 "#, ) .bind(job_id) .bind(error_log) .execute(pool) .await?; Ok(()) } /// Mark an import job as failed with an error message. #[tracing::instrument(skip_all)] pub async fn fail_import_job( pool: &PgPool, job_id: ImportJobId, error: &str, ) -> Result<()> { sqlx::query( r#" UPDATE import_jobs SET status = 'failed', completed_at = NOW(), error_log = $2 WHERE id = $1 "#, ) .bind(job_id) .bind(error) .execute(pool) .await?; Ok(()) } /// Get a single import job by ID, scoped to a user. #[tracing::instrument(skip_all)] pub async fn get_import_job( pool: &PgPool, job_id: ImportJobId, user_id: UserId, ) -> Result> { let job = sqlx::query_as::<_, DbImportJob>( "SELECT * FROM import_jobs WHERE id = $1 AND user_id = $2", ) .bind(job_id) .bind(user_id) .fetch_optional(pool) .await?; Ok(job) } /// List import jobs for a user, most recent first. #[tracing::instrument(skip_all)] pub async fn list_import_jobs( pool: &PgPool, user_id: UserId, ) -> Result> { let jobs = sqlx::query_as::<_, DbImportJob>( "SELECT * FROM import_jobs WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20", ) .bind(user_id) .fetch_all(pool) .await?; Ok(jobs) }