Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flytekit][3] [list, dict and nested cases] Binary IDL With MessagePack #2758

Closed
wants to merge 17 commits into from

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Sep 19, 2024

Related PRs

#2757
some local execution and remote execution tests are in its description.

Tracking issue

flyteorg/flyte#5318

Why are the changes needed?

We want to support attribute access for list and dict in python.

  • List Transformer
@dataclass
class DC:
    list_int: typing.List[int] = field(default_factory=lambda: [1, 2, 3])
    list_float: typing.List[float] = field(default_factory=lambda: [1.0, 2.0, 3.0])
    list_str: typing.List[str] = field(default_factory=lambda: ["a", "b", "c"])
    list_bool: typing.List[bool] = field(default_factory=lambda: [True, False, True])
    e: typing.List[typing.List[int]] = field(default_factory=lambda: [[1, 2], [3, 4]])
    f: typing.List[typing.List[float]] = field(default_factory=lambda: [[1.0, 2.0], [3.0, 4.0]])
    g: typing.List[typing.List[str]] = field(default_factory=lambda: [["a", "b"], ["c", "d"]])
    h: typing.List[typing.List[bool]] = field(default_factory=lambda: [[True, False], [False, True]])
    i: typing.List[Dict[str, int]] = field(default_factory=lambda: [{"key1": 1, "key2": 2}])
    j: typing.List[Dict[str, float]] = field(default_factory=lambda: [{"key1": 1.0, "key2": 2.0}])
    k: typing.List[Dict[str, str]] = field(default_factory=lambda: [{"key1": "a", "key2": "b"}])
    l: typing.List[Dict[str, bool]] = field(default_factory=lambda: [{"key1": True, "key2": False}])

@workflow
def dataclass_wf(input: DC):
    t_list_int(list_int=input.list_int)
    t_list_float(list_float=input.list_float)
    t_list_str(list_str=input.list_str)
    t_list_bool(list_bool=input.list_bool)
    t_list_list_int(e=input.e)
    t_list_dict_str_int(i=input.i)
    t_list_dict_str_float(j=input.j)
    t_list_dict_str_str(k=input.k)
    t_list_dict_str_bool(l=input.l)
  • Dict Transformer
@dataclass
class DC:
    e: Dict[str, int] = field(default_factory=lambda: {"key1": 1, "key2": 2})
    f: Dict[str, float] = field(default_factory=lambda: {"key1": 1.0, "key2": 2.0})
    g: Dict[str, str] = field(default_factory=lambda: {"key1": "a", "key2": "b"})
    h: Dict[str, bool] = field(default_factory=lambda: {"key1": True, "key2": False})
    i: dict = field(default_factory=lambda: {"key1": 1, "key2": 2})
    j: Dict[str, List[int]] = field(default_factory=lambda: {"key1": [1, 2, 3]})
    k: Dict[str, Dict[str, int]] = field(default_factory=lambda: {"key1": {"subkey1": 1, "subkey2": 2}})
    l: Dict[str, Dict[str, List[int]]] = field(default_factory=lambda: {"key1": {"subkey1": [1, 2], "subkey2": [3, 4]}})
    m: Dict[str, List[Dict[str, int]]] = field(default_factory=lambda: {"key1": [{"subkey1": 1}, {"subkey2": 2}]})
    o: Dict[int, str] = field(default_factory=lambda: {1: "a", 2: "b"})

@workflow
def dataclass_wf(input: DC):
    t_dict_int(e=input.e)
    t_dict_float(f=input.f)
    t_dict_str(g=input.g)
    t_dict_bool(h=input.h)
    t_dict_any(i=input.i)
    t_dict_list_int(j=input.j)
    t_dict_dict_int(k=input.k)
    t_dict_dict_list_int(l=input.l)
    t_list_dict_int(m=input.m)

What changes were proposed in this pull request?

  1. Add logic in List Transformer's to_python_val method to handle Binary IDL Object cases.
  2. Unit tests for list attribute access and dict attribute access.

How was this patch tested?

unit tests, local execution, and remote execution.

for local execution and remote execution

Setup process

  • List Transformer
