//! SSE push notification integration tests. //! //! Tests that call `get_streaming` open infinite SSE connections via oneshot. //! The handler tasks outlive the test, preventing the test binary from exiting. //! These tests are marked `#[ignore]` and must be run individually: //! //! cargo test --test integration synckit_sse -- --ignored --test-threads=1 use crate::harness::TestHarness; use makenotwork::db::{SyncAppId, SyncDeviceId, UserId}; use serde::Deserialize; use serde_json::json; use sqlx::PgPool; // ── Response types ── #[derive(Deserialize)] struct AuthResponse { token: String, #[allow(dead_code)] user_id: UserId, app_id: SyncAppId, } #[derive(Deserialize)] struct DeviceResponse { id: SyncDeviceId, } // ── Helpers ── async fn create_sync_app_for_user(pool: &PgPool, user_id: UserId) -> (SyncAppId, String) { let api_key = "test-sse-api-key"; let key_hash = crate::harness::hash_api_key(api_key); let key_prefix = &api_key[..8]; let app_id: SyncAppId = sqlx::query_scalar( "INSERT INTO sync_apps (creator_id, name, api_key_hash, api_key_prefix) VALUES ($1, 'SSE Test App', $2, $3) RETURNING id", ) .bind(user_id) .bind(&key_hash) .bind(key_prefix) .fetch_one(pool) .await .expect("Failed to create sync app"); (app_id, api_key.to_string()) } async fn setup_authenticated(h: &mut TestHarness) -> (String, SyncAppId) { let user_id = h.signup("sse_user", "sse@example.com", "Password1!").await; let (_app_id, api_key) = create_sync_app_for_user(&h.db, user_id).await; let resp = h .client .post_json( "/api/sync/auth", &json!({ "email": "sse@example.com", "password": "Password1!", "api_key": api_key, "key": "test-sdk-key", }) .to_string(), ) .await; assert_eq!(resp.status, 200, "Auth failed: {}", resp.text); let auth: AuthResponse = resp.json(); h.client.set_bearer_token(&auth.token); (auth.token, auth.app_id) } async fn register_device(h: &mut TestHarness, name: &str) -> SyncDeviceId { let resp = h .client .post_json( "/api/sync/devices", &json!({ "device_name": name, "platform": "macos" }).to_string(), ) .await; assert_eq!(resp.status, 200, "Register device failed: {}", resp.text); let dev: DeviceResponse = resp.json(); dev.id } // ── Tests ── #[tokio::test] #[ignore] // Opens infinite SSE stream — run separately async fn sse_subscribe_returns_event_stream() { let mut h = TestHarness::new().await; let (_token, app_id) = setup_authenticated(&mut h).await; let resp = h .client .get_streaming(&format!("/api/sync/subscribe?app_id={}", app_id)) .await; assert_eq!(resp.status, 200, "SSE subscribe failed"); let content_type = resp.header("content-type").unwrap_or(""); assert!( content_type.contains("text/event-stream"), "Expected text/event-stream, got: {}", content_type ); } #[tokio::test] async fn sse_subscribe_without_jwt_returns_401() { let mut h = TestHarness::new().await; let app_id = SyncAppId::new(); let resp = h .client .get(&format!("/api/sync/subscribe?app_id={}", app_id)) .await; assert_eq!(resp.status, 401); } #[tokio::test] async fn sse_subscribe_mismatched_app_id_rejected() { let mut h = TestHarness::new().await; setup_authenticated(&mut h).await; let wrong_app_id = SyncAppId::new(); let resp = h .client .get(&format!("/api/sync/subscribe?app_id={}", wrong_app_id)) .await; assert_eq!( resp.status, 400, "Expected 400 for mismatched app_id: {}", resp.text ); } #[tokio::test] #[ignore] // Opens infinite SSE stream — run separately async fn push_triggers_broadcast_notification() { let mut h = TestHarness::new().await; let user_id = h.signup("notif_user", "notif@example.com", "Password1!").await; let (app_id, api_key) = create_sync_app_for_user(&h.db, user_id).await; let resp = h .client .post_json( "/api/sync/auth", &json!({ "email": "notif@example.com", "password": "Password1!", "api_key": api_key, "key": "test-sdk-key", }) .to_string(), ) .await; assert_eq!(resp.status, 200); let auth: AuthResponse = resp.json(); h.client.set_bearer_token(&auth.token); let device_id = register_device(&mut h, "TestDevice").await; let resp = h .client .get_streaming(&format!("/api/sync/subscribe?app_id={}", app_id)) .await; assert_eq!(resp.status, 200); let resp = h .client .post_json( "/api/sync/push", &json!({ "device_id": device_id, "changes": [{ "table": "tasks", "op": "INSERT", "row_id": "sse-test-1", "timestamp": "2025-01-01T00:00:00Z", "data": {"title": "SSE test"} }] }) .to_string(), ) .await; assert_eq!(resp.status, 200, "Push failed: {}", resp.text); } #[tokio::test] #[ignore] // Opens infinite SSE stream — run separately async fn multiple_subscribers_receive_events() { let mut h = TestHarness::new().await; let (_token, app_id) = setup_authenticated(&mut h).await; let resp1 = h .client .get_streaming(&format!("/api/sync/subscribe?app_id={}", app_id)) .await; assert_eq!(resp1.status, 200); let resp2 = h .client .get_streaming(&format!("/api/sync/subscribe?app_id={}", app_id)) .await; assert_eq!(resp2.status, 200); } #[tokio::test] #[ignore] // Opens infinite SSE stream — run separately async fn sse_stream_includes_keepalive_header() { let mut h = TestHarness::new().await; let (_token, app_id) = setup_authenticated(&mut h).await; let resp = h .client .get_streaming(&format!("/api/sync/subscribe?app_id={}", app_id)) .await; assert_eq!(resp.status, 200); let content_type = resp.header("content-type").unwrap_or(""); assert!(content_type.contains("text/event-stream")); }