Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cluster/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,19 @@ type ActivationDetails struct {
func SelectRandomMember(details ActivationDetails) *Member {
return details.Members[rand.Intn(len(details.Members))]
}

// SelectRegionMember selects a member in the specified region of the cluster.
func SelectRegionMember(details ActivationDetails) *Member {
members := make([]*Member, 0, len(details.Members))
for _, member := range details.Members {
if member.Region == details.Region {
members = append(members, member)
}
}

if len(members) == 0 {
return nil
}

return members[rand.Intn(len(members))]
}
22 changes: 15 additions & 7 deletions cluster/consul_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ const (
)

type ConsulProviderConfig struct {
address string
address string
serviceName string
}

func NewConsulProviderConfig() ConsulProviderConfig {
return ConsulProviderConfig{
address: "127.0.0.1:8500",
address: "127.0.0.1:8500",
serviceName: "holywood_actor",
}
}

Expand All @@ -34,6 +36,11 @@ func (c ConsulProviderConfig) WithAddress(address string) ConsulProviderConfig {
return c
}

func (c ConsulProviderConfig) WithServiceName(serviceName string) ConsulProviderConfig {
c.serviceName = serviceName
return c
}

type ConsulProvider struct {
config ConsulProviderConfig
cluster *Cluster
Expand Down Expand Up @@ -98,7 +105,7 @@ func (p *ConsulProvider) registerService() error {

reg := &api.AgentServiceRegistration{
ID: p.memberID(),
Name: "hollywood_actor",
Name: p.config.serviceName,
Tags: p.cluster.kindsToString(),
Address: host,
Port: port,
Expand All @@ -117,7 +124,7 @@ func (p *ConsulProvider) registerService() error {
func (p *ConsulProvider) watch() {
query := map[string]any{
"type": "service",
"service": "hollywood_actor",
"service": p.config.serviceName,
"passingonly": true,
}

Expand Down Expand Up @@ -147,9 +154,10 @@ func (p *ConsulProvider) onUpdate(index watch.BlockingParamVal, msg any) {
if len(entry.Checks) > 0 && entry.Checks.AggregatedStatus() == api.HealthPassing {
port := strconv.Itoa(entry.Service.Port)
member := &Member{
ID: entry.Service.Meta["name"],
Host: entry.Service.Address + ":" + port,
Kinds: entry.Service.Tags,
ID: entry.Service.Meta["name"],
Host: entry.Service.Address + ":" + port,
Region: entry.Node.Datacenter,
Kinds: entry.Service.Tags,
}
members = append(members, member)
}
Expand Down