Skip to content

Commit

Permalink
Add Managed Kafka ConnectCluster and Connector resources and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
johndee7310 committed Mar 11, 2025
1 parent f181a08 commit 662523a
Show file tree
Hide file tree
Showing 4 changed files with 388 additions and 0 deletions.
145 changes: 145 additions & 0 deletions mmv1/products/managedkafka/ConnectCluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Copyright 2025 Google Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

---
name: 'ConnectCluster'
description: A Managed Service for Kafka Connect cluster.
min_version: 'beta'
docs:
id_format: 'projects/{{project}}/locations/{{location}}/connectClusters/{{connect_cluster_id}}'
base_url: 'projects/{{project}}/locations/{{location}}/connectClusters'
self_link: 'projects/{{project}}/locations/{{location}}/connectClusters/{{connect_cluster_id}}'
create_url: 'projects/{{project}}/locations/{{location}}/connectClusters?connectClusterId={{connect_cluster_id}}'
update_verb: 'PATCH'
update_mask: true
import_format:
- 'projects/{{project}}/locations/{{location}}/connectClusters/{{connect_cluster_id}}'
timeouts:
insert_minutes: 60
update_minutes: 30
delete_minutes: 30
autogen_async: true
async:
actions: ['create', 'delete', 'update']
type: 'OpAsync'
operation:
base_url: '{{op_id}}'
timeouts:
result:
resource_inside_response: true
custom_code:
examples:
- name: 'managedkafka_connect_cluster_basic'
primary_resource_id: 'example'
min_version: 'beta'
vars:
connect_cluster_id: 'my-connect-cluster'
cluster_id: 'my-cluster'
parameters:
- name: 'location'
type: String
description: "ID of the location of the Kafka Connect resource. See
https://cloud.google.com/managed-kafka/docs/locations for a list of
supported locations."
required: true
immutable: true
url_param_only: true
- name: 'connectClusterId'
type: String
description: "The ID to use for the Connect Cluster, which will become the final
component of the connect cluster's name. This value is structured like: `my-connect-cluster-id`."
url_param_only: true
required: true
immutable: true
properties:
- name: 'name'
type: String
description: "The name of the connect cluster. Structured like: `projects/PROJECT_ID/locations/LOCATION/connectClusters/CONNECT_CLUSTER_ID`."
output: true
- name: 'kafkaCluster'
type: String
description: "The name of the Kafka cluster this Kafka Connect cluster is attached to. Structured like: `projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID`."
required: true
- name: 'createTime'
type: String
description: "The time when the cluster was created."
output: true
- name: 'updateTime'
type: String
description: "The time when the cluster was last updated."
output: true
- name: 'labels'
type: KeyValueLabels
description: "List of label KEY=VALUE pairs to add. Keys must start with a lowercase
character and contain only hyphens (-), underscores (\_), lowercase
characters, and numbers. Values must contain only hyphens (-),
underscores (\_), lowercase characters, and numbers."
- name: 'capacityConfig'
type: NestedObject
description: "A capacity configuration of a Kafka cluster."
required: true
properties:
- name: vcpuCount
type: String
description: "The number of vCPUs to provision for the cluster. The minimum is 3."
required: true
- name: memoryBytes
type: String
description: "The memory to provision for the cluster in bytes. The CPU:memory ratio (vCPU:GiB) must be between 1:1 and 1:8. Minimum: 3221225472 (3 GiB)."
required: true
- name: 'gcpConfig'
type: NestedObject
description: "Configuration properties for a Kafka Connect cluster deployed to Google Cloud Platform."
required: true
properties:
- name: 'accessConfig'
type: NestedObject
description: "The configuration of access to the Kafka Connect cluster."
required: true
properties:
- name: 'networkConfigs'
type: Array
description: "Virtual Private Cloud (VPC) subnets where IP addresses for the Kafka Connect
cluster are allocated. To make the connect cluster available in a VPC, you must specify at least
one subnet per network. You must specify between 1 and 10 subnets.
Additional subnets may be specified with additional `network_configs` blocks."
required: true
item_type:
type: NestedObject
properties:
- name: 'primarySubnet'
type: String
description: "VPC subnet to make available to the Kafka Connect cluster. Structured like: projects/{project}/regions/{region}/subnetworks/{subnet_id}.
It is used to create a Private Service Connect (PSC) interface for the Kafka Connect workers. It must be located in the same region as the
Kafka Connect cluster. The CIDR range of the subnet must be within the IPv4 address ranges for private networks, as specified in RFC 1918.
The primary subnet CIDR range must have a minimum size of /22 (1024 addresses)."
required: true
diff_suppress_func: 'tpgresource.ProjectNumberDiffSuppress'
- name: 'additionalSubnets'
type: Array
description: "Additional subnets may be specified. They may be in another region, but must be in the
same VPC network. The Connect workers can communicate with
network endpoints in either the primary or additional subnets."
item_type:
type: String
- name: 'dnsDomainNames'
type: Array
description: "Additional DNS domain names from the subnet's network to be made visible to the Connect Cluster. When using
MirrorMaker2, it's necessary to add the bootstrap address's dns domain name of the target cluster to make it visible to
the connector. For example: my-kafka-cluster.us-central1.managedkafka.my-project.cloud.goog"
item_type:
type: String
- name: 'state'
type: String
description: "The current state of the connect cluster. Possible values: `STATE_UNSPECIFIED`, `CREATING`, `ACTIVE`, `DELETING`."
output: true
88 changes: 88 additions & 0 deletions mmv1/products/managedkafka/Connector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright 2025 Google Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