import typing
from dataclasses import dataclass, fields, field
from typing import Dict
from flytekit import task, workflow, ImageSpec


flytekit_hash = "c24077bce6e63bf8df0d80dbc2c5e2ff3322bca8"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"

image = ImageSpec(
    packages=[flytekit],
    apt_packages=["git"],
    registry="localhost:30000",
)

@dataclass
class DC:
    a: int = 1
    b: float = 2.0
    c: str = "string"
    d: bool = True
    list_int: typing.List[int] = field(default_factory=lambda: [1, 2, 3])
    list_float: typing.List[float] = field(default_factory=lambda: [1.0, 2.0, 3.0])
    list_str: typing.List[str] = field(default_factory=lambda: ["a", "b", "c"])
    list_bool: typing.List[bool] = field(default_factory=lambda: [True, False, True])
    e: typing.List[typing.List[int]] = field(default_factory=lambda: [[1, 2], [3, 4]])
    f: typing.List[typing.List[float]] = field(default_factory=lambda: [[1.0, 2.0], [3.0, 4.0]])
    g: typing.List[typing.List[str]] = field(default_factory=lambda: [["a", "b"], ["c", "d"]])
    h: typing.List[typing.List[bool]] = field(default_factory=lambda: [[True, False], [False, True]])
    i: typing.List[Dict[str, int]] = field(default_factory=lambda: [{"key1": 1, "key2": 2}])
    j: typing.List[Dict[str, float]] = field(default_factory=lambda: [{"key1": 1.0, "key2": 2.0}])
    k: typing.List[Dict[str, str]] = field(default_factory=lambda: [{"key1": "a", "key2": "b"}])
    l: typing.List[Dict[str, bool]] = field(default_factory=lambda: [{"key1": True, "key2": False}])

@task(container_image=image)
def t_int(a: int):
    assert isinstance(a, int), f"Expected type int, but got {type(a)}"
    print("t_int:", a, type(a))

@task(container_image=image)
def t_float(b: float):
    assert isinstance(b, float), f"Expected type float, but got {type(b)}"
    print("t_float:", b, type(b))

@task(container_image=image)
def t_str(c: str):
    assert isinstance(c, str), f"Expected type str, but got {type(c)}"
    print("t_str:", c, type(c))

@task(container_image=image)
def t_bool(d: bool):
    assert isinstance(d, bool), f"Expected type bool, but got {type(d)}"
    print("t_bool:", d, type(d))

# Tasks to handle List[int], List[float], List[str], and List[bool]
@task(container_image=image)
def t_list_int(list_int: typing.List[int]):
    assert isinstance(list_int, list), f"Expected type list[int], but got {type(list_int)}"
    for idx, val in enumerate(list_int):
        assert isinstance(val, int), f"Expected type int in list, but got {type(val)} at index {idx}"
    print("t_list_int:", list_int)

@task(container_image=image)
def t_list_float(list_float: typing.List[float]):
    assert isinstance(list_float, list), f"Expected type list[float], but got {type(list_float)}"
    for idx, val in enumerate(list_float):
        assert isinstance(val, float), f"Expected type float in list, but got {type(val)} at index {idx}"
    print("t_list_float:", list_float)

@task(container_image=image)
def t_list_str(list_str: typing.List[str]):
    assert isinstance(list_str, list), f"Expected type list[str], but got {type(list_str)}"
    for idx, val in enumerate(list_str):
        assert isinstance(val, str), f"Expected type str in list, but got {type(val)} at index {idx}"
    print("t_list_str:", list_str)

@task(container_image=image)
def t_list_bool(list_bool: typing.List[bool]):
    assert isinstance(list_bool, list), f"Expected type list[bool], but got {type(list_bool)}"
    for idx, val in enumerate(list_bool):
        assert isinstance(val, bool), f"Expected type bool in list, but got {type(val)} at index {idx}"
    print("t_list_bool:", list_bool)

# Tasks to handle nested list of int
@task(container_image=image)
def t_list_list_int(e: typing.List[typing.List[int]]):
    assert isinstance(e, list), f"Expected type list[list[int]], but got {type(e)}"
    for idx, sublist in enumerate(e):
        assert isinstance(sublist, list), f"Expected type list at index {idx}, but got {type(sublist)}"
        for i, val in enumerate(sublist):
            assert isinstance(val, int), f"Expected type int in sublist, but got {type(val)} at index {i}"
    print("t_list_list_int:", e)

