Skip to content

Commit

Permalink
[UOD-2196] Handle string values for Union (#54)
Browse files Browse the repository at this point in the history
Co-authored-by: Romain Gilles <[email protected]>
  • Loading branch information
1 parent 968e578 commit 6179b0b
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ STREAM_ACT_START_BLOCK ?= 49608000
## CDC TABLES
# CDC_TABLES_ACCOUNT ?= 'eosio.token'
# CDC_TABLES_TABLE_NAMES ?= 'accounts:s+k'
CDC_START_BLOCK ?= 135283216
CDC_START_BLOCK ?= 135283642
CDC_ACCOUNT ?= eosio.nft.ft

CDC_TABLES_START_BLOCK ?= $(CDC_START_BLOCK)
Expand Down
88 changes: 88 additions & 0 deletions adapt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,91 @@ func TestCdCAdapter_Adapt_pb(t *testing.T) {
})
}
}

func TestCdCAdapter_Action_pb(t *testing.T) {
eos.NativeType = true

tests := []struct {
name string
file string
abi string
action string
nbMessages int
}{
{
"eosio.nft.ft",
"testdata/block-135283642.pb.json",
"testdata/eosio.nft.ft-4.0.6-snapshot.abi",
"*",
1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
block := &pbcodec.Block{}
err := jsonpb.UnmarshalString(string(readFileFromTestdata(t, tt.file)), block)
if err != nil {
t.Fatalf("jsonpb.UnmarshalString(): %v", err)
}

abiAccount := strings.TrimRight(path.Base(tt.abi), ".abi")
var localABIFiles = map[string]string{
abiAccount: tt.abi,
}
abiFiles, err := LoadABIFiles(localABIFiles)
if err != nil {
t.Fatalf("LoadABIFiles() error: %v", err)
}
abiDecoder := NewABIDecoder(abiFiles, nil, context.Background())
msg := MessageSchemaGenerator{
Namespace: "test.dkafka",
Version: "1.2.3",
Account: abiAccount,
}
// abi, _ := abiDecoder.abi(abiAccount, 0, false)
// schema, _ := msg.getTableSchema("accounts", abi)
// jsonSchema, err := json.Marshal(schema)
// fmt.Println(string(jsonSchema))

expression := fmt.Sprintf(`{"%s":"transaction_id"}`, tt.action)
actionKeyExpressions, err := createCdcKeyExpressions(expression)
if err != nil {
t.Fatalf("createCdcKeyExpressions() error: %v", err)
}
g := ActionGenerator2{
keyExtractors: actionKeyExpressions,
abiCodec: NewStreamedAbiCodec(&DfuseAbiRepository{
overrides: abiDecoder.overrides,
abiCodecCli: abiDecoder.abiCodecCli,
context: abiDecoder.context,
}, msg.getActionSchema, srclient.CreateMockSchemaRegistryClient("mock://bench-adapter"), abiAccount, "mock://bench-adapter"),
}
a := &CdCAdapter{
topic: "test.topic",
saveBlock: saveBlockNoop,
generator: transaction2ActionsGenerator{
actionLevelGenerator: g,
topic: "test.topic",
headers: default_headers,
},
headers: default_headers,
}
blockStep := BlockStep{
blk: block,
step: pbbstream.ForkStep_STEP_NEW,
cursor: "123",
}
messages, err := a.Adapt(blockStep)
if err != nil {
t.Fatalf("Adapt() error: %v", err)
}
assert.Equal(t, len(messages), tt.nbMessages)
for _, m := range messages {
assert.Equal(t, findHeader("content-type", m.Headers), "application/avro")
assert.Equal(t, findHeader("ce_datacontenttype", m.Headers), "application/avro")
assert.Assert(t, findHeader("ce_dataschema", m.Headers) != "")
}
})
}
}
57 changes: 35 additions & 22 deletions doc/capture-block.md
Original file line number Diff line number Diff line change
@@ -1,34 +1,47 @@
# Capture block to create test
# Capturing a Block to Create a Test

