| 1 |
use chrono::{DateTime, Utc}; |
| 2 |
use sqlx::PgPool; |
| 3 |
|
| 4 |
use crate::db::{SyncAppId, UserId}; |
| 5 |
use crate::error::Result; |
| 6 |
|
| 7 |
|
| 8 |
pub struct NewAppSyncSubscription<'a> { |
| 9 |
pub user_id: UserId, |
| 10 |
pub app_id: SyncAppId, |
| 11 |
pub stripe_subscription_id: &'a str, |
| 12 |
pub stripe_customer_id: &'a str, |
| 13 |
|
| 14 |
|
| 15 |
|
| 16 |
pub interval: &'a str, |
| 17 |
pub storage_limit_bytes: i64, |
| 18 |
} |
| 19 |
|
| 20 |
|
| 21 |
#[derive(Debug, sqlx::FromRow)] |
| 22 |
pub struct DbAppSyncSubscription { |
| 23 |
pub stripe_subscription_id: String, |
| 24 |
pub interval: String, |
| 25 |
pub status: String, |
| 26 |
pub storage_limit_bytes: Option<i64>, |
| 27 |
pub pending_storage_limit_bytes: Option<i64>, |
| 28 |
pub current_period_end: Option<DateTime<Utc>>, |
| 29 |
} |
| 30 |
|
| 31 |
|
| 32 |
|
| 33 |
#[tracing::instrument(skip_all)] |
| 34 |
pub async fn get_user_app_subscription( |
| 35 |
pool: &PgPool, |
| 36 |
user_id: UserId, |
| 37 |
app_id: SyncAppId, |
| 38 |
) -> Result<Option<DbAppSyncSubscription>> { |
| 39 |
let row = sqlx::query_as::<_, DbAppSyncSubscription>( |
| 40 |
r#" |
| 41 |
SELECT stripe_subscription_id, |
| 42 |
tier AS interval, |
| 43 |
status, |
| 44 |
storage_limit_bytes, |
| 45 |
pending_storage_limit_bytes, |
| 46 |
current_period_end |
| 47 |
FROM app_sync_subscriptions |
| 48 |
WHERE user_id = $1 AND app_id = $2 |
| 49 |
"#, |
| 50 |
) |
| 51 |
.bind(user_id) |
| 52 |
.bind(app_id) |
| 53 |
.fetch_optional(pool) |
| 54 |
.await?; |
| 55 |
Ok(row) |
| 56 |
} |
| 57 |
|
| 58 |
|
| 59 |
|
| 60 |
|
| 61 |
|
| 62 |
|
| 63 |
|
| 64 |
|
| 65 |
|
| 66 |
|
| 67 |
|
| 68 |
|
| 69 |
|
| 70 |
|
| 71 |
#[tracing::instrument(skip_all)] |
| 72 |
pub async fn create_app_sync_subscription( |
| 73 |
pool: &PgPool, |
| 74 |
sub: &NewAppSyncSubscription<'_>, |
| 75 |
) -> Result<bool> { |
| 76 |
let result = sqlx::query( |
| 77 |
r#" |
| 78 |
INSERT INTO app_sync_subscriptions |
| 79 |
(user_id, app_id, stripe_subscription_id, stripe_customer_id, |
| 80 |
tier, status, storage_limit_bytes) |
| 81 |
VALUES ($1, $2, $3, $4, $5, 'active', $6) |
| 82 |
ON CONFLICT (user_id, app_id) DO UPDATE |
| 83 |
SET stripe_subscription_id = EXCLUDED.stripe_subscription_id, |
| 84 |
stripe_customer_id = EXCLUDED.stripe_customer_id, |
| 85 |
tier = EXCLUDED.tier, |
| 86 |
storage_limit_bytes = EXCLUDED.storage_limit_bytes, |
| 87 |
status = 'active', |
| 88 |
canceled_at = NULL |
| 89 |
WHERE app_sync_subscriptions.stripe_subscription_id != EXCLUDED.stripe_subscription_id |
| 90 |
OR app_sync_subscriptions.status != 'active' |
| 91 |
"#, |
| 92 |
) |
| 93 |
.bind(sub.user_id) |
| 94 |
.bind(sub.app_id) |
| 95 |
.bind(sub.stripe_subscription_id) |
| 96 |
.bind(sub.stripe_customer_id) |
| 97 |
.bind(sub.interval) |
| 98 |
.bind(sub.storage_limit_bytes) |
| 99 |
.execute(pool) |
| 100 |
.await?; |
| 101 |
Ok(result.rows_affected() > 0) |
| 102 |
} |
| 103 |
|
| 104 |
|
| 105 |
|
| 106 |
|
| 107 |
|
| 108 |
|
| 109 |
|
| 110 |
|
| 111 |
#[tracing::instrument(skip_all)] |
| 112 |
pub async fn update_app_sync_subscription_status( |
| 113 |
pool: &PgPool, |
| 114 |
stripe_subscription_id: &str, |
| 115 |
status: &str, |
| 116 |
current_period_end: Option<DateTime<Utc>>, |
| 117 |
) -> Result<()> { |
| 118 |
sqlx::query( |
| 119 |
r#" |
| 120 |
UPDATE app_sync_subscriptions |
| 121 |
SET status = $2, |
| 122 |
current_period_end = COALESCE($3, current_period_end), |
| 123 |
canceled_at = CASE WHEN $2 = 'canceled' THEN NOW() ELSE canceled_at END |
| 124 |
WHERE stripe_subscription_id = $1 |
| 125 |
AND (status != 'canceled' OR $2 = 'canceled') |
| 126 |
"#, |
| 127 |
) |
| 128 |
.bind(stripe_subscription_id) |
| 129 |
.bind(status) |
| 130 |
.bind(current_period_end) |
| 131 |
.execute(pool) |
| 132 |
.await?; |
| 133 |
Ok(()) |
| 134 |
} |
| 135 |
|
| 136 |
|
| 137 |
|
| 138 |
#[tracing::instrument(skip_all)] |
| 139 |
pub async fn get_subscription_by_stripe_id( |
| 140 |
pool: &PgPool, |
| 141 |
stripe_subscription_id: &str, |
| 142 |
) -> Result<Option<(UserId, SyncAppId)>> { |
| 143 |
let row: Option<(UserId, SyncAppId)> = sqlx::query_as( |
| 144 |
r#" |
| 145 |
SELECT user_id, app_id |
| 146 |
FROM app_sync_subscriptions |
| 147 |
WHERE stripe_subscription_id = $1 |
| 148 |
"#, |
| 149 |
) |
| 150 |
.bind(stripe_subscription_id) |
| 151 |
.fetch_optional(pool) |
| 152 |
.await?; |
| 153 |
Ok(row) |
| 154 |
} |
| 155 |
|
| 156 |
|
| 157 |
|
| 158 |
|
| 159 |
#[tracing::instrument(skip_all)] |
| 160 |
pub async fn set_pending_storage_cap( |
| 161 |
pool: &PgPool, |
| 162 |
user_id: UserId, |
| 163 |
app_id: SyncAppId, |
| 164 |
pending_bytes: i64, |
| 165 |
) -> Result<()> { |
| 166 |
sqlx::query( |
| 167 |
r#" |
| 168 |
UPDATE app_sync_subscriptions |
| 169 |
SET pending_storage_limit_bytes = $3 |
| 170 |
WHERE user_id = $1 AND app_id = $2 |
| 171 |
"#, |
| 172 |
) |
| 173 |
.bind(user_id) |
| 174 |
.bind(app_id) |
| 175 |
.bind(pending_bytes) |
| 176 |
.execute(pool) |
| 177 |
.await?; |
| 178 |
Ok(()) |
| 179 |
} |
| 180 |
|
| 181 |
|
| 182 |
|
| 183 |
|
| 184 |
#[tracing::instrument(skip_all)] |
| 185 |
pub async fn apply_pending_storage_cap( |
| 186 |
pool: &PgPool, |
| 187 |
stripe_subscription_id: &str, |
| 188 |
) -> Result<()> { |
| 189 |
sqlx::query( |
| 190 |
r#" |
| 191 |
UPDATE app_sync_subscriptions |
| 192 |
SET storage_limit_bytes = pending_storage_limit_bytes, |
| 193 |
pending_storage_limit_bytes = NULL |
| 194 |
WHERE stripe_subscription_id = $1 |
| 195 |
AND pending_storage_limit_bytes IS NOT NULL |
| 196 |
"#, |
| 197 |
) |
| 198 |
.bind(stripe_subscription_id) |
| 199 |
.execute(pool) |
| 200 |
.await?; |
| 201 |
Ok(()) |
| 202 |
} |
| 203 |
|