From 0d79a2cc5a7f1cf316cf553aa5a0a39cd7278773 Mon Sep 17 00:00:00 2001 From: mdrakos Date: Fri, 22 Nov 2024 16:49:41 -0800 Subject: [PATCH 1/7] Add basic message queue --- cmd/state-svc/internal/messages/message.go | 14 + cmd/state-svc/internal/messages/queue.go | 62 +++ cmd/state-svc/internal/resolver/resolver.go | 44 ++- .../internal/server/generated/generated.go | 352 ++++++++++++++++++ cmd/state-svc/schema/schema.graphqls | 7 + .../cmdtree/exechandlers/cmdcall/cmdcall.go | 1 - .../exechandlers/messages/messenger.go | 42 +++ cmd/state/main.go | 4 + internal/graph/generated.go | 6 + internal/locale/locales/en-us.yaml | 4 + internal/messages/topics.go | 5 + pkg/platform/api/svc/request/message.go | 21 ++ pkg/platform/authentication/auth.go | 26 +- pkg/platform/model/svc.go | 8 + 14 files changed, 588 insertions(+), 8 deletions(-) create mode 100644 cmd/state-svc/internal/messages/message.go create mode 100644 cmd/state-svc/internal/messages/queue.go create mode 100644 cmd/state/internal/cmdtree/exechandlers/messages/messenger.go create mode 100644 internal/messages/topics.go create mode 100644 pkg/platform/api/svc/request/message.go diff --git a/cmd/state-svc/internal/messages/message.go b/cmd/state-svc/internal/messages/message.go new file mode 100644 index 0000000000..dd5cc7cce9 --- /dev/null +++ b/cmd/state-svc/internal/messages/message.go @@ -0,0 +1,14 @@ +package messages + +import ( + "github.com/ActiveState/cli/internal/graph" + "github.com/google/uuid" +) + +func NewMessage(topic string, message string) *graph.Message { + return &graph.Message{ + ID: uuid.New().String(), + Topic: topic, + Message: message, + } +} diff --git a/cmd/state-svc/internal/messages/queue.go b/cmd/state-svc/internal/messages/queue.go new file mode 100644 index 0000000000..8df85add65 --- /dev/null +++ b/cmd/state-svc/internal/messages/queue.go @@ -0,0 +1,62 @@ +package messages + +import ( + "github.com/ActiveState/cli/internal/errs" + "github.com/ActiveState/cli/internal/graph" +) + +type Queue struct { + queue map[string]map[string]*graph.Message +} + +func NewQueue() *Queue { + return &Queue{ + queue: make(map[string]map[string]*graph.Message), + } +} + +func (q *Queue) Queue(topic string, message string) error { + if _, ok := q.queue[topic]; !ok { + q.queue[topic] = make(map[string]*graph.Message) + } + msg := NewMessage(topic, message) + q.queue[topic][msg.ID] = msg + return nil +} + +func (q *Queue) Messages() ([]*graph.Message, error) { + var messages []*graph.Message + for _, topic := range q.queue { + for _, message := range topic { + messages = append(messages, message) + } + } + return messages, nil +} + +func (q *Queue) Dequeue(messageIDs []string) error { + for _, messageID := range messageIDs { + err := q.dequeueMessages(messageID) + if err != nil { + return errs.Wrap(err, "failed to dequeue message") + } + } + return nil +} + +func (q *Queue) dequeueMessages(messageID string) error { + for _, topic := range q.queue { + for _, msg := range topic { + if msg.ID != messageID { + continue + } + delete(topic, messageID) + return nil + } + } + return nil +} + +func (q *Queue) Close() error { + return nil +} diff --git a/cmd/state-svc/internal/resolver/resolver.go b/cmd/state-svc/internal/resolver/resolver.go index 4f218fefb8..a0fabc2d2f 100644 --- a/cmd/state-svc/internal/resolver/resolver.go +++ b/cmd/state-svc/internal/resolver/resolver.go @@ -3,6 +3,7 @@ package resolver import ( "context" "encoding/json" + "errors" "os" "runtime/debug" "sort" @@ -11,6 +12,7 @@ import ( "github.com/ActiveState/cli/cmd/state-svc/internal/graphqltypes" "github.com/ActiveState/cli/cmd/state-svc/internal/hash" + "github.com/ActiveState/cli/cmd/state-svc/internal/messages" "github.com/ActiveState/cli/cmd/state-svc/internal/notifications" "github.com/ActiveState/cli/cmd/state-svc/internal/rtwatcher" genserver "github.com/ActiveState/cli/cmd/state-svc/internal/server/generated" @@ -35,7 +37,8 @@ import ( type Resolver struct { cfg *config.Instance - messages *notifications.Notifications + notifications *notifications.Notifications + messages *messages.Queue updatePoller *poller.Poller authPoller *poller.Poller projectIDCache *projectcache.ID @@ -50,11 +53,22 @@ type Resolver struct { // var _ genserver.ResolverRoot = &Resolver{} // Must implement ResolverRoot func New(cfg *config.Instance, an *sync.Client, auth *authentication.Auth) (*Resolver, error) { - msg, err := notifications.New(cfg, auth) + notif, err := notifications.New(cfg, auth) if err != nil { return nil, errs.Wrap(err, "Could not initialize messages") } + msg := messages.NewQueue() + // TODO: Check error and publish message + if err := auth.Sync(); err != nil { + if errors.Is(err, &authentication.ErrInvalidToken{}) { + // TODO: Add common message and topics to state tool internal libraries? + msg.Queue("error.auth", "Invalid API token") + } else { + logging.Warning("Could not sync authenticated state: %s", err.Error()) + } + } + upchecker := updater.NewDefaultChecker(cfg, an) pollUpdate := poller.New(1*time.Hour, func() (interface{}, error) { defer func() { @@ -88,6 +102,7 @@ func New(cfg *config.Instance, an *sync.Client, auth *authentication.Auth) (*Res anForClient := sync.New(anaConsts.SrcStateTool, cfg, auth, nil) return &Resolver{ cfg, + notif, msg, pollUpdate, pollAuth, @@ -102,7 +117,7 @@ func New(cfg *config.Instance, an *sync.Client, auth *authentication.Auth) (*Res } func (r *Resolver) Close() error { - r.messages.Close() + r.notifications.Close() r.updatePoller.Close() r.authPoller.Close() r.anForClient.Close() @@ -250,7 +265,28 @@ func (r *Resolver) ReportRuntimeUsage(_ context.Context, pid int, exec, source s func (r *Resolver) CheckNotifications(ctx context.Context, command string, flags []string) ([]*graph.NotificationInfo, error) { defer func() { panics.LogAndPanic(recover(), debug.Stack()) }() logging.Debug("Check notifications resolver") - return r.messages.Check(command, flags) + return r.notifications.Check(command, flags) +} + +func (r *Resolver) CheckMessages(ctx context.Context) ([]*graph.Message, error) { + logging.Debug("Check messages resolver") + var messages []*graph.Message + var err error + + defer func() { + var sentMessageIDs []string + for _, msg := range messages { + sentMessageIDs = append(sentMessageIDs, msg.ID) + } + r.messages.Dequeue(sentMessageIDs) + panics.LogAndPanic(recover(), debug.Stack()) + }() + + messages, err = r.messages.Messages() + if err != nil { + return nil, errs.Wrap(err, "Could not get messages") + } + return messages, nil } func (r *Resolver) ConfigChanged(ctx context.Context, key string) (*graph.ConfigChangedResponse, error) { diff --git a/cmd/state-svc/internal/server/generated/generated.go b/cmd/state-svc/internal/server/generated/generated.go index 9e2bbb7947..d6c2bfac4c 100644 --- a/cmd/state-svc/internal/server/generated/generated.go +++ b/cmd/state-svc/internal/server/generated/generated.go @@ -79,6 +79,12 @@ type ComplexityRoot struct { User func(childComplexity int) int } + Message struct { + ID func(childComplexity int) int + Message func(childComplexity int) int + Topic func(childComplexity int) int + } + Mutation struct { SetCache func(childComplexity int, key string, value string, expiry int) int } @@ -112,6 +118,7 @@ type ComplexityRoot struct { Query struct { AnalyticsEvent func(childComplexity int, category string, action string, source string, label *string, dimensionsJSON string) int AvailableUpdate func(childComplexity int, desiredChannel string, desiredVersion string) int + CheckMessages func(childComplexity int) int CheckNotifications func(childComplexity int, command string, flags []string) int ConfigChanged func(childComplexity int, key string) int FetchLogTail func(childComplexity int) int @@ -158,6 +165,7 @@ type QueryResolver interface { AnalyticsEvent(ctx context.Context, category string, action string, source string, label *string, dimensionsJSON string) (*graph.AnalyticsEventResponse, error) ReportRuntimeUsage(ctx context.Context, pid int, exec string, source string, dimensionsJSON string) (*graph.ReportRuntimeUsageResponse, error) CheckNotifications(ctx context.Context, command string, flags []string) ([]*graph.NotificationInfo, error) + CheckMessages(ctx context.Context) ([]*graph.Message, error) ConfigChanged(ctx context.Context, key string) (*graph.ConfigChangedResponse, error) FetchLogTail(ctx context.Context) (string, error) GetProcessesInUse(ctx context.Context, execDir string) ([]*graph.ProcessInfo, error) @@ -283,6 +291,27 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.JWT.User(childComplexity), true + case "Message.id": + if e.complexity.Message.ID == nil { + break + } + + return e.complexity.Message.ID(childComplexity), true + + case "Message.message": + if e.complexity.Message.Message == nil { + break + } + + return e.complexity.Message.Message(childComplexity), true + + case "Message.topic": + if e.complexity.Message.Topic == nil { + break + } + + return e.complexity.Message.Topic(childComplexity), true + case "Mutation.setCache": if e.complexity.Mutation.SetCache == nil { break @@ -417,6 +446,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Query.AvailableUpdate(childComplexity, args["desiredChannel"].(string), args["desiredVersion"].(string)), true + case "Query.checkMessages": + if e.complexity.Query.CheckMessages == nil { + break + } + + return e.complexity.Query.CheckMessages(childComplexity), true + case "Query.checkNotifications": if e.complexity.Query.CheckNotifications == nil { break @@ -762,6 +798,12 @@ type NotificationInfo { placement: NotificationPlacementType! } +type Message { + id: String! + topic: String! + message: String! +} + type Organization { URLname: String! role: String! @@ -797,6 +839,7 @@ type Query { analyticsEvent(category: String!, action: String!, source: String!, label: String, dimensionsJson: String!): AnalyticsEventResponse reportRuntimeUsage(pid: Int!, exec: String!, source: String!, dimensionsJson: String!): ReportRuntimeUsageResponse checkNotifications(command: String!, flags: [String!]!): [NotificationInfo!]! + checkMessages: [Message!]! configChanged(key: String!): ConfigChangedResponse fetchLogTail: String! getProcessesInUse(execDir: String!): [ProcessInfo!]! @@ -1757,6 +1800,138 @@ func (ec *executionContext) fieldContext_JWT_user(_ context.Context, field graph return fc, nil } +func (ec *executionContext) _Message_id(ctx context.Context, field graphql.CollectedField, obj *graph.Message) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Message_id(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.ID, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Message_id(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Message", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Message_topic(ctx context.Context, field graphql.CollectedField, obj *graph.Message) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Message_topic(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Topic, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Message_topic(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Message", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _Message_message(ctx context.Context, field graphql.CollectedField, obj *graph.Message) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Message_message(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Message, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Message_message(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Message", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _Mutation_setCache(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Mutation_setCache(ctx, field) if err != nil { @@ -2769,6 +2944,58 @@ func (ec *executionContext) fieldContext_Query_checkNotifications(ctx context.Co return fc, nil } +func (ec *executionContext) _Query_checkMessages(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_checkMessages(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().CheckMessages(rctx) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.([]*graph.Message) + fc.Result = res + return ec.marshalNMessage2ᚕᚖgithubᚗcomᚋActiveStateᚋcliᚋinternalᚋgraphᚐMessageᚄ(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Query_checkMessages(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "id": + return ec.fieldContext_Message_id(ctx, field) + case "topic": + return ec.fieldContext_Message_topic(ctx, field) + case "message": + return ec.fieldContext_Message_message(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type Message", field.Name) + }, + } + return fc, nil +} + func (ec *executionContext) _Query_configChanged(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Query_configChanged(ctx, field) if err != nil { @@ -5779,6 +6006,55 @@ func (ec *executionContext) _JWT(ctx context.Context, sel ast.SelectionSet, obj return out } +var messageImplementors = []string{"Message"} + +func (ec *executionContext) _Message(ctx context.Context, sel ast.SelectionSet, obj *graph.Message) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, messageImplementors) + + out := graphql.NewFieldSet(fields) + deferred := make(map[string]*graphql.FieldSet) + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("Message") + case "id": + out.Values[i] = ec._Message_id(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "topic": + out.Values[i] = ec._Message_topic(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "message": + out.Values[i] = ec._Message_message(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch(ctx) + if out.Invalids > 0 { + return graphql.Null + } + + atomic.AddInt32(&ec.deferred, int32(len(deferred))) + + for label, dfs := range deferred { + ec.processDeferredGroup(graphql.DeferredGroup{ + Label: label, + Path: graphql.GetPath(ctx), + FieldSet: dfs, + Context: ctx, + }) + } + + return out +} + var mutationImplementors = []string{"Mutation"} func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet) graphql.Marshaler { @@ -6169,6 +6445,28 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) + case "checkMessages": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_checkMessages(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&fs.Invalids, 1) + } + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, + func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) case "configChanged": field := field @@ -6942,6 +7240,60 @@ func (ec *executionContext) marshalNInt2int(ctx context.Context, sel ast.Selecti return res } +func (ec *executionContext) marshalNMessage2ᚕᚖgithubᚗcomᚋActiveStateᚋcliᚋinternalᚋgraphᚐMessageᚄ(ctx context.Context, sel ast.SelectionSet, v []*graph.Message) graphql.Marshaler { + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalNMessage2ᚖgithubᚗcomᚋActiveStateᚋcliᚋinternalᚋgraphᚐMessage(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null + } + } + + return ret +} + +func (ec *executionContext) marshalNMessage2ᚖgithubᚗcomᚋActiveStateᚋcliᚋinternalᚋgraphᚐMessage(ctx context.Context, sel ast.SelectionSet, v *graph.Message) graphql.Marshaler { + if v == nil { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + return graphql.Null + } + return ec._Message(ctx, sel, v) +} + func (ec *executionContext) marshalNNotificationInfo2ᚕᚖgithubᚗcomᚋActiveStateᚋcliᚋinternalᚋgraphᚐNotificationInfoᚄ(ctx context.Context, sel ast.SelectionSet, v []*graph.NotificationInfo) graphql.Marshaler { ret := make(graphql.Array, len(v)) var wg sync.WaitGroup diff --git a/cmd/state-svc/schema/schema.graphqls b/cmd/state-svc/schema/schema.graphqls index 606eb5cfd3..da0edcb343 100644 --- a/cmd/state-svc/schema/schema.graphqls +++ b/cmd/state-svc/schema/schema.graphqls @@ -62,6 +62,12 @@ type NotificationInfo { placement: NotificationPlacementType! } +type Message { + id: String! + topic: String! + message: String! +} + type Organization { URLname: String! role: String! @@ -97,6 +103,7 @@ type Query { analyticsEvent(category: String!, action: String!, source: String!, label: String, dimensionsJson: String!): AnalyticsEventResponse reportRuntimeUsage(pid: Int!, exec: String!, source: String!, dimensionsJson: String!): ReportRuntimeUsageResponse checkNotifications(command: String!, flags: [String!]!): [NotificationInfo!]! + checkMessages: [Message!]! configChanged(key: String!): ConfigChangedResponse fetchLogTail: String! getProcessesInUse(execDir: String!): [ProcessInfo!]! diff --git a/cmd/state/internal/cmdtree/exechandlers/cmdcall/cmdcall.go b/cmd/state/internal/cmdtree/exechandlers/cmdcall/cmdcall.go index 0956722559..049785f03b 100644 --- a/cmd/state/internal/cmdtree/exechandlers/cmdcall/cmdcall.go +++ b/cmd/state/internal/cmdtree/exechandlers/cmdcall/cmdcall.go @@ -26,7 +26,6 @@ func (c *CmdCall) OnExecStart(cmd *captain.Command, _ []string) error { return errs.Wrap(err, "before-command event run failure") } return nil - } func (c *CmdCall) OnExecStop(cmd *captain.Command, _ []string) error { diff --git a/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go b/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go new file mode 100644 index 0000000000..0e7feb5ff4 --- /dev/null +++ b/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go @@ -0,0 +1,42 @@ +package messages + +import ( + "context" + + "github.com/ActiveState/cli/internal/captain" + "github.com/ActiveState/cli/internal/errs" + "github.com/ActiveState/cli/internal/output" + "github.com/ActiveState/cli/pkg/platform/model" +) + +type Messenger struct { + out output.Outputer + svcModel *model.SvcModel +} + +func New(out output.Outputer, svcModel *model.SvcModel) *Messenger { + return &Messenger{ + out: out, + svcModel: svcModel, + } +} + +func (m *Messenger) OnExecStart(_ *captain.Command, _ []string) error { + if m.out.Type().IsStructured() { + // No point showing messages on structured output (eg. json) + return nil + } + + messages, err := m.svcModel.CheckMessages(context.Background()) + if err != nil { + return errs.Wrap(err, "Could not get messages") + } + + for _, message := range messages { + m.out.Notice("") // Line break before + // TODO: Add handling for different message types + m.out.Notice(message.Message) + } + + return nil +} diff --git a/cmd/state/main.go b/cmd/state/main.go index 2788e8d1b9..97863dfa00 100644 --- a/cmd/state/main.go +++ b/cmd/state/main.go @@ -10,6 +10,7 @@ import ( "time" "github.com/ActiveState/cli/cmd/state/internal/cmdtree" + "github.com/ActiveState/cli/cmd/state/internal/cmdtree/exechandlers/messages" "github.com/ActiveState/cli/cmd/state/internal/cmdtree/exechandlers/notifier" anAsync "github.com/ActiveState/cli/internal/analytics/client/async" anaConst "github.com/ActiveState/cli/internal/analytics/constants" @@ -251,6 +252,9 @@ func run(args []string, isInteractive bool, cfg *config.Instance, out output.Out cmds.OnExecStart(notifier.OnExecStart) cmds.OnExecStop(notifier.OnExecStop) + messenger := messages.New(out, svcmodel) + cmds.OnExecStart(messenger.OnExecStart) + // Auto update to latest state tool version if possible. if updated, err := autoUpdate(svcmodel, args, childCmd, cfg, an, out); err == nil && updated { return nil // command will be run by updated exe diff --git a/internal/graph/generated.go b/internal/graph/generated.go index f61461ffa3..7ab57470b1 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -40,6 +40,12 @@ type Jwt struct { User *User `json:"user"` } +type Message struct { + ID string `json:"id"` + Topic string `json:"topic"` + Message string `json:"message"` +} + type Mutation struct { } diff --git a/internal/locale/locales/en-us.yaml b/internal/locale/locales/en-us.yaml index 7030506100..e055560253 100644 --- a/internal/locale/locales/en-us.yaml +++ b/internal/locale/locales/en-us.yaml @@ -929,6 +929,10 @@ err_read_projectfile: other: The activestate.yaml at {{.V0}} could not be read. err_auth_fail_totp: other: A two-factor authentication code is required. +err_invalid_token: + other: Invalid API token +err_invalid_credentials: + other: Invalid credentials cve_title: other: Vulnerability Summary cve_description: diff --git a/internal/messages/topics.go b/internal/messages/topics.go new file mode 100644 index 0000000000..a7150e1634 --- /dev/null +++ b/internal/messages/topics.go @@ -0,0 +1,5 @@ +package messages + +const ( + TopicErrorAuth = "error.auth" +) diff --git a/pkg/platform/api/svc/request/message.go b/pkg/platform/api/svc/request/message.go new file mode 100644 index 0000000000..373cea2a3b --- /dev/null +++ b/pkg/platform/api/svc/request/message.go @@ -0,0 +1,21 @@ +package request + +type MessageRequest struct { +} + +func NewMessageRequest() *MessageRequest { + return &MessageRequest{} +} + +func (m *MessageRequest) Query() string { + return `query { + checkMessages { + topic + message + } + }` +} + +func (m *MessageRequest) Vars() (map[string]interface{}, error) { + return map[string]interface{}{}, nil +} diff --git a/pkg/platform/authentication/auth.go b/pkg/platform/authentication/auth.go index 8e1a1f8db3..a8379cac2c 100644 --- a/pkg/platform/authentication/auth.go +++ b/pkg/platform/authentication/auth.go @@ -34,6 +34,8 @@ type ErrUnauthorized struct{ *locale.LocalizedError } type ErrTokenRequired struct{ *locale.LocalizedError } +type ErrInvalidToken struct{ *locale.LocalizedError } + var errNotYetGranted = locale.NewInputError("err_auth_device_noauth") // jwtLifetime is the lifetime of the JWT. This is defined by the API, but the API doesn't communicate this. @@ -45,6 +47,7 @@ type Auth struct { client *mono_client.Mono clientAuth *runtime.ClientAuthInfoWriter bearerToken string + envToken string user *mono_models.User cfg Configurable lastRenewal *time.Time @@ -93,6 +96,7 @@ func New(cfg Configurable) *Auth { auth := &Auth{ cfg: cfg, jwtLifetime: jwtLifetime, + envToken: os.Getenv(constants.APIKeyEnvVarName), } return auth @@ -249,6 +253,11 @@ func (s *Auth) AuthenticateWithModel(credentials *mono_models.Credentials) error return errs.AddTips(&ErrUnauthorized{locale.WrapExternalError(err, "err_unauthorized")}, tips...) case *apiAuth.PostLoginRetryWith: return errs.AddTips(&ErrTokenRequired{locale.WrapExternalError(err, "err_auth_fail_totp")}, tips...) + case *apiAuth.PostLoginBadRequest: + if credentials.Token != "" { + return errs.AddTips(&ErrInvalidToken{locale.WrapExternalError(err, "err_invalid_token")}, tips...) + } + return errs.AddTips(locale.WrapExternalError(err, "err_invalid_credentials"), tips...) default: if os.IsTimeout(err) { return locale.NewExternalError("err_api_auth_timeout", "Timed out waiting for authentication response. Please try again.") @@ -303,9 +312,20 @@ func (s *Auth) AuthenticateWithDevicePolling(deviceCode strfmt.UUID, interval ti // AuthenticateWithToken will try to authenticate using the given token func (s *Auth) AuthenticateWithToken(token string) error { logging.Debug("AuthenticateWithToken") - return s.AuthenticateWithModel(&mono_models.Credentials{ + err := s.AuthenticateWithModel(&mono_models.Credentials{ Token: token, }) + if err != nil { + var invalidTokenErr *ErrInvalidToken + if errors.As(err, &invalidTokenErr) && s.envToken != "" { + logging.Debug("Invalid token, clearing stored token") + s.envToken = "" + return errs.Wrap(err, "Invalid API token") + } + return errs.Wrap(err, "Failed to authenticate with token") + } + + return nil } // UpdateSession authenticates with the given access token obtained via a Platform @@ -464,9 +484,9 @@ func (s *Auth) NewAPIKey(name string) (string, error) { } func (s *Auth) AvailableAPIToken() (v string) { - if tkn := os.Getenv(constants.APIKeyEnvVarName); tkn != "" { + if s.envToken != "" { logging.Debug("Using API token passed via env var") - return tkn + return s.envToken } return s.cfg.GetString(ApiTokenConfigKey) } diff --git a/pkg/platform/model/svc.go b/pkg/platform/model/svc.go index 5cfca41cfa..4f8929cad7 100644 --- a/pkg/platform/model/svc.go +++ b/pkg/platform/model/svc.go @@ -138,6 +138,14 @@ func (m *SvcModel) CheckNotifications(ctx context.Context, command string, flags return resp, nil } +func (m *SvcModel) CheckMessages(ctx context.Context) ([]*graph.Message, error) { + resp := []*graph.Message{} + if err := m.request(ctx, request.NewMessageRequest(), &resp); err != nil { + return nil, errs.Wrap(err, "Error sending messages request") + } + return resp, nil +} + func (m *SvcModel) ConfigChanged(ctx context.Context, key string) error { defer profile.Measure("svc:ConfigChanged", time.Now()) From dfe3a31e8c76094f1a8b02eab8bd2175d8278655 Mon Sep 17 00:00:00 2001 From: mdrakos Date: Tue, 26 Nov 2024 14:52:12 -0800 Subject: [PATCH 2/7] Properly sync auth and display messages --- cmd/state-svc/internal/messages/queue.go | 6 +-- cmd/state-svc/internal/resolver/resolver.go | 29 +++++++------ cmd/state-svc/main.go | 3 -- .../exechandlers/messages/messenger.go | 42 +++++++++++++++++-- internal/locale/locales/en-us.yaml | 4 ++ internal/messages/topics.go | 5 ++- 6 files changed, 66 insertions(+), 23 deletions(-) diff --git a/cmd/state-svc/internal/messages/queue.go b/cmd/state-svc/internal/messages/queue.go index 8df85add65..e224e50437 100644 --- a/cmd/state-svc/internal/messages/queue.go +++ b/cmd/state-svc/internal/messages/queue.go @@ -3,6 +3,7 @@ package messages import ( "github.com/ActiveState/cli/internal/errs" "github.com/ActiveState/cli/internal/graph" + "github.com/ActiveState/cli/internal/logging" ) type Queue struct { @@ -20,6 +21,7 @@ func (q *Queue) Queue(topic string, message string) error { q.queue[topic] = make(map[string]*graph.Message) } msg := NewMessage(topic, message) + logging.Debug("Queued message: %s, %s", msg.ID, msg.Message) q.queue[topic][msg.ID] = msg return nil } @@ -56,7 +58,3 @@ func (q *Queue) dequeueMessages(messageID string) error { } return nil } - -func (q *Queue) Close() error { - return nil -} diff --git a/cmd/state-svc/internal/resolver/resolver.go b/cmd/state-svc/internal/resolver/resolver.go index a0fabc2d2f..fbd72961c9 100644 --- a/cmd/state-svc/internal/resolver/resolver.go +++ b/cmd/state-svc/internal/resolver/resolver.go @@ -26,6 +26,7 @@ import ( "github.com/ActiveState/cli/internal/graph" "github.com/ActiveState/cli/internal/logging" configMediator "github.com/ActiveState/cli/internal/mediators/config" + msgs "github.com/ActiveState/cli/internal/messages" "github.com/ActiveState/cli/internal/poller" "github.com/ActiveState/cli/internal/rtutils/ptr" "github.com/ActiveState/cli/internal/runbits/panics" @@ -59,16 +60,6 @@ func New(cfg *config.Instance, an *sync.Client, auth *authentication.Auth) (*Res } msg := messages.NewQueue() - // TODO: Check error and publish message - if err := auth.Sync(); err != nil { - if errors.Is(err, &authentication.ErrInvalidToken{}) { - // TODO: Add common message and topics to state tool internal libraries? - msg.Queue("error.auth", "Invalid API token") - } else { - logging.Warning("Could not sync authenticated state: %s", err.Error()) - } - } - upchecker := updater.NewDefaultChecker(cfg, an) pollUpdate := poller.New(1*time.Hour, func() (interface{}, error) { defer func() { @@ -88,11 +79,23 @@ func New(cfg *config.Instance, an *sync.Client, auth *authentication.Auth) (*Res } pollAuth := poller.New(time.Duration(int64(time.Millisecond)*pollRate), func() (interface{}, error) { + logging.Debug("Polling for authenticated state") defer func() { panics.LogAndPanic(recover(), debug.Stack()) }() if auth.SyncRequired() { - return nil, auth.Sync() + logging.Debug("Sync required") + if err := auth.Sync(); err != nil { + logging.Debug("Syncing authenticated state: %s", err.Error()) + var invalidTokenErr *authentication.ErrInvalidToken + if errors.As(err, &invalidTokenErr) { + logging.Debug("Queuing invalid API token error") + msg.Queue(msgs.TopicErrorAuthToken, "Invalid API token") + } else { + logging.Warning("Could not sync authenticated state: %s", err.Error()) + } + } + return nil, nil } return nil, nil }) @@ -278,7 +281,9 @@ func (r *Resolver) CheckMessages(ctx context.Context) ([]*graph.Message, error) for _, msg := range messages { sentMessageIDs = append(sentMessageIDs, msg.ID) } - r.messages.Dequeue(sentMessageIDs) + if err := r.messages.Dequeue(sentMessageIDs); err != nil { + logging.Error("Could not dequeue messages: %s", errs.JoinMessage(err)) + } panics.LogAndPanic(recover(), debug.Stack()) }() diff --git a/cmd/state-svc/main.go b/cmd/state-svc/main.go index 0bdfb950ff..bc99a39a9a 100644 --- a/cmd/state-svc/main.go +++ b/cmd/state-svc/main.go @@ -206,9 +206,6 @@ func run(cfg *config.Instance) error { }, func(ccmd *captain.Command, args []string) error { logging.Debug("Running CmdForeground") - if err := auth.Sync(); err != nil { - logging.Warning("Could not sync authenticated state: %s", err.Error()) - } return runForeground(cfg, an, auth, foregroundArgText) }, ), diff --git a/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go b/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go index 0e7feb5ff4..c9d1381869 100644 --- a/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go +++ b/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go @@ -2,9 +2,14 @@ package messages import ( "context" + "strings" "github.com/ActiveState/cli/internal/captain" "github.com/ActiveState/cli/internal/errs" + "github.com/ActiveState/cli/internal/graph" + "github.com/ActiveState/cli/internal/locale" + "github.com/ActiveState/cli/internal/logging" + msgs "github.com/ActiveState/cli/internal/messages" "github.com/ActiveState/cli/internal/output" "github.com/ActiveState/cli/pkg/platform/model" ) @@ -22,8 +27,8 @@ func New(out output.Outputer, svcModel *model.SvcModel) *Messenger { } func (m *Messenger) OnExecStart(_ *captain.Command, _ []string) error { + logging.Debug("Checking for messages") if m.out.Type().IsStructured() { - // No point showing messages on structured output (eg. json) return nil } @@ -31,12 +36,43 @@ func (m *Messenger) OnExecStart(_ *captain.Command, _ []string) error { if err != nil { return errs.Wrap(err, "Could not get messages") } + logging.Debug("Found %d messages", len(messages)) for _, message := range messages { m.out.Notice("") // Line break before - // TODO: Add handling for different message types - m.out.Notice(message.Message) + + segments := strings.Split(message.Topic, ".") + if len(segments) > 0 { + switch segments[0] { + case msgs.TopicError: + m.handleErrorMessages(message) + case msgs.TopicInfo: + logging.Info("State Service reported an info message: %s", message.Message) + m.out.Notice(message.Message) + default: + logging.Debug("State Service reported an unknown message: %s", message.Topic) + m.out.Notice(message.Message) // fallback to notice for unknown types + } + } + + m.out.Notice("") // Line break after } return nil } + +func (m *Messenger) handleErrorMessages(message *graph.Message) { + switch message.Topic { + case msgs.TopicErrorAuth: + logging.Error("State Service reported an authentication error: %s", message.Message) + err := locale.NewError("err_svc_message", "The State Service reported an authentication error: {{.V0}}", message.Message) + m.out.Error(err) + case msgs.TopicErrorAuthToken: + logging.Error("State Service reported an authentication token error: %s", message.Message) + err := locale.NewError("err_svc_invalid_token", "", message.Message) + m.out.Error(err) + default: + logging.Error("State Service reported an unknown error: %s", message.Topic) + m.out.Error(locale.NewError("err_svc_unknown_message", "The State Service reported an unknown error: {{.V0}}", message.Message)) + } +} diff --git a/internal/locale/locales/en-us.yaml b/internal/locale/locales/en-us.yaml index e055560253..dca9b26835 100644 --- a/internal/locale/locales/en-us.yaml +++ b/internal/locale/locales/en-us.yaml @@ -1580,3 +1580,7 @@ config_set_help: other: "To SET the value for a specific config key run '[ACTIONABLE]state config set [/RESET]'" err_clean_in_use: other: "Could not clean your cache as you appear to have a runtime in use. Please stop any running State Tool processes and project runtime processes and try again." +err_svc_invalid_token: + other: | + The State Service reported an invalid API token error: {{.V0}} + Please check your API token and try again. diff --git a/internal/messages/topics.go b/internal/messages/topics.go index a7150e1634..638e69797a 100644 --- a/internal/messages/topics.go +++ b/internal/messages/topics.go @@ -1,5 +1,8 @@ package messages const ( - TopicErrorAuth = "error.auth" + TopicError = "error" + TopicInfo = "info" + TopicErrorAuth = "error.auth" + TopicErrorAuthToken = "error.auth.token" ) From c969fe63eedf58f32db121ba17a8e4b55c03661f Mon Sep 17 00:00:00 2001 From: mdrakos Date: Tue, 26 Nov 2024 14:52:19 -0800 Subject: [PATCH 3/7] Add integration test --- test/integration/auth_int_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/integration/auth_int_test.go b/test/integration/auth_int_test.go index c34dab906a..e2f9e56166 100644 --- a/test/integration/auth_int_test.go +++ b/test/integration/auth_int_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/ActiveState/cli/internal/constants" "github.com/ActiveState/cli/internal/testhelpers/suite" "github.com/ActiveState/termtest" "github.com/google/uuid" @@ -112,6 +113,27 @@ func (suite *AuthIntegrationTestSuite) TestAuth_JsonOutput() { suite.authOutput("json") } +func (suite *AuthIntegrationTestSuite) TestAuth_InvalidToken() { + suite.OnlyRunForTags(tagsuite.Auth, tagsuite.Critical) + ts := e2e.New(suite.T(), false) + defer ts.Close() + + cp := ts.SpawnWithOpts(e2e.OptArgs("--version"), e2e.OptAppendEnv(constants.APIKeyEnvVarName+"=bad-token")) + // Message is displayed + cp.Expect("The State Service reported an invalid API token error: Invalid API token") + cp.Expect("Please check your API token and try again.") + // The version information is still displayed + cp.Expect("ActiveState CLI") + cp.Expect("Version") + cp.ExpectExitCode(0) + + // Running the command again shows no error message as the token has been cleared + cp = ts.SpawnWithOpts(e2e.OptArgs("--version")) + cp.ExpectExitCode(0) + cp.Expect("ActiveState CLI") + cp.Expect("Version") +} + func TestAuthIntegrationTestSuite(t *testing.T) { suite.Run(t, new(AuthIntegrationTestSuite)) } From 393d3ca055a3e7695125edd3e86ecac85a21dda4 Mon Sep 17 00:00:00 2001 From: mdrakos Date: Tue, 26 Nov 2024 15:11:11 -0800 Subject: [PATCH 4/7] Add unit tests --- cmd/state-svc/internal/messages/queue_test.go | 169 ++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 cmd/state-svc/internal/messages/queue_test.go diff --git a/cmd/state-svc/internal/messages/queue_test.go b/cmd/state-svc/internal/messages/queue_test.go new file mode 100644 index 0000000000..eb5bb8f995 --- /dev/null +++ b/cmd/state-svc/internal/messages/queue_test.go @@ -0,0 +1,169 @@ +package messages + +import ( + "testing" + + "github.com/ActiveState/cli/internal/graph" + "github.com/stretchr/testify/assert" +) + +func TestNewQueue(t *testing.T) { + q := NewQueue() + assert.NotNil(t, q) + assert.Empty(t, q.queue) +} + +func TestQueue_Queue(t *testing.T) { + tests := []struct { + name string + messages []graph.Message + want map[string]int // topic -> expected message count + }{ + { + name: "queue first message", + messages: []graph.Message{ + {Topic: "topic1", Message: "message1"}, + }, + want: map[string]int{"topic1": 1}, + }, + { + name: "queue second message in same topic", + messages: []graph.Message{ + {Topic: "topic1", Message: "message1"}, + {Topic: "topic1", Message: "message2"}, + }, + want: map[string]int{"topic1": 2}, + }, + { + name: "queue message in different topic", + messages: []graph.Message{ + {Topic: "topic1", Message: "message1"}, + {Topic: "topic2", Message: "message3"}, + }, + want: map[string]int{"topic1": 1, "topic2": 1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := NewQueue() + + for _, m := range tt.messages { + err := q.Queue(m.Topic, m.Message) + assert.NoError(t, err) + } + + for topic, count := range tt.want { + assert.Len(t, q.queue[topic], count) + } + }) + } +} + +func TestQueue_Messages(t *testing.T) { + tests := []struct { + name string + messages []graph.Message + wantCount int + }{ + { + name: "empty queue", + messages: nil, + wantCount: 0, + }, + { + name: "single message", + messages: []graph.Message{ + {Topic: "topic1", Message: "message1"}, + }, + wantCount: 1, + }, + { + name: "multiple messages across topics", + messages: []graph.Message{ + {Topic: "topic1", Message: "message1"}, + {Topic: "topic1", Message: "message2"}, + {Topic: "topic2", Message: "message3"}, + }, + wantCount: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := NewQueue() + + for _, m := range tt.messages { + err := q.Queue(m.Topic, m.Message) + assert.NoError(t, err) + } + + msgs, err := q.Messages() + assert.NoError(t, err) + assert.Len(t, msgs, tt.wantCount) + }) + } +} + +func TestQueue_Dequeue(t *testing.T) { + tests := []struct { + name string + messages []graph.Message + dequeueIDs []string + wantRemaining int + }{ + { + name: "dequeue single message", + messages: []graph.Message{ + {Topic: "topic1", Message: "message1"}, + }, + dequeueIDs: nil, // Will be populated during test with actual message ID + wantRemaining: 0, + }, + { + name: "dequeue multiple messages", + messages: []graph.Message{ + {Topic: "topic1", Message: "message1"}, + {Topic: "topic2", Message: "message2"}, + }, + dequeueIDs: nil, // Will be populated during test with actual message IDs + wantRemaining: 0, + }, + { + name: "dequeue non-existent message", + messages: []graph.Message{ + {Topic: "topic1", Message: "message1"}, + }, + dequeueIDs: []string{"non-existent-id"}, + wantRemaining: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := NewQueue() + + for _, m := range tt.messages { + err := q.Queue(m.Topic, m.Message) + assert.NoError(t, err) + } + + if tt.dequeueIDs == nil { + msgs, err := q.Messages() + assert.NoError(t, err) + + tt.dequeueIDs = make([]string, len(msgs)) + for i, msg := range msgs { + tt.dequeueIDs[i] = msg.ID + } + } + + err := q.Dequeue(tt.dequeueIDs) + assert.NoError(t, err) + + remaining, err := q.Messages() + assert.NoError(t, err) + assert.Len(t, remaining, tt.wantRemaining) + }) + } +} From e4beb1ed644a789382ca03a5f62df643dd06f76d Mon Sep 17 00:00:00 2001 From: mdrakos Date: Tue, 26 Nov 2024 15:40:23 -0800 Subject: [PATCH 5/7] Ignore log errors --- test/integration/auth_int_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integration/auth_int_test.go b/test/integration/auth_int_test.go index e2f9e56166..0f0339e25c 100644 --- a/test/integration/auth_int_test.go +++ b/test/integration/auth_int_test.go @@ -132,6 +132,7 @@ func (suite *AuthIntegrationTestSuite) TestAuth_InvalidToken() { cp.ExpectExitCode(0) cp.Expect("ActiveState CLI") cp.Expect("Version") + ts.IgnoreLogErrors() } func TestAuthIntegrationTestSuite(t *testing.T) { From 14d1435dc7ad9f177641c78ad42c31b471374943 Mon Sep 17 00:00:00 2001 From: mitchell Date: Wed, 26 Feb 2025 17:41:53 -0500 Subject: [PATCH 6/7] Show simple warning message from the message queue. --- cmd/state-svc/internal/resolver/resolver.go | 3 ++- .../cmdtree/exechandlers/messages/messenger.go | 16 +++------------- internal/locale/locales/en-us.yaml | 6 +----- test/integration/auth_int_test.go | 3 +-- 4 files changed, 7 insertions(+), 21 deletions(-) diff --git a/cmd/state-svc/internal/resolver/resolver.go b/cmd/state-svc/internal/resolver/resolver.go index fbd72961c9..f4611ebe77 100644 --- a/cmd/state-svc/internal/resolver/resolver.go +++ b/cmd/state-svc/internal/resolver/resolver.go @@ -24,6 +24,7 @@ import ( "github.com/ActiveState/cli/internal/constants" "github.com/ActiveState/cli/internal/errs" "github.com/ActiveState/cli/internal/graph" + "github.com/ActiveState/cli/internal/locale" "github.com/ActiveState/cli/internal/logging" configMediator "github.com/ActiveState/cli/internal/mediators/config" msgs "github.com/ActiveState/cli/internal/messages" @@ -90,7 +91,7 @@ func New(cfg *config.Instance, an *sync.Client, auth *authentication.Auth) (*Res var invalidTokenErr *authentication.ErrInvalidToken if errors.As(err, &invalidTokenErr) { logging.Debug("Queuing invalid API token error") - msg.Queue(msgs.TopicErrorAuthToken, "Invalid API token") + msg.Queue(msgs.TopicErrorAuthToken, locale.Tl("err_invalid_token_try_again", "Invalid API token. Please check your API token and try again.")) } else { logging.Warning("Could not sync authenticated state: %s", err.Error()) } diff --git a/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go b/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go index c9d1381869..bb8e3b9565 100644 --- a/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go +++ b/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go @@ -62,17 +62,7 @@ func (m *Messenger) OnExecStart(_ *captain.Command, _ []string) error { } func (m *Messenger) handleErrorMessages(message *graph.Message) { - switch message.Topic { - case msgs.TopicErrorAuth: - logging.Error("State Service reported an authentication error: %s", message.Message) - err := locale.NewError("err_svc_message", "The State Service reported an authentication error: {{.V0}}", message.Message) - m.out.Error(err) - case msgs.TopicErrorAuthToken: - logging.Error("State Service reported an authentication token error: %s", message.Message) - err := locale.NewError("err_svc_invalid_token", "", message.Message) - m.out.Error(err) - default: - logging.Error("State Service reported an unknown error: %s", message.Topic) - m.out.Error(locale.NewError("err_svc_unknown_message", "The State Service reported an unknown error: {{.V0}}", message.Message)) - } + logging.Error("State Service reported a %s error: %s", message.Topic, message.Message) + err := locale.NewError("err_svc_message", "[WARNING]Warning:[/RESET] {{.V0}}", message.Message) + m.out.Error(err) } diff --git a/internal/locale/locales/en-us.yaml b/internal/locale/locales/en-us.yaml index b72785f065..8dc354a931 100644 --- a/internal/locale/locales/en-us.yaml +++ b/internal/locale/locales/en-us.yaml @@ -933,7 +933,7 @@ err_read_projectfile: err_auth_fail_totp: other: A two-factor authentication code is required. err_invalid_token: - other: Invalid API token + other: Invalid API token. err_invalid_credentials: other: Invalid credentials cve_title: @@ -1593,7 +1593,3 @@ config_set_help: other: "To SET the value for a specific config key run '[ACTIONABLE]state config set [/RESET]'" err_clean_in_use: other: "Could not clean your cache as you appear to have a runtime in use. Please stop any running State Tool processes and project runtime processes and try again." -err_svc_invalid_token: - other: | - The State Service reported an invalid API token error: {{.V0}} - Please check your API token and try again. diff --git a/test/integration/auth_int_test.go b/test/integration/auth_int_test.go index 0f0339e25c..321444e38c 100644 --- a/test/integration/auth_int_test.go +++ b/test/integration/auth_int_test.go @@ -120,8 +120,7 @@ func (suite *AuthIntegrationTestSuite) TestAuth_InvalidToken() { cp := ts.SpawnWithOpts(e2e.OptArgs("--version"), e2e.OptAppendEnv(constants.APIKeyEnvVarName+"=bad-token")) // Message is displayed - cp.Expect("The State Service reported an invalid API token error: Invalid API token") - cp.Expect("Please check your API token and try again.") + cp.Expect("Warning: Invalid API token") // The version information is still displayed cp.Expect("ActiveState CLI") cp.Expect("Version") From cdff989af08474a46d34591fffcfe2bfa4c93785 Mon Sep 17 00:00:00 2001 From: mitchell Date: Thu, 27 Feb 2025 15:29:45 -0500 Subject: [PATCH 7/7] Reduce message queue from double-map to a single list. --- cmd/state-svc/internal/messages/message.go | 14 -- cmd/state-svc/internal/messages/queue.go | 60 ------- cmd/state-svc/internal/messages/queue_test.go | 169 ------------------ cmd/state-svc/internal/resolver/resolver.go | 37 ++-- .../internal/server/generated/generated.go | 60 ------- cmd/state-svc/schema/schema.graphqls | 1 - .../cmdtree/exechandlers/cmdcall/cmdcall.go | 1 + .../exechandlers/messages/messenger.go | 2 +- internal/graph/generated.go | 1 - 9 files changed, 16 insertions(+), 329 deletions(-) delete mode 100644 cmd/state-svc/internal/messages/message.go delete mode 100644 cmd/state-svc/internal/messages/queue.go delete mode 100644 cmd/state-svc/internal/messages/queue_test.go diff --git a/cmd/state-svc/internal/messages/message.go b/cmd/state-svc/internal/messages/message.go deleted file mode 100644 index dd5cc7cce9..0000000000 --- a/cmd/state-svc/internal/messages/message.go +++ /dev/null @@ -1,14 +0,0 @@ -package messages - -import ( - "github.com/ActiveState/cli/internal/graph" - "github.com/google/uuid" -) - -func NewMessage(topic string, message string) *graph.Message { - return &graph.Message{ - ID: uuid.New().String(), - Topic: topic, - Message: message, - } -} diff --git a/cmd/state-svc/internal/messages/queue.go b/cmd/state-svc/internal/messages/queue.go deleted file mode 100644 index e224e50437..0000000000 --- a/cmd/state-svc/internal/messages/queue.go +++ /dev/null @@ -1,60 +0,0 @@ -package messages - -import ( - "github.com/ActiveState/cli/internal/errs" - "github.com/ActiveState/cli/internal/graph" - "github.com/ActiveState/cli/internal/logging" -) - -type Queue struct { - queue map[string]map[string]*graph.Message -} - -func NewQueue() *Queue { - return &Queue{ - queue: make(map[string]map[string]*graph.Message), - } -} - -func (q *Queue) Queue(topic string, message string) error { - if _, ok := q.queue[topic]; !ok { - q.queue[topic] = make(map[string]*graph.Message) - } - msg := NewMessage(topic, message) - logging.Debug("Queued message: %s, %s", msg.ID, msg.Message) - q.queue[topic][msg.ID] = msg - return nil -} - -func (q *Queue) Messages() ([]*graph.Message, error) { - var messages []*graph.Message - for _, topic := range q.queue { - for _, message := range topic { - messages = append(messages, message) - } - } - return messages, nil -} - -func (q *Queue) Dequeue(messageIDs []string) error { - for _, messageID := range messageIDs { - err := q.dequeueMessages(messageID) - if err != nil { - return errs.Wrap(err, "failed to dequeue message") - } - } - return nil -} - -func (q *Queue) dequeueMessages(messageID string) error { - for _, topic := range q.queue { - for _, msg := range topic { - if msg.ID != messageID { - continue - } - delete(topic, messageID) - return nil - } - } - return nil -} diff --git a/cmd/state-svc/internal/messages/queue_test.go b/cmd/state-svc/internal/messages/queue_test.go deleted file mode 100644 index eb5bb8f995..0000000000 --- a/cmd/state-svc/internal/messages/queue_test.go +++ /dev/null @@ -1,169 +0,0 @@ -package messages - -import ( - "testing" - - "github.com/ActiveState/cli/internal/graph" - "github.com/stretchr/testify/assert" -) - -func TestNewQueue(t *testing.T) { - q := NewQueue() - assert.NotNil(t, q) - assert.Empty(t, q.queue) -} - -func TestQueue_Queue(t *testing.T) { - tests := []struct { - name string - messages []graph.Message - want map[string]int // topic -> expected message count - }{ - { - name: "queue first message", - messages: []graph.Message{ - {Topic: "topic1", Message: "message1"}, - }, - want: map[string]int{"topic1": 1}, - }, - { - name: "queue second message in same topic", - messages: []graph.Message{ - {Topic: "topic1", Message: "message1"}, - {Topic: "topic1", Message: "message2"}, - }, - want: map[string]int{"topic1": 2}, - }, - { - name: "queue message in different topic", - messages: []graph.Message{ - {Topic: "topic1", Message: "message1"}, - {Topic: "topic2", Message: "message3"}, - }, - want: map[string]int{"topic1": 1, "topic2": 1}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - q := NewQueue() - - for _, m := range tt.messages { - err := q.Queue(m.Topic, m.Message) - assert.NoError(t, err) - } - - for topic, count := range tt.want { - assert.Len(t, q.queue[topic], count) - } - }) - } -} - -func TestQueue_Messages(t *testing.T) { - tests := []struct { - name string - messages []graph.Message - wantCount int - }{ - { - name: "empty queue", - messages: nil, - wantCount: 0, - }, - { - name: "single message", - messages: []graph.Message{ - {Topic: "topic1", Message: "message1"}, - }, - wantCount: 1, - }, - { - name: "multiple messages across topics", - messages: []graph.Message{ - {Topic: "topic1", Message: "message1"}, - {Topic: "topic1", Message: "message2"}, - {Topic: "topic2", Message: "message3"}, - }, - wantCount: 3, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - q := NewQueue() - - for _, m := range tt.messages { - err := q.Queue(m.Topic, m.Message) - assert.NoError(t, err) - } - - msgs, err := q.Messages() - assert.NoError(t, err) - assert.Len(t, msgs, tt.wantCount) - }) - } -} - -func TestQueue_Dequeue(t *testing.T) { - tests := []struct { - name string - messages []graph.Message - dequeueIDs []string - wantRemaining int - }{ - { - name: "dequeue single message", - messages: []graph.Message{ - {Topic: "topic1", Message: "message1"}, - }, - dequeueIDs: nil, // Will be populated during test with actual message ID - wantRemaining: 0, - }, - { - name: "dequeue multiple messages", - messages: []graph.Message{ - {Topic: "topic1", Message: "message1"}, - {Topic: "topic2", Message: "message2"}, - }, - dequeueIDs: nil, // Will be populated during test with actual message IDs - wantRemaining: 0, - }, - { - name: "dequeue non-existent message", - messages: []graph.Message{ - {Topic: "topic1", Message: "message1"}, - }, - dequeueIDs: []string{"non-existent-id"}, - wantRemaining: 1, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - q := NewQueue() - - for _, m := range tt.messages { - err := q.Queue(m.Topic, m.Message) - assert.NoError(t, err) - } - - if tt.dequeueIDs == nil { - msgs, err := q.Messages() - assert.NoError(t, err) - - tt.dequeueIDs = make([]string, len(msgs)) - for i, msg := range msgs { - tt.dequeueIDs[i] = msg.ID - } - } - - err := q.Dequeue(tt.dequeueIDs) - assert.NoError(t, err) - - remaining, err := q.Messages() - assert.NoError(t, err) - assert.Len(t, remaining, tt.wantRemaining) - }) - } -} diff --git a/cmd/state-svc/internal/resolver/resolver.go b/cmd/state-svc/internal/resolver/resolver.go index f4611ebe77..1c4ebe9515 100644 --- a/cmd/state-svc/internal/resolver/resolver.go +++ b/cmd/state-svc/internal/resolver/resolver.go @@ -12,7 +12,6 @@ import ( "github.com/ActiveState/cli/cmd/state-svc/internal/graphqltypes" "github.com/ActiveState/cli/cmd/state-svc/internal/hash" - "github.com/ActiveState/cli/cmd/state-svc/internal/messages" "github.com/ActiveState/cli/cmd/state-svc/internal/notifications" "github.com/ActiveState/cli/cmd/state-svc/internal/rtwatcher" genserver "github.com/ActiveState/cli/cmd/state-svc/internal/server/generated" @@ -27,7 +26,7 @@ import ( "github.com/ActiveState/cli/internal/locale" "github.com/ActiveState/cli/internal/logging" configMediator "github.com/ActiveState/cli/internal/mediators/config" - msgs "github.com/ActiveState/cli/internal/messages" + "github.com/ActiveState/cli/internal/messages" "github.com/ActiveState/cli/internal/poller" "github.com/ActiveState/cli/internal/rtutils/ptr" "github.com/ActiveState/cli/internal/runbits/panics" @@ -37,10 +36,14 @@ import ( "github.com/patrickmn/go-cache" ) +type messageQueue struct { + messages []*graph.Message +} + type Resolver struct { cfg *config.Instance notifications *notifications.Notifications - messages *messages.Queue + messages *messageQueue updatePoller *poller.Poller authPoller *poller.Poller projectIDCache *projectcache.ID @@ -60,7 +63,7 @@ func New(cfg *config.Instance, an *sync.Client, auth *authentication.Auth) (*Res return nil, errs.Wrap(err, "Could not initialize messages") } - msg := messages.NewQueue() + msg := &messageQueue{make([]*graph.Message, 0)} upchecker := updater.NewDefaultChecker(cfg, an) pollUpdate := poller.New(1*time.Hour, func() (interface{}, error) { defer func() { @@ -91,7 +94,10 @@ func New(cfg *config.Instance, an *sync.Client, auth *authentication.Auth) (*Res var invalidTokenErr *authentication.ErrInvalidToken if errors.As(err, &invalidTokenErr) { logging.Debug("Queuing invalid API token error") - msg.Queue(msgs.TopicErrorAuthToken, locale.Tl("err_invalid_token_try_again", "Invalid API token. Please check your API token and try again.")) + msg.messages = append(msg.messages, &graph.Message{ + Topic: messages.TopicErrorAuthToken, + Message: locale.Tl("err_invalid_token_try_again", "Invalid API token. Please check your API token and try again."), + }) } else { logging.Warning("Could not sync authenticated state: %s", err.Error()) } @@ -273,25 +279,10 @@ func (r *Resolver) CheckNotifications(ctx context.Context, command string, flags } func (r *Resolver) CheckMessages(ctx context.Context) ([]*graph.Message, error) { + defer func() { panics.LogAndPanic(recover(), debug.Stack()) }() logging.Debug("Check messages resolver") - var messages []*graph.Message - var err error - - defer func() { - var sentMessageIDs []string - for _, msg := range messages { - sentMessageIDs = append(sentMessageIDs, msg.ID) - } - if err := r.messages.Dequeue(sentMessageIDs); err != nil { - logging.Error("Could not dequeue messages: %s", errs.JoinMessage(err)) - } - panics.LogAndPanic(recover(), debug.Stack()) - }() - - messages, err = r.messages.Messages() - if err != nil { - return nil, errs.Wrap(err, "Could not get messages") - } + messages := r.messages.messages + r.messages.messages = r.messages.messages[:0] // clear queue return messages, nil } diff --git a/cmd/state-svc/internal/server/generated/generated.go b/cmd/state-svc/internal/server/generated/generated.go index d6c2bfac4c..21e5330c93 100644 --- a/cmd/state-svc/internal/server/generated/generated.go +++ b/cmd/state-svc/internal/server/generated/generated.go @@ -80,7 +80,6 @@ type ComplexityRoot struct { } Message struct { - ID func(childComplexity int) int Message func(childComplexity int) int Topic func(childComplexity int) int } @@ -291,13 +290,6 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.JWT.User(childComplexity), true - case "Message.id": - if e.complexity.Message.ID == nil { - break - } - - return e.complexity.Message.ID(childComplexity), true - case "Message.message": if e.complexity.Message.Message == nil { break @@ -799,7 +791,6 @@ type NotificationInfo { } type Message { - id: String! topic: String! message: String! } @@ -1800,50 +1791,6 @@ func (ec *executionContext) fieldContext_JWT_user(_ context.Context, field graph return fc, nil } -func (ec *executionContext) _Message_id(ctx context.Context, field graphql.CollectedField, obj *graph.Message) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Message_id(ctx, field) - if err != nil { - return graphql.Null - } - ctx = graphql.WithFieldContext(ctx, fc) - defer func() { - if r := recover(); r != nil { - ec.Error(ctx, ec.Recover(ctx, r)) - ret = graphql.Null - } - }() - resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { - ctx = rctx // use context from middleware stack in children - return obj.ID, nil - }) - if err != nil { - ec.Error(ctx, err) - return graphql.Null - } - if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } - return graphql.Null - } - res := resTmp.(string) - fc.Result = res - return ec.marshalNString2string(ctx, field.Selections, res) -} - -func (ec *executionContext) fieldContext_Message_id(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { - fc = &graphql.FieldContext{ - Object: "Message", - Field: field, - IsMethod: false, - IsResolver: false, - Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { - return nil, errors.New("field of type String does not have child fields") - }, - } - return fc, nil -} - func (ec *executionContext) _Message_topic(ctx context.Context, field graphql.CollectedField, obj *graph.Message) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Message_topic(ctx, field) if err != nil { @@ -2983,8 +2930,6 @@ func (ec *executionContext) fieldContext_Query_checkMessages(_ context.Context, IsResolver: true, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { switch field.Name { - case "id": - return ec.fieldContext_Message_id(ctx, field) case "topic": return ec.fieldContext_Message_topic(ctx, field) case "message": @@ -6017,11 +5962,6 @@ func (ec *executionContext) _Message(ctx context.Context, sel ast.SelectionSet, switch field.Name { case "__typename": out.Values[i] = graphql.MarshalString("Message") - case "id": - out.Values[i] = ec._Message_id(ctx, field, obj) - if out.Values[i] == graphql.Null { - out.Invalids++ - } case "topic": out.Values[i] = ec._Message_topic(ctx, field, obj) if out.Values[i] == graphql.Null { diff --git a/cmd/state-svc/schema/schema.graphqls b/cmd/state-svc/schema/schema.graphqls index da0edcb343..1046a9e687 100644 --- a/cmd/state-svc/schema/schema.graphqls +++ b/cmd/state-svc/schema/schema.graphqls @@ -63,7 +63,6 @@ type NotificationInfo { } type Message { - id: String! topic: String! message: String! } diff --git a/cmd/state/internal/cmdtree/exechandlers/cmdcall/cmdcall.go b/cmd/state/internal/cmdtree/exechandlers/cmdcall/cmdcall.go index 049785f03b..0956722559 100644 --- a/cmd/state/internal/cmdtree/exechandlers/cmdcall/cmdcall.go +++ b/cmd/state/internal/cmdtree/exechandlers/cmdcall/cmdcall.go @@ -26,6 +26,7 @@ func (c *CmdCall) OnExecStart(cmd *captain.Command, _ []string) error { return errs.Wrap(err, "before-command event run failure") } return nil + } func (c *CmdCall) OnExecStop(cmd *captain.Command, _ []string) error { diff --git a/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go b/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go index bb8e3b9565..ab5848d59d 100644 --- a/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go +++ b/cmd/state/internal/cmdtree/exechandlers/messages/messenger.go @@ -62,7 +62,7 @@ func (m *Messenger) OnExecStart(_ *captain.Command, _ []string) error { } func (m *Messenger) handleErrorMessages(message *graph.Message) { - logging.Error("State Service reported a %s error: %s", message.Topic, message.Message) + logging.Warning("State Service reported a %s error: %s", message.Topic, message.Message) err := locale.NewError("err_svc_message", "[WARNING]Warning:[/RESET] {{.V0}}", message.Message) m.out.Error(err) } diff --git a/internal/graph/generated.go b/internal/graph/generated.go index 7ab57470b1..7350c32b84 100644 --- a/internal/graph/generated.go +++ b/internal/graph/generated.go @@ -41,7 +41,6 @@ type Jwt struct { } type Message struct { - ID string `json:"id"` Topic string `json:"topic"` Message string `json:"message"` }