In case of issue with some block like serialization it's easy to capture the corresponding block to build a test case that you can play locally.
In the case of an issue with a specific block (e.g., serialization problems), you can easily capture the corresponding block to build a local test case.

## Step 1: Identify the Block Number
You can find the corresponding block number in the error log. It will look something like this:

## Identify the block number
Your supposed to get the corresponding block number on the error log like:
```
Error: transform to kafka message at block_num: 135283216, cursor: W5ph5hsUAYaSABpDgP3vVKWwLpcyB1lqVwPmKBBHj4v-8XLB35ymAzQjYB2Bwvqi2hHqHV3-2d-bF3cqoMhYv9jjkbFtvig-F3IklNvqqrTmeqGmbQgcJLwxDO7dZNHRWj_SYAL4e7cJ6tXvO_PdZhczYMFyLmPmi24C9NFWeKIT7HVgxDWsdprW1f-WpNYVrrdzEbL0xyvyA2F4fh1eNcvXY6OWuT52ZiE=, , cannot encode binary record "io.dkafka.eosio.nft.ft.v6.tables.FactoryBTableNotification" field "db_op": value does not match its schema: cannot encode binary record "io.dkafka.eosio.nft.ft.v6.tables.FactoryBTableOpInfo" field "new_json": value does not match its schema: cannot encode binary record "io.dkafka.eosio.nft.ft.v6.tables.FactoryBTableOp" field "keys": value does not match its schema: cannot encode binary record "io.dkafka.eosio.nft.ft.v6.tables.FactoryKeys" field "key_defs": value does not match its schema: cannot encode binary array item 1: map[default_value:[uint32 0] edit_rights:7 editors:[ultra.prop1] name:Gold type_index:6]: cannot encode binary record "io.dkafka.eosio.nft.ft.v6.tables.KeyDefTable" field "default_value": value does not match its schema: cannot encode binary union: non-nil Union values ought to be specified with Go map[string]interface{}, with single key equal to type name, and value equal to datum value: [null int long float double string bytes boolean array]; received: []interface {}
```
Here: `135283216`.

## Identify the corresponding smart contract
You can find the information by looking at the topic name that usually contains the smart contract or by searching some field names in our abi files.
In this case, the block number is `135283216`.

## Step 2: Identify the Corresponding Smart Contract
You can identify the relevant smart contract by checking the topic name, which usually contains the contract name, or by searching for certain field names in your ABI files.

In this example, you can determine the smart contract is `eosio.nft.ft` based on the topic name:

Here it's `eosio.nft.ft` thanks to the topic name:
`io.dkafka.eosio.nft.ft.v6.tables`
```
io.dkafka.eosio.nft.ft.v6.tables
```

## Start dkafka locally
To start dkafka locally you need four things:
1. Setup your KUBECONFIG and ENV env vars to access the corresponding environement that provide dfuse firehose and abicodec (at Ultra it's in our Dfuse Kubernetes environement).
2. Start the local kafka cluster by running `make up`
3. Start the dfuse port forward by running `make forward`
4. Start the cdc capture on the given block and smart contract `make cdc-tables CDC_START_BLOCK=135283216 CDC_ACCOUNT=eosio.nft.ft`
## Step 3: Start dkafka Locally
To start `dkafka` locally, you need to complete the following steps:

It will produce a file at the root of the project with the blocknum name: `block-135283216.pb.json`.
1. Set up your `KUBECONFIG` and `ENV` environment variables to access the corresponding environment that provides the `dfuse` firehose and `abicodec`. (At Ultra, this is available in the Dfuse Kubernetes environment.)
2. Start the local Kafka cluster by running `make up`.
3. Forward the dfuse port by running `make forward`.
4. Start the CDC capture for the given block and smart contract by running:

```
make cdc-tables CDC_START_BLOCK=135283216 CDC_ACCOUNT=eosio.nft.ft
```

Move it into `testdata/` folder with the corresponding abi file.
This will generate a file in the project root with a name like `block-135283216.pb.json`.

## Stop dkafka
To stop everything just run:
- `make down forward-stop`
Move this file into the `testdata/` folder along with the corresponding ABI file.

## Step 4: Stop dkafka
To stop the local setup, run the following commands:

```
make down forward-stop
```

## Build your test
Looks for existing test that use block to add your own test scenario.
## Step 5: Build Your Test
Look for existing tests that use blocks and add your test scenario accordingly.
12 changes: 12 additions & 0 deletions fork/goavro/boolean.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"errors"
"fmt"
"io"
"strconv"
)

