Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions docs/integrations/data-ingestion/clickpipes/mongodb/faq.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
---
sidebar_label: 'FAQ'
description: 'Frequently asked questions about ClickPipes for MongoDB.'
slug: /integrations/clickpipes/mongodb/faq
sidebar_position: 2
title: 'ClickPipes for MongoDB FAQ'
---

# ClickPipes for MongoDB FAQ

### Can I query for individual fields in the JSON datatype? {#can-i-query-for-individual-fields-in-the-json-datatype}

For direct field access, such as `{"user_id": 123}`, you can use **dot notation**:
```sql
SELECT doc.user_id as user_id FROM your_table;
```
For direct field access of nested object fields, such as `{"address": { "city": "San Francisco", "state": "CA" }}`, use the `^` operator:
```sql
SELECT doc.^address.city AS city FROM your_table;
```
For aggregations, cast the field to the appropriate type with the `CAST` function or `::` syntax:
```sql
SELECT sum(doc.shipping.cost::Float32) AS total_shipping_cost FROM t1;
```
To learn more about working with JSON, see our [Working with JSON guide](./quickstart).

### How do I flatten the nested MongoDB documents in ClickHouse? {#how-do-i-flatten-the-nested-mongodb-documents-in-clickhouse}

MongoDB documents are replicated as JSON type in ClickHouse by default, preserving the nested structure. You have several options to flatten this data. If you want to flatten the data to columns, you can use normal views, materialized views, or query-time access.

