Skip to main content

max / makenotwork

4.7 KB · 160 lines History Blame Raw
1 //! Import API endpoints: start import, check status, list jobs.
2
3 use axum::{
4 extract::{Path, State},
5 response::IntoResponse,
6 Json,
7 };
8 use base64::Engine;
9 use serde::Deserialize;
10 use serde_json::json;
11
12 use crate::{
13 auth::AuthUser,
14 db::{self, ImportJobId, ProjectId},
15 error::{AppError, Result},
16 import::{self, ColumnMapping, ImportSource},
17 AppState,
18 };
19
20 /// Maximum CSV size: 10 MB (base64-encoded).
21 const MAX_CSV_SIZE: usize = 10 * 1024 * 1024;
22
23 /// Request body for starting an import.
24 #[derive(Debug, Deserialize)]
25 pub(super) struct StartImportRequest {
26 pub project_id: ProjectId,
27 pub source: ImportSource,
28 pub csv_data: String,
29 pub column_mapping: ColumnMapping,
30 }
31
32 /// Start a new import job from CSV data.
33 ///
34 /// Creates the job in `pending` status, then spawns a background task
35 /// to parse the CSV and run the import pipeline.
36 #[tracing::instrument(skip_all, name = "imports::start_import")]
37 pub(super) async fn start_import(
38 State(state): State<AppState>,
39 AuthUser(user): AuthUser,
40 Json(req): Json<StartImportRequest>,
41 ) -> Result<impl IntoResponse> {
42 user.check_not_suspended()?;
43 user.check_not_sandbox()?;
44
45 // Validate project ownership
46 super::verify_project_ownership(&state, req.project_id, user.id).await?;
47
48 // Only generic_csv is currently supported
49 if req.source != ImportSource::GenericCsv {
50 return Err(AppError::validation(format!(
51 "Import source '{}' is not yet supported. Only 'generic_csv' is available.",
52 req.source,
53 )));
54 }
55
56 // Decode base64 CSV data
57 let csv_bytes = base64::engine::general_purpose::STANDARD
58 .decode(&req.csv_data)
59 .map_err(|_| AppError::validation("Invalid base64-encoded CSV data"))?;
60
61 if csv_bytes.len() > MAX_CSV_SIZE {
62 return Err(AppError::validation(format!(
63 "CSV data exceeds maximum size of {} MB",
64 MAX_CSV_SIZE / (1024 * 1024),
65 )));
66 }
67
68 // Parse CSV into payload
69 let payload = import::csv_converter::parse_csv(&csv_bytes, &req.column_mapping)?;
70 let total_rows = payload.total_rows() as i32;
71
72 // Create import job
73 let job = db::imports::create_import_job(
74 &state.db,
75 user.id,
76 req.project_id,
77 req.source,
78 total_rows,
79 )
80 .await?;
81
82 let job_id = job.id;
83 let pool = state.db.clone();
84 let project_id = req.project_id;
85 let user_id = user.id;
86
87 // Spawn background task for the import pipeline
88 tokio::spawn(async move {
89 if let Err(e) = import::pipeline::run_import(
90 &pool,
91 job_id,
92 project_id,
93 user_id,
94 payload,
95 )
96 .await
97 {
98 tracing::error!(error = %e, job_id = %job_id, "import pipeline failed");
99 let _ = db::imports::fail_import_job(&pool, job_id, &e.to_string()).await;
100 }
101 });
102
103 Ok(Json(json!({ "job_id": job_id })))
104 }
105
106 /// Get the status and progress of an import job.
107 #[tracing::instrument(skip_all, name = "imports::get_import_status")]
108 pub(super) async fn get_import_status(
109 State(state): State<AppState>,
110 AuthUser(user): AuthUser,
111 Path(id): Path<String>,
112 ) -> Result<impl IntoResponse> {
113 let job_id: ImportJobId = id.parse().map_err(|_| AppError::NotFound)?;
114
115 let job = db::imports::get_import_job(&state.db, job_id, user.id)
116 .await?
117 .ok_or(AppError::NotFound)?;
118
119 Ok(Json(json!({
120 "id": job.id,
121 "source": job.source.to_string(),
122 "status": job.status.to_string(),
123 "total_rows": job.total_rows,
124 "processed_rows": job.processed_rows,
125 "created_rows": job.created_rows,
126 "skipped_rows": job.skipped_rows,
127 "error_log": job.error_log,
128 "created_at": job.created_at,
129 "completed_at": job.completed_at,
130 })))
131 }
132
133 /// List all import jobs for the authenticated user.
134 #[tracing::instrument(skip_all, name = "imports::list_imports")]
135 pub(super) async fn list_imports(
136 State(state): State<AppState>,
137 AuthUser(user): AuthUser,
138 ) -> Result<impl IntoResponse> {
139 let jobs = db::imports::list_import_jobs(&state.db, user.id).await?;
140
141 let data: Vec<serde_json::Value> = jobs
142 .into_iter()
143 .map(|j| {
144 json!({
145 "id": j.id,
146 "source": j.source.to_string(),
147 "status": j.status.to_string(),
148 "total_rows": j.total_rows,
149 "processed_rows": j.processed_rows,
150 "created_rows": j.created_rows,
151 "skipped_rows": j.skipped_rows,
152 "created_at": j.created_at,
153 "completed_at": j.completed_at,
154 })
155 })
156 .collect();
157
158 Ok(Json(json!({ "data": data })))
159 }
160