diff --git a/cmd/topicctl/subcmd/apply.go b/cmd/topicctl/subcmd/apply.go index 8b9c362a..206805e3 100644 --- a/cmd/topicctl/subcmd/apply.go +++ b/cmd/topicctl/subcmd/apply.go @@ -227,6 +227,7 @@ func applyTopic( aws.Config{}, config.AdminClientOpts{ ReadOnly: applyConfig.dryRun, + KafkaConnTimeout: applyConfig.shared.connTimeout, UsernameOverride: applyConfig.shared.saslUsername, PasswordOverride: applyConfig.shared.saslPassword, SecretsManagerArnOverride: applyConfig.shared.saslSecretsManagerArn, diff --git a/cmd/topicctl/subcmd/bootstrap.go b/cmd/topicctl/subcmd/bootstrap.go index 8b090bdd..5cf12b3f 100644 --- a/cmd/topicctl/subcmd/bootstrap.go +++ b/cmd/topicctl/subcmd/bootstrap.go @@ -82,6 +82,7 @@ func bootstrapRun(cmd *cobra.Command, args []string) error { aws.Config{}, config.AdminClientOpts{ ReadOnly: true, + KafkaConnTimeout: bootstrapConfig.shared.connTimeout, UsernameOverride: bootstrapConfig.shared.saslUsername, PasswordOverride: bootstrapConfig.shared.saslPassword, SecretsManagerArnOverride: bootstrapConfig.shared.saslSecretsManagerArn, diff --git a/cmd/topicctl/subcmd/check.go b/cmd/topicctl/subcmd/check.go index 37117bfb..afccccae 100644 --- a/cmd/topicctl/subcmd/check.go +++ b/cmd/topicctl/subcmd/check.go @@ -140,6 +140,7 @@ func checkTopicFile( aws.Config{}, config.AdminClientOpts{ ReadOnly: true, + KafkaConnTimeout: checkConfig.shared.connTimeout, UsernameOverride: checkConfig.shared.saslUsername, PasswordOverride: checkConfig.shared.saslPassword, SecretsManagerArnOverride: checkConfig.shared.saslSecretsManagerArn, diff --git a/cmd/topicctl/subcmd/create.go b/cmd/topicctl/subcmd/create.go index b83c2507..2eebb926 100644 --- a/cmd/topicctl/subcmd/create.go +++ b/cmd/topicctl/subcmd/create.go @@ -153,6 +153,7 @@ func createACL( aws.Config{}, config.AdminClientOpts{ ReadOnly: createConfig.dryRun, + KafkaConnTimeout: createConfig.shared.connTimeout, UsernameOverride: createConfig.shared.saslUsername, PasswordOverride: createConfig.shared.saslPassword, SecretsManagerArnOverride: createConfig.shared.saslSecretsManagerArn, diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 2654c0d1..8f6d01ce 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -128,6 +128,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { aws.Config{}, config.AdminClientOpts{ ReadOnly: rebalanceConfig.dryRun, + KafkaConnTimeout: rebalanceConfig.shared.connTimeout, UsernameOverride: rebalanceConfig.shared.saslUsername, PasswordOverride: rebalanceConfig.shared.saslPassword, SecretsManagerArnOverride: rebalanceConfig.shared.saslSecretsManagerArn, diff --git a/cmd/topicctl/subcmd/shared.go b/cmd/topicctl/subcmd/shared.go index 57817f25..d6823a64 100644 --- a/cmd/topicctl/subcmd/shared.go +++ b/cmd/topicctl/subcmd/shared.go @@ -320,4 +320,10 @@ func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) { os.Getenv("TOPICCTL_SASL_SECRETS_MANAGER_ARN"), "Secrets Manager ARN to use for credentials if using SASL; will override value set in cluster config", ) + cmd.Flags().DurationVar( + &options.connTimeout, + "conn-timeout", + 10*time.Second, + "Kafka connection timeout", + ) } diff --git a/pkg/admin/brokerclient.go b/pkg/admin/brokerclient.go index 5b69699f..b333cae6 100644 --- a/pkg/admin/brokerclient.go +++ b/pkg/admin/brokerclient.go @@ -15,7 +15,7 @@ import ( ) const ( - defaultTimeout = 5 * time.Second + defaultRequestTimeout = 5 * time.Second // Used for filtering out default configs configSourceUnknown int8 = 0 @@ -138,6 +138,14 @@ func NewBrokerAdminClient( return adminClient, nil } +func (c *BrokerAdminClient) requestTimeout() time.Duration { + if c.config.ConnTimeout > 0 { + return c.config.ConnTimeout + } + + return defaultRequestTimeout +} + // GetClusterID gets the ID of the cluster. func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error) { resp, err := c.getMetadata(ctx, nil) @@ -597,7 +605,7 @@ func (c *BrokerAdminClient) AssignPartitions( req := kafka.AlterPartitionReassignmentsRequest{ Topic: topic, Assignments: apiAssignments, - Timeout: defaultTimeout, + Timeout: c.requestTimeout(), } log.Debugf("AlterPartitionReassignments request: %+v", req) @@ -708,7 +716,7 @@ func (c *BrokerAdminClient) RunLeaderElection( req := kafka.ElectLeadersRequest{ Topic: topic, Partitions: partitions, - Timeout: defaultTimeout, + Timeout: c.requestTimeout(), } log.Debugf("ElectLeaders request: %+v", req) diff --git a/pkg/admin/brokerclient_test.go b/pkg/admin/brokerclient_test.go index 856fbe18..2ee6f486 100644 --- a/pkg/admin/brokerclient_test.go +++ b/pkg/admin/brokerclient_test.go @@ -44,6 +44,25 @@ func TestBrokerClientControllerID(t *testing.T) { }, fmt.Sprintf("Received %d, Broker Controller ID should be between 1 and 6.", controllerID)) } +func TestBrokerAdminClientRequestTimeoutDefault(t *testing.T) { + client := &BrokerAdminClient{} + + assert.Equal(t, defaultRequestTimeout, client.requestTimeout()) +} + +func TestBrokerAdminClientRequestTimeoutOverride(t *testing.T) { + customTimeout := 42 * time.Second + client := &BrokerAdminClient{ + config: BrokerAdminClientConfig{ + ConnectorConfig: ConnectorConfig{ + ConnTimeout: customTimeout, + }, + }, + } + + assert.Equal(t, customTimeout, client.requestTimeout()) +} + func TestBrokerClientGetClusterID(t *testing.T) { if !util.CanTestBrokerAdmin() { t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set") diff --git a/pkg/admin/connector.go b/pkg/admin/connector.go index b5edb8db..b278c2f6 100644 --- a/pkg/admin/connector.go +++ b/pkg/admin/connector.go @@ -73,6 +73,11 @@ func NewConnector(config ConnectorConfig) (*Connector, error) { Config: config, } + connTimeout := config.ConnTimeout + if connTimeout == 0 { + connTimeout = 10 * time.Second + } + var mechanismClient sasl.Mechanism var tlsConfig *tls.Config var err error @@ -143,6 +148,7 @@ func NewConnector(config ConnectorConfig) (*Connector, error) { if !config.TLS.Enabled { connector.Dialer = kafka.DefaultDialer + connector.Dialer.Timeout = connTimeout connector.Dialer.SASLMechanism = mechanismClient } else { var certs []tls.Certificate @@ -184,7 +190,7 @@ func NewConnector(config ConnectorConfig) (*Connector, error) { } connector.Dialer = &kafka.Dialer{ SASLMechanism: mechanismClient, - Timeout: 10 * time.Second, + Timeout: connTimeout, TLS: tlsConfig, } } @@ -194,13 +200,18 @@ func NewConnector(config ConnectorConfig) (*Connector, error) { config.TLS.Enabled, config.SASL.Enabled, ) + transport := &kafka.Transport{ + SASL: mechanismClient, + TLS: tlsConfig, + DialTimeout: connTimeout, + } + if connector.Dialer.DialFunc != nil { + transport.Dial = connector.Dialer.DialFunc + } connector.KafkaClient = &kafka.Client{ Addr: kafka.TCP(config.BrokerAddr), - Transport: &kafka.Transport{ - Dial: connector.Dialer.DialFunc, - SASL: mechanismClient, - TLS: tlsConfig, - }, + Timeout: connTimeout, + Transport: transport, } return connector, nil diff --git a/pkg/admin/connector_test.go b/pkg/admin/connector_test.go new file mode 100644 index 00000000..8b792725 --- /dev/null +++ b/pkg/admin/connector_test.go @@ -0,0 +1,137 @@ +package admin + +import ( + "context" + "errors" + "net" + "testing" + "time" + + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/protocol" + "github.com/segmentio/kafka-go/protocol/createtopics" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewConnectorDefaultTimeout(t *testing.T) { + originalTimeout := kafka.DefaultDialer.Timeout + t.Cleanup(func() { kafka.DefaultDialer.Timeout = originalTimeout }) + + connector, err := NewConnector( + ConnectorConfig{ + BrokerAddr: "localhost:9092", + }, + ) + require.NoError(t, err) + + assert.Equal(t, 10*time.Second, connector.Dialer.Timeout) + transport, ok := connector.KafkaClient.Transport.(*kafka.Transport) + require.True(t, ok) + assert.Equal(t, 10*time.Second, transport.DialTimeout) + assert.Equal(t, 10*time.Second, connector.KafkaClient.Timeout) +} + +func TestNewConnectorCustomTimeout(t *testing.T) { + customTimeout := 3 * time.Second + + connector, err := NewConnector( + ConnectorConfig{ + BrokerAddr: "localhost:9092", + ConnTimeout: customTimeout, + TLS: TLSConfig{ + Enabled: true, + SkipVerify: true, + }, + }, + ) + require.NoError(t, err) + + assert.Equal(t, customTimeout, connector.Dialer.Timeout) + assert.NotNil(t, connector.Dialer.TLS) + transport, ok := connector.KafkaClient.Transport.(*kafka.Transport) + require.True(t, ok) + assert.Equal(t, customTimeout, transport.DialTimeout) + assert.Equal(t, customTimeout, connector.KafkaClient.Timeout) +} + +func TestConnectorDialerTimeoutHappyPath(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { _ = listener.Close() }) + + acceptErrCh := make(chan error) + go func() { + defer close(acceptErrCh) + conn, err := listener.Accept() + if err != nil { + acceptErrCh <- err + return + } + if err := conn.Close(); err != nil { + acceptErrCh <- err + } + }() + + connector, err := NewConnector( + ConnectorConfig{ + BrokerAddr: listener.Addr().String(), + ConnTimeout: 100 * time.Millisecond, + }, + ) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(t.Context(), time.Second) + defer cancel() + + conn, err := connector.Dialer.DialContext(ctx, "tcp", listener.Addr().String()) + require.NoError(t, err) + require.NoError(t, conn.Close()) + + select { + case err, ok := <-acceptErrCh: + if ok { + require.NoError(t, err) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for listener accept") + } +} + +func TestConnectorDialerTimeoutUnhappyPath(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { _ = listener.Close() }) + + connector, err := NewConnector( + ConnectorConfig{ + BrokerAddr: listener.Addr().String(), + ConnTimeout: time.Nanosecond, + }, + ) + require.NoError(t, err) + + _, err = connector.Dialer.DialContext(t.Context(), "tcp", listener.Addr().String()) + require.Error(t, err) + + var netErr net.Error + if errors.As(err, &netErr) { + require.True(t, netErr.Timeout(), "expected timeout error, got: %v", err) + return + } + + require.True(t, errors.Is(err, context.DeadlineExceeded), "expected deadline exceeded, got: %v", err) +} + +type captureTransport struct { + t *testing.T + expectedTimeoutMs int32 +} + +func (c *captureTransport) RoundTrip(_ context.Context, _ net.Addr, req protocol.Message) (protocol.Message, error) { + createReq, ok := req.(*createtopics.Request) + require.True(c.t, ok) + require.Equal(c.t, c.expectedTimeoutMs, createReq.TimeoutMs) + + return &createtopics.Response{}, nil +} diff --git a/scripts/set_up_net_alias.sh b/scripts/set_up_net_alias.sh index b76b24bd..f166b706 100755 --- a/scripts/set_up_net_alias.sh +++ b/scripts/set_up_net_alias.sh @@ -1,17 +1,39 @@ #!/bin/bash +set -euo pipefail + ADDR=169.254.123.123 +NETMASK=255.255.255.0 +CIDR=24 echo "Aliasing $ADDR to localhost..." -UNAME=$(uname -a) -case "$UNAME" in - Linux*) sudo ifconfig lo:0 $ADDR netmask 255.255.255.0 up;; - Darwin*) sudo ifconfig lo0 alias $ADDR;; - *) exit +alias_exists_with_ip() { + ip addr show dev lo | grep -q "$ADDR" +} + +OS=$(uname -s) +case "$OS" in + Linux*) + if command -v ifconfig >/dev/null 2>&1; then + sudo ifconfig lo:0 "$ADDR" netmask "$NETMASK" up + elif command -v ip >/dev/null 2>&1; then + if alias_exists_with_ip; then + echo "Alias already present on loopback interface." + else + sudo ip addr add "$ADDR/$CIDR" dev lo + fi + else + >&2 echo "Neither ifconfig nor ip is available; cannot create alias." + exit 1 + fi + ;; + Darwin*) + sudo ifconfig lo0 alias "$ADDR" + ;; + *) + >&2 echo "Unsupported platform: $OS" + exit 1 + ;; esac -if [[ $? != 0 ]] -then - >&2 echo "Unable to create alias" - exit 1 -fi +echo "Alias configured."