Skip to main content

max / makenotwork

3.4 KB · 155 lines History Blame Raw
1 //! Import job CRUD: create, update progress, complete, list.
2
3 use sqlx::PgPool;
4
5 use super::id_types::{ImportJobId, ProjectId, UserId};
6 use super::models::DbImportJob;
7 use crate::import::ImportSource;
8 use crate::error::Result;
9
10 /// Create a new import job in `pending` status.
11 #[tracing::instrument(skip_all)]
12 pub async fn create_import_job(
13 pool: &PgPool,
14 user_id: UserId,
15 project_id: ProjectId,
16 source: ImportSource,
17 total_rows: i32,
18 ) -> Result<DbImportJob> {
19 let job = sqlx::query_as::<_, DbImportJob>(
20 r#"
21 INSERT INTO import_jobs (user_id, project_id, source, total_rows)
22 VALUES ($1, $2, $3, $4)
23 RETURNING *
24 "#,
25 )
26 .bind(user_id)
27 .bind(project_id)
28 .bind(source)
29 .bind(total_rows)
30 .fetch_one(pool)
31 .await?;
32
33 Ok(job)
34 }
35
36 /// Update the processing progress of an import job.
37 #[tracing::instrument(skip_all)]
38 pub async fn update_import_progress(
39 pool: &PgPool,
40 job_id: ImportJobId,
41 processed_rows: i32,
42 created_rows: i32,
43 skipped_rows: i32,
44 ) -> Result<()> {
45 sqlx::query(
46 r#"
47 UPDATE import_jobs
48 SET processed_rows = $2, created_rows = $3, skipped_rows = $4
49 WHERE id = $1
50 "#,
51 )
52 .bind(job_id)
53 .bind(processed_rows)
54 .bind(created_rows)
55 .bind(skipped_rows)
56 .execute(pool)
57 .await?;
58
59 Ok(())
60 }
61
62 /// Update the status of an import job.
63 #[tracing::instrument(skip_all)]
64 pub async fn update_import_status(
65 pool: &PgPool,
66 job_id: ImportJobId,
67 status: &str,
68 ) -> Result<()> {
69 sqlx::query("UPDATE import_jobs SET status = $2 WHERE id = $1")
70 .bind(job_id)
71 .bind(status)
72 .execute(pool)
73 .await?;
74
75 Ok(())
76 }
77
78 /// Mark an import job as completed with optional error log.
79 #[tracing::instrument(skip_all)]
80 pub async fn complete_import_job(
81 pool: &PgPool,
82 job_id: ImportJobId,
83 error_log: Option<String>,
84 ) -> Result<()> {
85 sqlx::query(
86 r#"
87 UPDATE import_jobs
88 SET status = 'completed', completed_at = NOW(), error_log = $2
89 WHERE id = $1
90 "#,
91 )
92 .bind(job_id)
93 .bind(error_log)
94 .execute(pool)
95 .await?;
96
97 Ok(())
98 }
99
100 /// Mark an import job as failed with an error message.
101 #[tracing::instrument(skip_all)]
102 pub async fn fail_import_job(
103 pool: &PgPool,
104 job_id: ImportJobId,
105 error: &str,
106 ) -> Result<()> {
107 sqlx::query(
108 r#"
109 UPDATE import_jobs
110 SET status = 'failed', completed_at = NOW(), error_log = $2
111 WHERE id = $1
112 "#,
113 )
114 .bind(job_id)
115 .bind(error)
116 .execute(pool)
117 .await?;
118
119 Ok(())
120 }
121
122 /// Get a single import job by ID, scoped to a user.
123 #[tracing::instrument(skip_all)]
124 pub async fn get_import_job(
125 pool: &PgPool,
126 job_id: ImportJobId,
127 user_id: UserId,
128 ) -> Result<Option<DbImportJob>> {
129 let job = sqlx::query_as::<_, DbImportJob>(
130 "SELECT * FROM import_jobs WHERE id = $1 AND user_id = $2",
131 )
132 .bind(job_id)
133 .bind(user_id)
134 .fetch_optional(pool)
135 .await?;
136
137 Ok(job)
138 }
139
140 /// List import jobs for a user, most recent first.
141 #[tracing::instrument(skip_all)]
142 pub async fn list_import_jobs(
143 pool: &PgPool,
144 user_id: UserId,
145 ) -> Result<Vec<DbImportJob>> {
146 let jobs = sqlx::query_as::<_, DbImportJob>(
147 "SELECT * FROM import_jobs WHERE user_id = $1 ORDER BY created_at DESC LIMIT 20",
148 )
149 .bind(user_id)
150 .fetch_all(pool)
151 .await?;
152
153 Ok(jobs)
154 }
155