max / makenotwork
1 file changed,
+17 insertions,
-10 deletions
| @@ -35,19 +35,26 @@ struct SseConnectionGuard { | |||
| 35 | 35 | ||
| 36 | 36 | impl Drop for SseConnectionGuard { | |
| 37 | 37 | fn drop(&mut self) { | |
| 38 | - | if let Some(counter) = self.sse_connections.get(&self.user_id) { | |
| 38 | + | // Decrement connection counter, then remove entry if it hit zero. | |
| 39 | + | // Must drop the read guard before calling remove() to avoid deadlocking | |
| 40 | + | // on the same DashMap shard. | |
| 41 | + | let should_remove_connection = self.sse_connections.get(&self.user_id).map(|counter| { | |
| 39 | 42 | let prev = counter.value().fetch_sub(1, Ordering::AcqRel); | |
| 40 | - | // If this was the last connection for this user, remove the entry | |
| 41 | - | if prev <= 1 { | |
| 42 | - | self.sse_connections.remove(&self.user_id); | |
| 43 | - | } | |
| 43 | + | prev <= 1 | |
| 44 | + | }).unwrap_or(false); | |
| 45 | + | ||
| 46 | + | if should_remove_connection { | |
| 47 | + | self.sse_connections.remove(&self.user_id); | |
| 44 | 48 | } | |
| 45 | - | // Prune sync_notify channel if no receivers remain | |
| 49 | + | ||
| 50 | + | // Prune sync_notify channel if no receivers remain. | |
| 51 | + | // Same pattern: read first, drop guard, then remove. | |
| 46 | 52 | let key = (self.app_id, self.user_id); | |
| 47 | - | if let Some(entry) = self.sync_notify.get(&key) | |
| 48 | - | && entry.value().receiver_count() == 0 | |
| 49 | - | { | |
| 50 | - | drop(entry); | |
| 53 | + | let should_remove_notify = self.sync_notify.get(&key) | |
| 54 | + | .map(|entry| entry.value().receiver_count() == 0) | |
| 55 | + | .unwrap_or(false); | |
| 56 | + | ||
| 57 | + | if should_remove_notify { | |
| 51 | 58 | self.sync_notify.remove(&key); | |
| 52 | 59 | } | |
| 53 | 60 | } |