# Tasks to handle list of dicts with various types
@task(container_image=image)
def t_list_dict_str_int(i: typing.List[Dict[str, int]]):
    assert isinstance(i, list), f"Expected type list[dict[str, int]], but got {type(i)}"
    for idx, d in enumerate(i):
        assert isinstance(d, dict), f"Expected type dict at index {idx}, but got {type(d)}"
        for k, v in d.items():
            assert isinstance(k, str), f"Expected key type str, but got {type(k)}"
            assert isinstance(v, int), f"Expected value type int, but got {type(v)}"
    print("t_list_dict_str_int:", i)

@task(container_image=image)
def t_list_dict_str_float(j: typing.List[Dict[str, float]]):
    assert isinstance(j, list), f"Expected type list[dict[str, float]], but got {type(j)}"
    for idx, d in enumerate(j):
        assert isinstance(d, dict), f"Expected type dict at index {idx}, but got {type(d)}"
        for k, v in d.items():
            assert isinstance(k, str), f"Expected key type str, but got {type(k)}"
            assert isinstance(v, float), f"Expected value type float, but got {type(v)}"
    print("t_list_dict_str_float:", j)

@task(container_image=image)
def t_list_dict_str_str(k: typing.List[Dict[str, str]]):
    assert isinstance(k, list), f"Expected type list[dict[str, str]], but got {type(k)}"
    for idx, d in enumerate(k):
        assert isinstance(d, dict), f"Expected type dict at index {idx}, but got {type(d)}"
        for k, v in d.items():
            assert isinstance(k, str), f"Expected key type str, but got {type(k)}"
            assert isinstance(v, str), f"Expected value type str, but got {type(v)}"
    print("t_list_dict_str_str:", k)

@task(container_image=image)
def t_list_dict_str_bool(l: typing.List[Dict[str, bool]]):
    assert isinstance(l, list), f"Expected type list[dict[str, bool]], but got {type(l)}"
    for idx, d in enumerate(l):
        assert isinstance(d, dict), f"Expected type dict at index {idx}, but got {type(d)}"
        for k, v in d.items():
            assert isinstance(k, str), f"Expected key type str, but got {type(k)}"
            assert isinstance(v, bool), f"Expected value type bool, but got {type(v)}"
    print("t_list_dict_str_bool:", l)


@workflow
def dataclass_wf(input: DC) -> DC:
    # t_int(a=input.a)
    # t_float(b=input.b)
    # t_str(c=input.c)
    # t_bool(d=input.list_bool[1])
    t_list_int(list_int=input.list_int)
    t_list_float(list_float=input.list_float)
    t_list_str(list_str=input.list_str)
    t_list_bool(list_bool=input.list_bool)
    t_list_list_int(e=input.e)
    t_list_dict_str_int(i=input.i)
    t_list_dict_str_float(j=input.j)
    t_list_dict_str_str(k=input.k)
    t_list_dict_str_bool(l=input.l)

    return input

if __name__ == "__main__":
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner

    runner = CliRunner()
    path = "/Users/future-outlier/code/dev/flytekit/build/PR/JSON/demo/dataclass_simple_list_1_and_2_level.py"
    result = runner.invoke(pyflyte.main, ["run", path, "dataclass_wf", "--input", '{"a": 1}'])

    print("Local Execution: ", result.output)
    #
    result = runner.invoke(pyflyte.main, ["run", "--remote", path, "dataclass_wf", "--input", '{"a": 1}'])
    print("Remote Execution: ", result.output)
  • Dict Transformer
import typing
from dataclasses import dataclass, fields, field
from typing import Dict, List
from flytekit import task, workflow, ImageSpec


flytekit_hash = "c24077bce6e63bf8df0d80dbc2c5e2ff3322bca8"

flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"

image = ImageSpec(
    packages=[flytekit],
    apt_packages=["git"],
    registry="localhost:30000",
)