1. **Normal Views**: Use normal views to encapsulate flattening logic.
2. **Materialized Views**: For smaller datasets, you can use refreshable materialized with the [`FINAL` modifier](/sql-reference/statements/select/from#final-modifier) to periodically flatten and deduplicate data. For larger datasets, we recommend using incremental materialized views without `FINAL` to flatten the data in real-time, and then deduplicate data at query time.
3. **Query-time Access**: Instead of flattening, use dot notation to access nested fields directly in queries.

For detailed examples, see our [Working with JSON guide](./quickstart).

### Can I connect MongoDB databases that don't have a public IP or are in private networks? {#can-i-connect-mongodb-databases-that-dont-have-a-public-ip-or-are-in-private-networks}

We support AWS PrivateLink for connecting to MongoDB databases that don't have a public IP or are in private networks. Azure Private Link and GCP Private Service Connect are currently not supported.

### What happens if I delete a database/table from my MongoDB database? {#what-happens-if-i-delete-a-database-table-from-my-mongodb-database}

When you delete a database/table from MongoDB, ClickPipes will continue running but the dropped database/table will stop replicating changes. The corresponding tables in ClickHouse is preserved.

### How does MongoDB CDC Connector handle transactions? {#how-does-mongodb-cdc-connector-handle-transactions}

Each document change within a transaction is processed individually to ClickHouse. Changes are applied in the order they appear in the oplog; and only committed changes are replicated to ClickHouse. If a MongoDB transaction is rolled back, those changes won't appear in the change stream.

For more examples, see our [Working with JSON guide](./quickstart).

### How do I handle `resume of change stream was not possible, as the resume point may no longer be in the oplog.` error? {#resume-point-may-no-longer-be-in-the-oplog-error}

This error typically occurs when the oplog is truncated and ClickPipe is unable to resume the change stream at the expected point. To resolve this issue, [resync the ClickPipe](./resync.md). To avoid this issue from recurring, we recommend [increasing the oplog retention period](./source/atlas#enable-oplog-retention) (or [here](./source/generic#enable-oplog-retention) if you are on a self-managed MongoDB).

### How is replication managed? {#how-is-replication-managed}

We use MongoDB's native Change Streams API to track changes in the database. Change Streams API provides a resumable stream of database changes by leveraging MongoDB's oplog (operations log). ClickPipe uses MongoDB's resume tokens to track the position in the oplog and ensure every change is replicated to ClickHouse.

### Which read preference should I use? {#which-read-preference-should-i-use}

Which read preference to use depends on your specific use case. If you want to minimize the load on your primary node, we recommend using `secondaryPreferred` read preference. If you want to optimize ingestion latency, we recommend using `primaryPreferred` read preference. For more details, see [MongoDB documentation](https://www.mongodb.com/docs/manual/core/read-preference/#read-preference-modes-1).

### Does the MongoDB ClickPipe support Sharded Cluster? {#does-the-mongodb-clickpipe-support-sharded-cluster}
Yes, the MongoDB ClickPipe supports both Replica Set and Sharded Cluster.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import Image from '@theme/IdealImage';
<BetaBadge/>

:::info
Currently, ingesting data from MongoDB to ClickHouse Cloud via ClickPipes is in Private Preview.
Ingesting data from MongoDB to ClickHouse Cloud via ClickPipes is in public beta.
:::

:::note
Expand Down
128 changes: 91 additions & 37 deletions docs/integrations/data-ingestion/clickpipes/mongodb/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ MongoDB CDC Connector replicates MongoDB documents to ClickHouse using the nativ
Row 1:
──────
_id: "68a4df4b9fe6c73b541703b0"
_full_document: {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97}
doc: {"_id":"68a4df4b9fe6c73b541703b0","customer_id":"98765","items":[{"category":"electronics","price":149.99},{"category":"accessories","price":24.99}],"order_date":"2025-08-19T20:32:11.705Z","order_id":"ORD-001234","shipping":{"city":"Seattle","cost":19.99,"method":"express"},"status":"completed","total_amount":299.97}
_peerdb_synced_at: 2025-08-19 20:50:42.005000000
_peerdb_is_deleted: 0
_peerdb_version: 0
Expand All @@ -55,15 +55,15 @@ The replicated tables use this standard schema:
```shell
┌─name───────────────┬─type──────────┐
│ _id │ String │
_full_document │ JSON │
doc │ JSON │
│ _peerdb_synced_at │ DateTime64(9) │
│ _peerdb_version │ Int64 │
│ _peerdb_is_deleted │ Int8 │
└────────────────────┴───────────────┘
```

- `_id`: Primary key from MongoDB
- `_full_document`: MongoDB document replicated as JSON data type
- `doc`: MongoDB document replicated as JSON data type
- `_peerdb_synced_at`: Records when the row was last synced
- `_peerdb_version`: Tracks the version of the row; incremented when the row is updated or deleted
- `_peerdb_is_deleted`: Marks whether the row is deleted
Expand All @@ -72,7 +72,7 @@ The replicated tables use this standard schema:

ClickPipes maps MongoDB collections into ClickHouse using the `ReplacingMergeTree` table engine family. With this engine, updates are modeled as inserts with a newer version (`_peerdb_version`) of the document for a given primary key (`_id`), enabling efficient handling of updates, replaces, and deletes as versioned inserts.

`ReplacingMergeTree` clears out duplicates asynchronously in the background. To guarantee the absence of duplicates for the same row, use the [`FINAL` modifier](https://clickhouse.com/docs/sql-reference/statements/select/from#final-modifier). For example:
`ReplacingMergeTree` clears out duplicates asynchronously in the background. To guarantee the absence of duplicates for the same row, use the [`FINAL` modifier](/sql-reference/statements/select/from#final-modifier). For example:

```sql
SELECT * FROM t1 FINAL;
Expand All @@ -99,21 +99,21 @@ You can directly query JSON fields using dot syntax:

```sql title="Query"
SELECT
_full_document.order_id,
_full_document.shipping.method
doc.order_id,
doc.shipping.method
FROM t1;
```

```shell title="Result"
─_full_document.order_id─┬─_full_document.shipping.method─┐
│ ORD-001234 │ express
└─────────────────────────┴────────────────────────────────┘
-─doc.order_id─┬─doc.shipping.method─┐
│ ORD-001234 │ express │
└────────────────────────────────────┘
```

When querying _nested object fields_ using dot syntax, make sure to add the [`^`](https://clickhouse.com/docs/sql-reference/data-types/newjson#reading-json-sub-objects-as-sub-columns) operator:

```sql title="Query"
SELECT _full_document.^shipping as shipping_info FROM t1;
SELECT doc.^shipping as shipping_info FROM t1;
```

```shell title="Result"
Expand All @@ -127,7 +127,7 @@ SELECT _full_document.^shipping as shipping_info FROM t1;
In ClickHouse, each field in JSON has `Dynamic` type. Dynamic type allows ClickHouse to store values of any type without knowing the type in advance. You can verify this with the `toTypeName` function:

```sql title="Query"
SELECT toTypeName(_full_document.customer_id) AS type FROM t1;
SELECT toTypeName(doc.customer_id) AS type FROM t1;
```

```shell title="Result"
Expand All @@ -139,7 +139,7 @@ SELECT toTypeName(_full_document.customer_id) AS type FROM t1;
To examine the underlying data type(s) for a field, you can check with the `dynamicType` function. Note that it's possible to have different data types for the same field name in different rows:

```sql title="Query"
SELECT dynamicType(_full_document.customer_id) AS type FROM t1;
SELECT dynamicType(doc.customer_id) AS type FROM t1;
```

```shell title="Result"
Expand All @@ -153,7 +153,7 @@ SELECT dynamicType(_full_document.customer_id) AS type FROM t1;
**Example 1: Date parsing**

```sql title="Query"
SELECT parseDateTimeBestEffortOrNull(_full_document.order_date) AS order_date FROM t1;
SELECT parseDateTimeBestEffortOrNull(doc.order_date) AS order_date FROM t1;
```

```shell title="Result"
Expand All @@ -166,8 +166,8 @@ SELECT parseDateTimeBestEffortOrNull(_full_document.order_date) AS order_date FR

```sql title="Query"
SELECT multiIf(
_full_document.total_amount < 100, 'less_than_100',
_full_document.total_amount < 1000, 'less_than_1000',
doc.total_amount < 100, 'less_than_100',
doc.total_amount < 1000, 'less_than_1000',
'1000+') AS spendings
FROM t1;
```
Expand All @@ -181,7 +181,7 @@ FROM t1;
**Example 3: Array operations**

```sql title="Query"
SELECT length(_full_document.items) AS item_count FROM t1;
SELECT length(doc.items) AS item_count FROM t1;
```

```shell title="Result"
Expand All @@ -195,14 +195,14 @@ SELECT length(_full_document.items) AS item_count FROM t1;
[Aggregation functions](https://clickhouse.com/docs/sql-reference/aggregate-functions/combinators) in ClickHouse don't work with dynamic type directly. For example, if you attempt to directly use the `sum` function on a dynamic type, you get the following error:

```sql
SELECT sum(_full_document.shipping.cost) AS shipping_cost FROM t1;
SELECT sum(doc.shipping.cost) AS shipping_cost FROM t1;
-- DB::Exception: Illegal type Dynamic of argument for aggregate function sum. (ILLEGAL_TYPE_OF_ARGUMENT)
```

To use aggregation functions, cast the field to the appropriate type with the `CAST` function or `::` syntax:

```sql title="Query"
SELECT sum(_full_document.shipping.cost::Float32) AS shipping_cost FROM t1;
SELECT sum(doc.shipping.cost::Float32) AS shipping_cost FROM t1;
```

```shell title="Result"
Expand All @@ -224,14 +224,14 @@ You can create normal views on top of the JSON table to encapsulate flattening/c
```sql
CREATE VIEW v1 AS
SELECT
CAST(_full_document._id, 'String') AS object_id,
CAST(_full_document.order_id, 'String') AS order_id,
CAST(_full_document.customer_id, 'Int64') AS customer_id,
CAST(_full_document.status, 'String') AS status,
CAST(_full_document.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(_full_document.order_date, 3), 'DATETIME(3)') AS order_date,
_full_document.^shipping AS shipping_info,
_full_document.items AS items
CAST(doc._id, 'String') AS object_id,
CAST(doc.order_id, 'String') AS order_id,
CAST(doc.customer_id, 'Int64') AS customer_id,
CAST(doc.status, 'String') AS status,
CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
doc.^shipping AS shipping_info,
doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
```
Expand Down Expand Up @@ -266,11 +266,11 @@ LIMIT 10;

### Refreshable materialized view {#refreshable-materialized-view}

You can also create [Refreshable Materialized Views](https://clickhouse.com/docs/materialized-view/refreshable-materialized-view), which enable you to schedule query execution for deduplicating rows and storing the results in a flattened destination table. With each scheduled refresh, the destination table is replaced with the latest query results.
You can create [Refreshable Materialized Views](https://clickhouse.com/docs/materialized-view/refreshable-materialized-view), which enable you to schedule query execution for deduplicating rows and storing the results in a flattened destination table. With each scheduled refresh, the destination table is replaced with the latest query results.

The key advantage of this method is that the query using the `FINAL` keyword runs only once during the refresh, eliminating the need for subsequent queries on the destination table to use `FINAL`.

However, a drawback is that the data in the destination table is only as up-to-date as the most recent refresh. For many use cases, refresh intervals ranging from several minutes to a few hours provide a good balance between data freshness and query performance.
A drawback is that the data in the destination table is only as up-to-date as the most recent refresh. For many use cases, refresh intervals ranging from several minutes to a few hours provide a good balance between data freshness and query performance.

```sql
CREATE TABLE flattened_t1 (
Expand All @@ -287,16 +287,16 @@ ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW mv1 REFRESH EVERY 1 HOUR TO flattened_t1 AS
CREATE MATERIALIZED VIEW rmv REFRESH EVERY 1 HOUR TO flattened_t1 AS
SELECT
CAST(_full_document._id, 'String') AS _id,
CAST(_full_document.order_id, 'String') AS order_id,
CAST(_full_document.customer_id, 'Int64') AS customer_id,
CAST(_full_document.status, 'String') AS status,
CAST(_full_document.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(_full_document.order_date, 3), 'DATETIME(3)') AS order_date,
_full_document.^shipping AS shipping_info,
_full_document.items AS items
CAST(doc._id, 'String') AS _id,
CAST(doc.order_id, 'String') AS order_id,
CAST(doc.customer_id, 'Int64') AS customer_id,
CAST(doc.status, 'String') AS status,
CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
doc.^shipping AS shipping_info,
doc.items AS items
FROM t1 FINAL
WHERE _peerdb_is_deleted = 0;
```
Expand All @@ -313,3 +313,57 @@ GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;
```

### Incremental materialized view {#incremental-materialized-view}

If you want to access flattened columns in real-time, you can create [Incremental Materialized Views](https://clickhouse.com/docs/materialized-view/incremental-materialized-view). If your table has frequent updates, it's not recommended to use the `FINAL` modifier in your materialized view as every update will trigger a merge. Instead, you can deduplicate the data at query time by building a normal view on top of the materialized view.

```sql
CREATE TABLE flattened_t1 (
`_id` String,
`order_id` String,
`customer_id` Int64,
`status` String,
`total_amount` Decimal(18, 2),
`order_date` DateTime64(3),
`shipping_info` JSON,
`items` Dynamic,
`_peerdb_version` Int64,
`_peerdb_synced_at` DateTime64(9),
`_peerdb_is_deleted` Int8
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY _id
ORDER BY _id;

CREATE MATERIALIZED VIEW imv TO flattened_t1 AS
SELECT
CAST(doc._id, 'String') AS _id,
CAST(doc.order_id, 'String') AS order_id,
CAST(doc.customer_id, 'Int64') AS customer_id,
CAST(doc.status, 'String') AS status,
CAST(doc.total_amount, 'Decimal64(2)') AS total_amount,
CAST(parseDateTime64BestEffortOrNull(doc.order_date, 3), 'DATETIME(3)') AS order_date,
doc.^shipping AS shipping_info,
doc.items,
_peerdb_version,
_peerdb_synced_at,
_peerdb_is_deleted
FROM t1;

CREATE VIEW flattened_t1_final AS
SELECT * FROM flattened_t1 FINAL WHERE _peerdb_is_deleted = 0;
```

You can now query the view `flattened_t1_final` as follows:

```sql
SELECT
customer_id,
sum(total_amount)
FROM flattened_t1_final
AND shipping_info.city = 'Seattle'
GROUP BY customer_id
ORDER BY customer_id DESC
LIMIT 10;
```
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import Image from '@theme/IdealImage';
<BetaBadge/>

:::info
Currently, ingesting data from MySQL to ClickHouse Cloud via ClickPipes is in public beta.
Ingesting data from MySQL to ClickHouse Cloud via ClickPipes is in public beta.
:::

You can use ClickPipes to ingest data from your source MySQL database into ClickHouse Cloud. The source MySQL database can be hosted on-premises or in the cloud using services like Amazon RDS, Google Cloud SQL, and others.
Expand Down
2 changes: 2 additions & 0 deletions scripts/aspell-dict-file.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1103,3 +1103,5 @@ explorative
ReceiveMessage
--docs/integrations/data-ingestion/clickpipes/postgres/postgres_generated_columns.md--
RelationMessage
--docs/integrations/data-ingestion/clickpipes/mongodb/faq.md--
resumable
1 change: 1 addition & 0 deletions sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ const sidebars = {
"integrations/data-ingestion/clickpipes/mongodb/datatypes",
"integrations/data-ingestion/clickpipes/mongodb/quickstart",
"integrations/data-ingestion/clickpipes/mongodb/lifecycle",
"integrations/data-ingestion/clickpipes/mongodb/faq",
{
type: "category",
label: "Operations",
Expand Down