Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type Client struct {
closer func() error
}

// MultiClient is a Celestia client with support for multiple endpoints
type MultiClient struct {
*Client
multiReadClient *MultiReadClient
multiGRPCClient *MultiGRPCClient
}

// New initializes the Celestia client. It connects to the Celestia consensus nodes and Bridge
// nodes. Any changes to the keyring are not visible to the client. The client needs to be
// reinitialized to pick up new keys. Client should be closed after use by calling Close().
Expand Down Expand Up @@ -84,6 +91,95 @@ func New(ctx context.Context, cfg Config, kr keyring.Keyring) (*Client, error) {
return cl, nil
}

// NewMultiEndpoint initializes a Celestia client with support for multiple endpoints.
// It provides failover capabilities for both bridge DA and core gRPC connections.
func NewMultiEndpoint(ctx context.Context, cfg Config, kr keyring.Keyring) (*MultiClient, error) {
// Create multi-read client for bridge DA endpoints
multiReadClient, err := NewMultiReadClient(ctx, cfg.ReadConfig)
if err != nil {
return nil, fmt.Errorf("failed to create multi-read client: %w", err)
}

// Get the primary read client
primaryReadClient := multiReadClient.GetClient()
if primaryReadClient == nil {
return nil, errors.New("no read clients available")
}

// Create the base client with primary read client
baseClient := &Client{
ReadClient: *primaryReadClient,
}

// Validate config
err = cfg.Validate()
if err != nil {
multiReadClient.Close()
return nil, err
}
if kr == nil {
multiReadClient.Close()
return nil, errors.New("keyring is nil")
}

// Create multi-gRPC client for core endpoints
multiGRPCClient, err := NewMultiGRPCClient(cfg.SubmitConfig.CoreGRPCConfig)
if err != nil {
multiReadClient.Close()
return nil, fmt.Errorf("failed to create multi-gRPC client: %w", err)
}

// Initialize transaction client with primary gRPC connection
err = baseClient.initTxClient(ctx, cfg.SubmitConfig, multiGRPCClient.GetConnection(), kr)
if err != nil {
multiReadClient.Close()
multiGRPCClient.Close()
return nil, fmt.Errorf("failed to initialize transaction client: %w", err)
}

// Create multi-client
multiClient := &MultiClient{
Client: baseClient,
multiReadClient: multiReadClient,
multiGRPCClient: multiGRPCClient,
}

// Override the closer to close all connections
multiClient.closer = func() error {
var errs []error

// Close multi-read client
if err := multiReadClient.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close multi-read client: %w", err))
}

// Close multi-gRPC client
if err := multiGRPCClient.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close multi-gRPC client: %w", err))
}

// Note: State service is managed by the ServiceBreaker in nodebuilder
// and doesn't need explicit stopping in the client

if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

return multiClient, nil
}

// GetReadClients returns all available read clients for advanced usage
func (mc *MultiClient) GetReadClients() []*ReadClient {
return mc.multiReadClient.GetAllClients()
}

// GetGRPCConnections returns all available gRPC connections for advanced usage
func (mc *MultiClient) GetGRPCConnections() []*grpc.ClientConn {
return mc.multiGRPCClient.GetAllConnections()
}