func booleanNativeFromBinary(buf []byte) (interface{}, []byte, error) {
Expand Down Expand Up @@ -63,6 +64,17 @@ func booleanBinaryFromNative(buf []byte, datum interface{}) ([]byte, error) {
i = int(v)
case float64:
i = int(v)
case string:
res, err := strconv.ParseBool(v)
if err != nil {
return nil, fmt.Errorf("cannot encode string boolean %v: because off: %v", v, err)
} else {
if res {
i = 1
} else {
i = 0
}
}
default:
return nil, fmt.Errorf("cannot encode binary boolean: expected: Go bool; received: %T", datum)
}
Expand Down
14 changes: 14 additions & 0 deletions fork/goavro/floatingPoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ func doubleBinaryFromNative(buf []byte, datum interface{}) ([]byte, error) {
if value = float64(v); int32(value) != v {
return nil, fmt.Errorf("cannot encode binary double: provided Go int32 would lose precision: %d", v)
}
case string:
valuetmp, err := strconv.ParseFloat(v, 32)
if err != nil {
return nil, fmt.Errorf("cannot encode binary float: provided Go string would lose precision: %s", v)
} else {
value = valuetmp
}
default:
return nil, fmt.Errorf("cannot encode binary double: expected: Go numeric; received: %T", datum)
}
Expand Down Expand Up @@ -94,6 +101,13 @@ func floatBinaryFromNative(buf []byte, datum interface{}) ([]byte, error) {
if value = float32(v); int32(value) != v {
return nil, fmt.Errorf("cannot encode binary float: provided Go int32 would lose precision: %d", v)
}
case string:
valuetmp, err := strconv.ParseFloat(v, 32)
if err != nil {
return nil, fmt.Errorf("cannot encode binary float: provided Go string would lose precision: %s", v)
} else {
value = float32(valuetmp)
}
default:
return nil, fmt.Errorf("cannot encode binary float: expected: Go numeric; received: %T", datum)
}
Expand Down
60 changes: 58 additions & 2 deletions fork/goavro/union_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,65 @@ func TestUnionDynamicEncode(t *testing.T) {

func TestUnionEosEncode(t *testing.T) {
// output first parameter is type position, second is value
testBinaryEncodePass(t, `["null","int","long"]`, []interface{}{"uint32", uint32(3)}, []byte("\x04\x06"))
testBinaryEncodePass(t, `["null","int","long"]`, []interface{}{"int32", int32(3)}, []byte("\x02\x06"))

// Boolean Type
testBinaryEncodePass(t, `["null","boolean"]`, []interface{}{"bool", true}, []byte("\x02\x01"))
testBinaryEncodePass(t, `["null","boolean"]`, []interface{}{"bool", 1}, []byte("\x02\x01"))
testBinaryEncodePass(t, `["null","boolean"]`, []interface{}{"bool", "true"}, []byte("\x02\x01"))
testBinaryEncodePass(t, `["null","boolean"]`, []interface{}{"bool", false}, []byte("\x02\x00"))
testBinaryEncodePass(t, `["null","boolean"]`, []interface{}{"bool", 0}, []byte("\x02\x00"))

// Integer Types (int, uint mapped to int or long as per mapping)
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"int8", int8(5)}, []byte("\x02\x0a"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"int8", "5"}, []byte("\x02\x0a"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"uint8", uint8(5)}, []byte("\x02\x0a"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"uint8", "5"}, []byte("\x02\x0a"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"int16", int16(300)}, []byte("\x02\xd8\x04"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"int16", "300"}, []byte("\x02\xd8\x04"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"uint16", uint16(300)}, []byte("\x02\xd8\x04"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"uint16", "300"}, []byte("\x02\xd8\x04"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"int32", int32(3)}, []byte("\x02\x06"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"int32", "3"}, []byte("\x02\x06"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"uint32", uint32(3)}, []byte("\x02\x06"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"uint32", "3"}, []byte("\x02\x06"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"int64", int64(10)}, []byte("\x02\x14"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"int64", "10"}, []byte("\x02\x14"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"uint64", uint64(10)}, []byte("\x02\x14"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"uint64", "10"}, []byte("\x02\x14"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"varint32", int32(20)}, []byte("\x02\x28"))
testBinaryEncodePass(t, `["null","int"]`, []interface{}{"varint32", "20"}, []byte("\x02\x28"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"varuint32", uint32(20)}, []byte("\x02\x28"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"varuint32", "20"}, []byte("\x02\x28"))

// Floating Point Types
testBinaryEncodePass(t, `["null","float"]`, []interface{}{"float32", "0.12300000339746475"}, []byte("\x02\x6d\xe7\xfb\x3d"))
testBinaryEncodePass(t, `["null","float"]`, []interface{}{"float32", float32(0.123)}, []byte("\x02\x6d\xe7\xfb\x3d"))
testBinaryEncodePass(t, `["null","double"]`, []interface{}{"float64", "0.12300000339746475"}, []byte("\x02\x00\x00\x00\xa0\xed\x7c\xbf\x3f"))
testBinaryEncodePass(t, `["null","double"]`, []interface{}{"float64", float64(0.123)}, []byte("\x02\xb0\x72\x68\x91\xed\x7c\xbf\x3f"))

// Time and Timestamp Types
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"time_point", int64(1609459200)}, []byte("\x02\x80\x98\xf3\xfe\x0b"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"time_point", "1609459200"}, []byte("\x02\x80\x98\xf3\xfe\x0b"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"time_point_sec", int64(1609459200)}, []byte("\x02\x80\x98\xf3\xfe\x0b"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"time_point_sec", "1609459200"}, []byte("\x02\x80\x98\xf3\xfe\x0b"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"block_timestamp_type", int64(1609459200)}, []byte("\x02\x80\x98\xf3\xfe\x0b"))
testBinaryEncodePass(t, `["null","long"]`, []interface{}{"block_timestamp_type", "1609459200"}, []byte("\x02\x80\x98\xf3\xfe\x0b"))

