Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-9870: Check for schema changes all the time. Allow for FTDC to consume maps. #4756

Merged
merged 13 commits into from
Jan 31, 2025
266 changes: 130 additions & 136 deletions ftdc/custom_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"math"
"reflect"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -124,8 +125,6 @@ func writeDatum(time int64, prev, curr []float32, output io.Writer) error {
return nil
}

var errNotStruct = errors.New("stats object is not a struct")

func isNumeric(kind reflect.Kind) bool {
return kind == reflect.Bool ||
kind == reflect.Int ||
Expand All @@ -135,23 +134,122 @@ func isNumeric(kind reflect.Kind) bool {
kind == reflect.Float32 || kind == reflect.Float64
}

func flattenStruct(item reflect.Value) ([]float32, error) {
flattenPtr := func(inp reflect.Value) reflect.Value {
for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface {
if inp.IsNil() {
return inp
func flattenPtr(inp reflect.Value) reflect.Value {
for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface {
if inp.IsNil() {
return inp
}

inp = inp.Elem()
}
return inp
}

func flatten(value reflect.Value) ([]string, []float32, error) {
value = flattenPtr(value)

// why is the default case not sufficient to be considered exhaustive?
//nolint:exhaustive
switch value.Kind() {
case reflect.Struct:
return flattenStruct(value)
case reflect.Map:
return flattenMap(value)
default:
// We can get here, for example, if a struct member is typed as an `any`, but the value is
// nil. More antagonistically, this also catches weird types such as channels.
return []string{}, []float32{}, nil
}
}

type mapSorter struct {
fields []string
values []float32
}

func (ms mapSorter) Len() int {
return len(ms.fields)
}

func (ms mapSorter) Less(left, right int) bool {
return ms.fields[left] < ms.fields[right]
}

func (ms mapSorter) Swap(left, right int) {
ms.fields[left], ms.fields[right] = ms.fields[right], ms.fields[left]
ms.values[left], ms.values[right] = ms.values[right], ms.values[left]
}

// flattenMap must be passed in a map where the keys are explicitly typed as strings. The values can
// be any terminal type (e.g: numbers) or more maps of strings.
func flattenMap(mValue reflect.Value) ([]string, []float32, error) {
if mValue.Type().Key().Kind() != reflect.String {
// We ignore types we refuse to serialize into ftdc.
return []string{}, []float32{}, nil
}

fields := make([]string, 0)
numbers := make([]float32, 0)

// Map iteration order is not predictable. This means that consecutive calls to a `Statser` that
// returns a map may yield: {"X": 1, "Y": 2} for one stat followed by {"Y": 2, "X": 1}. That
// sequence would result in us rewriting out the schema. We will build up results in map
// iteration order here and sort them later.
for iter := mValue.MapRange(); iter.Next(); {
key := iter.Key()
value := flattenPtr(iter.Value())

switch {
case value.CanUint():
fields = append(fields, key.String())
numbers = append(numbers, float32(value.Uint()))
case value.CanInt():
fields = append(fields, key.String())
numbers = append(numbers, float32(value.Int()))
case value.CanFloat():
fields = append(fields, key.String())
numbers = append(numbers, float32(value.Float()))
case value.Kind() == reflect.Bool:
fields = append(fields, key.String())
if value.Bool() {
numbers = append(numbers, 1)
} else {
numbers = append(numbers, 0)
}
case value.Kind() == reflect.Struct ||
value.Kind() == reflect.Pointer ||
value.Kind() == reflect.Interface ||
value.Kind() == reflect.Map:
subFields, subNumbers, err := flatten(value)
if err != nil {
return nil, nil, err
}

inp = inp.Elem()
for _, subField := range subFields {
fields = append(fields, fmt.Sprintf("%v.%v", key.String(), subField))
}
numbers = append(numbers, subNumbers...)
case isNumeric(value.Kind()):
//nolint:stylecheck
return nil, nil, fmt.Errorf("A numeric type was forgotten to be included. Kind: %v", value.Kind())
default:
// Getting the keys for a structure will ignore these types. Such as the antagonistic
// `channel`, or `string`. We follow suit in ignoring these types.
}
return inp
}

rVal := flattenPtr(item)
if rVal.Kind() != reflect.Struct {
return []float32{}, nil
}
// Sort `fields` in-place to ascending order to combat random map iteration order. This will
// also make the corresponding swaps on the `numbers` slice.
sort.Sort(mapSorter{fields, numbers})

return fields, numbers, nil
}

func flattenStruct(value reflect.Value) ([]string, []float32, error) {
value = flattenPtr(value)
rType := value.Type()

var fields []string
var numbers []float32
// Use reflection to walk the member fields of an individual set of metric readings. We rely
// on reflection always walking fields in the same order.
Expand All @@ -160,99 +258,51 @@ func flattenStruct(item reflect.Value) ([]float32, error) {
// function calls and allocations than some more raw alternatives. For example, we can have
// the "schema" keep a (field, offset, type) index and we instead access get a single unsafe
// pointer to each structure and walk out index to pull out the relevant numbers.
for memberIdx := 0; memberIdx < rVal.NumField(); memberIdx++ {
rField := flattenPtr(rVal.Field(memberIdx))
for memberIdx := 0; memberIdx < value.NumField(); memberIdx++ {
rField := flattenPtr(value.Field(memberIdx))
switch {
case rField.CanUint():
fields = append(fields, rType.Field(memberIdx).Name)
numbers = append(numbers, float32(rField.Uint()))
case rField.CanInt():
fields = append(fields, rType.Field(memberIdx).Name)
numbers = append(numbers, float32(rField.Int()))
case rField.CanFloat():
fields = append(fields, rType.Field(memberIdx).Name)
numbers = append(numbers, float32(rField.Float()))
case rField.Kind() == reflect.Bool:
if rField.Bool() {
fields = append(fields, rType.Field(memberIdx).Name)
numbers = append(numbers, 1)
} else {
fields = append(fields, rType.Field(memberIdx).Name)
numbers = append(numbers, 0)
}
case rField.Kind() == reflect.Struct ||
rField.Kind() == reflect.Pointer ||
rField.Kind() == reflect.Interface:
subNumbers, err := flattenStruct(rField)
rField.Kind() == reflect.Interface ||
rField.Kind() == reflect.Map:
subFields, subNumbers, err := flatten(rField)
if err != nil {
return nil, err
return nil, nil, err
}

thisFieldName := rType.Field(memberIdx).Name
for _, subField := range subFields {
fields = append(fields, fmt.Sprintf("%v.%v", thisFieldName, subField))
}

numbers = append(numbers, subNumbers...)
case isNumeric(rField.Kind()):
//nolint:stylecheck
return nil, fmt.Errorf("A numeric type was forgotten to be included. Kind: %v", rField.Kind())
return nil, nil, fmt.Errorf("A numeric type was forgotten to be included. Kind: %v", rField.Kind())
default:
// Getting the keys for a structure will ignore these types. Such as the antagonistic
// `channel`, or `string`. We follow suit in ignoring these types.
}
}

return numbers, nil
}

// getFieldsForStruct returns the (flattened) list of strings for a metric structure. For example the
// following type:
//
// type Foo {
// PowerPct float64
// Pos int
// }
//
// Will return `["PowerPct", "Pos"]`.
//
// Nested structures will walk and return a "dot delimited" name. E.g:
//
// type ParentFoo {
// Healthy Bool
// FooField Foo
// }
//
// Will return `["Healthy", "FooField.PowerPct", "FooField.Pos"]`.
func getFieldsForStruct(item reflect.Value) ([]string, error) {
flattenPtr := func(inp reflect.Value) reflect.Value {
for inp.Kind() == reflect.Pointer || inp.Kind() == reflect.Interface {
if inp.IsNil() {
return inp
}
inp = inp.Elem()
}
return inp
}

rVal := flattenPtr(item)
if rVal.Kind() != reflect.Struct {
return nil, errNotStruct
}

rType := rVal.Type()
var fields []string
for memberIdx := 0; memberIdx < rVal.NumField(); memberIdx++ {
structField := rType.Field(memberIdx)
fieldVal := rVal.Field(memberIdx)
derefedVal := flattenPtr(fieldVal)
if isNumeric(derefedVal.Kind()) {
fields = append(fields, structField.Name)
continue
}

if derefedVal.Kind() == reflect.Struct {
subFields, err := getFieldsForStruct(derefedVal)
if err != nil {
return nil, err
}

for _, subField := range subFields {
fields = append(fields, fmt.Sprintf("%v.%v", structField.Name, subField))
}
}
}

return fields, nil
return fields, numbers, nil
}

type schemaError struct {
Expand All @@ -264,62 +314,6 @@ func (err *schemaError) Error() string {
return fmt.Sprintf("SchemaError: %s StatserName: %s", err.err.Error(), err.statserName)
}

// getSchema returns a schema for a full FTDC datum. It immortalizes two properties:
// - mapOrder: The order to iterate future input `map[string]any` data.
// - fieldOrder: The order diff bits and values are to be written in.
//
// For correctness, it must be the case that the `mapOrder` and `fieldOrder` are consistent. I.e: if
// the `mapOrder` is `A` then `B`, the `fieldOrder` must list all of the fields of `A` first,
// followed by all the fields of `B`.
func getSchema(data map[string]any) (*schema, *schemaError) {
var mapOrder []string
var fields []string

for name, stats := range data {
mapOrder = append(mapOrder, name)
fieldsForItem, err := getFieldsForStruct(reflect.ValueOf(stats))
if err != nil {
return nil, &schemaError{name, err}
}

for _, field := range fieldsForItem {
// We insert a `.` into every metric/field name we get a recording for. This property is
// assumed elsewhere.
fields = append(fields, fmt.Sprintf("%v.%v", name, field))
}
}

return &schema{
mapOrder: mapOrder,
fieldOrder: fields,
}, nil
}

// flatten takes an input `Datum` and a `mapOrder` from the current `Schema` and returns a list of
// `float32`s representing the readings. Similar to `getFieldsForItem`, there are constraints on
// input data shape that this code currently does not validate.
func flatten(datum datum, schema *schema) ([]float32, error) {
ret := make([]float32, 0, len(schema.fieldOrder))

for _, key := range schema.mapOrder {
// Walk over the datum in `mapOrder` to ensure we gather values in the order consistent with
// the current schema.
stats, exists := datum.Data[key]
if !exists {
//nolint
return nil, fmt.Errorf("Missing statser name. Name: %v", key)
}

numbers, err := flattenStruct(reflect.ValueOf(stats))
if err != nil {
return nil, err
}
ret = append(ret, numbers...)
}

return ret, nil
}

// FlatDatum has the same information as a `datum`, but without the arbitrarily nested `Data`
// map. Using dots to join keys as per the disk format. So where a `Data` map might be:
//
Expand Down
Loading
Loading