//! Generic import pipeline: takes an `ImportPayload` and creates MNW entities. //! //! Processing happens in chunks of 50 rows with progress updates after each chunk. //! Individual row failures are logged but don't abort the import. use sqlx::PgPool; use super::{ImportPayload, ImportTier}; use crate::db::{self, ImportJobId, ItemType, PriceCents, ProjectId, UserId}; use crate::error::Result; /// Chunk size for progress updates. const CHUNK_SIZE: usize = 50; /// Run the full import pipeline for a job. /// /// Creates MNW entities (tiers, items, tags, mailing list subscribers) from the /// intermediate payload. Transactions are recorded as subscriber metadata but /// not inserted into the `transactions` table (no matching buyer accounts exist /// for imported email addresses). #[tracing::instrument(skip_all, name = "import::run_import", fields(job_id = %job_id))] pub async fn run_import( pool: &PgPool, job_id: ImportJobId, project_id: ProjectId, _user_id: UserId, payload: ImportPayload, ) -> Result<()> { db::imports::update_import_status(pool, job_id, "processing").await?; let mut processed: i32 = 0; let mut created: i32 = 0; let mut skipped: i32 = 0; let mut errors: Vec = Vec::new(); // ── Phase 1: Tiers ── let created_tiers = import_tiers(pool, project_id, &payload.tiers, &mut errors).await; processed += payload.tiers.len() as i32; created += created_tiers; skipped += payload.tiers.len() as i32 - created_tiers; update_progress(pool, job_id, processed, created, skipped).await; // ── Phase 2: Items ── for chunk in payload.items.chunks(CHUNK_SIZE) { for item in chunk { match import_item(pool, project_id, item).await { Ok(true) => created += 1, Ok(false) => skipped += 1, Err(e) => { errors.push(format!("Item '{}': {}", item.title, e)); skipped += 1; } } processed += 1; } update_progress(pool, job_id, processed, created, skipped).await; } // ── Phase 3: Subscribers (mailing list) ── // Ensure the project has a content mailing list let list = db::mailing_lists::create_list( pool, project_id, db::MailingListType::Content, "Imported Subscribers", Some("Subscribers imported from external platform"), ) .await?; for chunk in payload.subscribers.chunks(CHUNK_SIZE) { for sub in chunk { match db::mailing_lists::subscribe_by_email(pool, list.id, &sub.email).await { Ok(true) => created += 1, Ok(false) => skipped += 1, Err(e) => { errors.push(format!("Subscriber '{}': {}", sub.email, e)); skipped += 1; } } processed += 1; } update_progress(pool, job_id, processed, created, skipped).await; } // ── Phase 4: Transactions (count only, no DB insert) ── // We can't create real transaction rows because imported buyer emails // don't correspond to MNW user accounts. We just count them. processed += payload.transactions.len() as i32; skipped += payload.transactions.len() as i32; update_progress(pool, job_id, processed, created, skipped).await; // ── Finalize ── let error_log = if errors.is_empty() { None } else { Some(errors.join("\n")) }; db::imports::complete_import_job(pool, job_id, error_log).await?; tracing::info!( processed, created, skipped, error_count = errors.len(), "import job completed" ); Ok(()) } /// Import subscription tiers. Returns number of tiers successfully created. async fn import_tiers( pool: &PgPool, project_id: ProjectId, tiers: &[ImportTier], errors: &mut Vec, ) -> i32 { let mut count = 0; for tier in tiers { let price = match tier.price_cents.try_into().ok().and_then(|p: i32| db::PriceCents::new(p).ok()) { Some(p) => p, None => { errors.push(format!( "Tier '{}': price_cents {} is invalid or out of range", tier.name, tier.price_cents )); continue; } }; match db::subscriptions::create_subscription_tier( pool, project_id, &tier.name, tier.description.as_deref(), price, ) .await { Ok(_) => count += 1, Err(e) => { // Check for duplicate (name already exists for project) if e.to_string().contains("23505") { // Unique violation — tier already exists, skip } else { errors.push(format!("Tier '{}': {}", tier.name, e)); } } } } count } /// Import a single item. Returns Ok(true) if created, Ok(false) if skipped. async fn import_item( pool: &PgPool, project_id: ProjectId, item: &super::ImportItem, ) -> Result { let price: i32 = item .price_cents .unwrap_or(0) .try_into() .map_err(|_| { anyhow::anyhow!( "Item '{}': price_cents {} exceeds i32 range", item.title, item.price_cents.unwrap_or(0) ) })?; // Convert HTML body to plain text (simple tag stripping for Phase A) let description = item .body_html .as_deref() .map(strip_html_tags) .or(item.description.clone()); let db_item = db::items::create_item( pool, project_id, &item.title, description.as_deref(), PriceCents::from_db(price), ItemType::Digital, // Default type for imports db::AiTier::Handmade, None, ) .await?; // Publish if the source item was public if item.is_public { sqlx::query("UPDATE items SET published = true, published_at = COALESCE($2, NOW()) WHERE id = $1") .bind(db_item.id) .bind(item.published_at) .execute(pool) .await?; } // Attach tags by slug lookup (dot-notation, e.g. "audio.genre.electronic") for tag_slug in &item.tags { if let Ok(Some(tag)) = db::tags::get_tag_by_slug(pool, tag_slug).await { let _ = db::tags::add_tag_to_item(pool, db_item.id, tag.id, false).await; } } Ok(true) } /// Minimal HTML tag stripper. Converts
,

