Skip to main content

max / makenotwork

9.2 KB · 307 lines History Blame Raw
1 //! Generic import pipeline: takes an `ImportPayload` and creates MNW entities.
2 //!
3 //! Processing happens in chunks of 50 rows with progress updates after each chunk.
4 //! Individual row failures are logged but don't abort the import.
5
6 use sqlx::PgPool;
7
8 use super::{ImportPayload, ImportTier};
9 use crate::db::{self, ImportJobId, ItemType, PriceCents, ProjectId, UserId};
10 use crate::error::Result;
11
12 /// Chunk size for progress updates.
13 const CHUNK_SIZE: usize = 50;
14
15 /// Run the full import pipeline for a job.
16 ///
17 /// Creates MNW entities (tiers, items, tags, mailing list subscribers) from the
18 /// intermediate payload. Transactions are recorded as subscriber metadata but
19 /// not inserted into the `transactions` table (no matching buyer accounts exist
20 /// for imported email addresses).
21 #[tracing::instrument(skip_all, name = "import::run_import", fields(job_id = %job_id))]
22 pub async fn run_import(
23 pool: &PgPool,
24 job_id: ImportJobId,
25 project_id: ProjectId,
26 _user_id: UserId,
27 payload: ImportPayload,
28 ) -> Result<()> {
29 db::imports::update_import_status(pool, job_id, "processing").await?;
30
31 let mut processed: i32 = 0;
32 let mut created: i32 = 0;
33 let mut skipped: i32 = 0;
34 let mut errors: Vec<String> = Vec::new();
35
36 // ── Phase 1: Tiers ──
37 let created_tiers = import_tiers(pool, project_id, &payload.tiers, &mut errors).await;
38 processed += payload.tiers.len() as i32;
39 created += created_tiers;
40 skipped += payload.tiers.len() as i32 - created_tiers;
41 update_progress(pool, job_id, processed, created, skipped).await;
42
43 // ── Phase 2: Items ──
44 for chunk in payload.items.chunks(CHUNK_SIZE) {
45 for item in chunk {
46 match import_item(pool, project_id, item).await {
47 Ok(true) => created += 1,
48 Ok(false) => skipped += 1,
49 Err(e) => {
50 errors.push(format!("Item '{}': {}", item.title, e));
51 skipped += 1;
52 }
53 }
54 processed += 1;
55 }
56 update_progress(pool, job_id, processed, created, skipped).await;
57 }
58
59 // ── Phase 3: Subscribers (mailing list) ──
60 // Ensure the project has a content mailing list
61 let list = db::mailing_lists::create_list(
62 pool,
63 project_id,
64 db::MailingListType::Content,
65 "Imported Subscribers",
66 Some("Subscribers imported from external platform"),
67 )
68 .await?;
69
70 for chunk in payload.subscribers.chunks(CHUNK_SIZE) {
71 for sub in chunk {
72 match db::mailing_lists::subscribe_by_email(pool, list.id, &sub.email).await {
73 Ok(true) => created += 1,
74 Ok(false) => skipped += 1,
75 Err(e) => {
76 errors.push(format!("Subscriber '{}': {}", sub.email, e));
77 skipped += 1;
78 }
79 }
80 processed += 1;
81 }
82 update_progress(pool, job_id, processed, created, skipped).await;
83 }
84
85 // ── Phase 4: Transactions (count only, no DB insert) ──
86 // We can't create real transaction rows because imported buyer emails
87 // don't correspond to MNW user accounts. We just count them.
88 processed += payload.transactions.len() as i32;
89 skipped += payload.transactions.len() as i32;
90 update_progress(pool, job_id, processed, created, skipped).await;
91
92 // ── Finalize ──
93 let error_log = if errors.is_empty() {
94 None
95 } else {
96 Some(errors.join("\n"))
97 };
98
99 db::imports::complete_import_job(pool, job_id, error_log).await?;
100
101 tracing::info!(
102 processed,
103 created,
104 skipped,
105 error_count = errors.len(),
106 "import job completed"
107 );
108
109 Ok(())
110 }
111
112 /// Import subscription tiers. Returns number of tiers successfully created.
113 async fn import_tiers(
114 pool: &PgPool,
115 project_id: ProjectId,
116 tiers: &[ImportTier],
117 errors: &mut Vec<String>,
118 ) -> i32 {
119 let mut count = 0;
120 for tier in tiers {
121 let price = match tier.price_cents.try_into().ok().and_then(|p: i32| db::PriceCents::new(p).ok()) {
122 Some(p) => p,
123 None => {
124 errors.push(format!(
125 "Tier '{}': price_cents {} is invalid or out of range",
126 tier.name, tier.price_cents
127 ));
128 continue;
129 }
130 };
131 match db::subscriptions::create_subscription_tier(
132 pool,
133 project_id,
134 &tier.name,
135 tier.description.as_deref(),
136 price,
137 )
138 .await
139 {
140 Ok(_) => count += 1,
141 Err(e) => {
142 // Check for duplicate (name already exists for project)
143 if e.to_string().contains("23505") {
144 // Unique violation — tier already exists, skip
145 } else {
146 errors.push(format!("Tier '{}': {}", tier.name, e));
147 }
148 }
149 }
150 }
151 count
152 }
153
154 /// Import a single item. Returns Ok(true) if created, Ok(false) if skipped.
155 async fn import_item(
156 pool: &PgPool,
157 project_id: ProjectId,
158 item: &super::ImportItem,
159 ) -> Result<bool> {
160 let price: i32 = item
161 .price_cents
162 .unwrap_or(0)
163 .try_into()
164 .map_err(|_| {
165 anyhow::anyhow!(
166 "Item '{}': price_cents {} exceeds i32 range",
167 item.title,
168 item.price_cents.unwrap_or(0)
169 )
170 })?;
171
172 // Convert HTML body to plain text (simple tag stripping for Phase A)
173 let description = item
174 .body_html
175 .as_deref()
176 .map(strip_html_tags)
177 .or(item.description.clone());
178
179 let db_item = db::items::create_item(
180 pool,
181 project_id,
182 &item.title,
183 description.as_deref(),
184 PriceCents::from_db(price),
185 ItemType::Digital, // Default type for imports
186 db::AiTier::Handmade,
187 None,
188 )
189 .await?;
190
191 // Publish if the source item was public
192 if item.is_public {
193 sqlx::query("UPDATE items SET published = true, published_at = COALESCE($2, NOW()) WHERE id = $1")
194 .bind(db_item.id)
195 .bind(item.published_at)
196 .execute(pool)
197 .await?;
198 }
199
200 // Attach tags by slug lookup (dot-notation, e.g. "audio.genre.electronic")
201 for tag_slug in &item.tags {
202 if let Ok(Some(tag)) = db::tags::get_tag_by_slug(pool, tag_slug).await {
203 let _ = db::tags::add_tag_to_item(pool, db_item.id, tag.id, false).await;
204 }
205 }
206
207 Ok(true)
208 }
209
210 /// Minimal HTML tag stripper. Converts <br>, <p>, <li> to newlines,
211 /// strips all other tags, and collapses whitespace.
212 fn strip_html_tags(html: &str) -> String {
213 let mut result = String::with_capacity(html.len());
214 let mut in_tag = false;
215 let mut last_was_newline = false;
216
217 // Replace block-level tags with newlines first
218 let html = html
219 .replace("<br>", "\n")
220 .replace("<br/>", "\n")
221 .replace("<br />", "\n")
222 .replace("</p>", "\n")
223 .replace("</li>", "\n")
224 .replace("</div>", "\n");
225
226 for ch in html.chars() {
227 if ch == '<' {
228 in_tag = true;
229 continue;
230 }
231 if ch == '>' {
232 in_tag = false;
233 continue;
234 }
235 if !in_tag {
236 if ch == '\n' {
237 if !last_was_newline {
238 result.push('\n');
239 last_was_newline = true;
240 }
241 } else {
242 result.push(ch);
243 last_was_newline = false;
244 }
245 }
246 }
247
248 // Cap output length to guard against multi-MB HTML bodies.
249 const MAX_STRIPPED_LEN: usize = 512 * 1024; // 512 KB
250 let trimmed = result.trim();
251 if trimmed.len() > MAX_STRIPPED_LEN {
252 // Truncate on a char boundary
253 let mut end = MAX_STRIPPED_LEN;
254 while !trimmed.is_char_boundary(end) {
255 end -= 1;
256 }
257 trimmed[..end].to_string()
258 } else {
259 trimmed.to_string()
260 }
261 }
262
263 /// Update progress on the import job.
264 async fn update_progress(pool: &PgPool, job_id: ImportJobId, processed: i32, created: i32, skipped: i32) {
265 if let Err(e) = db::imports::update_import_progress(pool, job_id, processed, created, skipped).await {
266 tracing::warn!(error = %e, "failed to update import progress");
267 }
268 }
269
270 #[cfg(test)]
271 mod tests {
272 use super::*;
273
274 #[test]
275 fn strip_html_basic() {
276 assert_eq!(strip_html_tags("<p>Hello</p>"), "Hello");
277 }
278
279 #[test]
280 fn strip_html_br_tags() {
281 assert_eq!(strip_html_tags("line1<br>line2"), "line1\nline2");
282 assert_eq!(strip_html_tags("line1<br/>line2"), "line1\nline2");
283 }
284
285 #[test]
286 fn strip_html_nested() {
287 assert_eq!(
288 strip_html_tags("<div><p>Hello <strong>world</strong></p></div>"),
289 "Hello world"
290 );
291 }
292
293 #[test]
294 fn strip_html_collapses_newlines() {
295 assert_eq!(
296 strip_html_tags("<p>One</p><p>Two</p><p>Three</p>"),
297 "One\nTwo\nThree"
298 );
299 }
300
301 #[test]
302 fn strip_html_empty() {
303 assert_eq!(strip_html_tags(""), "");
304 assert_eq!(strip_html_tags("<br><br><br>"), "");
305 }
306 }
307