Skip to content

feat: support propagation of OpenTelemetry context from clients #568

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
steampipe_postgres_fdw.h
# generated C imports
0_prebuild.go
prebuild.go

# Binaries for programs and plugins
*.exe
Expand All @@ -27,4 +28,4 @@ build-*/
# intermediate files from clang
*.bc
# work directory created by the standalone fdw building
/work
/work
40 changes: 40 additions & 0 deletions fdw.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net"
"net/http"
"os"
"strings"
"time"
"unsafe"

Expand Down Expand Up @@ -172,6 +173,30 @@ func goFdwGetRelSize(state *C.FdwPlanState, root *C.PlannerInfo, rows *C.double,

tableOpts := GetFTableOptions(types.Oid(state.foreigntableid))

// Extract trace context if available
var traceContext string
if state.trace_context_string != nil {
traceContext = C.GoString(state.trace_context_string)
log.Printf("[TRACE] Extracted trace context from session: %s", traceContext)

if len(traceContext) > 0 {
log.Printf("[DEBUG] Trace context length: %d characters", len(traceContext))
if strings.Contains(traceContext, "traceparent=") {
log.Printf("[DEBUG] Trace context contains traceparent field")
} else {
log.Printf("[WARN] Trace context missing traceparent field - may be malformed")
}
}
} else {
log.Printf("[DEBUG] No trace context found in session variables")
}

// Add trace context to options for hub layer
if traceContext != "" {
tableOpts["trace_context"] = traceContext
log.Printf("[DEBUG] Added trace context to table options")
}

// build columns
var columns []string
if state.target_list != nil {
Expand Down Expand Up @@ -296,6 +321,21 @@ func goFdwBeginForeignScan(node *C.ForeignScanState, eflags C.int) {
plan := (*C.ForeignScan)(unsafe.Pointer(node.ss.ps.plan))
var execState *C.FdwExecState = C.initializeExecState(unsafe.Pointer(plan.fdw_private))

// Extract trace context from session variables for scan operation
var traceContext string
if traceContextPtr := C.getTraceContextFromSession(); traceContextPtr != nil {
traceContext = C.GoString(traceContextPtr)
log.Printf("[TRACE] Extracted trace context from session for scan: %s", traceContext)
} else {
log.Printf("[DEBUG] No trace context found in session variables for scan")
}

// Add trace context to options for hub layer
if traceContext != "" {
opts["trace_context"] = traceContext
log.Printf("[DEBUG] Added trace context to scan options")
}

log.Printf("[INFO] goFdwBeginForeignScan, canPushdownAllSortFields %v", execState.canPushdownAllSortFields)
var columns []string
if execState.target_list != nil {
Expand Down
4 changes: 3 additions & 1 deletion fdw/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ typedef struct FdwPlanState
int width;
// the number of rows to return (limit+offset). -1 means no limit
int limit;
// OpenTelemetry trace context extracted from session variables
char *trace_context_string;

} FdwPlanState;

Expand Down Expand Up @@ -133,4 +135,4 @@ List *deserializeDeparsedSortGroup(List *items);
OpExpr *canonicalOpExpr(OpExpr *opExpr, Relids base_relids);
ScalarArrayOpExpr *canonicalScalarArrayOpExpr(ScalarArrayOpExpr *opExpr, Relids base_relids);
char *getOperatorString(Oid opoid);
#endif // FDW_COMMON_H
#endif // FDW_COMMON_H
46 changes: 46 additions & 0 deletions fdw/fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include "fdw_handlers.h"
#include "nodes/plannodes.h"
#include "access/xact.h"
#include "utils/guc.h"
#include "utils/builtins.h"

extern PGDLLEXPORT void _PG_init(void);

Expand Down Expand Up @@ -92,6 +94,40 @@ static bool fdwIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, Ran
return getenv("STEAMPIPE_FDW_PARALLEL_SAFE") != NULL;
}

/*
* Extract OpenTelemetry trace context from PostgreSQL session variables
* Returns a formatted string containing traceparent and tracestate, or NULL if not set
*/
static char *extractTraceContextFromSession(void)
{
const char *traceparent = GetConfigOption("steampipe.traceparent", true, false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to fix indentation...

const char *tracestate = GetConfigOption("steampipe.tracestate", true, false);
char *result = NULL;

// Format the result string for Go layer consumption
if (traceparent != NULL) {
if (tracestate != NULL) {
result = psprintf("traceparent=%s;tracestate=%s", traceparent, tracestate);
} else {
result = psprintf("traceparent=%s", traceparent);
}

elog(DEBUG1, "extracted trace context: %s", result);
} else {
elog(DEBUG2, "no trace context found in session variables");
}

return result;
}

/*
* Public wrapper for extractTraceContextFromSession - callable from Go
*/
char *getTraceContextFromSession(void)
{
return extractTraceContextFromSession();
}

static void fdwGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
{
FdwPlanState *planstate;
Expand All @@ -111,6 +147,16 @@ static void fdwGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid for
// Save plan state information
baserel->fdw_private = planstate;
planstate->foreigntableid = foreigntableid;

// Extract trace context from session variables
char *traceContext = extractTraceContextFromSession();
if (traceContext != NULL) {
planstate->trace_context_string = pstrdup(traceContext);
pfree(traceContext);
elog(DEBUG1, "stored trace context in plan state");
} else {
planstate->trace_context_string = NULL;
}

// Initialize the conversion info array
{
Expand Down
5 changes: 4 additions & 1 deletion fdw/fdw_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,7 @@ static inline char *nameStr(Name n) { return NameStr(*n); }


// logging
char *tagTypeToString(NodeTag type);
char *tagTypeToString(NodeTag type);

// trace context
char *getTraceContextFromSession(void);
89 changes: 87 additions & 2 deletions hub/hub_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

type hubBase struct {
Expand Down Expand Up @@ -372,8 +374,31 @@ func (h *hubBase) executeCommandScan(connectionName, table string, queryTimestam
}
}

func (h *hubBase) traceContextForScan(table string, columns []string, limit int64, qualMap map[string]*proto.Quals, connectionName string) *telemetry.TraceCtx {
ctx, span := telemetry.StartSpan(context.Background(), FdwName, "RemoteHub.Scan (%s)", table)
func (h *hubBase) traceContextForScan(table string, columns []string, limit int64, qualMap map[string]*proto.Quals, connectionName string, opts types.Options) *telemetry.TraceCtx {
var baseCtx context.Context = context.Background()

// Check if we have trace context from session variables
if traceContextStr, exists := opts["trace_context"]; exists && traceContextStr != "" {
log.Printf("[DEBUG] traceContextForScan received trace context: %s", traceContextStr)
if parentCtx := h.parseTraceContext(traceContextStr); parentCtx != nil {
baseCtx = parentCtx
log.Printf("[TRACE] Using parent trace context for scan of table: %s", table)

// Verify the parent context has the expected trace ID
parentSpanCtx := trace.SpanContextFromContext(parentCtx)
if parentSpanCtx.IsValid() {
log.Printf("[DEBUG] Parent context TraceID: %s, SpanID: %s",
parentSpanCtx.TraceID().String(), parentSpanCtx.SpanID().String())
}
} else {
log.Printf("[WARN] Failed to parse trace context for table: %s", table)
}
} else {
log.Printf("[DEBUG] No trace context found in options for table: %s", table)
}

// Create span with potentially propagated context
ctx, span := telemetry.StartSpan(baseCtx, FdwName, "RemoteHub.Scan (%s)", table)
span.SetAttributes(
attribute.StringSlice("columns", columns),
attribute.String("table", table),
Expand All @@ -383,9 +408,69 @@ func (h *hubBase) traceContextForScan(table string, columns []string, limit int6
if limit != -1 {
span.SetAttributes(attribute.Int64("limit", limit))
}

spanCtx := span.SpanContext()
if spanCtx.IsValid() {
log.Printf("[DEBUG] Created span for table %s - TraceID: %s, SpanID: %s",
table, spanCtx.TraceID().String(), spanCtx.SpanID().String())
}

return &telemetry.TraceCtx{Ctx: ctx, Span: span}
}

// parseTraceContext parses trace context string from session variables
// Format: "traceparent=00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01;tracestate=rojo=00f067aa0ba902b7"
func (h *hubBase) parseTraceContext(traceContextString string) context.Context {
log.Printf("[DEBUG] parseTraceContext called with: %s", traceContextString)

if traceContextString == "" {
log.Printf("[DEBUG] Empty trace context string")
return nil
}

carrier := propagation.MapCarrier{}

// Parse the trace context string format: "traceparent=..;tracestate=.."
parts := strings.Split(traceContextString, ";")
log.Printf("[DEBUG] Split trace context into %d parts: %v", len(parts), parts)

for _, part := range parts {
if kv := strings.SplitN(part, "=", 2); len(kv) == 2 {
key := strings.TrimSpace(kv[0])
value := strings.TrimSpace(kv[1])
carrier[key] = value
log.Printf("[DEBUG] Added to carrier: %s = %s", key, value)
} else {
log.Printf("[DEBUG] Skipping invalid part: %s", part)
}
}

log.Printf("[DEBUG] Final carrier contents: %v", carrier)

if len(carrier) == 0 {
log.Printf("[WARN] No valid trace context found in: %s", traceContextString)
return nil
}

// Use OpenTelemetry propagator to extract context
propagator := propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)
extractedCtx := propagator.Extract(context.Background(), carrier)

// Verify we actually got a valid span context
spanCtx := trace.SpanContextFromContext(extractedCtx)
if spanCtx.IsValid() {
log.Printf("[TRACE] Successfully extracted trace context - TraceID: %s, SpanID: %s",
spanCtx.TraceID().String(), spanCtx.SpanID().String())
return extractedCtx
}

log.Printf("[WARN] Extracted trace context is not valid - carrier was: %v", carrier)
return nil
}

// determine whether to include the limit, based on the quals
// we ONLY pushdown the limit if all quals have corresponding key columns,
// and if the qual operator is supported by the key column
Expand Down
2 changes: 1 addition & 1 deletion hub/hub_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (l *HubLocal) GetIterator(columns []string, quals *proto.Quals, unhandledRe
}

// create a span for this scan
scanTraceCtx := l.traceContextForScan(table, columns, limit, qualMap, connectionName)
scanTraceCtx := l.traceContextForScan(table, columns, limit, qualMap, connectionName, opts)
iterator, err := l.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, sortOrder, queryTimestamp, scanTraceCtx)

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion hub/hub_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (h *RemoteHub) GetIterator(columns []string, quals *proto.Quals, unhandledR
}

// create a span for this scan
scanTraceCtx := h.traceContextForScan(table, columns, limit, qualMap, connectionName)
scanTraceCtx := h.traceContextForScan(table, columns, limit, qualMap, connectionName, opts)
iterator, err := h.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, sortOrder, queryTimestamp, scanTraceCtx)

if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion hub/scan_iterator_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/turbot/steampipe-plugin-sdk/v5/telemetry"
"github.com/turbot/steampipe-postgres-fdw/v2/types"
"github.com/turbot/steampipe/v2/pkg/query/queryresult"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/reflect/protoreflect"
)

Expand Down Expand Up @@ -199,13 +200,25 @@ func (i *scanIteratorBase) GetScanMetadata() []queryresult.ScanMetadataRow {
}

func (i *scanIteratorBase) newExecuteRequest() *proto.ExecuteRequest {
traceCarrier := grpc.CreateCarrierFromContext(i.traceCtx.Ctx)
log.Printf("[DEBUG] newExecuteRequest creating trace carrier for table %s: %v", i.table, traceCarrier)

// Validate span context from the trace context
spanCtx := trace.SpanContextFromContext(i.traceCtx.Ctx)
if spanCtx.IsValid() {
log.Printf("[DEBUG] newExecuteRequest has valid span context - TraceID: %s, SpanID: %s",
spanCtx.TraceID().String(), spanCtx.SpanID().String())
} else {
log.Printf("[WARN] newExecuteRequest has invalid span context for table %s", i.table)
}

req := &proto.ExecuteRequest{
Table: i.table,
QueryContext: i.queryContext,
CallId: i.callId,
// pass connection name - used for aggregators
Connection: i.connectionName,
TraceContext: grpc.CreateCarrierFromContext(i.traceCtx.Ctx),
TraceContext: traceCarrier,
ExecuteConnectionData: make(map[string]*proto.ExecuteConnectionData),
}

Expand Down
Loading
Loading