func (c *Client) initTxClient(
ctx context.Context,
submitCfg SubmitConfig,
Expand Down
213 changes: 213 additions & 0 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,219 @@ func setupMockRPCServer(t *testing.T, ctx context.Context) (*nodebuilder.Node, *
return nd, mockAPI, adminToken, cleanup
}

// TestMultiEndpointClient tests the multi-endpoint client functionality
func TestMultiEndpointClient(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)

// Setup mock server with authed RPC
server, _, authToken, cleanup := setupMockRPCServer(t, ctx)
defer cleanup()

// Get the server URL
serverURL := "http://" + server.RPCServer.ListenAddr()

// Temporary directory for client storage
tmpDir, err := os.MkdirTemp("", "celestia-client-test")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

// Create keyring
kr, err := KeyringWithNewKey(KeyringConfig{
KeyName: "test_key",
BackendName: keyring.BackendTest,
}, tmpDir)
require.NoError(t, err)

// Test that multi-endpoint client creation fails gracefully with invalid config
invalidCfg := Config{
ReadConfig: ReadConfig{
BridgeDAAddr: serverURL,
AdditionalBridgeDAAddrs: []string{
"", // Empty address should be invalid
},
DAAuthToken: authToken,
EnableDATLS: false,
},
SubmitConfig: SubmitConfig{
DefaultKeyName: "test_key",
Network: "mocha-4",
CoreGRPCConfig: CoreGRPCConfig{
Addr: "localhost:9090",
TLSEnabled: false,
AuthToken: "",
},
},
}
_, err = NewMultiEndpoint(ctx, invalidCfg, kr)
require.Error(t, err)

// Test that multi-endpoint client creation fails gracefully with invalid gRPC config
invalidCfg = Config{
ReadConfig: ReadConfig{
BridgeDAAddr: serverURL,
DAAuthToken: authToken,
EnableDATLS: false,
},
SubmitConfig: SubmitConfig{
DefaultKeyName: "test_key",
Network: "mocha-4",
CoreGRPCConfig: CoreGRPCConfig{
Addr: "localhost:9090",
AdditionalCoreGRPCConfigs: []CoreGRPCConfig{
{
Addr: "", // Empty address should be invalid
},
},
TLSEnabled: false,
AuthToken: "",
},
},
}
_, err = NewMultiEndpoint(ctx, invalidCfg, kr)
require.Error(t, err)

// Test successful multi-read client creation (without gRPC to avoid connection issues)
multiReadClient, err := NewMultiReadClient(ctx, ReadConfig{
BridgeDAAddr: serverURL,
AdditionalBridgeDAAddrs: []string{
serverURL, // Same server for testing, in real usage would be different
},
DAAuthToken: authToken,
EnableDATLS: false,
})
require.NoError(t, err)
require.NotNil(t, multiReadClient)
defer multiReadClient.Close()

// Test that we have multiple read clients
readClients := multiReadClient.GetAllClients()
require.GreaterOrEqual(t, len(readClients), 1)

// Test that the client works normally
require.NotNil(t, readClients[0].Header)
require.NotNil(t, readClients[0].Share)
require.NotNil(t, readClients[0].Blob)
require.NotNil(t, readClients[0].Blobstream)
}

// TestReadConfigValidation tests validation of ReadConfig with multiple endpoints
func TestReadConfigValidation(t *testing.T) {
tests := []struct {
name string
cfg ReadConfig
expectErr bool
}{
{
name: "valid config with additional endpoints",
cfg: ReadConfig{
BridgeDAAddr: "http://localhost:26658",
AdditionalBridgeDAAddrs: []string{
"http://backup1:26658",
"http://backup2:26658",
},
DAAuthToken: "token",
},
expectErr: false,
},
{
name: "invalid additional endpoint",
cfg: ReadConfig{
BridgeDAAddr: "http://localhost:26658",
AdditionalBridgeDAAddrs: []string{
"", // Empty address should be invalid
},
DAAuthToken: "token",
},
expectErr: true,
},
{
name: "empty additional endpoints",
cfg: ReadConfig{
BridgeDAAddr: "http://localhost:26658",
AdditionalBridgeDAAddrs: []string{},
DAAuthToken: "token",
},
expectErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

// TestCoreGRPCConfigValidation tests validation of CoreGRPCConfig with multiple endpoints
func TestCoreGRPCConfigValidation(t *testing.T) {
tests := []struct {
name string
cfg CoreGRPCConfig
expectErr bool
}{
{
name: "valid config with additional endpoints",
cfg: CoreGRPCConfig{
Addr: "localhost:9090",
AdditionalCoreGRPCConfigs: []CoreGRPCConfig{
{
Addr: "backup1:9090",
TLSEnabled: true,
AuthToken: "token",
},
{
Addr: "backup2:9090",
TLSEnabled: false,
AuthToken: "",
},
},
TLSEnabled: true,
AuthToken: "token",
},
expectErr: false,
},
{
name: "invalid additional endpoint",
cfg: CoreGRPCConfig{
Addr: "localhost:9090",
AdditionalCoreGRPCConfigs: []CoreGRPCConfig{
{
Addr: "", // Empty address should be invalid
},
},
TLSEnabled: false,
},
expectErr: true,
},
{
name: "empty additional endpoints",
cfg: CoreGRPCConfig{
Addr: "localhost:9090",
AdditionalCoreGRPCConfigs: []CoreGRPCConfig{},
TLSEnabled: false,
},
expectErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

func addAuth(t *testing.T) (fx.Option, string) {
// Generate JWT signer and verifier for the server
key := make([]byte, 32)
Expand Down
Loading
Loading