// String Types
testBinaryEncodePass(t, `["null","string"]`, []interface{}{"name", "ultra"}, []byte("\x02\x0a\x75\x6c\x74\x72\x61"))
testBinaryEncodePass(t, `["null","string"]`, []interface{}{"string", "hello"}, []byte("\x02\x0a\x68\x65\x6c\x6c\x6f"))
testBinaryEncodePass(t, `["null","string"]`, []interface{}{"symbol", "USD"}, []byte("\x02\x06\x55\x53\x44"))
testBinaryEncodePass(t, `["null","string"]`, []interface{}{"symbol_code", "EUR"}, []byte("\x02\x06\x45\x55\x52"))

// Byte/Bytes Types
// If it happends; then GLHF
// testBinaryEncodePass(t, `["null","bytes"]`, []interface{}{"bytes", []byte{0x01, 0x02, 0x03}}, []byte("\x00\x06\x01\x02\x03"))
// testBinaryEncodePass(t, `["null","bytes"]`, []interface{}{"checksum160", []byte{0x01, 0x02, 0x03, 0x04, 0x05}}, []byte("\x00\x0a\x01\x02\x03\x04\x05"))
// testBinaryEncodePass(t, `["null","bytes"]`, []interface{}{"checksum256", []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}}, []byte("\x00\x10\x01\x02\x03\x04\x05\x06\x07\x08"))
// testBinaryEncodePass(t, `["null","bytes"]`, []interface{}{"checksum512", []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a}}, []byte("\x00\x14\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a"))

testBinaryEncodePass(t, `["string","int","long"]`, []interface{}{"name", "ultra"}, []byte("\x00\x0a\x75\x6c\x74\x72\x61"))

}

func TestUnionMapRecordFitsInRecord(t *testing.T) {
Expand Down
Loading

0 comments on commit 6179b0b

Please sign in to comment.