Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 187 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,9 @@ impl ConnectUtilApp {
}
};

// Validate each confluent_connector resource block individually
// Validate each confluent_connector resource block and module block individually
let mut connector_count = 0;
let mut module_count = 0;
for block in body.blocks() {
if block.identifier() == "resource" {
let labels = block.labels();
Expand All @@ -414,16 +415,41 @@ impl ConnectUtilApp {
let resource_name = labels[1].as_str();
self.validate_resource_block(block.body(), resource_name)?;
}
} else if block.identifier() == "module" {
// Check if this module has connector configuration by looking for config_nonsensitive
let has_connector_config = block.body().attributes().any(|attr| {
attr.key() == "config_nonsensitive" || attr.key() == "config_sensitive"
});
if has_connector_config {
module_count += 1;
let labels = block.labels();
let module_name = if !labels.is_empty() {
labels[0].as_str()
} else {
"unknown"
};
self.validate_module_block(block.body(), module_name)?;
}
}
}

if connector_count == 0 {
let total_count = connector_count + module_count;
if total_count == 0 {
return Err(ConnectUtilError::Config(
"❌ No 'confluent_connector' resources found in file".to_string(),
"❌ No connector configurations found in file (no 'confluent_connector' resources or connector modules)".to_string(),
));
}

println!(" ✅ Validated {} connector resource(s)", connector_count);
if connector_count > 0 && module_count > 0 {
println!(
" ✅ Validated {} connector resource(s) and {} module(s)",
connector_count, module_count
);
} else if connector_count > 0 {
println!(" ✅ Validated {} connector resource(s)", connector_count);
} else {
println!(" ✅ Validated {} connector module(s)", module_count);
}
println!("✅ Terraform structure validation passed!");
Ok(())
}
Expand Down Expand Up @@ -549,6 +575,91 @@ impl ConnectUtilApp {
Ok(())
}

/// Validates a single module block structure
/// Modules use attributes instead of blocks for environment and kafka_cluster
fn validate_module_block(
&self,
body: &Body,
module_name: &str,
) -> Result<(), ConnectUtilError> {
// Check for status field
let mut has_status = false;
for attr in body.attributes() {
if attr.key() == "status" {
has_status = true;
break;
}
}
if !has_status {
return Err(ConnectUtilError::Config(format!(
"❌ Module '{}' missing 'status' field",
module_name
)));
}

// Check for environment or environment_id attribute (modules use attributes, not blocks)
let mut has_environment = false;
for attr in body.attributes() {
if attr.key() == "environment" || attr.key() == "environment_id" {
has_environment = true;
break;
}
}
if !has_environment {
return Err(ConnectUtilError::Config(format!(
"❌ Module '{}' missing 'environment' or 'environment_id' attribute",
module_name
)));
}

// Check for kafka_cluster attribute (modules use attributes, not blocks)
let mut has_kafka_cluster = false;
for attr in body.attributes() {
if attr.key() == "kafka_cluster" {
has_kafka_cluster = true;
break;
}
}
if !has_kafka_cluster {
return Err(ConnectUtilError::Config(format!(
"❌ Module '{}' missing 'kafka_cluster' attribute",
module_name
)));
}

// Check for config_sensitive attribute
let mut has_config_sensitive = false;
for attr in body.attributes() {
if attr.key() == "config_sensitive" {
has_config_sensitive = true;
break;
}
}
if !has_config_sensitive {
return Err(ConnectUtilError::Config(format!(
"❌ Module '{}' missing 'config_sensitive' attribute",
module_name
)));
}

// Check for config_nonsensitive attribute
let mut has_config_nonsensitive = false;
for attr in body.attributes() {
if attr.key() == "config_nonsensitive" {
has_config_nonsensitive = true;
break;
}
}
if !has_config_nonsensitive {
return Err(ConnectUtilError::Config(format!(
"❌ Module '{}' missing 'config_nonsensitive' attribute",
module_name
)));
}

Ok(())
}

pub async fn list_plugins(
&mut self,
filter_type: Option<String>,
Expand Down Expand Up @@ -929,6 +1040,77 @@ resource "confluent_connector" "test_connector" {
);
}

#[tokio::test]
async fn test_validate_terraform_structure_with_modules() {
let app = ConnectUtilApp::new().await.unwrap();

// Test module-based Terraform validation
let terraform = r#"
module "test-connector" {
source = "../../modules/connector"

status = "RUNNING"
environment = var.environment
environment_id = var.environment_id
kafka_cluster = local.cluster

config_sensitive = {
"database.password" = "secret"
}

config_nonsensitive = {
"connector.class" = "MySqlCdcSourceV2"
"name" = "test-connector"
}
}
"#;

let result = app.validate_terraform_structure(terraform);
assert!(
result.is_ok(),
"Module-based Terraform structure should be valid"
);
}

#[tokio::test]
async fn test_validate_terraform_structure_mixed_resources_and_modules() {
let app = ConnectUtilApp::new().await.unwrap();

// Test mixed resource and module blocks
let terraform = r#"
resource "confluent_connector" "test_resource" {
status = "RUNNING"
environment {
id = var.environment_id
}
kafka_cluster {
id = var.kafka_cluster.id
}
config_sensitive = {}
config_nonsensitive = {
"connector.class" = "PostgresSink"
}
}

module "test_module" {
source = "../../modules/connector"
status = "RUNNING"
environment = var.environment
kafka_cluster = local.cluster
config_sensitive = {}
config_nonsensitive = {
"connector.class" = "MySqlCdcSourceV2"
}
}
"#;

let result = app.validate_terraform_structure(terraform);
assert!(
result.is_ok(),
"Mixed resource and module blocks should be valid"
);
}

#[tokio::test]
async fn test_validate_resource_block_missing_status() {
let app = ConnectUtilApp::new().await.unwrap();
Expand Down Expand Up @@ -1183,7 +1365,7 @@ variable "test" {
assert!(result
.unwrap_err()
.to_string()
.contains("No 'confluent_connector' resources found"));
.contains("No connector configurations found"));
}

#[tokio::test]
Expand Down
15 changes: 12 additions & 3 deletions src/connectors/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,18 @@ pub(crate) fn amazon_s3_sink() -> ConnectorDefinition {
false,
Some(vec![
"none".to_string(),
"gzip".to_string(),
"snappy".to_string(),
"lz4".to_string(),
"PARQUET - gzip".to_string(),
"PARQUET - snappy".to_string(),
"PARQUET - lz4".to_string(),
"JSON - gzip".to_string(),
"JSON - snappy".to_string(),
"JSON - lz4".to_string(),
"PROTOBUF - gzip".to_string(),
"PROTOBUF - snappy".to_string(),
"PROTOBUF - lz4".to_string(),
"AVRO - gzip".to_string(),
"AVRO - snappy".to_string(),
"AVRO - lz4".to_string(),
]),
),
config_field(
Expand Down