Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func applyTopic(
aws.Config{},
config.AdminClientOpts{
ReadOnly: applyConfig.dryRun,
KafkaConnTimeout: applyConfig.shared.connTimeout,
UsernameOverride: applyConfig.shared.saslUsername,
PasswordOverride: applyConfig.shared.saslPassword,
SecretsManagerArnOverride: applyConfig.shared.saslSecretsManagerArn,
Expand Down
1 change: 1 addition & 0 deletions cmd/topicctl/subcmd/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func bootstrapRun(cmd *cobra.Command, args []string) error {
aws.Config{},
config.AdminClientOpts{
ReadOnly: true,
KafkaConnTimeout: bootstrapConfig.shared.connTimeout,
UsernameOverride: bootstrapConfig.shared.saslUsername,
PasswordOverride: bootstrapConfig.shared.saslPassword,
SecretsManagerArnOverride: bootstrapConfig.shared.saslSecretsManagerArn,
Expand Down
1 change: 1 addition & 0 deletions cmd/topicctl/subcmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func checkTopicFile(
aws.Config{},
config.AdminClientOpts{
ReadOnly: true,
KafkaConnTimeout: checkConfig.shared.connTimeout,
UsernameOverride: checkConfig.shared.saslUsername,
PasswordOverride: checkConfig.shared.saslPassword,
SecretsManagerArnOverride: checkConfig.shared.saslSecretsManagerArn,
Expand Down
1 change: 1 addition & 0 deletions cmd/topicctl/subcmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func createACL(
aws.Config{},
config.AdminClientOpts{
ReadOnly: createConfig.dryRun,
KafkaConnTimeout: createConfig.shared.connTimeout,
UsernameOverride: createConfig.shared.saslUsername,
PasswordOverride: createConfig.shared.saslPassword,
SecretsManagerArnOverride: createConfig.shared.saslSecretsManagerArn,
Expand Down
1 change: 1 addition & 0 deletions cmd/topicctl/subcmd/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
aws.Config{},
config.AdminClientOpts{
ReadOnly: rebalanceConfig.dryRun,
KafkaConnTimeout: rebalanceConfig.shared.connTimeout,
UsernameOverride: rebalanceConfig.shared.saslUsername,
PasswordOverride: rebalanceConfig.shared.saslPassword,
SecretsManagerArnOverride: rebalanceConfig.shared.saslSecretsManagerArn,
Expand Down
6 changes: 6 additions & 0 deletions cmd/topicctl/subcmd/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,10 @@ func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) {
os.Getenv("TOPICCTL_SASL_SECRETS_MANAGER_ARN"),
"Secrets Manager ARN to use for credentials if using SASL; will override value set in cluster config",
)
cmd.Flags().DurationVar(
&options.connTimeout,
"conn-timeout",
10*time.Second,
"Kafka connection timeout",
)
}
14 changes: 11 additions & 3 deletions pkg/admin/brokerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

const (
defaultTimeout = 5 * time.Second
defaultRequestTimeout = 5 * time.Second

// Used for filtering out default configs
configSourceUnknown int8 = 0
Expand Down Expand Up @@ -138,6 +138,14 @@ func NewBrokerAdminClient(
return adminClient, nil
}

func (c *BrokerAdminClient) requestTimeout() time.Duration {
if c.config.ConnTimeout > 0 {
return c.config.ConnTimeout
}

return defaultRequestTimeout
}

// GetClusterID gets the ID of the cluster.
func (c *BrokerAdminClient) GetClusterID(ctx context.Context) (string, error) {
resp, err := c.getMetadata(ctx, nil)
Expand Down Expand Up @@ -597,7 +605,7 @@ func (c *BrokerAdminClient) AssignPartitions(
req := kafka.AlterPartitionReassignmentsRequest{
Topic: topic,
Assignments: apiAssignments,
Timeout: defaultTimeout,
Timeout: c.requestTimeout(),
}
log.Debugf("AlterPartitionReassignments request: %+v", req)

Expand Down Expand Up @@ -708,7 +716,7 @@ func (c *BrokerAdminClient) RunLeaderElection(
req := kafka.ElectLeadersRequest{
Topic: topic,
Partitions: partitions,
Timeout: defaultTimeout,
Timeout: c.requestTimeout(),
}
log.Debugf("ElectLeaders request: %+v", req)

Expand Down
19 changes: 19 additions & 0 deletions pkg/admin/brokerclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ func TestBrokerClientControllerID(t *testing.T) {
}, fmt.Sprintf("Received %d, Broker Controller ID should be between 1 and 6.", controllerID))
}

func TestBrokerAdminClientRequestTimeoutDefault(t *testing.T) {
client := &BrokerAdminClient{}

assert.Equal(t, defaultRequestTimeout, client.requestTimeout())
}

func TestBrokerAdminClientRequestTimeoutOverride(t *testing.T) {
customTimeout := 42 * time.Second
client := &BrokerAdminClient{
config: BrokerAdminClientConfig{
ConnectorConfig: ConnectorConfig{
ConnTimeout: customTimeout,
},
},
}

assert.Equal(t, customTimeout, client.requestTimeout())
}

func TestBrokerClientGetClusterID(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
Expand Down
23 changes: 17 additions & 6 deletions pkg/admin/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
Config: config,
}

connTimeout := config.ConnTimeout
if connTimeout == 0 {
connTimeout = 10 * time.Second
}

var mechanismClient sasl.Mechanism
var tlsConfig *tls.Config
var err error
Expand Down Expand Up @@ -143,6 +148,7 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {

if !config.TLS.Enabled {
connector.Dialer = kafka.DefaultDialer
connector.Dialer.Timeout = connTimeout
connector.Dialer.SASLMechanism = mechanismClient
} else {
var certs []tls.Certificate
Expand Down Expand Up @@ -184,7 +190,7 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
}
connector.Dialer = &kafka.Dialer{
SASLMechanism: mechanismClient,
Timeout: 10 * time.Second,
Timeout: connTimeout,
TLS: tlsConfig,
}
}
Expand All @@ -194,13 +200,18 @@ func NewConnector(config ConnectorConfig) (*Connector, error) {
config.TLS.Enabled,
config.SASL.Enabled,
)
transport := &kafka.Transport{
SASL: mechanismClient,
TLS: tlsConfig,
DialTimeout: connTimeout,
}
if connector.Dialer.DialFunc != nil {
transport.Dial = connector.Dialer.DialFunc
}
connector.KafkaClient = &kafka.Client{
Addr: kafka.TCP(config.BrokerAddr),
Transport: &kafka.Transport{
Dial: connector.Dialer.DialFunc,
SASL: mechanismClient,
TLS: tlsConfig,
},
Timeout: connTimeout,
Transport: transport,
}

return connector, nil
Expand Down
137 changes: 137 additions & 0 deletions pkg/admin/connector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package admin

import (
"context"
"errors"
"net"
"testing"
"time"

"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/protocol"
"github.com/segmentio/kafka-go/protocol/createtopics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewConnectorDefaultTimeout(t *testing.T) {
originalTimeout := kafka.DefaultDialer.Timeout
t.Cleanup(func() { kafka.DefaultDialer.Timeout = originalTimeout })

connector, err := NewConnector(
ConnectorConfig{
BrokerAddr: "localhost:9092",
},
)
require.NoError(t, err)

assert.Equal(t, 10*time.Second, connector.Dialer.Timeout)
transport, ok := connector.KafkaClient.Transport.(*kafka.Transport)
require.True(t, ok)
assert.Equal(t, 10*time.Second, transport.DialTimeout)
assert.Equal(t, 10*time.Second, connector.KafkaClient.Timeout)
}

func TestNewConnectorCustomTimeout(t *testing.T) {
customTimeout := 3 * time.Second

connector, err := NewConnector(
ConnectorConfig{
BrokerAddr: "localhost:9092",
ConnTimeout: customTimeout,
TLS: TLSConfig{
Enabled: true,
SkipVerify: true,
},
},
)
require.NoError(t, err)

assert.Equal(t, customTimeout, connector.Dialer.Timeout)
assert.NotNil(t, connector.Dialer.TLS)
transport, ok := connector.KafkaClient.Transport.(*kafka.Transport)
require.True(t, ok)
assert.Equal(t, customTimeout, transport.DialTimeout)
assert.Equal(t, customTimeout, connector.KafkaClient.Timeout)
}

func TestConnectorDialerTimeoutHappyPath(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() { _ = listener.Close() })

acceptErrCh := make(chan error)
go func() {
defer close(acceptErrCh)
conn, err := listener.Accept()
if err != nil {
acceptErrCh <- err
return
}
if err := conn.Close(); err != nil {
acceptErrCh <- err
}
}()

connector, err := NewConnector(
ConnectorConfig{
BrokerAddr: listener.Addr().String(),
ConnTimeout: 100 * time.Millisecond,
},
)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(t.Context(), time.Second)
defer cancel()

conn, err := connector.Dialer.DialContext(ctx, "tcp", listener.Addr().String())
require.NoError(t, err)
require.NoError(t, conn.Close())

select {
case err, ok := <-acceptErrCh:
if ok {
require.NoError(t, err)
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for listener accept")
}
}

func TestConnectorDialerTimeoutUnhappyPath(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() { _ = listener.Close() })

connector, err := NewConnector(
ConnectorConfig{
BrokerAddr: listener.Addr().String(),
ConnTimeout: time.Nanosecond,
},
)
require.NoError(t, err)

_, err = connector.Dialer.DialContext(t.Context(), "tcp", listener.Addr().String())
require.Error(t, err)

var netErr net.Error
if errors.As(err, &netErr) {
require.True(t, netErr.Timeout(), "expected timeout error, got: %v", err)
return
}

require.True(t, errors.Is(err, context.DeadlineExceeded), "expected deadline exceeded, got: %v", err)
}

type captureTransport struct {
t *testing.T
expectedTimeoutMs int32
}

func (c *captureTransport) RoundTrip(_ context.Context, _ net.Addr, req protocol.Message) (protocol.Message, error) {
createReq, ok := req.(*createtopics.Request)
require.True(c.t, ok)
require.Equal(c.t, c.expectedTimeoutMs, createReq.TimeoutMs)

return &createtopics.Response{}, nil
}
42 changes: 32 additions & 10 deletions scripts/set_up_net_alias.sh
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
#!/bin/bash

set -euo pipefail

ADDR=169.254.123.123
NETMASK=255.255.255.0
CIDR=24
echo "Aliasing $ADDR to localhost..."

UNAME=$(uname -a)
case "$UNAME" in
Linux*) sudo ifconfig lo:0 $ADDR netmask 255.255.255.0 up;;
Darwin*) sudo ifconfig lo0 alias $ADDR;;
*) exit
alias_exists_with_ip() {
ip addr show dev lo | grep -q "$ADDR"
}

OS=$(uname -s)
case "$OS" in
Linux*)
if command -v ifconfig >/dev/null 2>&1; then
sudo ifconfig lo:0 "$ADDR" netmask "$NETMASK" up
elif command -v ip >/dev/null 2>&1; then
if alias_exists_with_ip; then
echo "Alias already present on loopback interface."
else
sudo ip addr add "$ADDR/$CIDR" dev lo
fi
else
>&2 echo "Neither ifconfig nor ip is available; cannot create alias."
exit 1
fi
;;
Darwin*)
sudo ifconfig lo0 alias "$ADDR"
;;
*)
>&2 echo "Unsupported platform: $OS"
exit 1
;;
esac

if [[ $? != 0 ]]
then
>&2 echo "Unable to create alias"
exit 1
fi
echo "Alias configured."