Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/leaf/wiring/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ func main() {
specs.Xtrace_Logger,
specs.OT_Logger,
specs.Govector,
specs.HTTP_LoadBalancer,
)
}
45 changes: 45 additions & 0 deletions examples/leaf/wiring/specs/http_lb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package specs

import (
"github.com/blueprint-uservices/blueprint/blueprint/pkg/wiring"
"github.com/blueprint-uservices/blueprint/examples/leaf/workflow/leaf"
"github.com/blueprint-uservices/blueprint/plugins/cmdbuilder"
"github.com/blueprint-uservices/blueprint/plugins/goproc"
"github.com/blueprint-uservices/blueprint/plugins/http"
"github.com/blueprint-uservices/blueprint/plugins/linuxcontainer"
"github.com/blueprint-uservices/blueprint/plugins/loadbalancer"
"github.com/blueprint-uservices/blueprint/plugins/memcached"
"github.com/blueprint-uservices/blueprint/plugins/mongodb"
"github.com/blueprint-uservices/blueprint/plugins/workflow"
)

var HTTP_LoadBalancer = cmdbuilder.SpecOption{
Name: "http_lb",
Description: "Deploys each service in a separate process, communicating using HTTP. Leaf service has 2 replicas and NonLeafService chooses between the two at random.",
Build: makeHTTPLbSpec,
}

func makeHTTPLbSpec(spec wiring.WiringSpec) ([]string, error) {
leaf_cache := memcached.Container(spec, "leaf_cache")
leaf_db := mongodb.Container(spec, "leaf_db")

leaf_service1 := workflow.Service[*leaf.LeafServiceImpl](spec, "leaf1_service", leaf_cache, leaf_db)
leaf_proc1 := leaf_service1 + "_process"
http.Deploy(spec, leaf_service1)
leaf_proc1 = goproc.CreateProcess(spec, leaf_proc1, leaf_service1)
leaf_cntr1 := linuxcontainer.Deploy(spec, leaf_service1)
leaf_service2 := workflow.Service[*leaf.LeafServiceImpl](spec, "leaf2_service", leaf_cache, leaf_db)
leaf_proc2 := leaf_service2 + "_process"
http.Deploy(spec, leaf_service2)
leaf_proc2 = goproc.CreateProcess(spec, leaf_proc2, leaf_service2)
leaf_cntr2 := linuxcontainer.Deploy(spec, leaf_service2)

leaf_lb := loadbalancer.Create(spec, "LeafServices", []string{leaf_service1, leaf_service2})

nonleaf_service := workflow.Service[leaf.NonLeafService](spec, "nonleaf_service", leaf_lb)
nonleaf_proc := nonleaf_service + "_process"
http.Deploy(spec, nonleaf_service)
nonleaf_proc = goproc.CreateProcess(spec, nonleaf_proc, nonleaf_service)
nonleaf_cntr := linuxcontainer.Deploy(spec, nonleaf_service)
return []string{leaf_cntr1, leaf_cntr2, nonleaf_cntr}, nil
}
3 changes: 3 additions & 0 deletions plugins/loadbalancer/README.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Client-Side LoadBalancer Plugin

Adds a client-side loadbalancer in front of N instances of a service type. As a pre-requisite, the instances SHOULD export the same interface.
163 changes: 163 additions & 0 deletions plugins/loadbalancer/ir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package loadbalancer

import (
"fmt"
"path/filepath"
"strings"

"github.com/blueprint-uservices/blueprint/blueprint/pkg/blueprint"
"github.com/blueprint-uservices/blueprint/blueprint/pkg/blueprint/stringutil"
"github.com/blueprint-uservices/blueprint/blueprint/pkg/coreplugins/service"
"github.com/blueprint-uservices/blueprint/blueprint/pkg/ir"
"github.com/blueprint-uservices/blueprint/plugins/golang"
"github.com/blueprint-uservices/blueprint/plugins/golang/gocode"
"github.com/blueprint-uservices/blueprint/plugins/golang/gogen"
)

type LoadBalancerClient struct {
golang.Service
golang.GeneratesFuncs

BalancerName string
Clients []golang.Service
ContainedNodes []ir.IRNode

outputPackage string
}

func newLoadBalancerClient(name string, arg_nodes []ir.IRNode) (*LoadBalancerClient, error) {
clients := []golang.Service{}
for _, n := range arg_nodes {
if c, ok := n.(golang.Service); ok {
clients = append(clients, c)
} else {
return nil, blueprint.Errorf("Expected all clients to load balancer to be golang Services")
}
}
return &LoadBalancerClient{
BalancerName: name,
Clients: clients,
ContainedNodes: arg_nodes,
outputPackage: "lb",
}, nil
}

func (node *LoadBalancerClient) Name() string {
return node.BalancerName
}

func (node *LoadBalancerClient) String() string {
var b strings.Builder
b.WriteString(fmt.Sprintf("%v = LoadBalancer() {\n", node.BalancerName))
var children []string
for _, child := range node.ContainedNodes {
children = append(children, child.String())
}
b.WriteString(stringutil.Indent(strings.Join(children, "\n"), 2))
b.WriteString("\n}")
return b.String()
}

