//! Generic import pipeline: takes an `ImportPayload` and creates MNW entities.
//!
//! Processing happens in chunks of 50 rows with progress updates after each chunk.
//! Individual row failures are logged but don't abort the import.
use sqlx::PgPool;
use super::{ImportPayload, ImportTier};
use crate::db::{self, ImportJobId, ItemType, PriceCents, ProjectId, UserId};
use crate::error::Result;
/// Chunk size for progress updates.
const CHUNK_SIZE: usize = 50;
/// Run the full import pipeline for a job.
///
/// Creates MNW entities (tiers, items, tags, mailing list subscribers) from the
/// intermediate payload. Transactions are recorded as subscriber metadata but
/// not inserted into the `transactions` table (no matching buyer accounts exist
/// for imported email addresses).
#[tracing::instrument(skip_all, name = "import::run_import", fields(job_id = %job_id))]
pub async fn run_import(
pool: &PgPool,
job_id: ImportJobId,
project_id: ProjectId,
_user_id: UserId,
payload: ImportPayload,
) -> Result<()> {
db::imports::update_import_status(pool, job_id, "processing").await?;
let mut processed: i32 = 0;
let mut created: i32 = 0;
let mut skipped: i32 = 0;
let mut errors: Vec = Vec::new();
// ── Phase 1: Tiers ──
let created_tiers = import_tiers(pool, project_id, &payload.tiers, &mut errors).await;
processed += payload.tiers.len() as i32;
created += created_tiers;
skipped += payload.tiers.len() as i32 - created_tiers;
update_progress(pool, job_id, processed, created, skipped).await;
// ── Phase 2: Items ──
for chunk in payload.items.chunks(CHUNK_SIZE) {
for item in chunk {
match import_item(pool, project_id, item).await {
Ok(true) => created += 1,
Ok(false) => skipped += 1,
Err(e) => {
errors.push(format!("Item '{}': {}", item.title, e));
skipped += 1;
}
}
processed += 1;
}
update_progress(pool, job_id, processed, created, skipped).await;
}
// ── Phase 3: Subscribers (mailing list) ──
// Ensure the project has a content mailing list
let list = db::mailing_lists::create_list(
pool,
project_id,
db::MailingListType::Content,
"Imported Subscribers",
Some("Subscribers imported from external platform"),
)
.await?;
for chunk in payload.subscribers.chunks(CHUNK_SIZE) {
for sub in chunk {
match db::mailing_lists::subscribe_by_email(pool, list.id, &sub.email).await {
Ok(true) => created += 1,
Ok(false) => skipped += 1,
Err(e) => {
errors.push(format!("Subscriber '{}': {}", sub.email, e));
skipped += 1;
}
}
processed += 1;
}
update_progress(pool, job_id, processed, created, skipped).await;
}
// ── Phase 4: Transactions (count only, no DB insert) ──
// We can't create real transaction rows because imported buyer emails
// don't correspond to MNW user accounts. We just count them.
processed += payload.transactions.len() as i32;
skipped += payload.transactions.len() as i32;
update_progress(pool, job_id, processed, created, skipped).await;
// ── Finalize ──
let error_log = if errors.is_empty() {
None
} else {
Some(errors.join("\n"))
};
db::imports::complete_import_job(pool, job_id, error_log).await?;
tracing::info!(
processed,
created,
skipped,
error_count = errors.len(),
"import job completed"
);
Ok(())
}
/// Import subscription tiers. Returns number of tiers successfully created.
async fn import_tiers(
pool: &PgPool,
project_id: ProjectId,
tiers: &[ImportTier],
errors: &mut Vec,
) -> i32 {
let mut count = 0;
for tier in tiers {
let price = match tier.price_cents.try_into().ok().and_then(|p: i32| db::PriceCents::new(p).ok()) {
Some(p) => p,
None => {
errors.push(format!(
"Tier '{}': price_cents {} is invalid or out of range",
tier.name, tier.price_cents
));
continue;
}
};
match db::subscriptions::create_subscription_tier(
pool,
project_id,
&tier.name,
tier.description.as_deref(),
price,
)
.await
{
Ok(_) => count += 1,
Err(e) => {
// Check for duplicate (name already exists for project)
if e.to_string().contains("23505") {
// Unique violation — tier already exists, skip
} else {
errors.push(format!("Tier '{}': {}", tier.name, e));
}
}
}
}
count
}
/// Import a single item. Returns Ok(true) if created, Ok(false) if skipped.
async fn import_item(
pool: &PgPool,
project_id: ProjectId,
item: &super::ImportItem,
) -> Result {
let price: i32 = item
.price_cents
.unwrap_or(0)
.try_into()
.map_err(|_| {
anyhow::anyhow!(
"Item '{}': price_cents {} exceeds i32 range",
item.title,
item.price_cents.unwrap_or(0)
)
})?;
// Convert HTML body to plain text (simple tag stripping for Phase A)
let description = item
.body_html
.as_deref()
.map(strip_html_tags)
.or(item.description.clone());
let db_item = db::items::create_item(
pool,
project_id,
&item.title,
description.as_deref(),
PriceCents::from_db(price),
ItemType::Digital, // Default type for imports
db::AiTier::Handmade,
None,
)
.await?;
// Publish if the source item was public
if item.is_public {
sqlx::query("UPDATE items SET published = true, published_at = COALESCE($2, NOW()) WHERE id = $1")
.bind(db_item.id)
.bind(item.published_at)
.execute(pool)
.await?;
}
// Attach tags by slug lookup (dot-notation, e.g. "audio.genre.electronic")
for tag_slug in &item.tags {
if let Ok(Some(tag)) = db::tags::get_tag_by_slug(pool, tag_slug).await {
let _ = db::tags::add_tag_to_item(pool, db_item.id, tag.id, false).await;
}
}
Ok(true)
}
/// Minimal HTML tag stripper. Converts
, ,
to newlines,
/// strips all other tags, and collapses whitespace.
fn strip_html_tags(html: &str) -> String {
let mut result = String::with_capacity(html.len());
let mut in_tag = false;
let mut last_was_newline = false;
// Replace block-level tags with newlines first
let html = html
.replace("
", "\n")
.replace("
", "\n")
.replace("
", "\n")
.replace("
", "\n")
.replace("", "\n")
.replace("", "\n");
for ch in html.chars() {
if ch == '<' {
in_tag = true;
continue;
}
if ch == '>' {
in_tag = false;
continue;
}
if !in_tag {
if ch == '\n' {
if !last_was_newline {
result.push('\n');
last_was_newline = true;
}
} else {
result.push(ch);
last_was_newline = false;
}
}
}
// Cap output length to guard against multi-MB HTML bodies.
const MAX_STRIPPED_LEN: usize = 512 * 1024; // 512 KB
let trimmed = result.trim();
if trimmed.len() > MAX_STRIPPED_LEN {
// Truncate on a char boundary
let mut end = MAX_STRIPPED_LEN;
while !trimmed.is_char_boundary(end) {
end -= 1;
}
trimmed[..end].to_string()
} else {
trimmed.to_string()
}
}
/// Update progress on the import job.
async fn update_progress(pool: &PgPool, job_id: ImportJobId, processed: i32, created: i32, skipped: i32) {
if let Err(e) = db::imports::update_import_progress(pool, job_id, processed, created, skipped).await {
tracing::warn!(error = %e, "failed to update import progress");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn strip_html_basic() {
assert_eq!(strip_html_tags("Hello
"), "Hello");
}
#[test]
fn strip_html_br_tags() {
assert_eq!(strip_html_tags("line1
line2"), "line1\nline2");
assert_eq!(strip_html_tags("line1
line2"), "line1\nline2");
}
#[test]
fn strip_html_nested() {
assert_eq!(
strip_html_tags(""),
"Hello world"
);
}
#[test]
fn strip_html_collapses_newlines() {
assert_eq!(
strip_html_tags("One
Two
Three
"),
"One\nTwo\nThree"
);
}
#[test]
fn strip_html_empty() {
assert_eq!(strip_html_tags(""), "");
assert_eq!(strip_html_tags("
"), "");
}
}