Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sample_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"api_token": "api_token",
"forms_id": ["ids"],
"start_date": "2019-01-01T00:00:00Z"
}
8 changes: 4 additions & 4 deletions tap_formkeep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

LOGGER = singer.get_logger()

REQUIRED_CONFIG_KEYS = ['api_token', 'form_id', 'start_date']
REQUIRED_CONFIG_KEYS = ['api_token', 'form_ids', 'start_date']

def do_discover():
def do_discover(client, config):
"""
Discover and emit the catalog to stdout
"""
LOGGER.info("Starting discover")
catalog = discover()
catalog = discover(client, config)
json.dump(catalog.to_dict(), sys.stdout, indent=2)
LOGGER.info("Finished discover")

Expand All @@ -31,7 +31,7 @@ def main():

with Client(parsed_args.config) as client:
if parsed_args.discover:
do_discover()
do_discover(client, parsed_args.config)
elif parsed_args.catalog:
sync(
client=client,
Expand Down
6 changes: 5 additions & 1 deletion tap_formkeep/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

LOGGER = get_logger()
REQUEST_TIMEOUT = 300
DEFAULT_USER_AGENT = 'Singer.io FormKeep Tap'

def raise_for_error(response: requests.Response) -> None:
"""Raises the associated response exception. Takes in a response object,
Expand Down Expand Up @@ -64,7 +65,10 @@ def check_api_credentials(self) -> None:

def authenticate(self, headers: Dict, params: Dict) -> Tuple[Dict, Dict]:
"""Authenticates the request with the token"""
headers["Authorization"] = self.config["api_token"]
# Make a shallow copy to avoid mutating the caller's dictionary
headers = dict(headers) if headers is not None else {}
headers["Authorization"] = f"Token {self.config['api_token']}"
headers["User-Agent"] = self.config.get("user_agent", DEFAULT_USER_AGENT)
return headers, params

def make_request(
Expand Down
13 changes: 6 additions & 7 deletions tap_formkeep/discover.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import singer
from singer import metadata
from singer.catalog import Catalog, CatalogEntry, Schema
from tap_formkeep.schema import get_schemas
from tap_formkeep.schema import get_dynamic_schema

LOGGER = singer.get_logger()


def discover() -> Catalog:
def discover(client, config) -> Catalog:
"""
Run the discovery mode, prepare the catalog file and return the catalog.
Dynamically discover all forms from formkeep and build the catalog.
"""
schemas, field_metadata = get_schemas()
dynamic_schemas, dynamic_field_metadata = get_dynamic_schema(client, config)
catalog = Catalog([])

for stream_name, schema_dict in schemas.items():
for stream_name, schema_dict in dynamic_schemas.items():
try:
schema = Schema.from_dict(schema_dict)
mdata = field_metadata[stream_name]
mdata = dynamic_field_metadata[stream_name]
except Exception as err:
LOGGER.error(err)
LOGGER.error("stream_name: {}".format(stream_name))
Expand All @@ -36,4 +36,3 @@ def discover() -> Catalog:
)

return catalog

115 changes: 115 additions & 0 deletions tap_formkeep/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Dict, Tuple
from singer import metadata
from tap_formkeep.streams import STREAMS
import re
import ast

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -78,3 +80,116 @@ def get_schemas() -> Tuple[Dict, Dict]:

return schemas, field_metadata

DATE_REGEX = re.compile(r"^\d{4}-\d{2}-\d{2}$")
TIME_REGEX = re.compile(r"^\d{2}:\d{2}(:\d{2})?$")
Comment on lines 80 to +84
Copy link

Copilot AI Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Module-level regex patterns should be named with uppercase convention (already correct) but consider adding docstrings explaining their purpose and the formats they match to improve code maintainability.

Suggested change
DATE_REGEX = re.compile(r"^\d{4}-\d{2}-\d{2}$")
TIME_REGEX = re.compile(r"^\d{2}:\d{2}(:\d{2})?$")
# Matches dates in the format YYYY-MM-DD (e.g., 2024-06-01).
DATE_REGEX = re.compile(r"^\d{4}-\d{2}-\d{2}$")
# Matches times in the format HH:MM or HH:MM:SS (e.g., 14:30 or 14:30:59).
TIME_REGEX = re.compile(r"^\d{2}:\d{2}(:\d{2})?$")
# Matches datetimes in the format YYYY-MM-DDTHH:MM:SSZ, with optional timezone offsets or ' UTC'.
# Examples: 2024-06-01T14:30:59Z, 2024-06-01 14:30:59+02:00, 2024-06-01T14:30:59 UTC

Copilot uses AI. Check for mistakes.
DATETIME_REGEX = re.compile(
r"^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(Z|[\+\-]\d{2}:\d{2}| UTC)?$"
)

def infer_type(value):
if value is None:
return {"type": ["null", "string"]}

if isinstance(value, bool):
return {"type": ["null", "boolean"]}

if isinstance(value, int):
return {"type": ["null", "integer"]}

if isinstance(value, float):
return {"type": ["null", "number"]}

if isinstance(value, str):
if DATETIME_REGEX.match(value) or DATE_REGEX.match(value):
Copy link

Copilot AI Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DATETIME_REGEX pattern allows space as a date-time separator (pattern includes [T ]), but this does not produce RFC 3339 compliant datetime strings. Per coding guideline #1000000 section 3, only RFC 3339 format with 'T' separator should be accepted. Remove the space alternative from the regex.

Copilot uses AI. Check for mistakes.
return {"type": ["null", "string"], "format": "date-time"}

if TIME_REGEX.match(value):
return {"type": ["null", "string"]}

return {"type": ["null", "string"]}

# --- Recursive dict ---
if isinstance(value, dict):
props = {
k: infer_type(v)
for k, v in value.items()
}
return {
"type": ["null", "object"],
"properties": props
}

# --- Recursive list ---
if isinstance(value, list):
if value:
# infer type from first element
item_type = infer_type(value[0])
else:
# empty list → unknown items
item_type = {"type": ["null", "string"]}

return {
"type": ["null", "array"],
"items": item_type
}

return {"type": ["null", "string"]}


def get_dynamic_schema(client, config):
schemas = {}
field_metadata = {}

raw_ids = config.get("form_ids", [])

if isinstance(raw_ids, str):
form_ids = ast.literal_eval(raw_ids)
Copy link

Copilot AI Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using ast.literal_eval() on user configuration input without error handling is risky. If the string is malformed, it will raise a ValueError. Add try-except to handle parsing errors gracefully and log a meaningful error message.

Suggested change
form_ids = ast.literal_eval(raw_ids)
try:
form_ids = ast.literal_eval(raw_ids)
except (ValueError, SyntaxError) as e:
LOGGER.error(f"Failed to parse 'form_ids' from config: {raw_ids!r}. Error: {e}")
form_ids = []

Copilot uses AI. Check for mistakes.
else:
form_ids = raw_ids

for form_id in form_ids:
response = client.make_request(
method="GET",
endpoint=client.base_url.format(form_id=form_id),
params={"page": 1, "include_attachments": "true"},
)

submissions = response.get("submissions", [])
if not submissions:
continue

first_submission = submissions[0]
Copy link

Copilot AI Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inferring schema from only the first submission is unreliable. If the first submission has null values or missing fields, the schema will be incomplete. Consider iterating through multiple submissions and merging their schemas, or at minimum, document this limitation.

Copilot uses AI. Check for mistakes.
data_obj = first_submission.get("data", {})

data_properties = {k: infer_type(v) for k, v in data_obj.items()}

schema = {
"type": "object",
"properties": {
"id": {"type": ["null", "integer"]},
"created_at": {"type": ["null", "string"], "format": "date-time"},
"spam": {"type": ["null", "boolean"]},
"data": {
"type": "object",
"properties": data_properties,
},
},
}

schemas[form_id] = schema

# metadata
mdata = metadata.new()
mdata = metadata.get_standard_metadata(
schema=schema,
key_properties=["id"],
valid_replication_keys=["created_at"],
replication_method="INCREMENTAL",
)
mdata = metadata.to_map(mdata)
mdata = metadata.write(
mdata, ('properties', "created_at"), 'inclusion', 'automatic'
)
field_metadata[form_id] = metadata.to_list(mdata)

return schemas, field_metadata
6 changes: 1 addition & 5 deletions tap_formkeep/streams/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
from tap_formkeep.streams.submissions import Submissions

STREAMS = {
"submissions": Submissions,
}

STREAMS = {}
43 changes: 30 additions & 13 deletions tap_formkeep/streams/abstracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

LOGGER = get_logger()

DEFAULT_PAGE_SIZE = 25


class BaseStream(ABC):
"""
Expand All @@ -28,7 +30,6 @@ class BaseStream(ABC):

url_endpoint = ""
path = ""
page_size = 25
next_page_key = "next_page"
headers = {'Accept': 'application/json', 'Content-Type': 'application/json'}
children = []
Expand All @@ -43,7 +44,8 @@ def __init__(self, client=None, catalog=None) -> None:
self.schema = catalog.schema.to_dict()
self.metadata = metadata.to_map(catalog.metadata)
self.child_to_sync = []
self.params = {'api_token': 'api_token', 'page': 1, 'page_limit': 25, 'spam': 'False', 'startdate': ''}
self.page_size = self.client.config.get("page_size", DEFAULT_PAGE_SIZE)
self.params = {'page': 1, 'page_limit': self.page_size}
self.data_payload = {}

@property
Expand Down Expand Up @@ -99,21 +101,32 @@ def sync(

def get_records(self) -> Iterator:
"""Interacts with api client interaction and pagination."""
self.params[""] = self.page_size
next_page = 1
while next_page:
self.params["page"] = 1
response = self.client.make_request(
self.http_method,
self.url_endpoint,
self.params,
body=json.dumps(self.data_payload),
path=self.path
)

raw_records = response.get(self.data_key, [])
pagination = response.get("meta", {}).get("pagination", {})
total_pages = pagination.get("total_pages", 1)

for record in raw_records:
yield record
for page in range(2, total_pages + 1):
self.params["page"] = page

response = self.client.make_request(
self.http_method,
self.url_endpoint,
self.params,
self.headers,
body=json.dumps(self.data_payload),
path=self.path
)
raw_records = response.get(self.data_key, [])
next_page = response.get(self.next_page_key)

self.params[self.next_page_key] = next_page
yield from raw_records

def write_schema(self) -> None:
Expand All @@ -132,6 +145,12 @@ def update_params(self, **kwargs) -> None:
"""
Update params for the stream
"""
default = {
"page": 1,
"page_limit": self.page_size
}

self.params = {**default, **self.params}
self.params.update(kwargs)

def update_data_payload(self, **kwargs) -> None:
Expand All @@ -150,7 +169,7 @@ def get_url_endpoint(self, parent_obj: Dict = None) -> str:
"""
Get the URL endpoint for the stream
"""
return self.url_endpoint or f"{self.client.base_url}/{self.path}"
return self.url_endpoint or self.client.base_url.format(form_id=self.path)


class IncrementalStream(BaseStream):
Expand Down Expand Up @@ -189,8 +208,7 @@ def sync(
"""Implementation for `type: Incremental` stream."""
bookmark_date = self.get_bookmark(state, self.tap_stream_id)
current_max_bookmark_date = bookmark_date
self.update_params(updated_since=bookmark_date)
self.update_data_payload(parent_obj=parent_obj)
self.update_params(startdate=bookmark_date)
self.url_endpoint = self.get_url_endpoint(parent_obj)

with metrics.record_counter(self.tap_stream_id) as counter:
Expand Down Expand Up @@ -299,4 +317,3 @@ def get_bookmark(self, state: Dict, stream: str, key: Any = None) -> int:
self.bookmark_value = super().get_bookmark(state, stream)

return self.bookmark_value

8 changes: 0 additions & 8 deletions tap_formkeep/streams/submissions.py

This file was deleted.

Loading