//! Session tracking for remote revocation. use sqlx::PgPool; use super::{DbUserSession, UserId, UserSessionId}; use crate::error::Result; /// Insert a tracked session row and return its ID. #[tracing::instrument(skip_all)] pub async fn create_user_session( pool: &PgPool, user_id: UserId, user_agent: Option<&str>, ip_address: Option<&str>, ) -> Result { let row = sqlx::query_scalar::<_, UserSessionId>( "INSERT INTO user_sessions (user_id, user_agent, ip_address) VALUES ($1, $2, $3) RETURNING id", ) .bind(user_id) .bind(user_agent) .bind(ip_address) .fetch_one(pool) .await?; Ok(row) } /// Insert a `kind='pending_2fa'` row for an intermediate session held between /// the password step and the TOTP/backup-code step. Exposed to /// `delete_all_sessions_for_user` so "log out everywhere" sweeps a phisher /// who has the password but not the second factor. #[tracing::instrument(skip_all)] pub async fn create_pending_2fa_session( pool: &PgPool, user_id: UserId, user_agent: Option<&str>, ip_address: Option<&str>, ) -> Result { let row = sqlx::query_scalar::<_, UserSessionId>( "INSERT INTO user_sessions (user_id, user_agent, ip_address, kind) VALUES ($1, $2, $3, 'pending_2fa') RETURNING id", ) .bind(user_id) .bind(user_agent) .bind(ip_address) .fetch_one(pool) .await?; Ok(row) } /// Confirm the pending_2fa tracking row is still present (i.e. wasn't swept /// by `delete_all_sessions_for_user` while the user was at the TOTP prompt). #[tracing::instrument(skip_all)] pub async fn pending_2fa_session_exists( pool: &PgPool, id: UserSessionId, user_id: UserId, ) -> Result { let exists: bool = sqlx::query_scalar( "SELECT EXISTS(SELECT 1 FROM user_sessions WHERE id = $1 AND user_id = $2 AND kind = 'pending_2fa')", ) .bind(id) .bind(user_id) .fetch_one(pool) .await?; Ok(exists) } /// Delete a pending_2fa tracking row. Called when 2FA succeeds (the caller /// then `track_session`s a fresh 'active' row) or when the pending state is /// cleared (expiry, account lockout, navigation away). #[tracing::instrument(skip_all)] pub async fn delete_pending_2fa_session(pool: &PgPool, id: UserSessionId) -> Result<()> { sqlx::query("DELETE FROM user_sessions WHERE id = $1 AND kind = 'pending_2fa'") .bind(id) .execute(pool) .await?; Ok(()) } /// Result of touching a session: whether it exists and the user's current /// suspended status (live from the `users` table, not cached in the session). pub struct TouchResult { /// `false` if the session row was deleted (revoked). pub valid: bool, /// Current `suspended_at IS NOT NULL` from the users table. /// Only meaningful when `valid` is `true`. pub suspended: bool, /// Current `can_create_projects` from the users table. /// Only meaningful when `valid` is `true`. pub can_create_projects: bool, /// Whether the user has an active Fan+ subscription. pub is_fan_plus: bool, /// Active creator tier name (e.g. "SmallFiles"), or None. pub creator_tier: Option, } /// Update `last_active_at`, confirm the session still exists, and return /// the user's current `suspended` status from the `users` table. /// /// This ensures suspension takes effect immediately even if the session /// was created before the admin suspended the user. #[tracing::instrument(skip_all)] pub async fn touch_session(pool: &PgPool, session_id: UserSessionId) -> Result { // Single query: update last_active_at, join users for live status, and check // fan_plus + creator_tier via subqueries (avoids 2 extra round-trips in auth extractor). let row = sqlx::query_as::<_, (bool, bool, bool, Option)>( r#" UPDATE user_sessions us SET last_active_at = NOW() FROM users u WHERE us.id = $1 AND u.id = us.user_id RETURNING u.suspended_at IS NOT NULL, u.can_create_projects, EXISTS(SELECT 1 FROM fan_plus_subscriptions fps WHERE fps.user_id = u.id AND fps.status = 'active'), (SELECT cs.tier FROM creator_subscriptions cs WHERE cs.user_id = u.id AND cs.status = 'active') "#, ) .bind(session_id) .fetch_optional(pool) .await?; match row { Some((suspended, can_create_projects, is_fan_plus, creator_tier)) => Ok(TouchResult { valid: true, suspended, can_create_projects, is_fan_plus, creator_tier, }), None => Ok(TouchResult { valid: false, suspended: false, can_create_projects: false, is_fan_plus: false, creator_tier: None }), } } /// List all active sessions for a user, newest first. #[tracing::instrument(skip_all)] pub async fn get_user_sessions(pool: &PgPool, user_id: UserId) -> Result> { let sessions = sqlx::query_as::<_, DbUserSession>( "SELECT id, user_id, created_at, last_active_at, user_agent, ip_address FROM user_sessions WHERE user_id = $1 ORDER BY last_active_at DESC LIMIT 100", ) .bind(user_id) .fetch_all(pool) .await?; Ok(sessions) } /// Count active sessions for a user. #[tracing::instrument(skip_all)] pub async fn count_user_sessions(pool: &PgPool, user_id: UserId) -> Result { let count = sqlx::query_scalar::<_, i64>( "SELECT COUNT(*) FROM user_sessions WHERE user_id = $1", ) .bind(user_id) .fetch_one(pool) .await?; Ok(count) } /// Delete a single session, scoped to the owning user. Returns `true` if deleted. #[tracing::instrument(skip_all)] pub async fn delete_user_session( pool: &PgPool, session_id: UserSessionId, user_id: UserId, ) -> Result { let rows = sqlx::query( "DELETE FROM user_sessions WHERE id = $1 AND user_id = $2", ) .bind(session_id) .bind(user_id) .execute(pool) .await?; Ok(rows.rows_affected() > 0) } /// Delete a session row, scoped to a specific user. /// /// The user scoping isn't strictly required for correctness in the current /// caller (logout reads its own tracking ID out of the session and we /// trust that), but the unscoped signature was an easy footgun — anyone /// who later wired this up with an attacker-controllable session_id could /// delete arbitrary rows. Requiring user_id in the signature keeps the /// SQL pinned to "this user, this row" so that misuse fails fast. #[tracing::instrument(skip_all)] pub async fn delete_session_by_id( pool: &PgPool, session_id: UserSessionId, user_id: UserId, ) -> Result { let rows = sqlx::query("DELETE FROM user_sessions WHERE id = $1 AND user_id = $2") .bind(session_id) .bind(user_id) .execute(pool) .await?; Ok(rows.rows_affected() > 0) } /// Delete expired session records (inactive longer than the given threshold). /// Returns the number of rows removed. #[tracing::instrument(skip_all)] pub async fn prune_expired_sessions(pool: &PgPool, stale_threshold: chrono::DateTime) -> Result { let result = sqlx::query( "DELETE FROM user_sessions WHERE last_active_at < $1", ) .bind(stale_threshold) .execute(pool) .await?; Ok(result.rows_affected()) } /// Delete all sessions for a user except the current one. Returns count deleted. #[tracing::instrument(skip_all)] pub async fn delete_other_sessions( pool: &PgPool, current_session_id: UserSessionId, user_id: UserId, ) -> Result> { let ids: Vec = sqlx::query_scalar( "DELETE FROM user_sessions WHERE user_id = $1 AND id != $2 RETURNING id", ) .bind(user_id) .bind(current_session_id) .fetch_all(pool) .await?; Ok(ids) } /// Delete ALL sessions for a user. Returns the deleted session IDs (for cache eviction). /// /// Also bumps `users.jwt_invalidated_at` — without this, a stolen SyncKit JWT /// would survive a "log out everywhere" until its natural expiry. Both writes /// run in a single transaction so a partial failure can't leave the JWTs alive /// after the session rows are gone. #[tracing::instrument(skip_all)] pub async fn delete_all_sessions_for_user( pool: &PgPool, user_id: UserId, ) -> Result> { let mut tx = pool.begin().await?; let ids: Vec = sqlx::query_scalar( "DELETE FROM user_sessions WHERE user_id = $1 RETURNING id", ) .bind(user_id) .fetch_all(&mut *tx) .await?; sqlx::query("UPDATE users SET jwt_invalidated_at = NOW() WHERE id = $1") .bind(user_id) .execute(&mut *tx) .await?; tx.commit().await?; Ok(ids) }