diff --git a/modules/cluster-and-ha-management/pages/ha-overview.adoc b/modules/cluster-and-ha-management/pages/ha-overview.adoc index 22e00700..ff41bc7c 100644 --- a/modules/cluster-and-ha-management/pages/ha-overview.adoc +++ b/modules/cluster-and-ha-management/pages/ha-overview.adoc @@ -82,7 +82,7 @@ Here you find detailed Instructions for the removal of a failed node. == File and Kafka loaders HA with Auto-Restart -Loading jobs for xref:tigergraph-server:data-loading:load-from-kafka.adoc[Kafka] and xref:tigergraph-server:data-loading:load-local-files.adoc[Local Files] will automatically restart the loader job process if it unexpectedly exits. +Loading jobs for xref:tigergraph-server:data-loading:load-from-kafka-mm.adoc[Kafka] and xref:tigergraph-server:data-loading:load-local-files.adoc[Local Files] will automatically restart the loader job process if it unexpectedly exits. And will then continue to import data from the loading job. This functionality is enabled by default, but users can disable this feature by setting `LoaderRetryMax=0` through `gadmin config set RESTPP.BasicConfig.Env`. diff --git a/modules/data-loading/nav.adoc b/modules/data-loading/nav.adoc index e8c20995..7f21db7b 100644 --- a/modules/data-loading/nav.adoc +++ b/modules/data-loading/nav.adoc @@ -4,6 +4,7 @@ ** xref:load-from-cloud.adoc[from Cloud Storage] ** xref:load-from-warehouse.adoc[from Data Warehouse] ** xref:load-from-kafka.adoc[from External Kafka] +** xref:load-from-kafka-mm.adoc[from External Kafka (Deprecated)] *** xref:avro-validation-with-kafka.adoc[Avro Data Validation] *** xref:kafka-ssl-security-guide.adoc[] ** xref:load-from-spark-dataframe.adoc[from Spark] diff --git a/modules/data-loading/pages/avro-validation-with-kafka.adoc b/modules/data-loading/pages/avro-validation-with-kafka.adoc index 3a9853bd..4649eebe 100644 --- a/modules/data-loading/pages/avro-validation-with-kafka.adoc +++ b/modules/data-loading/pages/avro-validation-with-kafka.adoc @@ -2,7 +2,7 @@ == Overview -In certain scenarios, users could load data in Avro format to TigerGraph DB, via an external Kafka connector, such as MirrorMakerConnector and experience malformed data errors during this process. +In certain scenarios, users could load data in Avro format to TigerGraph DB via an external Kafka connector, such as MirrorMakerConnector, and experience malformed data errors during this process. This generated vague error messages and loading failures. The KafkaConnect feature flag `ErrorTolerance` and the new transformations enables data loading services to handle malformed data and report errors effectively. diff --git a/modules/data-loading/pages/data-loading-overview.adoc b/modules/data-loading/pages/data-loading-overview.adoc index 03708b69..33447c3c 100644 --- a/modules/data-loading/pages/data-loading-overview.adoc +++ b/modules/data-loading/pages/data-loading-overview.adoc @@ -49,7 +49,7 @@ xref:load-from-cloud.adoc[cloud storage], xref:load-from-warehouse.adoc#_bigquery[BigQuery], xref:load-from-warehouse.adoc#_snowflake[Snowflake], xref:load-from-warehouse.adoc#_postgresql[PostgreSQL] or -xref:tigergraph-server:data-loading:load-from-kafka.adoc#_configure_the_kafka_source[Kafka] +xref:load-from-kafka.adoc#_configure_the_kafka_source[Kafka] . *Create a xref:#_loading_jobs[loading job]*. diff --git a/modules/data-loading/pages/externalizing-kafka-configs.adoc b/modules/data-loading/pages/externalizing-kafka-configs.adoc index 6b1aa73f..3a82e489 100644 --- a/modules/data-loading/pages/externalizing-kafka-configs.adoc +++ b/modules/data-loading/pages/externalizing-kafka-configs.adoc @@ -1,6 +1,6 @@ = Externalizing Kafka Configurations -Users can utilize external sources, including files, vault, and environment variables, to provide configurations for Kafka connectors when xref:tigergraph-server:data-loading:load-from-kafka.adoc[Loading data from Kafka]. +Users can utilize external sources, including files, vault, and environment variables, to provide configurations for Kafka connectors when creating data sources. Sensitive information such as credentials and security setups are kept secure. For more information on Kafka security see https://docs.confluent.io/platform/current/connect/security.html#externalize-secrets[Kafka Connect Security Basics]. diff --git a/modules/data-loading/pages/index.adoc b/modules/data-loading/pages/index.adoc index ea96b68e..c38eee06 100644 --- a/modules/data-loading/pages/index.adoc +++ b/modules/data-loading/pages/index.adoc @@ -23,9 +23,8 @@ Instructions for loading files stored in third-party cloud storage Instructions for loading query results from a data warehouse (xref:load-from-warehouse.adoc#_bigquery[BigQuery], xref:load-from-warehouse.adoc#_snowflake[Snowflake], and xref:tigergraph-server:data-loading:load-from-warehouse.adoc#_postgresql[PostgreSql]). -== xref:load-from-kafka.adoc[Load Data from an External Kafka Cluster, in v3.9.3+] +== xref:load-from-kafka.adoc[Load Data from an External Kafka Cluster, in v4.2.0+] Instructions for loading records from Kafka topics including CSV, JSON and Avro formats. -With additional instructions on xref:avro-validation-with-kafka.adoc[Avro Data Validation through KafkaConnect] and how to xref:tigergraph-server:data-loading:kafka-ssl-security-guide.adoc[Set up SSL on Kafka] or xref:tigergraph-server:data-loading:externalizing-kafka-configs.adoc[Externalize Kafka Configs]. == xref:load-from-spark-dataframe.adoc[Load from a Spark DataFrame] diff --git a/modules/data-loading/pages/kafka-ssl-security-guide.adoc b/modules/data-loading/pages/kafka-ssl-security-guide.adoc index 208fa5a6..1072bebb 100644 --- a/modules/data-loading/pages/kafka-ssl-security-guide.adoc +++ b/modules/data-loading/pages/kafka-ssl-security-guide.adoc @@ -164,73 +164,6 @@ gadmin config apply -y gadmin restart all -y ---- -=== Instructions on Enabling SSL for MirrorMaker2 -Settings below need be added to connector configuration: - -[NOTE] -==== -See xref:load-from-kafka.adoc#_configure_the_kafka_source[Configuration Settings] for more information on connection configurations. -==== - -* `source.cluster.bootstrap.servers=` -* `target.cluster.bootstrap.servers=` -* `source.cluster.security.protocol=SSL` -* `target.cluster.security.protocol=SSL` - -.A full connector configuration example, with schema registry: -[console] ----- -connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector - -source.cluster.alias=Primary - -target.cluster.alias=Secondary - -source.cluster.bootstrap.servers=195.0.0.1:30001 - -target.cluster.bootstrap.servers=127.0.0.1:30001 - -source.cluster.security.protocol=SSL - -source->target.enabled=true - -topics=${topic_avro_with_registry} - -replication.factor=1 - -sync.topic.acls.enabled=false - -checkpoints.topic.replication.factor=1 - -heartbeats.topic.replication.factor=1 - -offset-syncs.topic.replication.factor=1 - -offset.storage.replication.factor=1 - -status.storage.replication.factor=1 - -config.storage.replication.factor=1 - -emit.heartbeats.interval.seconds=5 - -secondary.scheduled.rebalance.max.delay.ms=35000 - -key.converter=org.apache.kafka.connect.converters.ByteArrayConverter - -header.converter=org.apache.kafka.connect.converters.ByteArrayConverter - -value.converter=com.tigergraph.kafka.connect.converters.TigerGraphAvroConverter - -value.converter.schema.registry.url=http://127.0.0.1:8081 - -[connector_mm] - -name=connector_name_with_schema_registry - -tasks.max=10 ----- - === Instructions on Enabling SSL for Cross-Region Replication [console] diff --git a/modules/data-loading/pages/load-from-kafka-mm.adoc b/modules/data-loading/pages/load-from-kafka-mm.adoc new file mode 100644 index 00000000..59c79b37 --- /dev/null +++ b/modules/data-loading/pages/load-from-kafka-mm.adoc @@ -0,0 +1,47 @@ +:toc: +:toclevels: 4 += Load from External Kafka +//:page-aliases: data-loading:kafka-loader:kafka-ssl-sasl.adoc + + +include::partial$load-part1-intro-and-schema.adoc[] + +include::partial$load-part2-create-data-source.adoc[] + +// Custom data source content for +include::partial$kafka-mm/kafka-data-source-details.adoc[] + +include::partial$load-part3-create-loading-job.adoc[] + +// Example loading job for +include::partial$kafka-mm/kafka-example-loading-job.adoc[] + +include::partial$load-part3A-define-filenames.adoc[] + +// For , the following format rules apply for filenames. +include::partial$kafka-mm/kafka-filename-details.adoc[] + +include::partial$load-part3B-specify-mapping.adoc[] + +// Custom data mapping notes for +include::partial$kafka-mm/kafka-specify-mapping-details.adoc[] + +=== Avro Data Validation + +In certain scenarios, users could load data in Avro format to TigerGraph DB via an external Kafka connector, such as MirrorMakerConnector, and experience malformed data errors during this process. +See our documentation on xref:tigergraph-server:data-loading:avro-validation-with-kafka.adoc[] for help. + +include::partial$load-part4-run-job.adoc[] + +// Custom notes about run loading for +include::partial$kafka-mm/kafka-run-loading-details.adoc[] + +include::partial$load-part5-monitor-and-manage.adoc[] + +== Kafka Loader Auto-Restart + +See xref:tigergraph-server:cluster-and-ha-management:ha-overview.adoc#_support_file_and_kafka_loader_by_auto_restart[High Availability (HA) Overview]. + +include::partial$load-part6-known-issues.adoc[] + +// Custom known issues for \ No newline at end of file diff --git a/modules/data-loading/pages/load-from-kafka.adoc b/modules/data-loading/pages/load-from-kafka.adoc index 125bd874..3a59d879 100644 --- a/modules/data-loading/pages/load-from-kafka.adoc +++ b/modules/data-loading/pages/load-from-kafka.adoc @@ -1,8 +1,6 @@ :toc: :toclevels: 4 = Load from External Kafka -//:page-aliases: data-loading:kafka-loader:kafka-ssl-sasl.adoc - include::partial$load-part1-intro-and-schema.adoc[] @@ -26,11 +24,6 @@ include::partial$load-part3B-specify-mapping.adoc[] // Custom data mapping notes for include::partial$kafka/kafka-specify-mapping-details.adoc[] -=== Avro Data Validation - -In certain scenarios, users could load data in Avro format to TigerGraph DB, via an external Kafka connector, such as MirrorMakerConnector and experience malformed data errors during this process. -See our documentation on xref:tigergraph-server:data-loading:avro-validation-with-kafka.adoc[] for help. - include::partial$load-part4-run-job.adoc[] // Custom notes about run loading for @@ -38,10 +31,6 @@ include::partial$kafka/kafka-run-loading-details.adoc[] include::partial$load-part5-monitor-and-manage.adoc[] -== Kafka Loader Auto-Restart - -See xref:tigergraph-server:cluster-and-ha-management:ha-overview.adoc#_support_file_and_kafka_loader_by_auto_restart[High Availability (HA) Overview]. - include::partial$load-part6-known-issues.adoc[] // Custom known issues for \ No newline at end of file diff --git a/modules/data-loading/pages/manage-data-source.adoc b/modules/data-loading/pages/manage-data-source.adoc index 2d92fd36..702cbf96 100644 --- a/modules/data-loading/pages/manage-data-source.adoc +++ b/modules/data-loading/pages/manage-data-source.adoc @@ -1,7 +1,7 @@ = Manage Data Sources //:page-aliases: data-loading:kafka-loader:manage-data-source.adoc -Loading jobs which using the Kafka Loader +Loading jobs which use the Kafka Loader (from xref:data-loading:load-from-cloud.adoc[cloud storage], xref:data-loading:load-from-warehouse.adoc[data warehouses], or xref:data-loading:load-from-kafka.adoc[external Kafka]) diff --git a/modules/data-loading/partials/kafka-mm/kafka-data-source-details.adoc b/modules/data-loading/partials/kafka-mm/kafka-data-source-details.adoc new file mode 100644 index 00000000..e890e15e --- /dev/null +++ b/modules/data-loading/partials/kafka-mm/kafka-data-source-details.adoc @@ -0,0 +1,260 @@ +=== Configure the Kafka source + +The TigerGraph connector to external Kafka sources makes use of https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330[Apache Kafka Mirrormaker]. + +[NOTE] +==== +In addition, users can utilize external sources to provide configurations for Kafka connectors. These sources include files, Vault, and environment variables. See xref:data-loading:externalizing-kafka-configs.adoc[]. +==== + +To configure the data source object, the minimum requirement is the address of the external source Kafka cluster: + +[source,json,linenum] +.Data source configuration for external Kafka +---- +{ + "type": "mirrormaker", + "source.cluster.bootstrap.servers": "" +} +---- + +==== Configuration Settings + +[%header,cols="1,2"] +|=== +| Field | Description + +| `.security.protocol` +| Protocol used to communicate with brokers. +Valid values are: `PLAINTEXT`, `SSL, `SASL_PLAINTEXT`, `SASL_SSL`. +The default is `PLAINTEXT`. + +| `.sasl.mechanism` +| SASL mechanism used for client connections. +This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism. +Third party providers require configuring `.sasl.kerberos.service.name` and `.sasl.client.callback.handler.class` +as well as placing the third party jar under `$(gadmin config get System.Approot)/kafka/libs/`. + +| `.sasl.kerberos.service.name` +| The Kerberos principal name used by your Kafka brokers. +This could be defined in either JAAS configuration or Kafka’s configuration. + +| `.sasl.jaas.config` +| JAAS login context parameters for SASL connections in the format used by JAAS configuration files. +See https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html[JAAS Login Configuration File] for details. + +| `.sasl.client.callback.handler.class` +| Name of java handler class required for third party `.sasl.mechanism` which must be placed under `$(gadmin config get System.Approot)/kafka/libs/`. + +| `.ssl.endpoint.identification.algorithm` +| The endpoint identification algorithm used to validate server hostname in the server certificate. Default is `https`. +If the value is set to an empty string, this will disable server host name verification. + +| `.ssl.keystore.location` +| The location of the key store file. + +| `.ssl.keystore.password` +| The password of the key store file. + +| `.ssl.key.password` +| The password of the private key in the key store file or the PEM key specified in `ssl.keystore.key`. + +| `.ssl.truststore.location` +| The location of the trust store file. + +| `.ssl.truststore.password` +| The password for the trust store file. +|=== + +===== Component Prefix +Replace the `` with the appropriate identifier: + +* `[admin | producer | consumer]` +* `[source | target].cluster` +* `[source | target].cluster.[admin | producer | consumer]` + +===== Security Protocols + +If the source cluster is configured for SSL or SASL protocols, you need to provide the following SSL/SASL credentials in order to communicate with the source cluster. + +* If the source cluster uses SASL, you need to upload the keytab of each Kerberos principal to every node of your TigerGraph cluster at the same absolute path. +* If the source cluster uses SSL, see our documentation xref:tigergraph-server:data-loading:kafka-ssl-security-guide.adoc[] +* If the source cluster uses SASL *and* SSL, you need to upload the keytab of each Kerberos principal, as well as the key store and truststore to every node of your TigerGraph cluster. Each file must be at the same absolute path on all nodes. + +The following configurations are required for admin, producer and consumer. Kafka allows SSL settings overriding, respecting security settings with the following precedence: + +* `generic.ssl.setting` < `source/target.cluster.ssl.setting` < `admin/producer/consumer.ssl.setting`. + +If both source and target clusters are sharing the same SSL settings, user can set generic settings for both source/target clusters and all the rols(admin/producer/consumer). + +For example, user can set `ssl.keystore.location="/path/to/key/store"` instead of: + +* `source.cluster.ssl.keystore.location="/path/to/key/store"` +* `admin.ssl.keystore.location="/path/to/key/store"` +* `source.cluster.admin.ssl.keystore.location="/path/to/key/store"`. + +If source and target clusters have different SSL settings, it is possible to set cluster wide SSL configs. + +For example, user can set: `target.cluster.ssl.truststore.password="/password/for/trust/store"` instead of: + +* `target.cluster.producer.ssl.trust.password="/password/for/trust/store"`. + +Note: SSL is now well supported by TigerGraph, we recommend users to set up regular SSL rather than SASL + PlainText/SSL. + +==== Supported Configuration Examples +===== PLAINTEXT +[source,json,linenum] +---- +{ + "type": "mirrormaker", + "source.cluster.bootstrap.servers": ":" +} +---- + +===== SSL +Need to configure: + +* `.security.protocol` +* `.ssl.` + +[source,json,linenum] +---- +{ + "type": "mirrormaker", + "source.cluster.bootstrap.servers": ":", + + "consumer.security.protocol": "SSL", + "consumer.ssl.endpoint.identification.algorithm": "none", + "consumer.ssl.keystore.location": "/path/to/client.keystore.jks", + "consumer.ssl.keystore.password": "******", + "consumer.ssl.key.password": "******", + "consumer.ssl.truststore.location": "/path/to/client.truststore.jks", + "consumer.ssl.truststore.password": "******", + + "source.admin.security.protocol": "SSL", + "source.admin.ssl.endpoint.identification.algorithm": "none", + "source.admin.ssl.keystore.location": "/path/to/client.keystore.jks", + "source.admin.ssl.keystore.password": "******", + "source.admin.ssl.key.password": "******", + "source.admin.ssl.truststore.location": "/path/to/client.truststore.jks", + "source.admin.ssl.truststore.password": "******", + + "producer.security.protocol": "SSL", + "producer.ssl.endpoint.identification.algorithm": "none", + "producer.ssl.keystore.location": "/path/to/client.keystore.jks", + "producer.ssl.keystore.password": "******", + "producer.ssl.key.password": "******", + "producer.ssl.truststore.location": "/path/to/client.truststore.jks", + "producer.ssl.truststore.password": "******" +} +---- + +===== SASL_PLAINTEXT +Need to configure: + +* `.security.protocol` +* `.sasl.` + +[source,json,linenum] +---- +{ + "type": "mirrormaker", + "source.cluster.bootstrap.servers": ":", + + "consumer.security.protocol": "SASL_PLAINTEXT", + "consumer.sasl.mechanism": "", + "consumer.sasl.jaas.config": "", + + "source.admin.security.protocol": "SASL_PLAINTEXT", + "source.admin.sasl.mechanism": "", + "source.admin.sasl.jaas.config": "", + + "producer.security.protocol": "SASL_PLAINTEXT", + "producer.sasl.mechanism": "", + "producer.sasl.jaas.config": "", +} +---- + +===== SASL_SSL +Need to configure: + +* `.security.protocol` +* `.sasl.` +* `.ssl.` + +[source,json,linenum] +---- +{ + "type": "mirrormaker", + "source.cluster.bootstrap.servers": ":", + + "consumer.security.protocol": "SASL_SSL", + "consumer.sasl.mechanism": "", + "consumer.sasl.jaas.config": "", + "consumer.ssl.endpoint.identification.algorithm": "none", + "consumer.ssl.keystore.location": "/path/to/client.keystore.jks", + "consumer.ssl.keystore.password": "******", + "consumer.ssl.key.password": "******", + "consumer.ssl.truststore.location": "/path/to/client.truststore.jks", + "consumer.ssl.truststore.password": "******", + + "source.admin.security.protocol": "SASL_PLAINTEXT", + "source.admin.sasl.mechanism": "", + "source.admin.sasl.jaas.config": "", + "source.admin.ssl.endpoint.identification.algorithm": "none", + "source.admin.ssl.keystore.location": "/path/to/client.keystore.jks", + "source.admin.ssl.keystore.password": "******", + "source.admin.ssl.key.password": "******", + "source.admin.ssl.truststore.location": "/path/to/client.truststore.jks", + "source.admin.ssl.truststore.password": "******", + + "producer.security.protocol": "SASL_PLAINTEXT", + "producer.sasl.mechanism": "", + "producer.sasl.jaas.config": "", + "producer.ssl.endpoint.identification.algorithm": "none", + "producer.ssl.keystore.location": "/path/to/client.keystore.jks", + "producer.ssl.keystore.password": "******", + "producer.ssl.key.password": "******", + "producer.ssl.truststore.location": "/path/to/client.truststore.jks", + "producer.ssl.truststore.password": "******" +} +---- + +===== Third Party SASL Mechanism +For both `SASL` and `SASL_SSL` when a third party mechanism it is necessary to: + +* Include the `.sasl.jaas.config` in addition to the `.sasl.client.callback.handler.class` in the configuration +* Place the third party jar under `$(gadmin config get System.Approot)/kafka/libs/` + +[source,json,linenum] +.Example SASL Configuration with third party mechanism +---- +{ + "type": "mirrormaker", + "source.cluster.bootstrap.servers": ":", + + "consumer.security.protocol": "SASL_PLAINTEXT", + "consumer.sasl.mechanism": "", + "consumer.sasl.jaas.config": "", + "consumer.sasl.client.callback.handler.class": "", + + "source.admin.security.protocol": "SASL_PLAINTEXT", + "source.admin.sasl.mechanism": "", + "source.admin.sasl.jaas.config": "", + "source.admin.sasl.client.callback.handler.class": "", + + "producer.security.protocol": "SASL_PLAINTEXT", + "producer.sasl.mechanism": "", + "producer.sasl.jaas.config": "", + "producer.sasl.client.callback.handler.class": "" +} +---- + +==== Schema Registry Service +If there is a https://docs.confluent.io/platform/current/schema-registry/index.html[schema registry service] containing the record schema of the source topic, please add it to the data source configuration: + +[source,json] +"value.converter.schema.registry.url": "schema_registry_url" + +[NOTE] +Currently, only Avro schema is supported. \ No newline at end of file diff --git a/modules/data-loading/partials/kafka-mm/kafka-example-loading-job.adoc b/modules/data-loading/partials/kafka-mm/kafka-example-loading-job.adoc new file mode 100644 index 00000000..48696f47 --- /dev/null +++ b/modules/data-loading/partials/kafka-mm/kafka-example-loading-job.adoc @@ -0,0 +1,26 @@ +=== Example loading job from external Kafka + +The following is an example loading job from and external Kafka cluster. + +[source,gsql,linenums] +.Example loading job from external Kafka +---- +USE GRAPH ldbc_snb +CREATE DATA_SOURCE s1 = "ds_config.json" FOR GRAPH ldbc_snb +CREATE LOADING JOB load_data FOR GRAPH ldbc_snb { + DEFINE FILENAME file_Comment = "$s1:topic_Comment"; + DEFINE FILENAME file_Person = "$s1:topic_Person"; + DEFINE FILENAME file_Comment_hasCreator_Person = + "$s1:topic_Comment_hasCreator_Person"; + LOAD file_Comment + TO VERTEX Comment + VALUES ($1, $0, $2, $3, $4, $5) USING separator="|"; + LOAD file_Person + TO VERTEX Person + VALUES ($1, $2, $3, $4, $5, $0, $6, $7, + SPLIT($8,";"), SPLIT($9,";")) USING separator="|"; + LOAD file_Comment_hasCreator_Person + TO EDGE HAS_CREATOR + VALUES ($1, $2) USING separator="|"; +} +---- \ No newline at end of file diff --git a/modules/data-loading/partials/kafka-mm/kafka-filename-details.adoc b/modules/data-loading/partials/kafka-mm/kafka-filename-details.adoc new file mode 100644 index 00000000..035c2af0 --- /dev/null +++ b/modules/data-loading/partials/kafka-mm/kafka-filename-details.adoc @@ -0,0 +1,66 @@ +NOTE: While a loading job may have multiple `FILENAME` variables , they must all refer to the same `DATA_SOURCE` object. + +==== Kafka file descriptors +The file descriptor has three valid formats. +You can simply provide the Kafka topic name and use default settings. Or, you can provide configuration details including the topic, either in a JSON file or as inline JSON content. + +[source,php,linenum] +DEFINE FILENAME file_name = "$[data source name]:[topic]"; +DEFINE FILENAME file_name = "$[data source name]:[json config file]"; +DEFINE FILENAME file_name = "$[data source name]:[inline json content]"; + +For example: + +[source,go] +---- +// Format 1: topic only +DEFINE FILENAME file_Person = "$s1:topic_Person"; + +// Format 2: topic and configuration file +DEFINE FILENAME file_Person = "$s1:myfile.json"; + +// Format 3: topic and inline configuration +DEFINE FILENAME file_Person="""$s1:{ + "topic": "topic_Person", + "tasks.max": "10" +}"""; +---- + +==== Filename parameters + +These are the required and optional configuration parameters: + +[%header,cols="1,4,1,4"] +|=== +|Parameter |Description |Required? |Default value + +| topic +| The source topic name +| Required +| N/A + +| tasks.max +| The maximum number of tasks used to consume the source topic. +You can increase this value when the source topic contains multiple partitions. +| Optional +| 1 + +| num.partitions +| The number of partitions to use. +When loading data, each partition is distributed evenly across each node. +If one filename contains much more data than others, consider using a larger partition number. +| Optional +| 3 + +| value.converter +| Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. +This controls the format of the values in messages written to or read from Kafka. +If records are in Avro format with Schema Registry service, use `com.tigergraph.kafka.connect.converters.TigerGraphAvroConverter`. +If records are in Avro format without using Schema Registry service, use `com.tigergraph.kafka.connect.converters.TigerGraphAvroConverterWithoutSchemaRegistry`. +If records are in plaintext or JSON format, use `org.apache.kafka.connect.converters.ByteArrayConverter`. +| Optional +| N/A +|=== + +[NOTE] +For Avro data with schema registry, you must set both `value.converter.schema.registry.url` when defining the DATA_SOURCE and `value.converter` when defining a FILENAME. \ No newline at end of file diff --git a/modules/data-loading/partials/kafka-mm/kafka-run-loading-details.adoc b/modules/data-loading/partials/kafka-mm/kafka-run-loading-details.adoc new file mode 100644 index 00000000..7bb794f3 --- /dev/null +++ b/modules/data-loading/partials/kafka-mm/kafka-run-loading-details.adoc @@ -0,0 +1,2 @@ +[NOTE] +A loading job from an external Kafka clusters runs in streaming mode regardless of the `EOF` setting, i.e., it continuously read new records from the source topic. \ No newline at end of file diff --git a/modules/data-loading/partials/kafka-mm/kafka-specify-mapping-details.adoc b/modules/data-loading/partials/kafka-mm/kafka-specify-mapping-details.adoc new file mode 100644 index 00000000..8f45ed77 --- /dev/null +++ b/modules/data-loading/partials/kafka-mm/kafka-specify-mapping-details.adoc @@ -0,0 +1,13 @@ +[NOTE] +==== +When loading JSON or Avro data, + +* The USING option JSON_FILE="true" must be included. +* Refer to JSON keys (or Avro field names) instead of column index numbers. + +E.g., + +[source,php] +LOAD file_Comment TO VERTEX Comment + VALUES ($"id", $"content") USING JSON_FILE="TRUE" +==== diff --git a/modules/data-loading/partials/kafka/kafka-data-source-details.adoc b/modules/data-loading/partials/kafka/kafka-data-source-details.adoc index e890e15e..c1e48792 100644 --- a/modules/data-loading/partials/kafka/kafka-data-source-details.adoc +++ b/modules/data-loading/partials/kafka/kafka-data-source-details.adoc @@ -1,260 +1,119 @@ === Configure the Kafka source -The TigerGraph connector to external Kafka sources makes use of https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330[Apache Kafka Mirrormaker]. - -[NOTE] -==== -In addition, users can utilize external sources to provide configurations for Kafka connectors. These sources include files, Vault, and environment variables. See xref:data-loading:externalizing-kafka-configs.adoc[]. -==== - -To configure the data source object, the minimum requirement is the address of the external source Kafka cluster: +To configure the data source object for Kafka, the minimum requirement is the address of the external source Kafka cluster: [source,json,linenum] -.Data source configuration for external Kafka +.Data source configuration for an external Kafka source ---- { - "type": "mirrormaker", - "source.cluster.bootstrap.servers": "" + "type": "kafka", + "bootstrap.servers": "{broker-addresses}" } ---- -==== Configuration Settings - -[%header,cols="1,2"] -|=== -| Field | Description - -| `.security.protocol` -| Protocol used to communicate with brokers. -Valid values are: `PLAINTEXT`, `SSL, `SASL_PLAINTEXT`, `SASL_SSL`. -The default is `PLAINTEXT`. - -| `.sasl.mechanism` -| SASL mechanism used for client connections. -This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism. -Third party providers require configuring `.sasl.kerberos.service.name` and `.sasl.client.callback.handler.class` -as well as placing the third party jar under `$(gadmin config get System.Approot)/kafka/libs/`. - -| `.sasl.kerberos.service.name` -| The Kerberos principal name used by your Kafka brokers. -This could be defined in either JAAS configuration or Kafka’s configuration. - -| `.sasl.jaas.config` -| JAAS login context parameters for SASL connections in the format used by JAAS configuration files. -See https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html[JAAS Login Configuration File] for details. - -| `.sasl.client.callback.handler.class` -| Name of java handler class required for third party `.sasl.mechanism` which must be placed under `$(gadmin config get System.Approot)/kafka/libs/`. - -| `.ssl.endpoint.identification.algorithm` -| The endpoint identification algorithm used to validate server hostname in the server certificate. Default is `https`. -If the value is set to an empty string, this will disable server host name verification. - -| `.ssl.keystore.location` -| The location of the key store file. - -| `.ssl.keystore.password` -| The password of the key store file. - -| `.ssl.key.password` -| The password of the private key in the key store file or the PEM key specified in `ssl.keystore.key`. +==== Additional Configurations -| `.ssl.truststore.location` -| The location of the trust store file. +These configurations are consistent with the Java Kafka consumer client settings. -| `.ssl.truststore.password` -| The password for the trust store file. -|=== +To customize the consumer for authentication or advanced configurations, refer to the official Kafka documentation: link:https://kafka.apache.org/documentation/#consumerconfigs[Kafka Consumer Configurations]. -===== Component Prefix -Replace the `` with the appropriate identifier: +==== Examples for SSL Encryption and Authentication -* `[admin | producer | consumer]` -* `[source | target].cluster` -* `[source | target].cluster.[admin | producer | consumer]` +Each configuration must include the `type` and `bootstrap.servers` properties to specify the Kafka data source type and broker addresses. -===== Security Protocols +===== SSL Configuration +To configure SSL for secure communication with the Kafka cluster, include the following properties: -If the source cluster is configured for SSL or SASL protocols, you need to provide the following SSL/SASL credentials in order to communicate with the source cluster. - -* If the source cluster uses SASL, you need to upload the keytab of each Kerberos principal to every node of your TigerGraph cluster at the same absolute path. -* If the source cluster uses SSL, see our documentation xref:tigergraph-server:data-loading:kafka-ssl-security-guide.adoc[] -* If the source cluster uses SASL *and* SSL, you need to upload the keytab of each Kerberos principal, as well as the key store and truststore to every node of your TigerGraph cluster. Each file must be at the same absolute path on all nodes. - -The following configurations are required for admin, producer and consumer. Kafka allows SSL settings overriding, respecting security settings with the following precedence: - -* `generic.ssl.setting` < `source/target.cluster.ssl.setting` < `admin/producer/consumer.ssl.setting`. - -If both source and target clusters are sharing the same SSL settings, user can set generic settings for both source/target clusters and all the rols(admin/producer/consumer). - -For example, user can set `ssl.keystore.location="/path/to/key/store"` instead of: - -* `source.cluster.ssl.keystore.location="/path/to/key/store"` -* `admin.ssl.keystore.location="/path/to/key/store"` -* `source.cluster.admin.ssl.keystore.location="/path/to/key/store"`. - -If source and target clusters have different SSL settings, it is possible to set cluster wide SSL configs. - -For example, user can set: `target.cluster.ssl.truststore.password="/password/for/trust/store"` instead of: - -* `target.cluster.producer.ssl.trust.password="/password/for/trust/store"`. - -Note: SSL is now well supported by TigerGraph, we recommend users to set up regular SSL rather than SASL + PlainText/SSL. - -==== Supported Configuration Examples -===== PLAINTEXT -[source,json,linenum] +[source,json] ---- { - "type": "mirrormaker", - "source.cluster.bootstrap.servers": ":" + "type": "kafka", + "bootstrap.servers": "{broker-addresses}", + "security.protocol": "SSL", + "ssl.truststore.location": "{truststore-location}", + "ssl.truststore.password": "{truststore-password}", + "ssl.keystore.location": "{keystore-location}", + "ssl.keystore.password": "{keystore-password}", + "ssl.key.password": "{key-password}" } ---- -===== SSL -Need to configure: +===== SASL_PLAINTEXT Configuration +For SASL authentication over plaintext, use the following configuration: -* `.security.protocol` -* `.ssl.` - -[source,json,linenum] +[source,json] ---- { - "type": "mirrormaker", - "source.cluster.bootstrap.servers": ":", - - "consumer.security.protocol": "SSL", - "consumer.ssl.endpoint.identification.algorithm": "none", - "consumer.ssl.keystore.location": "/path/to/client.keystore.jks", - "consumer.ssl.keystore.password": "******", - "consumer.ssl.key.password": "******", - "consumer.ssl.truststore.location": "/path/to/client.truststore.jks", - "consumer.ssl.truststore.password": "******", - - "source.admin.security.protocol": "SSL", - "source.admin.ssl.endpoint.identification.algorithm": "none", - "source.admin.ssl.keystore.location": "/path/to/client.keystore.jks", - "source.admin.ssl.keystore.password": "******", - "source.admin.ssl.key.password": "******", - "source.admin.ssl.truststore.location": "/path/to/client.truststore.jks", - "source.admin.ssl.truststore.password": "******", - - "producer.security.protocol": "SSL", - "producer.ssl.endpoint.identification.algorithm": "none", - "producer.ssl.keystore.location": "/path/to/client.keystore.jks", - "producer.ssl.keystore.password": "******", - "producer.ssl.key.password": "******", - "producer.ssl.truststore.location": "/path/to/client.truststore.jks", - "producer.ssl.truststore.password": "******" + "type": "kafka", + "bootstrap.servers": "{broker-addresses}", + "security.protocol": "SASL_PLAINTEXT", + "sasl.mechanism": "PLAIN", + "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{username}\" password=\"{password}\";" } ---- -===== SASL_PLAINTEXT -Need to configure: - -* `.security.protocol` -* `.sasl.` +===== SASL_SSL Configuration +For SASL authentication over SSL, use the following configuration: -[source,json,linenum] +[source,json] ---- { - "type": "mirrormaker", - "source.cluster.bootstrap.servers": ":", - - "consumer.security.protocol": "SASL_PLAINTEXT", - "consumer.sasl.mechanism": "", - "consumer.sasl.jaas.config": "", - - "source.admin.security.protocol": "SASL_PLAINTEXT", - "source.admin.sasl.mechanism": "", - "source.admin.sasl.jaas.config": "", - - "producer.security.protocol": "SASL_PLAINTEXT", - "producer.sasl.mechanism": "", - "producer.sasl.jaas.config": "", + "type": "kafka", + "bootstrap.servers": "{broker-addresses}", + "security.protocol": "SASL_SSL", + "sasl.mechanism": "PLAIN", + "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{username}\" password=\"{password}\";", + "ssl.truststore.location": "{truststore-location}", + "ssl.truststore.password": "{truststore-password}" } ---- -===== SASL_SSL -Need to configure: - -* `.security.protocol` -* `.sasl.` -* `.ssl.` +===== Amazon MSK Configuration +For Amazon MSK with IAM authentication, use the following configuration: -[source,json,linenum] +[source,json] ---- { - "type": "mirrormaker", - "source.cluster.bootstrap.servers": ":", - - "consumer.security.protocol": "SASL_SSL", - "consumer.sasl.mechanism": "", - "consumer.sasl.jaas.config": "", - "consumer.ssl.endpoint.identification.algorithm": "none", - "consumer.ssl.keystore.location": "/path/to/client.keystore.jks", - "consumer.ssl.keystore.password": "******", - "consumer.ssl.key.password": "******", - "consumer.ssl.truststore.location": "/path/to/client.truststore.jks", - "consumer.ssl.truststore.password": "******", - - "source.admin.security.protocol": "SASL_PLAINTEXT", - "source.admin.sasl.mechanism": "", - "source.admin.sasl.jaas.config": "", - "source.admin.ssl.endpoint.identification.algorithm": "none", - "source.admin.ssl.keystore.location": "/path/to/client.keystore.jks", - "source.admin.ssl.keystore.password": "******", - "source.admin.ssl.key.password": "******", - "source.admin.ssl.truststore.location": "/path/to/client.truststore.jks", - "source.admin.ssl.truststore.password": "******", - - "producer.security.protocol": "SASL_PLAINTEXT", - "producer.sasl.mechanism": "", - "producer.sasl.jaas.config": "", - "producer.ssl.endpoint.identification.algorithm": "none", - "producer.ssl.keystore.location": "/path/to/client.keystore.jks", - "producer.ssl.keystore.password": "******", - "producer.ssl.key.password": "******", - "producer.ssl.truststore.location": "/path/to/client.truststore.jks", - "producer.ssl.truststore.password": "******" + "type": "kafka", + "bootstrap.servers": "{broker-addresses}", + "security.protocol": "SASL_SSL", + "sasl.mechanism": "AWS_MSK_IAM", + "sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", + "sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", + "ssl.truststore.location": "{truststore-location}", + "ssl.truststore.password": "{truststore-password}" } ---- -===== Third Party SASL Mechanism -For both `SASL` and `SASL_SSL` when a third party mechanism it is necessary to: - -* Include the `.sasl.jaas.config` in addition to the `.sasl.client.callback.handler.class` in the configuration -* Place the third party jar under `$(gadmin config get System.Approot)/kafka/libs/` +===== Google Cloud Kafka Configuration +For Kafka on Google Cloud with SASL authentication, use the following configuration: -[source,json,linenum] -.Example SASL Configuration with third party mechanism +[source,json] ---- { - "type": "mirrormaker", - "source.cluster.bootstrap.servers": ":", - - "consumer.security.protocol": "SASL_PLAINTEXT", - "consumer.sasl.mechanism": "", - "consumer.sasl.jaas.config": "", - "consumer.sasl.client.callback.handler.class": "", - - "source.admin.security.protocol": "SASL_PLAINTEXT", - "source.admin.sasl.mechanism": "", - "source.admin.sasl.jaas.config": "", - "source.admin.sasl.client.callback.handler.class": "", - - "producer.security.protocol": "SASL_PLAINTEXT", - "producer.sasl.mechanism": "", - "producer.sasl.jaas.config": "", - "producer.sasl.client.callback.handler.class": "" + "type": "kafka", + "bootstrap.servers": "{broker-addresses}", + "security.protocol": "SASL_SSL", + "sasl.mechanism": "OAUTHBEARER", + "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;", + "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler", + "ssl.truststore.location": "{truststore-location}", + "ssl.truststore.password": "{truststore-password}" } ---- ==== Schema Registry Service -If there is a https://docs.confluent.io/platform/current/schema-registry/index.html[schema registry service] containing the record schema of the source topic, please add it to the data source configuration: +To load Avro data through a https://docs.confluent.io/platform/current/schema-registry/index.html[schema registry service], include the schema registry details in the data source configuration: [source,json] -"value.converter.schema.registry.url": "schema_registry_url" +{ + "type": "kafka", + "bootstrap.servers": "{broker-addresses}", + "value.converter.schema.registry.url": "{schema-registry-url}", + "value.converter.schema.registry.basic.auth.credentials.source": "USER_INFO", + "value.converter.schema.registry.basic.auth.user.info": "{username}:{password}", + "value.converter":"com.tigergraph.kafka.connect.converters.TigerGraphAvroConverter" +} [NOTE] Currently, only Avro schema is supported. \ No newline at end of file diff --git a/modules/data-loading/partials/kafka/kafka-filename-details.adoc b/modules/data-loading/partials/kafka/kafka-filename-details.adoc index d258902a..c4c6aadc 100644 --- a/modules/data-loading/partials/kafka/kafka-filename-details.adoc +++ b/modules/data-loading/partials/kafka/kafka-filename-details.adoc @@ -1,6 +1,6 @@ -NOTE: While a loading job may have multiple `FILENAME` variables , they must all refer to the same `DATA_SOURCE` object. +NOTE: While a loading job may have multiple `FILENAME` variables, they must all refer to the same `DATA_SOURCE` object. -==== Kafka file descriptors +==== Filename Descriptors The file descriptor has three valid formats. You can simply provide the Kafka topic name and use default settings. Or, you can provide configuration details including the topic, either in a JSON file or as inline JSON content. @@ -22,10 +22,12 @@ DEFINE FILENAME file_Person = "$s1:myfile.json"; // Format 3: topic and inline configuration DEFINE FILENAME file_Person="""$s1:{ "topic": "topic_Person", - "tasks.max": "10" + "partition.offsets": "0:0,1:3,2:-1" }"""; ---- +To load a topic with more granular control, you can define the filename in JSON format using the parameters described in the following "Filename parameters" section. + ==== Filename parameters These are the required and optional configuration parameters: @@ -39,6 +41,19 @@ These are the required and optional configuration parameters: | Yes | N/A +| time +| The start timestamp to consume from. -1 means consume from the latest offset; -2 means consume from the beginning. If unset, consume from the beginning. If 'partition_offsets' is set, this setting will be ignored. +| Optional +| -2 + +| partition.offsets +| Specifies the partition offsets to consume from, formatted as a comma-separated list of : pairs. +For example, `0:0,1:3,2:-1` indicates that partition 0 starts at offset 0, partition 1 starts at offset 3, and partition 2 starts at the latest offset. +An offset of -1 means the latest offset; a offset of -2 means the beginning of the partition. +If not specified, all partitions are consumed from the beginning. +| Optional +| N/A + | tasks.max | The maximum number of tasks used to consume the source topic. You can increase this value when the source topic contains multiple partitions. @@ -91,5 +106,23 @@ If one filename contains much more data than others, consider increasing this va |=== +For example: + +[source,go] +---- +// Example 1: Specify the start timestamp to consume from +DEFINE FILENAME file_Person="""$s1:{ + "topic": "topic_Person", + "time": "1745203693324" +}"""; + +// Example 2: Specify partition offsets: partition 0 starts at offset 200, partition 1 starts at offset 100, and partition 2 starts at the latest offset +DEFINE FILENAME file_Person="""$s1:{ + "topic": "topic_Person", + "partition.offsets": "0:200,2:100,3:-1" +}"""; +---- + + [NOTE] -For Avro data with schema registry,you must set both `value.converter.schema.registry.url` when defining the DATA_SOURCE and `value.converter` when defining a FILENAME. \ No newline at end of file +For Avro data with schema registry, you must set both `value.converter.schema.registry.url` when defining the DATA_SOURCE and `value.converter` when defining a FILENAME. \ No newline at end of file