func (lb *LoadBalancerClient) GetInterface(ctx ir.BuildContext) (service.ServiceInterface, error) {
// LoadBalancer doesn't modify the interface! As all clients must have the same interface, we can simply return the interface of any of the clients.
return lb.Clients[0].GetInterface(ctx)
}

func (lb *LoadBalancerClient) AddInterfaces(module golang.ModuleBuilder) error {
for _, node := range lb.ContainedNodes {
if n, valid := node.(golang.ProvidesInterface); valid {
if err := n.AddInterfaces(module); err != nil {
return err
}
}
}
return nil
}

func (lb *LoadBalancerClient) AddInstantiation(builder golang.NamespaceBuilder) error {
if builder.Visited(lb.BalancerName) {
return nil
}

iface, err := golang.GetGoInterface(builder, lb.Clients[0])
if err != nil {
return err
}

constructor := &gocode.Constructor{
Package: builder.Module().Info().Name + "/" + lb.outputPackage,
Func: gocode.Func{
Name: fmt.Sprintf("New_%v_LoadBalancer", iface.BaseName),
Arguments: []gocode.Variable{
{Name: "ctx", Type: &gocode.UserType{Package: "context", Name: "Context"}},
{Name: "clients", Type: &gocode.Ellipsis{EllipsisOf: iface}},
},
},
}

return builder.DeclareConstructor(lb.BalancerName, constructor, lb.ContainedNodes)
}

func (lb *LoadBalancerClient) GenerateFuncs(module golang.ModuleBuilder) error {
if module.Visited(lb.BalancerName) {
return nil
}

pkg, err := module.CreatePackage(lb.outputPackage)
if err != nil {
return err
}

iface, err := golang.GetGoInterface(module, lb.Clients[0])
if err != nil {
return err
}

args := &clientArgs{}
args.LBName = lb.BalancerName
args.PackageShortName = lb.outputPackage
args.Imports = gogen.NewImports(pkg.Name)
args.ServiceName = iface.BaseName
args.Service = iface
lbFileName := filepath.Join(module.Info().Path, args.PackageShortName, lb.BalancerName+".go")
args.Imports.AddPackages("context", "github.com/blueprint-uservices/blueprint/runtime/plugins/loadbalancer")

return gogen.ExecuteTemplateToFile("lb_client_constructor", lbTemplate, args, lbFileName)
}

type clientArgs struct {
LBName string
ServiceName string
PackageShortName string
Imports *gogen.Imports
Service *gocode.ServiceInterface
}

var lbTemplate = `// This file is auto-generated by the Blueprint loadbalancer plugin
package {{.PackageShortName}}

{{.Imports}}

type {{.LBName}} struct {
balancer *loadbalancer.LoadBalancer[{{NameOf .Service.UserType}}]
}

func New_{{.ServiceName}}_LoadBalancer(ctx context.Context, clients ...{{NameOf .Service.UserType}}) (*{{.LBName}}, error) {
handler := &{{.LBName}}{}
clients_arr := []{{NameOf .Service.UserType}}{}
for _, c := range clients {
clients_arr = append(clients_arr, c)
}
handler.balancer = loadbalancer.NewLoadBalancer[{{NameOf .Service.UserType}}](ctx, clients_arr)
return handler, nil
}

{{$service := .Service -}}
{{$receiver := .LBName -}}
{{ range $_, $f := .Service.Methods }}
func (lbalancer *{{$receiver}}) {{SignatureWithRetVars $f}} {
client := lbalancer.balancer.PickClient(ctx)
return client.{{$f.Name}}({{ArgVars $f "ctx"}})
}
{{end}}
`
28 changes: 28 additions & 0 deletions plugins/loadbalancer/wiring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package loadbalancer

import (
"github.com/blueprint-uservices/blueprint/blueprint/pkg/ir"
"github.com/blueprint-uservices/blueprint/blueprint/pkg/wiring"
)

// Creates a client-side load-balancer for multiple instances of a service. The list of services must be provided as an argument at compile-time when using this plugin.
func Create(spec wiring.WiringSpec, serviceGroupName string, services []string) string {
loadbalancer_name := serviceGroupName + ".lb"
spec.Define(loadbalancer_name, &LoadBalancerClient{}, func(namespace wiring.Namespace) (ir.IRNode, error) {
var arg_nodes []ir.IRNode
for _, arg_name := range services {
var arg ir.IRNode
if err := namespace.Get(arg_name, &arg); err != nil {
return nil, err
}
arg_nodes = append(arg_nodes, arg)
}

return newLoadBalancerClient(serviceGroupName, arg_nodes)
})

dstName := loadbalancer_name + ".dst"
spec.Alias(dstName, loadbalancer_name)

return loadbalancer_name
}
20 changes: 20 additions & 0 deletions runtime/plugins/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package loadbalancer

import (
"context"
"math/rand"
)

type LoadBalancer[T any] struct {
Clients []T
}

func NewLoadBalancer[T any](ctx context.Context, clients []T) *LoadBalancer[T] {
return &LoadBalancer[T]{Clients: clients}
}

func (this *LoadBalancer[T]) PickClient(ctx context.Context) T {
// TODO: Support more policies!
randIndex := rand.Intn(len(this.Clients))
return this.Clients[randIndex]
}