---
name: 'Connector'
description: A Managed Service for Kafka Connect Connectors.
min_version: 'beta'
docs:
id_format: 'projects/{{project}}/locations/{{location}}/connectClusters/{{connect_cluster}}/connectors/{{connector_id}}'
base_url: 'projects/{{project}}/locations/{{location}}/connectClusters/{{connect_cluster}}/connectors'
self_link: 'projects/{{project}}/locations/{{location}}/connectClusters/{{connect_cluster}}/connectors/{{connector_id}}'
create_url: 'projects/{{project}}/locations/{{location}}/connectClusters/{{connect_cluster}}/connectors?connectorId={{connector_id}}'
update_verb: 'PATCH'
update_mask: true
import_format:
- 'projects/{{project}}/locations/{{location}}/connectClusters/{{connect_cluster}}/connectors/{{connector_id}}'
timeouts:
insert_minutes: 60
update_minutes: 30
delete_minutes: 30
examples:
- name: 'managedkafka_connector_basic'
primary_resource_id: 'example'
min_version: 'beta'
vars:
connector_id: 'my-connector'
connect_cluster_id: 'my-connect-cluster'
cluster_id: 'my-cluster'
parameters:
- name: 'location'
type: String
description: "ID of the location of the Kafka Connect resource. See
https://cloud.google.com/managed-kafka/docs/locations for a list of
supported locations."
url_param_only: true
required: true
immutable: true
- name: 'connectCluster'
type: String
description: "The connect cluster name."
url_param_only: true
required: true
immutable: true
- name: 'connectorId'
type: String
description: "The ID to use for the connector, which will become the final
component of the connector's name. This value is structured like: `my-connector-id`."
url_param_only: true
required: true
immutable: true
properties:
- name: 'name'
type: String
description: "The name of the connector. The `connector` segment is used when
connecting directly to the connect cluster. Structured like: `projects/PROJECT_ID/locations/LOCATION/connectClusters/CONNECT_CLUSTER/connectors/CONNECTOR_ID`."
output: true
- name: 'configs'
type: KeyValuePairs
description: "Connector config as keys/values. The keys of the map are connector property names, for example:
`connector.class`, `tasks.max`, `key.converter`."
- name: 'state'
type: String
description: "The current state of the connect. Possible values: `STATE_UNSPECIFIED`, `UNASSIGNED`, `RUNNING`, `PAUSED`, `FAILED`, `RESTARTING`, and `STOPPED`."
output: true
- name: 'taskRestartPolicy'
type: NestedObject
description: "A policy that specifies how to restart the failed connectors/tasks in a Cluster resource. If not set, the failed connectors/tasks won't be restarted."
properties:
- name: 'minimumBackoff'
type: String
description: |
The minimum amount of time to wait before retrying a failed task. This sets a lower bound for the backoff delay.
A duration in seconds with up to nine fractional digits, terminated by 's'. Example: "3.5s".
- name: 'maximumBackoff'
type: String
description: |
The maximum amount of time to wait before retrying a failed task. This sets an upper bound for the backoff delay.
A duration in seconds with up to nine fractional digits, terminated by 's'. Example: "3.5s".
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
provider "google-beta" {
managed_kafka_custom_endpoint = "https://staging-managedkafka.sandbox.googleapis.com/v1/"
}

# create another subnet to provider for MKC connect cluster
resource "google_compute_subnetwork" "another_subnet" {
name = "another-subnet"
ip_cidr_range = "10.2.0.0/16"
region = "us-central1"
network = google_compute_network.gmk_network.id
}