@dataclass
class DC:
    a: int = 1
    b: float = 2.0
    c: str = "string"
    d: bool = True
    e: Dict[str, int] = field(default_factory=lambda: {"key1": 1, "key2": 2})
    f: Dict[str, float] = field(default_factory=lambda: {"key1": 1.0, "key2": 2.0})
    g: Dict[str, str] = field(default_factory=lambda: {"key1": "a", "key2": "b"})
    h: Dict[str, bool] = field(default_factory=lambda: {"key1": True, "key2": False})
    i: dict = field(default_factory=lambda: {"key1": 1, "key2": 2})
    j: Dict[str, List[int]] = field(default_factory=lambda: {"key1": [1, 2, 3]})
    k: Dict[str, Dict[str, int]] = field(default_factory=lambda: {"key1": {"subkey1": 1, "subkey2": 2}})
    l: Dict[str, Dict[str, List[int]]] = field(default_factory=lambda: {"key1": {"subkey1": [1, 2], "subkey2": [3, 4]}})
    m: Dict[str, List[Dict[str, int]]] = field(default_factory=lambda: {"key1": [{"subkey1": 1}, {"subkey2": 2}]})
    o: Dict[int, str] = field(default_factory=lambda: {1: "a", 2: "b"})

@task(container_image=image)
def t_int(a: int):
    assert isinstance(a, int), f"Expected type int, but got {type(a)}"
    print("t_int:", a, type(a))

@task(container_image=image)
def t_float(b: float):
    assert isinstance(b, float), f"Expected type float, but got {type(b)}"
    print("t_float:", b, type(b))

@task(container_image=image)
def t_str(c: str):
    assert isinstance(c, str), f"Expected type str, but got {type(c)}"
    print("t_str:", c, type(c))

@task(container_image=image)
def t_bool(d: bool):
    assert isinstance(d, bool), f"Expected type bool, but got {type(d)}"
    print("t_bool:", d, type(d))

# Tasks to handle Dict[str, int], Dict[str, float], Dict[str, str], and Dict[str, bool]
@task(container_image=image)
def t_dict_int(e: Dict[str, int]):
    assert isinstance(e, dict), f"Expected type dict, but got {type(e)}"
    print("t_dict_int:", e, type(e))

@task(container_image=image)
def t_dict_float(f: Dict[str, float]):
    assert isinstance(f, dict), f"Expected type dict, but got {type(f)}"
    print("t_dict_float:", f, type(f))

@task(container_image=image)
def t_dict_str(g: Dict[str, str]):
    assert isinstance(g, dict), f"Expected type dict, but got {type(g)}"
    print("t_dict_str:", g, type(g))

@task(container_image=image)
def t_dict_bool(h: Dict[str, bool]):
    assert isinstance(h, dict), f"Expected type dict, but got {type(h)}"
    print("t_dict_bool:", h, type(h))

@task(container_image=image)
def t_dict_any(i: dict):
    assert isinstance(i, dict), f"Expected type dict, but got {type(i)}"
    print("t_dict_any:", i, type(i))

# Tasks to handle more complex cases
@task(container_image=image)
def t_dict_list_int(j: Dict[str, List[int]]):
    assert isinstance(j, dict), f"Expected type dict, but got {type(j)}"
    for key, value in j.items():
        assert isinstance(key, str), f"Expected key of type str, but got {type(key)}"
        assert isinstance(value, list), f"Expected value of type list, but got {type(value)}"
        for item in value:
            assert isinstance(item, int), f"Expected list item of type int, but got {type(item)}"
    print("t_dict_list_int:", j, type(j))


@task(container_image=image)
def t_dict_dict_int(k: Dict[str, Dict[str, int]]):
    assert isinstance(k, dict), f"Expected type dict, but got {type(k)}"
    for key, value in k.items():
        assert isinstance(key, str), f"Expected key of type str, but got {type(key)}"
        assert isinstance(value, dict), f"Expected value of type dict, but got {type(value)}"
        for subkey, subvalue in value.items():
            assert isinstance(subkey, str), f"Expected subkey of type str, but got {type(subkey)}"
            assert isinstance(subvalue, int), f"Expected subvalue of type int, but got {type(subvalue)}"
    print("t_dict_dict_int:", k, type(k))


