Skip to content

DOC-2684 feat(loading): new kafka data source; #807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 8, 2025
2 changes: 1 addition & 1 deletion modules/cluster-and-ha-management/pages/ha-overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Here you find detailed Instructions for the removal of a failed node.

== File and Kafka loaders HA with Auto-Restart

Loading jobs for xref:tigergraph-server:data-loading:load-from-kafka.adoc[Kafka] and xref:tigergraph-server:data-loading:load-local-files.adoc[Local Files] will automatically restart the loader job process if it unexpectedly exits.
Loading jobs for xref:tigergraph-server:data-loading:load-from-kafka-mm.adoc[Kafka] and xref:tigergraph-server:data-loading:load-local-files.adoc[Local Files] will automatically restart the loader job process if it unexpectedly exits.
And will then continue to import data from the loading job.

This functionality is enabled by default, but users can disable this feature by setting `LoaderRetryMax=0` through `gadmin config set RESTPP.BasicConfig.Env`.
Expand Down
1 change: 1 addition & 0 deletions modules/data-loading/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
** xref:load-from-cloud.adoc[from Cloud Storage]
** xref:load-from-warehouse.adoc[from Data Warehouse]
** xref:load-from-kafka.adoc[from External Kafka]
** xref:load-from-kafka-mm.adoc[from External Kafka (Deprecated)]
*** xref:avro-validation-with-kafka.adoc[Avro Data Validation]
*** xref:kafka-ssl-security-guide.adoc[]
** xref:load-from-spark-dataframe.adoc[from Spark]
Expand Down
2 changes: 1 addition & 1 deletion modules/data-loading/pages/avro-validation-with-kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

== Overview

In certain scenarios, users could load data in Avro format to TigerGraph DB, via an external Kafka connector, such as MirrorMakerConnector and experience malformed data errors during this process.
In certain scenarios, users could load data in Avro format to TigerGraph DB via an external Kafka connector, such as MirrorMakerConnector, and experience malformed data errors during this process.
This generated vague error messages and loading failures.

The KafkaConnect feature flag `ErrorTolerance` and the new transformations enables data loading services to handle malformed data and report errors effectively.
Expand Down
2 changes: 1 addition & 1 deletion modules/data-loading/pages/data-loading-overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ xref:load-from-cloud.adoc[cloud storage],
xref:load-from-warehouse.adoc#_bigquery[BigQuery],
xref:load-from-warehouse.adoc#_snowflake[Snowflake],
xref:load-from-warehouse.adoc#_postgresql[PostgreSQL] or
xref:tigergraph-server:data-loading:load-from-kafka.adoc#_configure_the_kafka_source[Kafka]
xref:load-from-kafka.adoc#_configure_the_kafka_source[Kafka]


. *Create a xref:#_loading_jobs[loading job]*.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
= Externalizing Kafka Configurations

Users can utilize external sources, including files, vault, and environment variables, to provide configurations for Kafka connectors when xref:tigergraph-server:data-loading:load-from-kafka.adoc[Loading data from Kafka].
Users can utilize external sources, including files, vault, and environment variables, to provide configurations for Kafka connectors when creating data sources.

