max / makenotwork
1 file changed,
+15 insertions,
-1 deletion
| @@ -116,6 +116,11 @@ pub fn spawn_pool(n: usize, ctx: Arc<WorkerContext>, shutdown_rx: tokio::sync::w | |||
| 116 | 116 | ||
| 117 | 117 | /// Run a single scan job end-to-end. On success the job is marked done; the | |
| 118 | 118 | /// caller marks failed if this returns an error. | |
| 119 | + | /// | |
| 120 | + | /// On pipeline error (e.g. S3 download failure), reset the entity from | |
| 121 | + | /// Scanning back to HeldForReview before bubbling the error up. Otherwise | |
| 122 | + | /// the entity stays stuck at Scanning forever — a real regression we hit | |
| 123 | + | /// in production with stale s3_keys. | |
| 119 | 124 | #[tracing::instrument(skip_all, fields(%job_id = job.id, target_kind = %job.target_kind, %target_id = job.target_id, attempts = job.attempts))] | |
| 120 | 125 | async fn process_job(ctx: &WorkerContext, job: ScanJob) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { | |
| 121 | 126 | let job_id = job.id; | |
| @@ -128,7 +133,16 @@ async fn process_job(ctx: &WorkerContext, job: ScanJob) -> Result<(), Box<dyn st | |||
| 128 | 133 | // queue panel. | |
| 129 | 134 | update_entity_status(&ctx.db, kind, target_id, FileScanStatus::Scanning).await.ok(); | |
| 130 | 135 | ||
| 131 | - | let entity_status = run_pipeline_and_decide(ctx, &job, file_type).await?; | |
| 136 | + | let entity_status = match run_pipeline_and_decide(ctx, &job, file_type).await { | |
| 137 | + | Ok(s) => s, | |
| 138 | + | Err(e) => { | |
| 139 | + | // Pipeline blew up — most often a stale s3_key. Reset entity to | |
| 140 | + | // HeldForReview so admins see it on the dashboard and decide | |
| 141 | + | // whether to delete the orphan record. | |
| 142 | + | update_entity_status(&ctx.db, kind, target_id, FileScanStatus::HeldForReview).await.ok(); | |
| 143 | + | return Err(e); | |
| 144 | + | } | |
| 145 | + | }; | |
| 132 | 146 | update_entity_status(&ctx.db, kind, target_id, entity_status).await?; | |
| 133 | 147 | ||
| 134 | 148 | db::scan_jobs::mark_done(&ctx.db, job_id).await?; |