Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plan-generator): improve memory and cpu usage #1684

Merged
merged 8 commits into from
Mar 12, 2025
50 changes: 48 additions & 2 deletions router/cmd/plan_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@ package cmd
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/KimMachineGun/automemlimit/memlimit"
"github.com/dustin/go-humanize"
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/logging"
"github.com/wundergraph/cosmo/router/pkg/plan_generator"
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
)

func PlanGenerator(args []string) {
Expand All @@ -27,6 +34,7 @@ func PlanGenerator(args []string) {
f.BoolVar(&cfg.OutputReport, "print-report", false, "write a report.json file, with all the query plans and errors sorted by file name")
f.BoolVar(&cfg.FailOnPlanError, "fail-on-error", false, "if at least one plan fails, the command exit code will be 1")
f.BoolVar(&cfg.FailFast, "fail-fast", false, "stop as soon as possible if a plan fails")
f.StringVar(&cfg.LogLevel, "log-level", "warning", "log level to use (debug, info, warning, error, panic, fatal)")

if err := f.Parse(args[1:]); err != nil {
f.PrintDefaults()
Expand All @@ -51,8 +59,46 @@ func PlanGenerator(args []string) {
)
defer stop()

err := plan_generator.PlanGenerator(ctxNotify, cfg)
logLevel, err := logging.ZapLogLevelFromString(cfg.LogLevel)
if err != nil {
log.Fatalf("Error during command plan-generator: %s", err)
log.Fatalf("Could not parse log level: %s", err)
}

logger := logging.New(false, false, logLevel).
With(
zap.String("service", "@wundergraph/query-plan"),
zap.String("service_version", core.Version),
)
cfg.Logger = logger

// Automatically set GOMAXPROCS to avoid CPU throttling on containerized environments
_, err = maxprocs.Set(maxprocs.Logger(func(msg string, args ...interface{}) {
logger.Info(fmt.Sprintf(msg, args...))
}))
if err != nil {
logger.Fatal(fmt.Sprintf("could not set max GOMAXPROCS: %s", err.Error()))
}

if os.Getenv("GOMEMLIMIT") != "" {
logger.Info(fmt.Sprintf("GOMEMLIMIT set by user %s", os.Getenv("GOMEMLIMIT")))
} else {
// Automatically set GOMEMLIMIT to 90% of the available memory.
// This is an effort to prevent the router from being killed by OOM (Out Of Memory)
// when the system is under memory pressure e.g. when GC is not able to free memory fast enough.
// More details: https://tip.golang.org/doc/gc-guide#Memory_limit
mLimit, err := memlimit.SetGoMemLimitWithOpts(
memlimit.WithRatio(0.9),
memlimit.WithProvider(memlimit.FromCgroupHybrid),
)
if err == nil {
logger.Info(fmt.Sprintf("GOMEMLIMIT set automatically to %s", humanize.Bytes(uint64(mLimit))))
} else {
logger.Info(fmt.Sprintf("GOMEMLIMIT was not set. Please set it manually to around 90%% of the available memory to prevent OOM kills %s", err.Error()))
}
}

err = plan_generator.PlanGenerator(ctxNotify, cfg)
if err != nil {
logger.Fatal(fmt.Sprintf("Error during command plan-generator: %s", err.Error()))
}
}
52 changes: 35 additions & 17 deletions router/core/plan_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/postprocess"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport"
"go.uber.org/zap"

"github.com/wundergraph/cosmo/router/pkg/config"

Expand All @@ -28,40 +29,40 @@ import (

type PlanGenerator struct {
planConfiguration *plan.Configuration
planner *plan.Planner
definition *ast.Document
}

func NewPlanGenerator(configFilePath string) (*PlanGenerator, error) {
pg := &PlanGenerator{}
if err := pg.loadConfiguration(configFilePath); err != nil {
return nil, err
}
type Planner struct {
planner *plan.Planner
definition *ast.Document
}

planner, err := plan.NewPlanner(*pg.planConfiguration)
func NewPlanner(planConfiguration *plan.Configuration, definition *ast.Document) (*Planner, error) {
planner, err := plan.NewPlanner(*planConfiguration)
if err != nil {
return nil, fmt.Errorf("failed to create planner: %w", err)
}
pg.planner = planner

return pg, nil
return &Planner{
planner: planner,
definition: definition,
}, nil
}

func (pg *PlanGenerator) PlanOperation(operationFilePath string) (string, error) {
operation, err := pg.parseOperation(operationFilePath)
func (pl *Planner) PlanOperation(operationFilePath string) (string, error) {
operation, err := pl.parseOperation(operationFilePath)
if err != nil {
return "", fmt.Errorf("failed to parse operation: %w", err)
}

rawPlan, err := pg.planOperation(operation)
rawPlan, err := pl.planOperation(operation)
if err != nil {
return "", fmt.Errorf("failed to plan operation: %w", err)
}

return rawPlan.PrettyPrint(), nil
}

func (pg *PlanGenerator) planOperation(operation *ast.Document) (*resolve.FetchTreeQueryPlanNode, error) {
func (pl *Planner) planOperation(operation *ast.Document) (*resolve.FetchTreeQueryPlanNode, error) {
report := operationreport.Report{}

var operationName []byte
Expand All @@ -77,10 +78,10 @@ func (pg *PlanGenerator) planOperation(operation *ast.Document) (*resolve.FetchT
return nil, errors.New("operation name not found")
}

astnormalization.NormalizeNamedOperation(operation, pg.definition, operationName, &report)
astnormalization.NormalizeNamedOperation(operation, pl.definition, operationName, &report)

// create and postprocess the plan
preparedPlan := pg.planner.Plan(operation, pg.definition, string(operationName), &report, plan.IncludeQueryPlanInResponse())
preparedPlan := pl.planner.Plan(operation, pl.definition, string(operationName), &report, plan.IncludeQueryPlanInResponse())
if report.HasErrors() {
return nil, errors.New(report.Error())
}
Expand All @@ -94,7 +95,7 @@ func (pg *PlanGenerator) planOperation(operation *ast.Document) (*resolve.FetchT
return &resolve.FetchTreeQueryPlanNode{}, nil
}

func (pg *PlanGenerator) parseOperation(operationFilePath string) (*ast.Document, error) {
func (pl *Planner) parseOperation(operationFilePath string) (*ast.Document, error) {
content, err := os.ReadFile(operationFilePath)
if err != nil {
return nil, err
Expand All @@ -108,6 +109,23 @@ func (pg *PlanGenerator) parseOperation(operationFilePath string) (*ast.Document
return &doc, nil
}

func NewPlanGenerator(configFilePath string, logger *zap.Logger) (*PlanGenerator, error) {
pg := &PlanGenerator{}
if err := pg.loadConfiguration(configFilePath); err != nil {
return nil, err
}

if logger != nil {
pg.planConfiguration.Logger = log.NewZapLogger(logger, log.DebugLevel)
}

return pg, nil
}

func (pg *PlanGenerator) GetPlanner() (*Planner, error) {
return NewPlanner(pg.planConfiguration, pg.definition)
}

func (pg *PlanGenerator) loadConfiguration(configFilePath string) error {
routerConfig, err := execution_config.FromFile(configFilePath)
if err != nil {
Expand Down
15 changes: 11 additions & 4 deletions router/pkg/plan_generator/plan_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/wundergraph/cosmo/router/core"
"go.uber.org/zap"
)

const ReportFileName = "report.json"
Expand All @@ -29,6 +30,8 @@ type QueryPlanConfig struct {
OutputReport bool
FailOnPlanError bool
FailFast bool
LogLevel string
Logger *zap.Logger
}

type QueryPlanResults struct {
Expand Down Expand Up @@ -98,16 +101,20 @@ func PlanGenerator(ctx context.Context, cfg QueryPlanConfig) error {
ctxError, cancelError := context.WithCancelCause(ctx)
defer cancelError(nil)

pg, err := core.NewPlanGenerator(executionConfigPath, cfg.Logger)
if err != nil {
return fmt.Errorf("failed to create plan generator: %v", err)
}

var planError atomic.Bool
wg := sync.WaitGroup{}
wg.Add(cfg.Concurrency)
for i := 0; i < cfg.Concurrency; i++ {
go func(i int) {
defer wg.Done()
pg, err := core.NewPlanGenerator(executionConfigPath)
planner, err := pg.GetPlanner()
if err != nil {
cancelError(fmt.Errorf("failed to create plan generator: %v", err))
return
cancelError(fmt.Errorf("failed to get planner: %v", err))
}
for {
select {
Expand All @@ -128,7 +135,7 @@ func PlanGenerator(ctx context.Context, cfg QueryPlanConfig) error {

queryFilePath := filepath.Join(queriesPath, queryFile.Name())

outContent, err := pg.PlanOperation(queryFilePath)
outContent, err := planner.PlanOperation(queryFilePath)
res := QueryPlanResult{
FileName: queryFile.Name(),
Plan: outContent,
Expand Down
Loading