//! Import API endpoints: start import, check status, list jobs. use axum::{ extract::{Path, State}, response::IntoResponse, Json, }; use base64::Engine; use serde::Deserialize; use serde_json::json; use crate::{ auth::AuthUser, db::{self, ImportJobId, ProjectId}, error::{AppError, Result}, import::{self, ColumnMapping, ImportSource}, AppState, }; /// Maximum CSV size: 10 MB (base64-encoded). const MAX_CSV_SIZE: usize = 10 * 1024 * 1024; /// Request body for starting an import. #[derive(Debug, Deserialize)] pub(super) struct StartImportRequest { pub project_id: ProjectId, pub source: ImportSource, pub csv_data: String, pub column_mapping: ColumnMapping, } /// Start a new import job from CSV data. /// /// Creates the job in `pending` status, then spawns a background task /// to parse the CSV and run the import pipeline. #[tracing::instrument(skip_all, name = "imports::start_import")] pub(super) async fn start_import( State(state): State, AuthUser(user): AuthUser, Json(req): Json, ) -> Result { user.check_not_suspended()?; user.check_not_sandbox()?; // Validate project ownership super::verify_project_ownership(&state, req.project_id, user.id).await?; // Only generic_csv is currently supported if req.source != ImportSource::GenericCsv { return Err(AppError::validation(format!( "Import source '{}' is not yet supported. Only 'generic_csv' is available.", req.source, ))); } // Decode base64 CSV data let csv_bytes = base64::engine::general_purpose::STANDARD .decode(&req.csv_data) .map_err(|_| AppError::validation("Invalid base64-encoded CSV data"))?; if csv_bytes.len() > MAX_CSV_SIZE { return Err(AppError::validation(format!( "CSV data exceeds maximum size of {} MB", MAX_CSV_SIZE / (1024 * 1024), ))); } // Parse CSV into payload let payload = import::csv_converter::parse_csv(&csv_bytes, &req.column_mapping)?; let total_rows = payload.total_rows() as i32; // Create import job let job = db::imports::create_import_job( &state.db, user.id, req.project_id, req.source, total_rows, ) .await?; let job_id = job.id; let pool = state.db.clone(); let project_id = req.project_id; let user_id = user.id; // Spawn background task for the import pipeline tokio::spawn(async move { if let Err(e) = import::pipeline::run_import( &pool, job_id, project_id, user_id, payload, ) .await { tracing::error!(error = %e, job_id = %job_id, "import pipeline failed"); let _ = db::imports::fail_import_job(&pool, job_id, &e.to_string()).await; } }); Ok(Json(json!({ "job_id": job_id }))) } /// Get the status and progress of an import job. #[tracing::instrument(skip_all, name = "imports::get_import_status")] pub(super) async fn get_import_status( State(state): State, AuthUser(user): AuthUser, Path(id): Path, ) -> Result { let job_id: ImportJobId = id.parse().map_err(|_| AppError::NotFound)?; let job = db::imports::get_import_job(&state.db, job_id, user.id) .await? .ok_or(AppError::NotFound)?; Ok(Json(json!({ "id": job.id, "source": job.source.to_string(), "status": job.status.to_string(), "total_rows": job.total_rows, "processed_rows": job.processed_rows, "created_rows": job.created_rows, "skipped_rows": job.skipped_rows, "error_log": job.error_log, "created_at": job.created_at, "completed_at": job.completed_at, }))) } /// List all import jobs for the authenticated user. #[tracing::instrument(skip_all, name = "imports::list_imports")] pub(super) async fn list_imports( State(state): State, AuthUser(user): AuthUser, ) -> Result { let jobs = db::imports::list_import_jobs(&state.db, user.id).await?; let data: Vec = jobs .into_iter() .map(|j| { json!({ "id": j.id, "source": j.source.to_string(), "status": j.status.to_string(), "total_rows": j.total_rows, "processed_rows": j.processed_rows, "created_rows": j.created_rows, "skipped_rows": j.skipped_rows, "created_at": j.created_at, "completed_at": j.completed_at, }) }) .collect(); Ok(Json(json!({ "data": data }))) }