Skip to content

Commit c86b785

Browse files
Backport of Add the -capture flag to proxy log command. into release/1.9.x (#4956)
backport of commit 80f852e Co-authored-by: Abhishek Yadav <[email protected]>
1 parent a8450b0 commit c86b785

File tree

4 files changed

+402
-21
lines changed

4 files changed

+402
-21
lines changed

.changelog/4788.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:feature
2+
cli: added new -capture flag to proxy loglevel command, enabling users to capture logs for certain duration.
3+
```

cli/cmd/proxy/loglevel/command.go

Lines changed: 223 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,17 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"io"
11+
"os"
12+
"path/filepath"
1013
"strings"
1114
"sync"
15+
"time"
1216

1317
"github.com/posener/complete"
18+
"golang.org/x/sync/errgroup"
1419
helmCLI "helm.sh/helm/v3/pkg/cli"
20+
corev1 "k8s.io/api/core/v1"
1521
"k8s.io/apimachinery/pkg/api/validation"
1622
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1723
"k8s.io/client-go/kubernetes"
@@ -21,6 +27,7 @@ import (
2127
"github.com/hashicorp/consul-k8s/cli/common/envoy"
2228
"github.com/hashicorp/consul-k8s/cli/common/flag"
2329
"github.com/hashicorp/consul-k8s/cli/common/terminal"
30+
"github.com/hashicorp/go-multierror"
2431
)
2532

2633
const (
@@ -30,6 +37,14 @@ const (
3037
flagNameReset = "reset"
3138
flagNameKubeConfig = "kubeconfig"
3239
flagNameKubeContext = "context"
40+
flagNameCapture = "capture"
41+
42+
// minimum duration for log capture should be atleast 10seconds
43+
minimumCaptureDuration = 10 * time.Second
44+
45+
// permission to be used when creating files and directories
46+
filePermission = 0644
47+
dirPermission = 0755
3348
)
3449

3550
var ErrIncorrectArgFormat = errors.New("Exactly one positional argument is required: <pod-name>")
@@ -57,13 +72,15 @@ type LogLevelCommand struct {
5772
namespace string
5873
level string
5974
reset bool
75+
capture time.Duration
6076
kubeConfig string
6177
kubeContext string
6278

6379
once sync.Once
6480
help string
6581
restConfig *rest.Config
6682
envoyLoggingCaller func(context.Context, common.PortForwarder, *envoy.LoggerParams) (map[string]string, error)
83+
getLogFunc func(context.Context, *corev1.Pod, *corev1.PodLogOptions) ([]byte, error)
6784
}
6885

6986
func (l *LogLevelCommand) init() {
@@ -83,6 +100,12 @@ func (l *LogLevelCommand) init() {
83100
Usage: "Update the level for the logger. Can be either `-update-level warning` to change all loggers to warning, or a comma delineated list of loggers with level can be passed like `-update-level grpc:warning,http:info` to only modify specific loggers.",
84101
Aliases: []string{"u"},
85102
})
103+
f.DurationVar(&flag.DurationVar{
104+
Name: flagNameCapture,
105+
Target: &l.capture,
106+
Default: 0,
107+
Usage: "Captures pod log for the given duration according to existing/new update-level. It can be used with -update-level <any> flag to capture logs at that level or with -reset flag to capture logs at default info level",
108+
})
86109

87110
f.BoolVar(&flag.BoolVar{
88111
Name: flagNameReset,
@@ -128,6 +151,9 @@ func (l *LogLevelCommand) Run(args []string) int {
128151
if l.envoyLoggingCaller == nil {
129152
l.envoyLoggingCaller = envoy.CallLoggingEndpoint
130153
}
154+
if l.getLogFunc == nil {
155+
l.getLogFunc = l.getLogs
156+
}
131157

132158
err = l.initKubernetes()
133159
if err != nil {
@@ -139,11 +165,19 @@ func (l *LogLevelCommand) Run(args []string) int {
139165
return l.logOutputAndDie(err)
140166
}
141167

142-
err = l.fetchOrSetLogLevels(adminPorts)
143-
if err != nil {
144-
return l.logOutputAndDie(err)
168+
if l.capture == 0 {
169+
loggers, err := l.fetchOrSetLogLevels(adminPorts, l.level)
170+
if err != nil {
171+
return l.logOutputAndDie(err)
172+
}
173+
l.outputLevels(loggers)
174+
return 0
145175
}
146176

177+
err = l.captureLogsAndResetLogLevels(adminPorts, l.level)
178+
if err != nil {
179+
return 1
180+
}
147181
return 0
148182
}
149183

@@ -180,13 +214,14 @@ func (l *LogLevelCommand) validateFlags() error {
180214
if l.level != "" && l.reset {
181215
return fmt.Errorf("cannot set log level to %q and reset to 'info' at the same time", l.level)
182216
}
183-
if l.namespace == "" {
184-
return nil
217+
if l.namespace != "" {
218+
errs := validation.ValidateNamespaceName(l.namespace, false)
219+
if len(errs) > 0 {
220+
return fmt.Errorf("invalid namespace name passed for -namespace/-n: %v", strings.Join(errs, "; "))
221+
}
185222
}
186-
187-
errs := validation.ValidateNamespaceName(l.namespace, false)
188-
if len(errs) > 0 {
189-
return fmt.Errorf("invalid namespace name passed for -namespace/-n: %v", strings.Join(errs, "; "))
223+
if l.capture != 0 && l.capture < minimumCaptureDuration {
224+
return fmt.Errorf("capture duration must be at least %s", minimumCaptureDuration)
190225
}
191226

192227
return nil
@@ -248,7 +283,10 @@ func (l *LogLevelCommand) fetchAdminPorts() (map[string]int, error) {
248283
return adminPorts, nil
249284
}
250285

251-
func (l *LogLevelCommand) fetchOrSetLogLevels(adminPorts map[string]int) error {
286+
// fetchOrSetLogLevels - fetches or sets the log levels for all admin ports depending on the logLevel parameter
287+
// - if logLevel is empty, it fetches the existing log levels
288+
// - if logLevel is non-empty, it sets the new log levels
289+
func (l *LogLevelCommand) fetchOrSetLogLevels(adminPorts map[string]int, logLevel string) (map[string]LoggerConfig, error) {
252290
loggers := make(map[string]LoggerConfig, 0)
253291

254292
for name, port := range adminPorts {
@@ -259,21 +297,189 @@ func (l *LogLevelCommand) fetchOrSetLogLevels(adminPorts map[string]int) error {
259297
KubeClient: l.kubernetes,
260298
RestConfig: l.restConfig,
261299
}
262-
params, err := parseParams(l.level)
300+
params, err := parseParams(logLevel)
263301
if err != nil {
264-
return err
302+
return nil, err
265303
}
266304
logLevels, err := l.envoyLoggingCaller(l.Ctx, &pf, params)
267305
if err != nil {
268-
return err
306+
return nil, err
269307
}
270308
loggers[name] = logLevels
271309
}
310+
return loggers, nil
311+
}
312+
313+
// captureLogsAndResetLogLevels - captures the logs from the given pod at given logLevels for the given duration and writes it to a file
314+
func (l *LogLevelCommand) captureLogsAndResetLogLevels(adminPorts map[string]int, logLevels string) error {
315+
// if no new log level is provided, just capture logs at existing log levels.
316+
if logLevels == "" {
317+
return l.captureLogs()
318+
}
319+
320+
// NEW LOG LEVELS provided via -update-level flag,
321+
// 1. Fetch existing log levels before setting NEW log levels (for reset after log capture)
322+
// 2. Set NEW log levels
323+
// 3. Capture logs at NEW log levels for the given duration
324+
// 4. Reset back to existing log levels
325+
326+
// cleanup is required to ensure that if new log level set,
327+
// should be reset back to existing log level after log capture
328+
// even if user interrupts the command during log capture.
329+
select {
330+
case <-l.CleanupReqAndCompleted:
331+
default:
332+
}
272333

273-
l.outputLevels(loggers)
334+
// fetch log levels
335+
l.UI.Output(fmt.Sprintf("Fetching existing log levels..."))
336+
existingLoggers, err := l.fetchOrSetLogLevels(adminPorts, "")
337+
if err != nil {
338+
return fmt.Errorf("error fetching existing log levels: %w", err)
339+
}
340+
341+
// defer reset of log levels
342+
defer func() {
343+
l.UI.Output("Resetting log levels back to existing levels...")
344+
if err := l.resetLogLevels(existingLoggers, adminPorts); err != nil {
345+
l.UI.Output(err.Error(), terminal.WithErrorStyle())
346+
} else {
347+
l.UI.Output("Reset completed successfully!")
348+
}
349+
l.CleanupReqAndCompleted <- false
350+
}()
351+
352+
// set new log levels for log capture
353+
l.UI.Output(fmt.Sprintf("Setting new log levels..."))
354+
newLogger, err := l.fetchOrSetLogLevels(adminPorts, logLevels)
355+
if err != nil {
356+
return fmt.Errorf("error setting new log levels: %w", err)
357+
}
358+
l.outputLevels(newLogger)
359+
360+
// capture logs at new log levels
361+
err = l.captureLogs()
362+
if err != nil {
363+
l.UI.Output(fmt.Sprintf("error capturing logs: %v", err), terminal.WithErrorStyle())
364+
return err
365+
}
274366
return nil
275367
}
276368

369+
// resetLogLevels - converts the 'existing logger map' to logLevel parameter string
370+
// and reset the log levels back for EACH admin ports
371+
func (l *LogLevelCommand) resetLogLevels(existingLogger map[string]LoggerConfig, adminPorts map[string]int) error {
372+
// Use a fresh context for resetting log levels as
373+
// l.Ctx might be cancelled during log capture DUE TO user interrupt
374+
originalCtx := l.Ctx
375+
l.Ctx = context.Background()
376+
defer func() {
377+
l.Ctx = originalCtx
378+
}()
379+
380+
var errs error
381+
for adminPortName, loggers := range existingLogger {
382+
var logLevelParams []string
383+
for loggerName, logLevel := range loggers {
384+
// EnvoyLoggers is a map of valid logger for consul and
385+
// fetchLogLevels return ALL the envoy logger (not the one specific of consul)
386+
// so below check is needed to filter out unspecified loggers.
387+
// It can be removed once the above is fixed.
388+
if _, ok := envoy.EnvoyLoggers[loggerName]; ok {
389+
logLevelParams = append(logLevelParams, fmt.Sprintf("%s:%s", loggerName, logLevel))
390+
}
391+
}
392+
var logLevelParamsString string
393+
if len(logLevelParams) > 0 {
394+
logLevelParamsString = strings.Join(logLevelParams, ",")
395+
} else {
396+
logLevelParamsString = "info"
397+
}
398+
_, err := l.fetchOrSetLogLevels(map[string]int{adminPortName: adminPorts[adminPortName]}, logLevelParamsString)
399+
if err != nil {
400+
errs = multierror.Append(errs, fmt.Errorf("error resetting log level for %s: %w", adminPortName, err))
401+
}
402+
}
403+
return errs
404+
}
405+
406+
func (l *LogLevelCommand) captureLogs() error {
407+
l.UI.Output("Starting log capture...")
408+
g := new(errgroup.Group)
409+
g.Go(func() error {
410+
return l.fetchPodLogs()
411+
})
412+
err := g.Wait()
413+
if err != nil {
414+
return err
415+
}
416+
return nil
417+
}
418+
419+
// fetchPodLogs - captures the logs from the given pod for the given duration and writes it to a file
420+
func (l *LogLevelCommand) fetchPodLogs() error {
421+
sinceSeconds := int64(l.capture.Seconds())
422+
pod, err := l.kubernetes.CoreV1().Pods(l.namespace).Get(l.Ctx, l.podName, metav1.GetOptions{})
423+
if err != nil {
424+
return fmt.Errorf("error getting pod object from k8s: %w", err)
425+
}
426+
427+
var podLogOptions *corev1.PodLogOptions
428+
for _, container := range pod.Spec.Containers {
429+
if container.Name == "consul-dataplane" {
430+
podLogOptions = &corev1.PodLogOptions{
431+
Container: container.Name,
432+
SinceSeconds: &sinceSeconds,
433+
Timestamps: true,
434+
}
435+
}
436+
}
437+
proxyLogFilePath := filepath.Join("proxy", fmt.Sprintf("proxy-log-%s.log", l.podName))
438+
439+
// metadata of log capture
440+
l.UI.Output("Pod Name: %s", pod.Name)
441+
l.UI.Output("Container Name: %s", podLogOptions.Container)
442+
l.UI.Output("Namespace: %s", pod.Namespace)
443+
l.UI.Output("Log Capture Duration: %s", l.capture)
444+
l.UI.Output("Log File Path: %s", proxyLogFilePath)
445+
446+
durationChn := time.After(l.capture)
447+
select {
448+
case <-durationChn:
449+
logs, err := l.getLogFunc(l.Ctx, pod, podLogOptions)
450+
if err != nil {
451+
return err
452+
}
453+
// Create file path and directory for storing logs
454+
// NOTE: currently it is writing log file in cwd /proxy only. Also, log file contents will be overwritten if
455+
// the command is run multiple times for the same pod name or if file already exists.
456+
if err := os.MkdirAll(filepath.Dir(proxyLogFilePath), dirPermission); err != nil {
457+
return fmt.Errorf("error creating directory for log file: %w", err)
458+
}
459+
if err := os.WriteFile(proxyLogFilePath, logs, filePermission); err != nil {
460+
return fmt.Errorf("error writing log to file: %v", err)
461+
}
462+
l.UI.Output("Logs saved to '%s'", proxyLogFilePath, terminal.WithSuccessStyle())
463+
return nil
464+
case <-l.Ctx.Done():
465+
return fmt.Errorf("stopping collection due to shutdown signal received")
466+
}
467+
}
468+
func (l *LogLevelCommand) getLogs(ctx context.Context, pod *corev1.Pod, podLogOptions *corev1.PodLogOptions) ([]byte, error) {
469+
podLogRequest := l.kubernetes.CoreV1().Pods(l.namespace).GetLogs(pod.Name, podLogOptions)
470+
podLogStream, err := podLogRequest.Stream(ctx)
471+
if err != nil {
472+
return nil, fmt.Errorf("error getting logs: %v\n", err)
473+
}
474+
defer podLogStream.Close()
475+
476+
logs, err := io.ReadAll(podLogStream)
477+
if err != nil {
478+
return nil, fmt.Errorf("error reading log streams: %w", err)
479+
}
480+
return logs, nil
481+
}
482+
277483
func parseParams(params string) (*envoy.LoggerParams, error) {
278484
loggerParams := envoy.NewLoggerParams()
279485
if len(params) == 0 {
@@ -330,6 +536,9 @@ func (l *LogLevelCommand) Synopsis() string {
330536
func (l *LogLevelCommand) AutocompleteFlags() complete.Flags {
331537
return complete.Flags{
332538
fmt.Sprintf("-%s", flagNameNamespace): complete.PredictNothing,
539+
fmt.Sprintf("-%s", flagNameCapture): complete.PredictAnything,
540+
fmt.Sprintf("-%s", flagNameUpdateLevel): complete.PredictAnything,
541+
fmt.Sprintf("-%s", flagNameReset): complete.PredictNothing,
333542
fmt.Sprintf("-%s", flagNameKubeConfig): complete.PredictFiles("*"),
334543
fmt.Sprintf("-%s", flagNameKubeContext): complete.PredictNothing,
335544
}

0 commit comments

Comments
 (0)