diff --git a/docs/catalog/clerk.md b/docs/catalog/clerk.md index 26857c81..ef1ef796 100644 --- a/docs/catalog/clerk.md +++ b/docs/catalog/clerk.md @@ -17,6 +17,7 @@ The Clerk Wrapper is a WebAssembly(Wasm) foreign data wrapper which allows you t | Version | Wasm Package URL | Checksum | Required Wrappers Version | | ------- | --------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------ | ------------------------- | +| 0.2.1 | `https://github.com/supabase/wrappers/releases/download/wasm_clerk_fdw_v0.2.1/clerk_fdw.wasm` | `tbd` | >=0.5.0 | | 0.2.0 | `https://github.com/supabase/wrappers/releases/download/wasm_clerk_fdw_v0.2.0/clerk_fdw.wasm` | `89337bb11779d4d654cd3e54391aabd02509d213db6995f7dd58951774bf0e37` | >=0.5.0 | | 0.1.0 | `https://github.com/supabase/wrappers/releases/download/wasm_clerk_fdw_v0.1.0/clerk_fdw.wasm` | `613be26b59fa4c074e0b93f0db617fcd7b468d4d02edece0b1f85fdb683ebdc4` | >=0.4.0 | @@ -107,20 +108,27 @@ The full list of foreign table options are below: Supported objects are listed below: -| Object name | -| ------------------------ | -| allowlist_identifiers | -| blocklist_identifiers | -| domains | -| invitations | -| jwt_templates | -| oauth_applications | -| organizations | -| organization_invitations | -| organization_memberships | -| redirect_urls | -| saml_connections | -| users | +| Object name | +| ---------------------------------- | +| allowlist_identifiers | +| billing_payment_attempts | +| billing_plans | +| billing_statement | +| billing_statements | +| billing_subscription_items | +| blocklist_identifiers | +| domains | +| invitations | +| jwt_templates | +| oauth_applications | +| organizations | +| organization_billing_subscriptions | +| organization_invitations | +| organization_memberships | +| redirect_urls | +| saml_connections | +| user_billing_subscriptions | +| users | ## Entities @@ -558,6 +566,244 @@ create foreign table clerk.users ( - The `attrs` column contains additional attributes in JSON format +### User Billing Subscriptions + +This retrieves the billing subscription for a specific user. + +Ref: [Clerk API docs](https://clerk.com/docs/reference/backend-api/tag/billing#operation/GetUserBillingSubscription) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| --------------------------- | :----: | :----: | :----: | :----: | :------: | +| users/billing/subscription | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table clerk.user_billing_subscriptions ( + user_id text, + id text, + status text, + payer_id text, + created_at timestamp, + updated_at timestamp, + attrs jsonb +) + server clerk_server + options ( + object 'users/billing/subscription' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format +- The query must specify `user_id` in the WHERE clause + +### Organization Billing Subscriptions + +This retrieves the billing subscription for a specific organization. + +Ref: [Clerk API docs](https://clerk.com/docs/reference/backend-api/tag/billing#operation/GetOrganizationBillingSubscription) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| --------------------------------- | :----: | :----: | :----: | :----: | :------: | +| organizations/billing/subscription | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table clerk.organization_billing_subscriptions ( + organization_id text, + id text, + status text, + payer_id text, + created_at timestamp, + updated_at timestamp, + attrs jsonb +) + server clerk_server + options ( + object 'organizations/billing/subscription' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format +- The query must specify `organization_id` in the WHERE clause + +### Billing Plans + +This is a list of all billing plans. + +Ref: [Clerk API docs](https://clerk.com/docs/reference/backend-api/tag/billing#operation/ListBillingPlans) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------------- | :----: | :----: | :----: | :----: | :------: | +| billing/plans | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table clerk.billing_plans ( + id text, + name text, + description text, + slug text, + is_default boolean, + is_recurring boolean, + attrs jsonb +) + server clerk_server + options ( + object 'billing/plans' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Billing Subscription Items + +This is a list of all billing subscription items. + +Ref: [Clerk API docs](https://clerk.com/docs/reference/backend-api/tag/billing#operation/ListBillingSubscriptionItems) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| -------------------------- | :----: | :----: | :----: | :----: | :------: | +| billing/subscription_items | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table clerk.billing_subscription_items ( + id text, + status text, + plan_id text, + plan_period text, + payer_id text, + is_free_trial boolean, + created_at timestamp, + updated_at timestamp, + attrs jsonb +) + server clerk_server + options ( + object 'billing/subscription_items' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Billing Statements + +This is a list of all billing statements. + +Ref: [Clerk API docs](https://clerk.com/docs/reference/backend-api/tag/billing#operation/ListBillingStatements) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------------------ | :----: | :----: | :----: | :----: | :------: | +| billing/statements | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table clerk.billing_statements ( + id text, + status text, + timestamp timestamp, + attrs jsonb +) + server clerk_server + options ( + object 'billing/statements' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Billing Statement + +This retrieves a specific billing statement by ID. + +Ref: [Clerk API docs](https://clerk.com/docs/reference/backend-api/tag/billing#operation/GetBillingStatement) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ----------------- | :----: | :----: | :----: | :----: | :------: | +| billing/statement | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table clerk.billing_statement ( + statement_id text, + id text, + status text, + timestamp timestamp, + attrs jsonb +) + server clerk_server + options ( + object 'billing/statement' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format +- The query must specify `statement_id` in the WHERE clause + +### Billing Payment Attempts + +This retrieves payment attempts for a specific billing statement. + +Ref: [Clerk API docs](https://clerk.com/docs/reference/backend-api/tag/billing#operation/ListBillingStatementPaymentAttempts) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------------------------ | :----: | :----: | :----: | :----: | :------: | +| billing/payment_attempts | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table clerk.billing_payment_attempts ( + statement_id text, + id text, + status text, + created_at timestamp, + updated_at timestamp, + attrs jsonb +) + server clerk_server + options ( + object 'billing/payment_attempts' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format +- The query must specify `statement_id` in the WHERE clause + ## Query Pushdown Support This FDW doesn't support query pushdown. @@ -623,3 +869,28 @@ select from clerk.users u cross join json_array_elements((attrs->'email_addresses')::json) e; ``` + +### Billing examples + +```sql +-- Query all billing plans +SELECT * FROM clerk.billing_plans; + +-- Query all billing statements +SELECT * FROM clerk.billing_statements; + +-- Query all billing subscription items +SELECT * FROM clerk.billing_subscription_items; + +-- Query a specific statement (requires WHERE clause) +SELECT * FROM clerk.billing_statement WHERE statement_id = 'stmt_xxx'; + +-- Query payment attempts for a statement (requires WHERE clause) +SELECT * FROM clerk.billing_payment_attempts WHERE statement_id = 'stmt_xxx'; + +-- Query subscription for a specific user (requires WHERE clause) +SELECT * FROM clerk.user_billing_subscriptions WHERE user_id = 'user_xxx'; + +-- Query subscription for a specific organization (requires WHERE clause) +SELECT * FROM clerk.organization_billing_subscriptions WHERE organization_id = 'org_xxx'; +``` diff --git a/wasm-wrappers/fdw/Cargo.lock b/wasm-wrappers/fdw/Cargo.lock index 8e5d99a2..9634f8fb 100644 --- a/wasm-wrappers/fdw/Cargo.lock +++ b/wasm-wrappers/fdw/Cargo.lock @@ -86,7 +86,7 @@ dependencies = [ [[package]] name = "clerk_fdw" -version = "0.2.0" +version = "0.2.1" dependencies = [ "serde_json", "wit-bindgen-rt", diff --git a/wasm-wrappers/fdw/clerk_fdw/Cargo.toml b/wasm-wrappers/fdw/clerk_fdw/Cargo.toml index 52a59ac5..3caed156 100644 --- a/wasm-wrappers/fdw/clerk_fdw/Cargo.toml +++ b/wasm-wrappers/fdw/clerk_fdw/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "clerk_fdw" -version = "0.2.0" +version = "0.2.1" edition = { workspace = true } homepage = { workspace = true } rust-version = { workspace = true } diff --git a/wasm-wrappers/fdw/clerk_fdw/src/lib.rs b/wasm-wrappers/fdw/clerk_fdw/src/lib.rs index 76ff9033..dc4b7864 100644 --- a/wasm-wrappers/fdw/clerk_fdw/src/lib.rs +++ b/wasm-wrappers/fdw/clerk_fdw/src/lib.rs @@ -8,7 +8,7 @@ use bindings::{ http, stats, time, types::{ Cell, Column, Context, FdwError, FdwResult, ImportForeignSchemaStmt, OptionsType, Row, - TypeOid, + TypeOid, Value, }, utils, }, @@ -22,6 +22,9 @@ struct ClerkFdw { src_rows: Vec, src_idx: usize, src_offset: usize, + // For parameterized billing endpoints + sub_obj: String, + sub_obj_value: String, } static mut INSTANCE: *mut ClerkFdw = std::ptr::null_mut::(); @@ -51,6 +54,35 @@ impl ClerkFdw { return Ok(Some(Cell::Json(src_row.to_string()))); } + // Handle parameterized endpoint ID columns + match self.object.as_str() { + "users/billing/subscription" => { + if tgt_col_name == "user_id" { + if self.sub_obj == "user_id" { + return Ok(Some(Cell::String(self.sub_obj_value.clone()))); + } + return Ok(None); + } + } + "organizations/billing/subscription" => { + if tgt_col_name == "organization_id" { + if self.sub_obj == "organization_id" { + return Ok(Some(Cell::String(self.sub_obj_value.clone()))); + } + return Ok(None); + } + } + "billing/statement" | "billing/payment_attempts" => { + if tgt_col_name == "statement_id" { + if self.sub_obj == "statement_id" { + return Ok(Some(Cell::String(self.sub_obj_value.clone()))); + } + return Ok(None); + } + } + _ => {} + } + let src = src_row .as_object() .and_then(|v| v.get(&tgt_col_name)) @@ -90,13 +122,114 @@ impl ClerkFdw { } // create a request instance - fn create_request(&self) -> Result { - let qs = [ - "order_by=-created_at".to_string(), - format!("offset={}", self.src_offset), - format!("limit={BATCH_SIZE}"), - ]; - let url = format!("{}/{}?{}", self.base_url, self.object, qs.join("&")); + fn create_request(&mut self, ctx: &Context) -> Result { + let quals = ctx.get_quals(); + + // Standard endpoints with pagination + // Billing endpoints don't support order_by + let is_billing = self.object.starts_with("billing/"); + let qs = if is_billing { + vec![ + format!("offset={}", self.src_offset), + format!("limit={BATCH_SIZE}"), + ] + } else { + vec![ + "order_by=-created_at".to_string(), + format!("offset={}", self.src_offset), + format!("limit={BATCH_SIZE}"), + ] + }; + let mut url = format!("{}/{}?{}", self.base_url, self.object, qs.join("&")); + + // Handle parameterized endpoints + match self.object.as_str() { + "users/billing/subscription" => { + // GET /users/{user_id}/billing/subscription + if let Some(q) = quals + .iter() + .find(|q| q.field() == "user_id" && q.operator() == "=") + { + if let Value::Cell(Cell::String(user_id)) = q.value() { + self.sub_obj = "user_id".to_string(); + self.sub_obj_value = user_id.clone(); + url = format!("{}/users/{}/billing/subscription", self.base_url, user_id); + } else { + return Err("user_id must be a string value".to_string()); + } + } else { + return Err( + "user_id is required in WHERE clause for users/billing/subscription" + .to_string(), + ); + } + } + "organizations/billing/subscription" => { + // GET /organizations/{organization_id}/billing/subscription + if let Some(q) = quals + .iter() + .find(|q| q.field() == "organization_id" && q.operator() == "=") + { + if let Value::Cell(Cell::String(org_id)) = q.value() { + self.sub_obj = "organization_id".to_string(); + self.sub_obj_value = org_id.clone(); + url = format!( + "{}/organizations/{}/billing/subscription", + self.base_url, org_id + ); + } else { + return Err("organization_id must be a string value".to_string()); + } + } else { + return Err("organization_id is required in WHERE clause for organizations/billing/subscription".to_string()); + } + } + "billing/statement" => { + // GET /billing/statements/{statement_id} + if let Some(q) = quals + .iter() + .find(|q| q.field() == "statement_id" && q.operator() == "=") + { + if let Value::Cell(Cell::String(statement_id)) = q.value() { + self.sub_obj = "statement_id".to_string(); + self.sub_obj_value = statement_id.clone(); + url = format!("{}/billing/statements/{}", self.base_url, statement_id); + } else { + return Err("statement_id must be a string value".to_string()); + } + } else { + return Err( + "statement_id is required in WHERE clause for billing/statement" + .to_string(), + ); + } + } + "billing/payment_attempts" => { + // GET /billing/statements/{statement_id}/payment_attempts + if let Some(q) = quals + .iter() + .find(|q| q.field() == "statement_id" && q.operator() == "=") + { + if let Value::Cell(Cell::String(statement_id)) = q.value() { + self.sub_obj = "statement_id".to_string(); + self.sub_obj_value = statement_id.clone(); + url = format!( + "{}/billing/statements/{}/payment_attempts", + self.base_url, statement_id + ); + } else { + return Err("statement_id must be a string value".to_string()); + } + } else { + return Err( + "statement_id is required in WHERE clause for billing/payment_attempts" + .to_string(), + ); + } + } + _ => {} + } + let headers = self.headers.clone(); Ok(http::Request { @@ -125,6 +258,11 @@ impl ClerkFdw { } } + // if encounter the 404 error, we should take it as empty result rather than an error + if resp.status_code == 404 { + return Ok(serde_json::json!([])); + } + // check for errors http::error_for_status(&resp).map_err(|err| format!("{}: {}", err, resp.body))?; @@ -139,29 +277,28 @@ impl ClerkFdw { } // fetch source data rows from Clerk API - fn fetch_source_data(&mut self) -> FdwResult { + fn fetch_source_data(&mut self, ctx: &Context) -> FdwResult { self.src_rows.clear(); self.src_idx = 0; // create a request and send it - let req = self.create_request()?; + let req = self.create_request(ctx)?; let resp_json = self.make_request(&req)?; // unify response object to array and save source rows in local batch - let resp_data = if resp_json.is_array() { - resp_json.as_array().cloned() - } else if resp_json.is_object() { - resp_json.pointer("/data").and_then(|v| { - if v.is_array() { - v.as_array().cloned() + let resp_data = resp_json + .pointer("/data") + .and_then(|v| v.as_array().cloned()) + .or_else(|| { + if resp_json.is_object() { + Some(vec![resp_json.clone()]) + } else if resp_json.is_array() { + resp_json.as_array().cloned() } else { - Some(vec![v.clone()]) + None } }) - } else { - Some(vec![resp_json.clone()]) - } - .ok_or("cannot get query result data")?; + .ok_or("cannot get query result data")?; self.src_rows.extend(resp_data); Ok(()) @@ -210,7 +347,11 @@ impl Guest for ClerkFdw { let this = Self::this_mut(); let opts = ctx.get_options(&OptionsType::Table); this.object = opts.require("object")?; - this.fetch_source_data() + this.src_offset = 0; + // Reset parameterized endpoint IDs + this.sub_obj = String::default(); + this.sub_obj_value = String::default(); + this.fetch_source_data(ctx) } fn iter_scan(ctx: &Context, row: &Row) -> Result, FdwError> { @@ -220,15 +361,24 @@ impl Guest for ClerkFdw { while this.src_idx >= this.src_rows.len() { let consumed_cnt = this.src_rows.len(); + // For parameterized billing endpoints, don't paginate (scoped to specific resource) + let is_parameterized = matches!( + this.object.as_str(), + "users/billing/subscription" + | "organizations/billing/subscription" + | "billing/statement" + | "billing/payment_attempts" + ); + // local batch buffer isn't fully filled, means no more source records on remote, // stop the iteration scan - if consumed_cnt < BATCH_SIZE { + if consumed_cnt < BATCH_SIZE || is_parameterized { return Ok(None); } // otherwise, make a new request for the next batch this.src_offset += consumed_cnt; - this.fetch_source_data()?; + this.fetch_source_data(ctx)?; stats::inc_stats(FDW_NAME, stats::Metric::RowsIn, consumed_cnt as i64); stats::inc_stats(FDW_NAME, stats::Metric::RowsOut, consumed_cnt as i64); @@ -245,10 +395,13 @@ impl Guest for ClerkFdw { Ok(Some(0)) } - fn re_scan(_ctx: &Context) -> FdwResult { + fn re_scan(ctx: &Context) -> FdwResult { let this = Self::this_mut(); this.src_offset = 0; - this.fetch_source_data() + // Reset parameterized endpoint IDs + this.sub_obj = String::default(); + this.sub_obj_value = String::default(); + this.fetch_source_data(ctx) } fn end_scan(_ctx: &Context) -> FdwResult { @@ -469,6 +622,112 @@ impl Guest for ClerkFdw { )"#, stmt.server_name, ), + format!( + r#"create foreign table if not exists user_billing_subscriptions ( + user_id text, + id text, + status text, + payer_id text, + created_at timestamp, + updated_at timestamp, + attrs jsonb + ) + server {} options ( + object 'users/billing/subscription' + )"#, + stmt.server_name, + ), + format!( + r#"create foreign table if not exists organization_billing_subscriptions ( + organization_id text, + id text, + status text, + payer_id text, + created_at timestamp, + updated_at timestamp, + attrs jsonb + ) + server {} options ( + object 'organizations/billing/subscription' + )"#, + stmt.server_name, + ), + // GET /billing/plans + format!( + r#"create foreign table if not exists billing_plans ( + id text, + name text, + description text, + slug text, + is_default boolean, + is_recurring boolean, + attrs jsonb + ) + server {} options ( + object 'billing/plans' + )"#, + stmt.server_name, + ), + // GET /billing/subscription_items + format!( + r#"create foreign table if not exists billing_subscription_items ( + id text, + status text, + plan_id text, + plan_period text, + payer_id text, + is_free_trial boolean, + created_at timestamp, + updated_at timestamp, + attrs jsonb + ) + server {} options ( + object 'billing/subscription_items' + )"#, + stmt.server_name, + ), + // GET /billing/statements + format!( + r#"create foreign table if not exists billing_statements ( + id text, + status text, + timestamp timestamp, + attrs jsonb + ) + server {} options ( + object 'billing/statements' + )"#, + stmt.server_name, + ), + // GET /billing/statements/{statement_id} + format!( + r#"create foreign table if not exists billing_statement ( + statement_id text, + id text, + status text, + timestamp timestamp, + attrs jsonb + ) + server {} options ( + object 'billing/statement' + )"#, + stmt.server_name, + ), + // GET /billing/statements/{statement_id}/payment_attempts + format!( + r#"create foreign table if not exists billing_payment_attempts ( + statement_id text, + id text, + status text, + created_at timestamp, + updated_at timestamp, + attrs jsonb + ) + server {} options ( + object 'billing/payment_attempts' + )"#, + stmt.server_name, + ), ]; Ok(ret) }