Sensitive information such as credentials and security setups are kept secure.
For more information on Kafka security see https://docs.confluent.io/platform/current/connect/security.html#externalize-secrets[Kafka Connect Security Basics].
Expand Down
3 changes: 1 addition & 2 deletions modules/data-loading/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ Instructions for loading files stored in third-party cloud storage
Instructions for loading query results from a data warehouse
(xref:load-from-warehouse.adoc#_bigquery[BigQuery], xref:load-from-warehouse.adoc#_snowflake[Snowflake], and xref:tigergraph-server:data-loading:load-from-warehouse.adoc#_postgresql[PostgreSql]).

== xref:load-from-kafka.adoc[Load Data from an External Kafka Cluster, in v3.9.3+]
== xref:load-from-kafka.adoc[Load Data from an External Kafka Cluster, in v4.2.0+]
Instructions for loading records from Kafka topics including CSV, JSON and Avro formats.
With additional instructions on xref:avro-validation-with-kafka.adoc[Avro Data Validation through KafkaConnect] and how to xref:tigergraph-server:data-loading:kafka-ssl-security-guide.adoc[Set up SSL on Kafka] or xref:tigergraph-server:data-loading:externalizing-kafka-configs.adoc[Externalize Kafka Configs].

== xref:load-from-spark-dataframe.adoc[Load from a Spark DataFrame]

Expand Down
67 changes: 0 additions & 67 deletions modules/data-loading/pages/kafka-ssl-security-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -164,73 +164,6 @@ gadmin config apply -y
gadmin restart all -y
----

=== Instructions on Enabling SSL for MirrorMaker2
Settings below need be added to connector configuration:

[NOTE]
====
See xref:load-from-kafka.adoc#_configure_the_kafka_source[Configuration Settings] for more information on connection configurations.
====

* `source.cluster.bootstrap.servers=<Source_Kafka_SSL_Broker_List>`
* `target.cluster.bootstrap.servers=<Target_Kafka_SSL_Broker_List>`
* `source.cluster.security.protocol=SSL`
* `target.cluster.security.protocol=SSL`

.A full connector configuration example, with schema registry:
[console]
----
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector

source.cluster.alias=Primary

target.cluster.alias=Secondary

source.cluster.bootstrap.servers=195.0.0.1:30001

target.cluster.bootstrap.servers=127.0.0.1:30001

source.cluster.security.protocol=SSL

source->target.enabled=true

topics=${topic_avro_with_registry}

replication.factor=1

sync.topic.acls.enabled=false

checkpoints.topic.replication.factor=1

heartbeats.topic.replication.factor=1

offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1

status.storage.replication.factor=1

config.storage.replication.factor=1

emit.heartbeats.interval.seconds=5

secondary.scheduled.rebalance.max.delay.ms=35000

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

header.converter=org.apache.kafka.connect.converters.ByteArrayConverter

value.converter=com.tigergraph.kafka.connect.converters.TigerGraphAvroConverter

value.converter.schema.registry.url=http://127.0.0.1:8081

[connector_mm]

name=connector_name_with_schema_registry

tasks.max=10
----

=== Instructions on Enabling SSL for Cross-Region Replication

[console]
Expand Down
47 changes: 47 additions & 0 deletions modules/data-loading/pages/load-from-kafka-mm.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
:toc:
:toclevels: 4
= Load from External Kafka
//:page-aliases: data-loading:kafka-loader:kafka-ssl-sasl.adoc


include::partial$load-part1-intro-and-schema.adoc[]

include::partial$load-part2-create-data-source.adoc[]

// Custom data source content for <source type>
include::partial$kafka-mm/kafka-data-source-details.adoc[]

include::partial$load-part3-create-loading-job.adoc[]

// Example loading job for <source type>
include::partial$kafka-mm/kafka-example-loading-job.adoc[]

include::partial$load-part3A-define-filenames.adoc[]

// For <source type>, the following format rules apply for filenames.
include::partial$kafka-mm/kafka-filename-details.adoc[]

include::partial$load-part3B-specify-mapping.adoc[]

// Custom data mapping notes for <source type>
include::partial$kafka-mm/kafka-specify-mapping-details.adoc[]

=== Avro Data Validation

In certain scenarios, users could load data in Avro format to TigerGraph DB via an external Kafka connector, such as MirrorMakerConnector, and experience malformed data errors during this process.
See our documentation on xref:tigergraph-server:data-loading:avro-validation-with-kafka.adoc[] for help.

include::partial$load-part4-run-job.adoc[]

// Custom notes about run loading for <source type>
include::partial$kafka-mm/kafka-run-loading-details.adoc[]

include::partial$load-part5-monitor-and-manage.adoc[]

== Kafka Loader Auto-Restart

See xref:tigergraph-server:cluster-and-ha-management:ha-overview.adoc#_support_file_and_kafka_loader_by_auto_restart[High Availability (HA) Overview].

include::partial$load-part6-known-issues.adoc[]

// Custom known issues for <source type>
11 changes: 0 additions & 11 deletions modules/data-loading/pages/load-from-kafka.adoc
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
:toc:
:toclevels: 4
= Load from External Kafka
//:page-aliases: data-loading:kafka-loader:kafka-ssl-sasl.adoc


include::partial$load-part1-intro-and-schema.adoc[]

Expand All @@ -26,22 +24,13 @@ include::partial$load-part3B-specify-mapping.adoc[]
// Custom data mapping notes for <source type>
include::partial$kafka/kafka-specify-mapping-details.adoc[]

=== Avro Data Validation

In certain scenarios, users could load data in Avro format to TigerGraph DB, via an external Kafka connector, such as MirrorMakerConnector and experience malformed data errors during this process.
See our documentation on xref:tigergraph-server:data-loading:avro-validation-with-kafka.adoc[] for help.

include::partial$load-part4-run-job.adoc[]

// Custom notes about run loading for <source type>
include::partial$kafka/kafka-run-loading-details.adoc[]

include::partial$load-part5-monitor-and-manage.adoc[]

== Kafka Loader Auto-Restart

See xref:tigergraph-server:cluster-and-ha-management:ha-overview.adoc#_support_file_and_kafka_loader_by_auto_restart[High Availability (HA) Overview].

Comment on lines -41 to -44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pingxieTG Why do you propose to remove this?
Does the loader HA working differently? Do the parameters mentioned in that section
(https://docs.tigergraph.com/tigergraph-server/4.1/cluster-and-ha-management/ha-overview#_file_and_kafka_loaders_ha_with_auto_restart) no longer apply?

include::partial$load-part6-known-issues.adoc[]

// Custom known issues for <source type>
2 changes: 1 addition & 1 deletion modules/data-loading/pages/manage-data-source.adoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
= Manage Data Sources
//:page-aliases: data-loading:kafka-loader:manage-data-source.adoc

Loading jobs which using the Kafka Loader
Loading jobs which use the Kafka Loader
(from xref:data-loading:load-from-cloud.adoc[cloud storage],
xref:data-loading:load-from-warehouse.adoc[data warehouses],
or xref:data-loading:load-from-kafka.adoc[external Kafka])
Expand Down
Loading