diff --git a/cluster/activation.go b/cluster/activation.go index faf3b3fb..eaf5de42 100644 --- a/cluster/activation.go +++ b/cluster/activation.go @@ -65,3 +65,19 @@ type ActivationDetails struct { func SelectRandomMember(details ActivationDetails) *Member { return details.Members[rand.Intn(len(details.Members))] } + +// SelectRegionMember selects a member in the specified region of the cluster. +func SelectRegionMember(details ActivationDetails) *Member { + members := make([]*Member, 0, len(details.Members)) + for _, member := range details.Members { + if member.Region == details.Region { + members = append(members, member) + } + } + + if len(members) == 0 { + return nil + } + + return members[rand.Intn(len(members))] +} diff --git a/cluster/consul_provider.go b/cluster/consul_provider.go index 7d4dde30..a64ad8aa 100644 --- a/cluster/consul_provider.go +++ b/cluster/consul_provider.go @@ -20,12 +20,14 @@ const ( ) type ConsulProviderConfig struct { - address string + address string + serviceName string } func NewConsulProviderConfig() ConsulProviderConfig { return ConsulProviderConfig{ - address: "127.0.0.1:8500", + address: "127.0.0.1:8500", + serviceName: "holywood_actor", } } @@ -34,6 +36,11 @@ func (c ConsulProviderConfig) WithAddress(address string) ConsulProviderConfig { return c } +func (c ConsulProviderConfig) WithServiceName(serviceName string) ConsulProviderConfig { + c.serviceName = serviceName + return c +} + type ConsulProvider struct { config ConsulProviderConfig cluster *Cluster @@ -98,7 +105,7 @@ func (p *ConsulProvider) registerService() error { reg := &api.AgentServiceRegistration{ ID: p.memberID(), - Name: "hollywood_actor", + Name: p.config.serviceName, Tags: p.cluster.kindsToString(), Address: host, Port: port, @@ -117,7 +124,7 @@ func (p *ConsulProvider) registerService() error { func (p *ConsulProvider) watch() { query := map[string]any{ "type": "service", - "service": "hollywood_actor", + "service": p.config.serviceName, "passingonly": true, } @@ -147,9 +154,10 @@ func (p *ConsulProvider) onUpdate(index watch.BlockingParamVal, msg any) { if len(entry.Checks) > 0 && entry.Checks.AggregatedStatus() == api.HealthPassing { port := strconv.Itoa(entry.Service.Port) member := &Member{ - ID: entry.Service.Meta["name"], - Host: entry.Service.Address + ":" + port, - Kinds: entry.Service.Tags, + ID: entry.Service.Meta["name"], + Host: entry.Service.Address + ":" + port, + Region: entry.Node.Datacenter, + Kinds: entry.Service.Tags, } members = append(members, member) }