resource "google_compute_network" "gmk_network" {
name = "test_network"
auto_create_subnetworks = false
}

# create GMK cluster instance
resource "google_managed_kafka_cluster" "gmk_cluster" {
cluster_id = "{{index $.Vars "cluster_id"}}"
location = "us-central1"
capacity_config {
vcpu_count = 3
memory_bytes = 3221225472
}
gcp_config {
access_config {
network_configs {
subnet = "projects/${data.google_project.project.project_id}/regions/us-central1/subnetworks/default"
}
}
}

provider = google-beta
}

resource "google_managed_kafka_connect_cluster" "{{$.PrimaryResourceId}}" {
connect_cluster_id = "{{index $.Vars "connect_cluster_id"}}"
kafka_cluster = "projects/${data.google_project.project.project_id}/locations/us-central1/clusters/${google_managed_kafka_cluster.gmk_cluster.cluster_id}"
location = "us-central1"
capacity_config {
vcpu_count = 12
memory_bytes = 21474836480
}
gcp_config {
access_config {
network_configs {
primary_subnet = "projects/${data.google_project.project.project_id}/regions/us-central1/subnetworks/default"
additional_subnets = [google_compute_subnetwork.another_subnet.self_link]
dns_domain_names = ["${google_managed_kafka_cluster.gmk_cluster.cluster_id}.us-central1.managedkafka-staging.${data.google_project.project.project_id}.cloud-staging.goog"]
}
}
}
labels = {
key = "value"
}

# Add the depends_on attribute here
depends_on = [google_managed_kafka_cluster.gmk_cluster]

provider = google-beta
}

data "google_project" "project" {
provider = google-beta
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
provider "google-beta" {
managed_kafka_custom_endpoint = "https://staging-managedkafka.sandbox.googleapis.com/v1/"
}

# create another subnet to provider for MKC connect cluster
resource "google_compute_subnetwork" "another_subnet" {
name = "another-subnet"
ip_cidr_range = "10.2.0.0/16"
region = "us-central1"
network = google_compute_network.gmk_network.id
}

resource "google_compute_network" "gmk_network" {
name = "test_network"
auto_create_subnetworks = false
}

# create GMK cluster instance
resource "google_managed_kafka_cluster" "gmk_cluster" {
cluster_id = "{{index $.Vars "cluster_id"}}"
location = "us-central1"
capacity_config {
vcpu_count = 3
memory_bytes = 3221225472
}
gcp_config {
access_config {
network_configs {
subnet = "projects/${data.google_project.project.project_id}/regions/us-central1/subnetworks/default"
}
}
}

provider = google-beta
}

# create MKC cluster instance
resource "google_managed_kafka_connect_cluster" "mkc_cluster" {
connect_cluster_id = "{{index $.Vars "connect_cluster_id"}}"
kafka_cluster = "projects/${data.google_project.project.project_id}/locations/us-central1/clusters/${google_managed_kafka_cluster.gmk_cluster.cluster_id}"
location = "us-central1"
capacity_config {
vcpu_count = 12
memory_bytes = 21474836480
}
gcp_config {
access_config {
network_configs {
primary_subnet = "projects/${data.google_project.project.project_id}/regions/us-central1/subnetworks/default"
additional_subnets = [google_compute_subnetwork.another_subnet.self_link]
dns_domain_names = ["${google_managed_kafka_cluster.gmk_cluster.cluster_id}.us-central1.managedkafka-staging.${data.google_project.project.project_id}.cloud-staging.goog"]
}
}
}
labels = {
key = "value"
}

# Add the depends_on attribute here
depends_on = [google_managed_kafka_cluster.gmk_cluster]

provider = google-beta
}

resource "google_managed_kafka_connector" "{{$.PrimaryResourceId}}" {
connector_id = "{{index $.Vars "connector_id"}}"
connect_cluster = google_managed_kafka_connect_cluster.mkc_cluster.connect_cluster_id
location = "us-central1"
configs = {
"connector.class" = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
"tasks.max" = "1"
"topics" = "gmk-topic"
"name" = "{{index $.Vars "connector_id"}}"
"value.converter" = "org.apache.kafka.connect.storage.StringConverter"
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
"cps.topic" = "terraform-provider-test-topic",
"cps.project" = "${data.google_project.project.project_id}"
}
task_restart_policy {
maximum_backoff = "60s"
minimum_backoff = "1800s"
}

provider = google-beta
}

data "google_project" "project" {
provider = google-beta
}

0 comments on commit 662523a

Please sign in to comment.