Skip to main content

max / makenotwork

6.3 KB · 221 lines History Blame Raw
1 //! SSE push notification integration tests.
2 //!
3 //! Tests that call `get_streaming` open infinite SSE connections via oneshot.
4 //! The handler tasks outlive the test, preventing the test binary from exiting.
5 //! These tests are marked `#[ignore]` and must be run individually:
6 //!
7 //! cargo test --test integration synckit_sse -- --ignored --test-threads=1
8
9 use crate::harness::TestHarness;
10 use makenotwork::db::{SyncAppId, SyncDeviceId, UserId};
11 use serde::Deserialize;
12 use serde_json::json;
13 use sqlx::PgPool;
14
15 // ── Response types ──
16
17 #[derive(Deserialize)]
18 struct AuthResponse {
19 token: String,
20 #[allow(dead_code)]
21 user_id: UserId,
22 app_id: SyncAppId,
23 }
24
25 #[derive(Deserialize)]
26 struct DeviceResponse {
27 id: SyncDeviceId,
28 }
29
30 // ── Helpers ──
31
32 async fn create_sync_app_for_user(pool: &PgPool, user_id: UserId) -> (SyncAppId, String) {
33 let api_key = "test-sse-api-key";
34 let key_hash = crate::harness::hash_api_key(api_key);
35 let key_prefix = &api_key[..8];
36 let app_id: SyncAppId = sqlx::query_scalar(
37 "INSERT INTO sync_apps (creator_id, name, api_key_hash, api_key_prefix) VALUES ($1, 'SSE Test App', $2, $3) RETURNING id",
38 )
39 .bind(user_id)
40 .bind(&key_hash)
41 .bind(key_prefix)
42 .fetch_one(pool)
43 .await
44 .expect("Failed to create sync app");
45 (app_id, api_key.to_string())
46 }
47
48 async fn setup_authenticated(h: &mut TestHarness) -> (String, SyncAppId) {
49 let user_id = h.signup("sse_user", "sse@example.com", "Password1!").await;
50 let (_app_id, api_key) = create_sync_app_for_user(&h.db, user_id).await;
51
52 let resp = h
53 .client
54 .post_json(
55 "/api/sync/auth",
56 &json!({
57 "email": "sse@example.com",
58 "password": "Password1!",
59 "api_key": api_key,
60 "key": "test-sdk-key",
61 })
62 .to_string(),
63 )
64 .await;
65 assert_eq!(resp.status, 200, "Auth failed: {}", resp.text);
66
67 let auth: AuthResponse = resp.json();
68 h.client.set_bearer_token(&auth.token);
69
70 (auth.token, auth.app_id)
71 }
72
73 async fn register_device(h: &mut TestHarness, name: &str) -> SyncDeviceId {
74 let resp = h
75 .client
76 .post_json(
77 "/api/sync/devices",
78 &json!({ "device_name": name, "platform": "macos" }).to_string(),
79 )
80 .await;
81 assert_eq!(resp.status, 200, "Register device failed: {}", resp.text);
82 let dev: DeviceResponse = resp.json();
83 dev.id
84 }
85
86 // ── Tests ──
87
88 #[tokio::test]
89 #[ignore] // Opens infinite SSE stream — run separately
90 async fn sse_subscribe_returns_event_stream() {
91 let mut h = TestHarness::new().await;
92 let (_token, app_id) = setup_authenticated(&mut h).await;
93
94 let resp = h
95 .client
96 .get_streaming(&format!("/api/sync/subscribe?app_id={}", app_id))
97 .await;
98
99 assert_eq!(resp.status, 200, "SSE subscribe failed");
100 let content_type = resp.header("content-type").unwrap_or("");
101 assert!(
102 content_type.contains("text/event-stream"),
103 "Expected text/event-stream, got: {}",
104 content_type
105 );
106 }
107
108 #[tokio::test]
109 async fn sse_subscribe_without_jwt_returns_401() {
110 let mut h = TestHarness::new().await;
111
112 let app_id = SyncAppId::new();
113 let resp = h
114 .client
115 .get(&format!("/api/sync/subscribe?app_id={}", app_id))
116 .await;
117 assert_eq!(resp.status, 401);
118 }
119
120 #[tokio::test]
121 async fn sse_subscribe_mismatched_app_id_rejected() {
122 let mut h = TestHarness::new().await;
123 setup_authenticated(&mut h).await;
124
125 let wrong_app_id = SyncAppId::new();
126 let resp = h
127 .client
128 .get(&format!("/api/sync/subscribe?app_id={}", wrong_app_id))
129 .await;
130 assert_eq!(
131 resp.status, 400,
132 "Expected 400 for mismatched app_id: {}",
133 resp.text
134 );
135 }
136
137 #[tokio::test]
138 #[ignore] // Opens infinite SSE stream — run separately
139 async fn push_triggers_broadcast_notification() {
140 let mut h = TestHarness::new().await;
141 let user_id = h.signup("notif_user", "notif@example.com", "Password1!").await;
142 let (app_id, api_key) = create_sync_app_for_user(&h.db, user_id).await;
143
144 let resp = h
145 .client
146 .post_json(
147 "/api/sync/auth",
148 &json!({
149 "email": "notif@example.com",
150 "password": "Password1!",
151 "api_key": api_key,
152 "key": "test-sdk-key",
153 })
154 .to_string(),
155 )
156 .await;
157 assert_eq!(resp.status, 200);
158 let auth: AuthResponse = resp.json();
159 h.client.set_bearer_token(&auth.token);
160 let device_id = register_device(&mut h, "TestDevice").await;
161
162 let resp = h
163 .client
164 .get_streaming(&format!("/api/sync/subscribe?app_id={}", app_id))
165 .await;
166 assert_eq!(resp.status, 200);
167
168 let resp = h
169 .client
170 .post_json(
171 "/api/sync/push",
172 &json!({
173 "device_id": device_id,
174 "changes": [{
175 "table": "tasks",
176 "op": "INSERT",
177 "row_id": "sse-test-1",
178 "timestamp": "2025-01-01T00:00:00Z",
179 "data": {"title": "SSE test"}
180 }]
181 })
182 .to_string(),
183 )
184 .await;
185 assert_eq!(resp.status, 200, "Push failed: {}", resp.text);
186 }
187
188 #[tokio::test]
189 #[ignore] // Opens infinite SSE stream — run separately
190 async fn multiple_subscribers_receive_events() {
191 let mut h = TestHarness::new().await;
192 let (_token, app_id) = setup_authenticated(&mut h).await;
193
194 let resp1 = h
195 .client
196 .get_streaming(&format!("/api/sync/subscribe?app_id={}", app_id))
197 .await;
198 assert_eq!(resp1.status, 200);
199
200 let resp2 = h
201 .client
202 .get_streaming(&format!("/api/sync/subscribe?app_id={}", app_id))
203 .await;
204 assert_eq!(resp2.status, 200);
205 }
206
207 #[tokio::test]
208 #[ignore] // Opens infinite SSE stream — run separately
209 async fn sse_stream_includes_keepalive_header() {
210 let mut h = TestHarness::new().await;
211 let (_token, app_id) = setup_authenticated(&mut h).await;
212
213 let resp = h
214 .client
215 .get_streaming(&format!("/api/sync/subscribe?app_id={}", app_id))
216 .await;
217 assert_eq!(resp.status, 200);
218 let content_type = resp.header("content-type").unwrap_or("");
219 assert!(content_type.contains("text/event-stream"));
220 }
221