| 415 |
415 |
|
State(state): State<AppState>,
|
| 416 |
416 |
|
AdminUser(admin): AdminUser,
|
| 417 |
417 |
|
) -> Result<Response> {
|
|
418 |
+ |
// Clear the backlog with bounded concurrency instead of one sequential
|
|
419 |
+ |
// round-trip per candidate (Run #21 Performance MODERATE). MAX_INFLIGHT
|
|
420 |
+ |
// caps how many connections this admin action borrows from the pool at once.
|
|
421 |
+ |
const MAX_INFLIGHT: usize = 8;
|
|
422 |
+ |
let mut set: tokio::task::JoinSet<bool> = tokio::task::JoinSet::new();
|
| 418 |
423 |
|
let mut total = 0usize;
|
| 419 |
424 |
|
|
| 420 |
425 |
|
for cand in db::scan_jobs::rescan_candidates_versions(&state.db).await? {
|
| 421 |
|
- |
let id = VersionId::from_uuid(cand.version_id);
|
| 422 |
|
- |
if rescan_version_inner(&state, id, admin.id, AdminAction::BulkRescan).await.is_ok() {
|
| 423 |
|
- |
total += 1;
|
|
426 |
+ |
if set.len() >= MAX_INFLIGHT && let Some(Ok(ok)) = set.join_next().await {
|
|
427 |
+ |
total += ok as usize;
|
| 424 |
428 |
|
}
|
|
429 |
+ |
let state = state.clone();
|
|
430 |
+ |
let admin_id = admin.id;
|
|
431 |
+ |
set.spawn(async move {
|
|
432 |
+ |
rescan_version_inner(&state, VersionId::from_uuid(cand.version_id), admin_id, AdminAction::BulkRescan)
|
|
433 |
+ |
.await
|
|
434 |
+ |
.is_ok()
|
|
435 |
+ |
});
|
| 425 |
436 |
|
}
|
| 426 |
437 |
|
for cand in db::scan_jobs::rescan_candidates_items(&state.db).await? {
|
| 427 |
|
- |
let id = ItemId::from_uuid(cand.item_id);
|
| 428 |
|
- |
if rescan_item_inner(&state, id, admin.id, AdminAction::BulkRescan).await.is_ok() {
|
| 429 |
|
- |
total += 1;
|
|
438 |
+ |
if set.len() >= MAX_INFLIGHT && let Some(Ok(ok)) = set.join_next().await {
|
|
439 |
+ |
total += ok as usize;
|
|
440 |
+ |
}
|
|
441 |
+ |
let state = state.clone();
|
|
442 |
+ |
let admin_id = admin.id;
|
|
443 |
+ |
set.spawn(async move {
|
|
444 |
+ |
rescan_item_inner(&state, ItemId::from_uuid(cand.item_id), admin_id, AdminAction::BulkRescan)
|
|
445 |
+ |
.await
|
|
446 |
+ |
.is_ok()
|
|
447 |
+ |
});
|
|
448 |
+ |
}
|
|
449 |
+ |
while let Some(res) = set.join_next().await {
|
|
450 |
+ |
if let Ok(ok) = res {
|
|
451 |
+ |
total += ok as usize;
|
| 430 |
452 |
|
}
|
| 431 |
453 |
|
}
|
| 432 |
454 |
|
tracing::info!(total, admin_id = %admin.id, "bulk rescan of held queue dispatched");
|
| 456 |
478 |
|
|
| 457 |
479 |
|
let held_items = db::scanning::get_held_items(&state.db).await?;
|
| 458 |
480 |
|
let held_versions = db::scanning::get_held_versions(&state.db).await?;
|
|
481 |
+ |
|
|
482 |
+ |
// Bounded-concurrency promote (Run #21 Performance MODERATE). The note is
|
|
483 |
+ |
// shared across tasks via Arc; MAX_INFLIGHT caps borrowed connections.
|
|
484 |
+ |
const MAX_INFLIGHT: usize = 8;
|
|
485 |
+ |
let note: std::sync::Arc<str> = note.into();
|
|
486 |
+ |
let mut set: tokio::task::JoinSet<bool> = tokio::task::JoinSet::new();
|
| 459 |
487 |
|
let mut total = 0usize;
|
| 460 |
488 |
|
|
| 461 |
|
- |
for v in &held_versions {
|
| 462 |
|
- |
if db::scanning::update_version_scan_status(&state.db, v.version_id, FileScanStatus::Clean).await.is_ok() {
|
| 463 |
|
- |
db::scan_admin_actions::log_version(
|
| 464 |
|
- |
&state.db, v.version_id, admin.id, AdminAction::BulkPromote,
|
| 465 |
|
- |
Some("held_for_review"), Some("clean"), Some(note),
|
| 466 |
|
- |
).await.ok();
|
| 467 |
|
- |
total += 1;
|
|
489 |
+ |
for v in held_versions {
|
|
490 |
+ |
if set.len() >= MAX_INFLIGHT && let Some(Ok(ok)) = set.join_next().await {
|
|
491 |
+ |
total += ok as usize;
|
| 468 |
492 |
|
}
|
|
493 |
+ |
let state = state.clone();
|
|
494 |
+ |
let note = note.clone();
|
|
495 |
+ |
let admin_id = admin.id;
|
|
496 |
+ |
let vid = v.version_id;
|
|
497 |
+ |
set.spawn(async move {
|
|
498 |
+ |
if db::scanning::update_version_scan_status(&state.db, vid, FileScanStatus::Clean).await.is_ok() {
|
|
499 |
+ |
db::scan_admin_actions::log_version(
|
|
500 |
+ |
&state.db, vid, admin_id, AdminAction::BulkPromote,
|
|
501 |
+ |
Some("held_for_review"), Some("clean"), Some(¬e),
|
|
502 |
+ |
).await.ok();
|
|
503 |
+ |
true
|
|
504 |
+ |
} else {
|
|
505 |
+ |
false
|
|
506 |
+ |
}
|
|
507 |
+ |
});
|
|
508 |
+ |
}
|
|
509 |
+ |
for i in held_items {
|
|
510 |
+ |
if set.len() >= MAX_INFLIGHT && let Some(Ok(ok)) = set.join_next().await {
|
|
511 |
+ |
total += ok as usize;
|
|
512 |
+ |
}
|
|
513 |
+ |
let state = state.clone();
|
|
514 |
+ |
let note = note.clone();
|
|
515 |
+ |
let admin_id = admin.id;
|
|
516 |
+ |
let iid = i.item_id;
|
|
517 |
+ |
set.spawn(async move {
|
|
518 |
+ |
if db::scanning::update_item_scan_status(&state.db, iid, FileScanStatus::Clean).await.is_ok() {
|
|
519 |
+ |
db::scan_admin_actions::log_item(
|
|
520 |
+ |
&state.db, iid, admin_id, AdminAction::BulkPromote,
|
|
521 |
+ |
Some("held_for_review"), Some("clean"), Some(¬e),
|
|
522 |
+ |
).await.ok();
|
|
523 |
+ |
true
|
|
524 |
+ |
} else {
|
|
525 |
+ |
false
|
|
526 |
+ |
}
|
|
527 |
+ |
});
|
| 469 |
528 |
|
}
|
| 470 |
|
- |
for i in &held_items {
|
| 471 |
|
- |
if db::scanning::update_item_scan_status(&state.db, i.item_id, FileScanStatus::Clean).await.is_ok() {
|
| 472 |
|
- |
db::scan_admin_actions::log_item(
|
| 473 |
|
- |
&state.db, i.item_id, admin.id, AdminAction::BulkPromote,
|
| 474 |
|
- |
Some("held_for_review"), Some("clean"), Some(note),
|
| 475 |
|
- |
).await.ok();
|
| 476 |
|
- |
total += 1;
|
|
529 |
+ |
while let Some(res) = set.join_next().await {
|
|
530 |
+ |
if let Ok(ok) = res {
|
|
531 |
+ |
total += ok as usize;
|
| 477 |
532 |
|
}
|
| 478 |
533 |
|
}
|
| 479 |
534 |
|
tracing::warn!(total, admin_id = %admin.id, note = %note, "bulk promote of held queue executed");
|