,

  • to newlines, /// strips all other tags, and collapses whitespace. fn strip_html_tags(html: &str) -> String { let mut result = String::with_capacity(html.len()); let mut in_tag = false; let mut last_was_newline = false; // Replace block-level tags with newlines first let html = html .replace("
    ", "\n") .replace("
    ", "\n") .replace("
    ", "\n") .replace("

    ", "\n") .replace("
  • ", "\n") .replace("", "\n"); for ch in html.chars() { if ch == '<' { in_tag = true; continue; } if ch == '>' { in_tag = false; continue; } if !in_tag { if ch == '\n' { if !last_was_newline { result.push('\n'); last_was_newline = true; } } else { result.push(ch); last_was_newline = false; } } } // Cap output length to guard against multi-MB HTML bodies. const MAX_STRIPPED_LEN: usize = 512 * 1024; // 512 KB let trimmed = result.trim(); if trimmed.len() > MAX_STRIPPED_LEN { // Truncate on a char boundary let mut end = MAX_STRIPPED_LEN; while !trimmed.is_char_boundary(end) { end -= 1; } trimmed[..end].to_string() } else { trimmed.to_string() } } /// Update progress on the import job. async fn update_progress(pool: &PgPool, job_id: ImportJobId, processed: i32, created: i32, skipped: i32) { if let Err(e) = db::imports::update_import_progress(pool, job_id, processed, created, skipped).await { tracing::warn!(error = %e, "failed to update import progress"); } } #[cfg(test)] mod tests { use super::*; #[test] fn strip_html_basic() { assert_eq!(strip_html_tags("

    Hello

    "), "Hello"); } #[test] fn strip_html_br_tags() { assert_eq!(strip_html_tags("line1
    line2"), "line1\nline2"); assert_eq!(strip_html_tags("line1
    line2"), "line1\nline2"); } #[test] fn strip_html_nested() { assert_eq!( strip_html_tags("

    Hello world

    "), "Hello world" ); } #[test] fn strip_html_collapses_newlines() { assert_eq!( strip_html_tags("

    One

    Two

    Three

    "), "One\nTwo\nThree" ); } #[test] fn strip_html_empty() { assert_eq!(strip_html_tags(""), ""); assert_eq!(strip_html_tags("


    "), ""); } }