@task(container_image=image)
def t_dict_dict_list_int(l: Dict[str, Dict[str, List[int]]]):
    assert isinstance(l, dict), f"Expected type dict, but got {type(l)}"
    for key, value in l.items():
        assert isinstance(key, str), f"Expected key of type str, but got {type(key)}"
        assert isinstance(value, dict), f"Expected value of type dict, but got {type(value)}"
        for subkey, subvalue in value.items():
            assert isinstance(subkey, str), f"Expected subkey of type str, but got {type(subkey)}"
            assert isinstance(subvalue, list), f"Expected subvalue of type list, but got {type(subvalue)}"
            for item in subvalue:
                assert isinstance(item, int), f"Expected list item of type int, but got {type(item)}"
    print("t_dict_dict_list_int:", l, type(l))


@task(container_image=image)
def t_list_dict_int(m: Dict[str, List[Dict[str, int]]]):
    assert isinstance(m, dict), f"Expected type dict, but got {type(m)}"
    for key, value in m.items():
        assert isinstance(key, str), f"Expected key of type str, but got {type(key)}"
        assert isinstance(value, list), f"Expected value of type list, but got {type(value)}"
        for item in value:
            assert isinstance(item, dict), f"Expected list item of type dict, but got {type(item)}"
            for subkey, subvalue in item.items():
                assert isinstance(subkey, str), f"Expected subkey of type str, but got {type(subkey)}"
                assert isinstance(subvalue, int), f"Expected subvalue of type int, but got {type(subvalue)}"
    print("t_list_dict_int:", m, type(m))


@workflow
def dataclass_wf(input: DC) -> DC:
    # t_int(a=input.a)
    # t_float(b=input.b)
    # t_str(c=input.c)
    # t_bool(d=input.d)
    t_dict_int(e=input.e)
    t_dict_float(f=input.f)
    t_dict_str(g=input.g)
    t_dict_bool(h=input.h)
    t_dict_any(i=input.i)
    t_dict_list_int(j=input.j)
    t_dict_dict_int(k=input.k)
    t_dict_dict_list_int(l=input.l)
    t_list_dict_int(m=input.m)

    return input

if __name__ == "__main__":
    from flytekit.clis.sdk_in_container import pyflyte
    from click.testing import CliRunner

    runner = CliRunner()
    path = "/Users/future-outlier/code/dev/flytekit/build/PR/JSON/demo/dataclass_simple_dict_1_and_2_level.py"

    result = runner.invoke(pyflyte.main, ["run", path, "dataclass_wf", "--input", '{"a": 1}'])

    print("Local Execution: ", result.output)
    #
    result = runner.invoke(pyflyte.main, ["run", "--remote", path, "dataclass_wf", "--input", '{"a": 1}'])
    print("Remote Execution: ", result.output)

Screenshots

list transformer

  • local execution
image image
  • remote execution
image

dict transformer

  • local execution
image image
  • remote execution
image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

@Future-Outlier Future-Outlier marked this pull request as ready for review September 19, 2024 05:40
@Future-Outlier Future-Outlier changed the title [Flyte][3][Attribute Access] Binary IDL With MessagePack [flytekit][3] [list, dict and nested cases] Binary IDL With MessagePack Sep 19, 2024
@Future-Outlier Future-Outlier enabled auto-merge (squash) September 19, 2024 05:41
Copy link

codecov bot commented Sep 19, 2024

Codecov Report

Attention: Patch coverage is 42.42424% with 19 lines in your changes missing coverage. Please review.

Project coverage is 44.46%. Comparing base (11c3a18) to head (684f31d).
Report is 13 commits behind head on master.

Files with missing lines Patch % Lines
flytekit/core/type_engine.py 36.66% 19 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff             @@
##           master    #2758       +/-   ##
===========================================
- Coverage   66.44%   44.46%   -21.98%     
===========================================
  Files           9      232      +223     
  Lines         453    21235    +20782     
  Branches        0     2871     +2871     
===========================================
+ Hits          301     9443     +9142     
- Misses        152    11677    +11525     
- Partials        0      115      +115     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant