diff --git a/.gitignore b/.gitignore index d83678b32..789004875 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # IDE and editor files .idea +.zed # Python-related files __pycache__ @@ -60,6 +61,7 @@ sqlite* # Miscellaneous openapi.yaml +shell.nix # Developer Cargo configs .cargo/ diff --git a/Cargo.lock b/Cargo.lock index 8beb792a3..5021461bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android_system_properties" version = "0.1.5" @@ -159,6 +165,8 @@ checksum = "a0149602eeaf915158e14029ba0c78dedb8c08d554b024d54c8f239aab46511d" dependencies = [ "aws-credential-types", "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", "aws-sdk-sts", "aws-smithy-async", "aws-smithy-http", @@ -169,11 +177,14 @@ dependencies = [ "aws-types", "bytes", "fastrand", + "hex", "http 1.4.0", + "ring", "time", "tokio", "tracing", "url", + "zeroize", ] [[package]] @@ -219,6 +230,7 @@ dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", + "aws-smithy-eventstream", "aws-smithy-http", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -234,6 +246,40 @@ dependencies = [ "uuid", ] +[[package]] +name = "aws-sdk-s3" +version = "1.115.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdaa0053cbcbc384443dd24569bd5d1664f86427b9dc04677bd0ab853954baec" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "fastrand", + "hex", + "hmac", + "http 0.2.12", + "http 1.4.0", + "http-body 0.4.6", + "lru", + "percent-encoding", + "regex-lite", + "sha2", + "tracing", + "url", +] + [[package]] name = "aws-sdk-sqs" version = "1.90.0" @@ -256,6 +302,50 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-sso" +version = "1.90.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f18e53542c522459e757f81e274783a78f8c81acdfc8d1522ee8a18b5fb1c66" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.92.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "532f4d866012ffa724a4385c82e8dd0e59f0ca0e600f3f22d4c03b6824b34e4a" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.12", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sts" version = "1.94.0" @@ -286,19 +376,25 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c35452ec3f001e1f2f6db107b6373f1f48f05ec63ba2c5c9fa91f07dad32af11" dependencies = [ "aws-credential-types", + "aws-smithy-eventstream", "aws-smithy-http", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", + "crypto-bigint 0.5.5", "form_urlencoded", "hex", "hmac", "http 0.2.12", "http 1.4.0", + "p256", "percent-encoding", + "ring", "sha2", + "subtle", "time", "tracing", + "zeroize", ] [[package]] @@ -312,12 +408,44 @@ dependencies = [ "tokio", ] +[[package]] +name = "aws-smithy-checksums" +version = "0.63.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95bd108f7b3563598e4dc7b62e1388c9982324a2abd622442167012690184591" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc-fast", + "hex", + "http 0.2.12", + "http-body 0.4.6", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e29a304f8319781a39808847efb39561351b1bb76e933da7aa90232673638658" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + [[package]] name = "aws-smithy-http" version = "0.62.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "445d5d720c99eed0b4aa674ed00d835d9b1427dd73e04adaf2f94c6b2d6f9fca" dependencies = [ + "aws-smithy-eventstream", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", @@ -581,6 +709,12 @@ dependencies = [ "tracing-opentelemetry-instrumentation-sdk", ] +[[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + [[package]] name = "base64" version = "0.21.7" @@ -603,6 +737,12 @@ dependencies = [ "vsimd", ] +[[package]] +name = "base64ct" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" + [[package]] name = "bindgen" version = "0.72.1" @@ -780,7 +920,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -918,9 +1058,9 @@ dependencies = [ [[package]] name = "const-hex" -version = "1.17.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bb320cac8a0750d7f25280aa97b09c26edfe161164238ecbbb31092b079e735" +checksum = "b6407bff74dea37e0fa3dc1c1c974e5d46405f0c987bf9997a0762adce71eda6" dependencies = [ "cfg-if", "cpufeatures", @@ -928,6 +1068,12 @@ dependencies = [ "serde_core", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "core-foundation" version = "0.9.4" @@ -963,6 +1109,34 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + +[[package]] +name = "crc-fast" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ddc2d09feefeee8bd78101665bd8645637828fa9317f9f292496dbbd8c65ff3" +dependencies = [ + "crc", + "digest", + "rand 0.9.2", + "regex", + "rustversion", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -1045,6 +1219,28 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "crypto-bigint" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" +dependencies = [ + "generic-array", + "rand_core 0.6.4", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto-bigint" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "crypto-common" version = "0.1.7" @@ -1090,6 +1286,49 @@ dependencies = [ "syn", ] +[[package]] +name = "dataplane" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "aws-config", + "aws-sdk-s3", + "aws-sdk-sso", + "bytes", + "futures", + "gethostname", + "http-body-util", + "hyper 1.8.1", + "hyper-util", + "prometheus-client", + "prost", + "prost-types", + "serde", + "serde_json", + "serde_yaml", + "sha2", + "sysinfo 0.37.2", + "tokio", + "tonic", + "tonic-prost", + "tonic-prost-build", + "tracing", + "tracing-subscriber", + "url", + "vergen", +] + +[[package]] +name = "der" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "deranged" version = "0.5.5" @@ -1163,18 +1402,56 @@ dependencies = [ "syn", ] +[[package]] +name = "dtoa" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6add3b8cff394282be81f3fc1a0605db594ed69890078ca6e2cab1c408bcf04" + [[package]] name = "dunce" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "ecdsa" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" +dependencies = [ + "der", + "elliptic-curve", + "rfc6979", + "signature", +] + [[package]] name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "elliptic-curve" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" +dependencies = [ + "base16ct", + "crypto-bigint 0.4.9", + "der", + "digest", + "ff", + "generic-array", + "group", + "pkcs8", + "rand_core 0.6.4", + "sec1", + "subtle", + "zeroize", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -1206,6 +1483,16 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "ff" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" +dependencies = [ + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "figment" version = "0.10.19" @@ -1248,6 +1535,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -1372,6 +1665,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "gethostname" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bd49230192a3797a9a4d6abe9b3eed6f7fa4c8a8a4947977c6f80025f92cbd8" +dependencies = [ + "rustix", + "windows-link 0.2.1", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -1405,6 +1708,17 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "group" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" +dependencies = [ + "ff", + "rand_core 0.6.4", + "subtle", +] + [[package]] name = "h2" version = "0.3.27" @@ -1454,6 +1768,17 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "hashbrown" version = "0.16.1" @@ -1679,9 +2004,11 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2 0.6.1", + "system-configuration", "tokio", "tower-service", "tracing", + "windows-registry", ] [[package]] @@ -1902,7 +2229,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.1", "serde", "serde_core", ] @@ -2007,7 +2334,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" dependencies = [ "cfg-if", - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -2072,6 +2399,15 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -2270,6 +2606,16 @@ dependencies = [ "bitflags", ] +[[package]] +name = "objc2-io-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15" +dependencies = [ + "libc", + "objc2-core-foundation", +] + [[package]] name = "object_store" version = "0.12.4" @@ -2451,6 +2797,17 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" +[[package]] +name = "p256" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" +dependencies = [ + "ecdsa", + "elliptic-curve", + "sha2", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -2471,7 +2828,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -2522,6 +2879,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -2610,13 +2977,37 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus-client" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4500adecd7af8e0e9f4dbce15cfee07ce913fbf6ad605cc468b83f2d531ee94" +dependencies = [ + "dtoa", + "itoa", + "parking_lot", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9adf1691c04c0a5ff46ff8f262b58beb07b0dbb61f96f9f54f6cbd82106ed87f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "proptest" -version = "1.9.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bee689443a2bd0a16ab0348b52ee43e3b2d1b1f931c8aa5c9f8de4c86fbe8c40" +checksum = "2bb0be07becd10686a0bb407298fb425360a5c44a663774406340c59a22de4ce" dependencies = [ "bitflags", + "lazy_static", "num-traits", "rand 0.9.2", "rand_chacha 0.9.0", @@ -2963,6 +3354,17 @@ dependencies = [ "web-sys", ] +[[package]] +name = "rfc6979" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" +dependencies = [ + "crypto-bigint 0.4.9", + "hmac", + "zeroize", +] + [[package]] name = "ring" version = "0.17.14" @@ -3069,6 +3471,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ "aws-lc-rs", + "log", "once_cell", "ring", "rustls-pki-types", @@ -3188,6 +3591,20 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sec1" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" +dependencies = [ + "base16ct", + "der", + "generic-array", + "pkcs8", + "subtle", + "zeroize", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -3326,6 +3743,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -3361,6 +3789,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +dependencies = [ + "digest", + "rand_core 0.6.4", +] + [[package]] name = "simd-adler32" version = "0.3.7" @@ -3415,6 +3853,16 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spki" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -3495,7 +3943,42 @@ dependencies = [ "memchr", "ntapi", "objc2-core-foundation", - "windows", + "windows 0.57.0", +] + +[[package]] +name = "sysinfo" +version = "0.37.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16607d5caffd1c07ce073528f9ed972d88db15dd44023fa57142963be3feb11f" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows 0.61.3", +] + +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", ] [[package]] @@ -3705,6 +4188,7 @@ dependencies = [ "socket2 0.6.1", "sync_wrapper", "tokio", + "tokio-rustls 0.26.4", "tokio-stream", "tower", "tower-layer", @@ -4099,7 +4583,7 @@ dependencies = [ "regex", "rustc_version", "rustversion", - "sysinfo", + "sysinfo 0.34.2", "time", "vergen-lib", ] @@ -4293,6 +4777,28 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows" +version = "0.61.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" +dependencies = [ + "windows-collections", + "windows-core 0.61.2", + "windows-future", + "windows-link 0.1.3", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" +dependencies = [ + "windows-core 0.61.2", +] + [[package]] name = "windows-core" version = "0.57.0" @@ -4305,6 +4811,19 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement 0.60.2", + "windows-interface 0.59.3", + "windows-link 0.1.3", + "windows-result 0.3.4", + "windows-strings 0.4.2", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -4313,9 +4832,20 @@ checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" dependencies = [ "windows-implement 0.60.2", "windows-interface 0.59.3", - "windows-link", + "windows-link 0.2.1", "windows-result 0.4.1", - "windows-strings", + "windows-strings 0.5.1", +] + +[[package]] +name = "windows-future" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" +dependencies = [ + "windows-core 0.61.2", + "windows-link 0.1.3", + "windows-threading", ] [[package]] @@ -4362,12 +4892,39 @@ dependencies = [ "syn", ] +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-numerics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" +dependencies = [ + "windows-core 0.61.2", + "windows-link 0.1.3", +] + +[[package]] +name = "windows-registry" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720" +dependencies = [ + "windows-link 0.2.1", + "windows-result 0.4.1", + "windows-strings 0.5.1", +] + [[package]] name = "windows-result" version = "0.1.2" @@ -4377,13 +4934,31 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-result" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" +dependencies = [ + "windows-link 0.1.3", +] + [[package]] name = "windows-result" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" dependencies = [ - "windows-link", + "windows-link 0.2.1", +] + +[[package]] +name = "windows-strings" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" +dependencies = [ + "windows-link 0.1.3", ] [[package]] @@ -4392,7 +4967,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" dependencies = [ - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -4419,7 +4994,7 @@ version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" dependencies = [ - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -4444,7 +5019,7 @@ version = "0.53.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" dependencies = [ - "windows-link", + "windows-link 0.2.1", "windows_aarch64_gnullvm 0.53.1", "windows_aarch64_msvc 0.53.1", "windows_i686_gnu 0.53.1", @@ -4455,6 +5030,15 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows-threading" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" +dependencies = [ + "windows-link 0.1.3", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -4594,18 +5178,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.30" +version = "0.8.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea879c944afe8a2b25fef16bb4ba234f47c694565e97383b36f3a878219065c" +checksum = "b7a46b351402ada3508ec7d8b210e5643085b30692a09e921b07c902f8aded21" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.30" +version = "0.8.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf955aa904d6040f70dc8e9384444cb1030aed272ba3cb09bbc4ab9e7c1f34f5" +checksum = "4e999748819ddfe1ad621c3c277f9e4814321defdc0e6a7dd31772eb31ad3d2b" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index ad6843784..efdd45ee6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ "server", + "dataplane" ] resolver = "2" diff --git a/README.md b/README.md index 99f68955e..4ae1b4d24 100644 --- a/README.md +++ b/README.md @@ -182,7 +182,7 @@ indexify-cli executor Set the environment variable - ```bash -export INDEXIFY_URL=http://localhost:8900 +export TENSORLAKE_API_URL=http://localhost:8900 ``` Change the code in the workflow to the following - diff --git a/dataplane/Cargo.toml b/dataplane/Cargo.toml new file mode 100644 index 000000000..c769f8c66 --- /dev/null +++ b/dataplane/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "dataplane" +version = "0.1.0" +edition = "2021" +authors = ["Tensorlake Inc. "] +resolver = "2" + +[dependencies] +async-trait = "0.1.81" +anyhow = "1.0.100" +aws-config = "1.8.11" +aws-sdk-s3 = "1.115.0" +aws-sdk-sso = "1.90.0" +bytes = { version = "1.10.1", features = ["serde"] } +futures = "0.3.31" +gethostname = "1.1.0" +http-body-util = "0.1.3" +hyper = "1.7.0" +hyper-util = { version = "0.1.18", features = ["full"] } +prometheus-client = "0.24.0" +prost = "0.14.1" +prost-types = "0.14.1" +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.145" +serde_yaml = "0.9.34" +sha2 = "0.10.9" +sysinfo = "0.37.2" +tokio = { version = "1.48.0", features = ["full"] } +tonic = { version = "0.14.2", features = ["channel", "tls-aws-lc"] } +tonic-prost = "0.14.2" +tracing = "0.1.41" +tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json"] } +url = "2.5.6" + +[build-dependencies] +anyhow = "1.0.98" +vergen = { version = "9.0.6", features = ["build", "cargo", "rustc", "si"] } +tonic-prost-build = "0.14.2" diff --git a/dataplane/build.rs b/dataplane/build.rs new file mode 100644 index 000000000..26985a913 --- /dev/null +++ b/dataplane/build.rs @@ -0,0 +1,37 @@ +use std::{env, path::PathBuf}; + +use anyhow::Result; +use vergen::{BuildBuilder, Emitter, SysinfoBuilder}; + +fn main() -> Result<()> { + let build = BuildBuilder::all_build()?; + let si = SysinfoBuilder::all_sysinfo()?; + + Emitter::default() + .add_instructions(&build)? + .add_instructions(&si)? + .emit()?; + + let client_proto_files = ["./proto/executor_api.proto"]; + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + + tonic_prost_build::configure() + .build_client(true) + .build_server(false) + .file_descriptor_set_path(out_dir.join("executor_api_descriptor.bin")) + .protoc_arg("--experimental_allow_proto3_optional") // Required for building on Ubuntu 22.04 + .compile_protos(&client_proto_files, &["proto"])?; + + let server_proto_files = ["./proto/function_executor.proto", "./proto/status.proto"]; + + tonic_prost_build::configure() + .build_client(true) + .build_server(false) + .compile_well_known_types(true) + .extern_path(".google.protobuf", "::prost_types:") + .file_descriptor_set_path(out_dir.join("function_executor_descriptor.bin")) + .protoc_arg("--experimental_allow_proto3_optional") // Required for building on Ubuntu 22.04 + .compile_protos(&server_proto_files, &["proto"])?; + + Ok(()) +} diff --git a/dataplane/proto/executor_api.proto b/dataplane/proto/executor_api.proto new file mode 100644 index 000000000..809a60b80 --- /dev/null +++ b/dataplane/proto/executor_api.proto @@ -0,0 +1,399 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +// Rename with caution. The package name is part of gRPC service name. +// Existing clients won't find the service if the package name changes. +package executor_api_pb; + +// ===== DataPayload ===== +enum DataPayloadEncoding { + DATA_PAYLOAD_ENCODING_UNKNOWN = 0; + DATA_PAYLOAD_ENCODING_UTF8_JSON = 1; + DATA_PAYLOAD_ENCODING_UTF8_TEXT = 2; + DATA_PAYLOAD_ENCODING_BINARY_PICKLE = 3; + DATA_PAYLOAD_ENCODING_BINARY_ZIP = 4; + DATA_PAYLOAD_ENCODING_RAW = 5; +} + +message DataPayload { + // URI of the BLOB where the data is stored. + // S3 URI if the data is stored in S3. + // Starts with "file://"" prefix if the data is stored on a local file system. + optional string uri = 1; + optional DataPayloadEncoding encoding = 2; + // Not set and ignored by Server right now. + optional uint64 encoding_version = 3; + // MIME type. Only set when raw encoding is used. + optional string content_type = 4; + + // DataPayload metadata is stored at the head of the binary + // blob. This metadata is used optionally to store additional + // metadata for the type of the object that is stored + // so that we can deserialize the object correctly. + optional uint64 metadata_size = 5; + + optional uint64 offset = 6; + // Includes data and metadata. + optional uint64 size = 7; + optional string sha256_hash = 8; + + // ID of the function call that returned this data payload as it's return value. + // Payloads which are not a function call return value, don't have this field set. + optional string source_function_call_id = 9; + + // Opaque ID. Generated by Server if not set by Executor. + optional string id = 10; + +} + +enum GPUModel { + GPU_MODEL_UNKNOWN = 0; + GPU_MODEL_NVIDIA_A100_40GB = 1; + GPU_MODEL_NVIDIA_A100_80GB = 2; + GPU_MODEL_NVIDIA_H100_80GB = 3; + GPU_MODEL_NVIDIA_TESLA_T4 = 4; + GPU_MODEL_NVIDIA_A6000 = 5; + GPU_MODEL_NVIDIA_A10 = 6; +} + +// Free GPUs available at the Executor. +message GPUResources { + optional uint32 count = 1; + optional GPUModel model = 2; +} + +// Resources that we're currently tracking and limiting on Executor. +message HostResources { + optional uint32 cpu_count = 1; + optional uint64 memory_bytes = 2; + optional uint64 disk_bytes = 3; + // Not set if no GPUs are available. + optional GPUResources gpu = 4; +} + +// Specification of a single function that is allowed to be run on the Executor. +message AllowedFunction { + optional string namespace = 1; + optional string application_name = 2; + optional string function_name = 3; + // If none then any version of the application is allowed to run on the Executor. + optional string application_version = 4; +} + +enum FunctionExecutorStatus { + FUNCTION_EXECUTOR_STATUS_UNKNOWN = 0; + // FE is being created. + FUNCTION_EXECUTOR_STATUS_PENDING = 1; + // FE is running and ready to accept allocations. + FUNCTION_EXECUTOR_STATUS_RUNNING = 2; + // FE is terminated, all resources are freed. + FUNCTION_EXECUTOR_STATUS_TERMINATED = 3; +} + +// The reasons why an Executor decided to terminate a Function Executor. +enum FunctionExecutorTerminationReason { + FUNCTION_EXECUTOR_TERMINATION_REASON_UNKNOWN = 0; + // Internal error aka platform error on FE startup. + FUNCTION_EXECUTOR_TERMINATION_REASON_STARTUP_FAILED_INTERNAL_ERROR = 1; + // A clear function constructor code error on FE startup. + // Typically an exception raised from the constructor. + FUNCTION_EXECUTOR_TERMINATION_REASON_STARTUP_FAILED_FUNCTION_ERROR = 2; + // Timeout on FE startup while running the function constructor. + FUNCTION_EXECUTOR_TERMINATION_REASON_STARTUP_FAILED_FUNCTION_TIMEOUT = 3; + + // FE was terminated because it failed a health check. + FUNCTION_EXECUTOR_TERMINATION_REASON_UNHEALTHY = 12; + // FE was terminated due to an unrecoverable internal error on Executor. + FUNCTION_EXECUTOR_TERMINATION_REASON_INTERNAL_ERROR = 13; + // FE was terminated because Function code exceeded its configured timeout. + // FE termination is the only way to reliably stop the function code execution. + FUNCTION_EXECUTOR_TERMINATION_REASON_FUNCTION_TIMEOUT = 14; + // FE was terminated because function allocation currently running on it was removed + // from Executor desired state. + // FE termination is the only way to reliably stop the function code execution. + FUNCTION_EXECUTOR_TERMINATION_REASON_FUNCTION_CANCELLED = 15; + + // FE was terminated because it ran out of memory. + FUNCTION_EXECUTOR_TERMINATION_REASON_OOM = 16; +} + +// Immutable information that identifies and describes a Function Executor. +message FunctionExecutorResources { + // 1000 CPU ms per sec is one full CPU core. + // 2000 CPU ms per sec is two full CPU cores. + optional uint32 cpu_ms_per_sec = 1; + optional uint64 memory_bytes = 2; + optional uint64 disk_bytes = 3; + optional GPUResources gpu = 4; +} + +message FunctionRef { + optional string namespace = 1; + optional string application_name = 2; + optional string function_name = 3; + optional string application_version = 4; +} + +message FunctionExecutorDescription { + optional string id = 1; + optional FunctionRef function = 2; + repeated string secret_names = 3; + // Timeout for customer code duration during FE creation. + optional uint32 initialization_timeout_ms = 4; + optional DataPayload application = 5; + optional FunctionExecutorResources resources = 6; + optional uint32 max_concurrency = 8; + // Timeout for running allocations on FE. + optional uint32 allocation_timeout_ms = 9; +} + +message FunctionExecutorState { + optional FunctionExecutorDescription description = 1; + optional FunctionExecutorStatus status = 2; + optional FunctionExecutorTerminationReason termination_reason = 3; + repeated string allocation_ids_caused_termination = 4; +} + +enum ExecutorStatus { + EXECUTOR_STATUS_UNKNOWN = 0; + // Executor is starting up, not ready to accept allocations and FEs. + EXECUTOR_STATUS_STARTING_UP = 1; + // Executor is ready to accept allocations and FEs. + EXECUTOR_STATUS_RUNNING = 2; + // No new allocations or FEs should be places on the Executor. + EXECUTOR_STATUS_DRAINED = 3; + // Executor is stopped, all resources are freed, all FEs are terminated. + EXECUTOR_STATUS_STOPPED = 4; +} + +message ExecutorState { + optional string executor_id = 1; + optional string hostname = 3; + optional string version = 5; + optional ExecutorStatus status = 6; + // Total resources at the Executor. + optional HostResources total_resources = 13; + // Total resources usable by Function Executors. + optional HostResources total_function_executor_resources = 7; + // Empty allowed_functions list means that any function can run on the Executor. + repeated AllowedFunction allowed_functions = 8; + repeated FunctionExecutorState function_executor_states = 9; + map labels = 10; + optional string state_hash = 11; + // Server supplied clock value of the latest desired executor state that was + // reconciled by Executor. Not included into state_hash. + // Initial value on Executor startup is 0. + optional uint64 server_clock = 12; + + // Catalog entry name that this executor is associated with. + optional string catalog_entry_name = 14; + + repeated FunctionCallWatch function_call_watches = 15; +} + +// Updates that Executor wants to report to Server. If report_executor_state RPC is successful +// then the updates from it won't be included in the next RPC. +message ExecutorUpdate { + optional string executor_id = 1; + repeated AllocationResult allocation_results = 2; +} + +message FunctionCallWatch { + optional string namespace = 1; + optional string application = 2; + optional string request_id = 3; + optional string function_call_id = 4; +} + +message ReportExecutorStateRequest { + optional ExecutorState executor_state = 1; + optional ExecutorUpdate executor_update = 2; +} + +// A message sent by Server to Executor to acknowledge the receipt of ReportExecutorStateRequest. +message ReportExecutorStateResponse { +} + +message Allocation { + // Version is not set in function ref. + optional FunctionRef function = 1; + optional string allocation_id = 2; + optional string function_call_id = 3; + optional string request_id = 4; + repeated DataPayload args = 5; + // URI prefix for DataPayloads/BLOBs generated by the Allocation request. + // S3 URI if the data is stored in S3. + // Starts with "file://"" prefix followed by an absolute directory path if the data is stored on a local file system. + optional string request_data_payload_uri_prefix = 6; + // BLOB URI prefix for the request error payloads. + optional string request_error_payload_uri_prefix = 7; + optional string function_executor_id = 8; + + // Always set. An empty bytes array if no metadata, e.g. for API function call. + optional bytes function_call_metadata = 9; +} + +// A message sent by Executor to Server to open the stream of desired Executor States for the Executor. +message GetDesiredExecutorStatesRequest { + optional string executor_id = 1; +} + +message FunctionCallResult { + optional string namespace = 1; + optional string request_id = 2; + optional string function_call_id = 4; + optional AllocationOutcomeCode outcome_code = 5; + optional AllocationFailureReason failure_reason = 6; + optional DataPayload return_value = 7; + optional DataPayload request_error = 9; +} + +// A message sent from Server to Executor that describes the desired state of the Executor at the moment. +// Executor compares this state with its current state and make necessary changes to match the desired state. +message DesiredExecutorState { + repeated FunctionExecutorDescription function_executors = 1; + repeated Allocation allocations = 2; + // Server supplied clock value used to deduplicate messages. Executor records max clock value + // it observed and ignores all the messages with clock value <= the max observed value. + optional uint64 clock = 3; + repeated FunctionCallResult function_call_results = 4; +} + +enum AllocationOutcomeCode { + ALLOCATION_OUTCOME_CODE_UNKNOWN = 0; + ALLOCATION_OUTCOME_CODE_SUCCESS = 1; + ALLOCATION_OUTCOME_CODE_FAILURE = 2; +} + +enum AllocationFailureReason { + ALLOCATION_FAILURE_REASON_UNKNOWN = 0; + // Internal error on Executor aka platform error. + ALLOCATION_FAILURE_REASON_INTERNAL_ERROR = 1; + // Clear function code failure typically by raising an exception from the function code. + // Also a grey failure where we can't determine the exact cause. We attribute these to + // functions to prevent service abuse but not billed intenionally failing functions. + ALLOCATION_FAILURE_REASON_FUNCTION_ERROR = 2; + // Function code run time exceeded its configured timeout. + ALLOCATION_FAILURE_REASON_FUNCTION_TIMEOUT = 3; + // Function code raised RequestException to mark the request as permanently failed. + ALLOCATION_FAILURE_REASON_REQUEST_ERROR = 4; + // Server removed the allocation from Executor desired state. + // The allocation was either running or didn't start before the removal. + // If it started running then its execution duration is set. + ALLOCATION_FAILURE_REASON_ALLOCATION_CANCELLED = 5; + // Function Executor terminated can't run the allocation on it anymore. + // The allocation didn't run on FE. + ALLOCATION_FAILURE_REASON_FUNCTION_EXECUTOR_TERMINATED = 6; + // Out of memory error. + ALLOCATION_FAILURE_REASON_OOM = 7; + // The below failure reasons are only set by Server when reporting function call results + // back to Executor. + // + // Function run cannot be scheduled given its constraints. + // This shouldn't be allocation failure, but since we are reusing + // allocation failure reasons as function run failure reasons, we need to add it here. + ALLOCATION_FAILURE_REASON_CONSTRAINT_UNSATISFIABLE = 8; + // Another failure reason which doesn't make sense to put here + // but this is one of the function run reasons - again shouldn't happen in practice, + ALLOCATION_FAILURE_REASON_EXECUTOR_REMOVED = 9; +} + +message ExecutionPlanUpdate { + oneof op { + FunctionCall function_call = 10; + ReduceOp reduce = 12; + } +} + +message FunctionCall { + // The ID of the function call. + // When all the dependencies of the function call are resolved, the + // resulting DataPayload will have the same ID. + // This allows resolving datapayloads of resolved functions calls when it's upstream is called again. + // The ID is unique within the scope of the request. + optional string id = 1; + optional FunctionRef target = 2; + repeated FunctionArg args = 3; + + // This required metadata allows SDK to restore original function call from the arguments. + optional bytes call_metadata = 5; +} + +message FunctionArg { + oneof source { + string function_call_id = 1; + DataPayload inline_data = 2; + } +} + +message ReduceOp { + // The DataPayload of the last function call in the reducer will have the function call id set + // as the id of the ReduceOp. + optional string id = 1; + // Contains at least two items. + repeated FunctionArg collection = 2; + optional FunctionRef reducer = 3; + // This required metadata allows SDK to restore original function call from the arguments. + optional bytes call_metadata = 4; +} + +message ExecutionPlanUpdates { + repeated ExecutionPlanUpdate updates = 1; + optional string root_function_call_id = 2; + // Optional timestamp for when to start the plan execution. + optional google.protobuf.Timestamp start_at = 3; +} + +message AllocationResult { + optional FunctionRef function = 1; + optional string allocation_id = 2; + optional string function_call_id = 3; + optional string request_id = 4; + optional AllocationOutcomeCode outcome_code = 5; + optional AllocationFailureReason failure_reason = 6; + + oneof return_value { + DataPayload value = 7; + ExecutionPlanUpdates updates = 8; + } + // User payload for request error if allocation failed with request error. + optional DataPayload request_error = 9; + optional uint64 execution_duration_ms = 10; +} + +message FunctionCallRequest { + optional string namespace = 1; + optional string application = 2; + // The ID of request in which the function call + // needs to run. + optional string request_id = 3; + // Function call tree. + // updates.root_function_call_id is the ID of this function call. + optional ExecutionPlanUpdates updates = 4; + // ID of the function call that initiated this function call. + optional string source_function_call_id = 6; +} + +message FunctionCallResponse { +} + +// Internal API for scheduling and running allocations on Executors. Executors are acting as clients of this API. +// Server is responsible for scheduling allocations on Executors and Executors are responsible for running the allocations. +// +// Rename with caution. Existing clients won't find the service if the service name changes. A HTTP2 ingress proxy +// might use the service name in it HTTP2 path based routing rules. See how gRPC uses service names in its HTTP2 paths +// at https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md. +service ExecutorAPI { + // Called by Executor every 5 seconds to report that it's still alive and provide its current state. + // + // Missing 3 reports will result in the Executor being deregistered by Server. + rpc report_executor_state(ReportExecutorStateRequest) returns (ReportExecutorStateResponse) {} + + // Called by Executor to open a stream of its desired states. When Server wants Executor to change something + // it puts a message on the stream with the new desired state of the Executor. + rpc get_desired_executor_states(GetDesiredExecutorStatesRequest) returns (stream DesiredExecutorState) {} + + // Called by the user code to invoke a blocking function call. + rpc call_function(FunctionCallRequest) returns (FunctionCallResponse) {} +} diff --git a/dataplane/proto/function_executor.proto b/dataplane/proto/function_executor.proto new file mode 100644 index 000000000..18635a8b1 --- /dev/null +++ b/dataplane/proto/function_executor.proto @@ -0,0 +1,392 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; +import "status.proto"; + +package function_executor_service; + +// The messages should not use any Python SDK objects. Only Function Executor implemented +// in Python is allowed to import Python SDK to run customer functions. This ensures that +// all the other components can be written in any language. + +// The standard Empty message. +message Empty {} + +enum SerializedObjectEncoding { + SERIALIZED_OBJECT_ENCODING_UNKNOWN = 0; + SERIALIZED_OBJECT_ENCODING_UTF8_JSON = 1; + SERIALIZED_OBJECT_ENCODING_UTF8_TEXT = 2; + SERIALIZED_OBJECT_ENCODING_BINARY_PICKLE = 3; + SERIALIZED_OBJECT_ENCODING_BINARY_ZIP = 4; + SERIALIZED_OBJECT_ENCODING_RAW = 5; +} + +message SerializedObjectManifest { + optional SerializedObjectEncoding encoding = 1; + optional uint64 encoding_version = 2; + // Includes data and metadata. + optional uint64 size = 3; + // Metadata is stored before the actual data. + optional uint64 metadata_size = 4; + // Includes data and metadata. + optional string sha256_hash = 5; + // MIME type. Only set when raw encoding is used. + optional string content_type = 6; + // ID of the function call that returned this SO as it's return value. + // SOs which are not a function call return value, don't have this field set. + optional string source_function_call_id = 7; +} + +// Full serialized object with all its data. +message SerializedObject { + optional SerializedObjectManifest manifest = 1; + optional bytes data = 2; +} + +// A chunk of a BLOB in a BLOB store. +message BLOBChunk { + // URI of the chunk. + // S3 URI if the data is stored in S3. + // Starts with "file://"" prefix if the data is stored on a local file system. + optional string uri = 1; + // Actual size of chunk data if the BLOB has data. + // Max chunk size if the BLOB has no data yet (can be used for data upload). + optional uint64 size = 2; + // None if the BLOB has data. + // ETag of the chunk data (generated by BLOB store) if the BLOB chunk was used to upload data. + optional string etag = 3; +} + +// A BLOB in a BLOB store. +message BLOB { + // ID of the BLOB, unique per allocation. + optional string id = 1; + // Ordered chunks of the BLOB if the BLOB has data. + // Ordered chunks of the BLOB that can be used to upload data if the BLOB has no data yet. + repeated BLOBChunk chunks = 2; +} + +// Serialized object stored inside a BLOB. +// The BLOB is determined from the protocol context. +message SerializedObjectInsideBLOB { + optional SerializedObjectManifest manifest = 1; + // Offset inside the BLOB where the serialized object data starts. + optional uint64 offset = 2; +} + +message FunctionRef { + optional string namespace = 1; + optional string application_name = 2; + optional string function_name = 3; + optional string application_version = 4; +} + +// InitializeRequest contains information about the function +// that Function Executor is going to run the allocation for. +message InitializeRequest { + optional FunctionRef function = 1; + optional SerializedObject application_code = 2; +} + +enum InitializationOutcomeCode { + INITIALIZATION_OUTCOME_CODE_UNKNOWN = 0; + INITIALIZATION_OUTCOME_CODE_SUCCESS = 1; + INITIALIZATION_OUTCOME_CODE_FAILURE = 2; +} + +enum InitializationFailureReason { + INITIALIZATION_FAILURE_REASON_UNKNOWN = 0; + INITIALIZATION_FAILURE_REASON_INTERNAL_ERROR = 1; + INITIALIZATION_FAILURE_REASON_FUNCTION_ERROR = 2; +} + +message InitializeResponse { + optional InitializationOutcomeCode outcome_code = 1; + optional InitializationFailureReason failure_reason = 2; +} + +message ListAllocationsRequest {} + +message ListAllocationsResponse { + repeated Allocation allocations = 1; +} + +message Metrics { + map timers = 1; + map counters = 2; +} + +enum AllocationOutcomeCode { + ALLOCATION_OUTCOME_CODE_UNKNOWN = 0; + ALLOCATION_OUTCOME_CODE_SUCCESS = 1; + ALLOCATION_OUTCOME_CODE_FAILURE = 2; +} + +enum AllocationFailureReason { + ALLOCATION_FAILURE_REASON_UNKNOWN = 0; + ALLOCATION_FAILURE_REASON_INTERNAL_ERROR = 1; + ALLOCATION_FAILURE_REASON_FUNCTION_ERROR = 2; + ALLOCATION_FAILURE_REASON_REQUEST_ERROR = 3; +} + +message AllocationProgress { + float current = 1; + float total = 2; +} + +message AllocationOutputBLOBRequest { + // ID of the BLOB, unique per allocation. + optional string id = 1; + optional uint64 size = 2; +} + +message AllocationFunctionCall { + // Call ID is the same as root_function_call_id in ExecutionPlanUpdates. + optional ExecutionPlanUpdates updates = 1; + // SerializedObjectInsideBLOB arguments from the execution + // plan are stored inside this BLOB. + // Optional, not set if no function arguments in updates. + optional BLOB args_blob = 2; +} + +message AllocationFunctionCallWatcher { + optional string watcher_id = 1; + // Only root_function_call_id of ExecutionPlanUpdates can be watched. + optional string function_call_id = 2; +} + +message AllocationRequestStatePrepareReadOperation {} + +message AllocationRequestStatePrepareWriteOperation { + optional uint64 size = 1; +} + +message AllocationRequestStateCommitWriteOperation { + // BLOB with the uploaded data. + optional BLOB blob = 1; +} + +message AllocationRequestStateOperation { + optional string operation_id = 1; + optional string state_key = 2; + oneof operation { + AllocationRequestStatePrepareReadOperation prepare_read = 3; + AllocationRequestStatePrepareWriteOperation prepare_write = 4; + AllocationRequestStateCommitWriteOperation commit_write = 5; + } +} + +message AllocationState { + // Optional current allocation progress. + optional AllocationProgress progress = 1; + // Requests for writable BLOBs so allocation can upload data there. + repeated AllocationOutputBLOBRequest output_blob_requests = 2; + // Function calls started by the allocation. + // New function calls are detected by client when it sees a new + // ExecutionPlanUpdates.root_function_call_id. + // + // If a not finished function call gets removed from this list then + // it doesn't get cancelled by the client. ExecutionPlanUpdates are immutable. + repeated AllocationFunctionCall function_calls = 3; + // Watchers for function calls started by the allocation. + repeated AllocationFunctionCallWatcher function_call_watchers = 4; + // Request state operations. + repeated AllocationRequestStateOperation request_state_operations = 7; + + // Optional allocation result. Set when allocation finishes. + AllocationResult result = 5; + // Required state hash. + optional string sha256_hash = 6; +} + +// FunctionInputs contains the input data for allocation execution +message FunctionInputs { + repeated SerializedObjectInsideBLOB args = 1; + // Each arg has a matching BLOB. + repeated BLOB arg_blobs = 2; + // BLOB where to upload request error message if allocation fails with request error. + optional BLOB request_error_blob = 3; + // Always set. An empty bytes array if no metadata, e.g. for API function call. + optional bytes function_call_metadata = 4; +} + +message FunctionArg { + oneof source { + // Data dependency reference to another function call. + string function_call_id = 1; + // Stored inside uploaded_function_outputs_blob or args_blob. + SerializedObjectInsideBLOB value = 2; + } +} + +message FunctionCall { + optional string id = 1; + optional FunctionRef target = 2; + repeated FunctionArg args = 3; + optional bytes call_metadata = 4; +} + +message ReduceOp { + optional string id = 1; + // Contains at least two items. + repeated FunctionArg collection = 2; + optional FunctionRef reducer = 3; + optional bytes call_metadata = 4; +} + + +message ExecutionPlanUpdate { + oneof op { + FunctionCall function_call = 10; + ReduceOp reduce = 12; + } +} + +message ExecutionPlanUpdates { + repeated ExecutionPlanUpdate updates = 1; + optional string root_function_call_id = 2; + // Optional timestamp for when to start the plan execution. + optional google.protobuf.Timestamp start_at = 3; +} + +// AllocationResult contains the execution outcome and outputs of an allocation. +message AllocationResult { + optional AllocationOutcomeCode outcome_code = 1; + optional AllocationFailureReason failure_reason = 2; + oneof outputs { + SerializedObjectInsideBLOB value = 3; + ExecutionPlanUpdates updates = 4; + } + // The function_outputs_blob with uploaded function outputs. + optional BLOB uploaded_function_outputs_blob = 5; + // User payload for request error if allocation failed with request error. + // Stored inside uploaded_request_error_blob. + optional SerializedObjectInsideBLOB request_error_output = 6; + // The request_error_blob with uploaded request error output, if any. + optional BLOB uploaded_request_error_blob = 7; + optional Metrics metrics = 8; +} + +// Allocation represents an allocation with its metadata and execution result +message Allocation { + optional string request_id = 1; + optional string function_call_id = 2; + optional string allocation_id = 3; + // Original allocation inputs. + optional FunctionInputs inputs = 4; + // Set when allocation finishes. + optional AllocationResult result = 5; +} + +message CreateAllocationRequest { + optional Allocation allocation = 1; +} + +message WatchAllocationStateRequest { + optional string allocation_id = 1; +} + +message DeleteAllocationRequest { + optional string allocation_id = 1; +} + +message AllocationOutputBLOB { + optional google.rpc.Status status = 1; + // Set if status is OK. + optional BLOB blob = 2; +} + +message AllocationFunctionCallResult { + // From ExecutionPlanUpdates.root_function_call_id. + optional string function_call_id = 1; + optional AllocationOutcomeCode outcome_code = 2; + optional SerializedObjectInsideBLOB value_output = 3; + optional BLOB value_blob = 4; + optional SerializedObjectInsideBLOB request_error_output = 5; + optional BLOB request_error_blob = 6; +} + +message AllocationRequestStatePrepareReadOperationResult { + // A readable BLOB with the value of the requested key. + // Reading the BLOB will fail if the state key doesn't exist. + optional BLOB blob = 1; +} + +message AllocationRequestStatePrepareWriteOperationResult { + // A writable BLOB where the allocation can upload the value for the requested key. + optional BLOB blob = 1; +} + +message AllocationRequestStateCommitWriteOperationResult {} + +message AllocationRequestStateOperationResult { + optional string operation_id = 1; + optional google.rpc.Status status = 3; + // result is set if status is OK. + oneof result { + AllocationRequestStatePrepareReadOperationResult prepare_read = 4; + AllocationRequestStatePrepareWriteOperationResult prepare_write = 5; + AllocationRequestStateCommitWriteOperationResult commit_write = 6; + } +} + +message AllocationUpdate { + // Target allocation ID. + optional string allocation_id = 1; + oneof update { + // Function call result if the function call is watched by allocation. + AllocationFunctionCallResult function_call_result = 2; + // TODO: Delete output_blob_deprecated once all FEs can use both + // output_blob_deprecated and output_blob. + BLOB output_blob_deprecated = 3; + AllocationOutputBLOB output_blob = 4; + AllocationRequestStateOperationResult request_state_operation_result = 5; + } +} + +message HealthCheckRequest {} + +message HealthCheckResponse { + optional bool healthy = 1; + optional string status_message = 2; +} + +message InfoRequest {} + +message InfoResponse { + // Internal version of this Function Executor. + // Semantic versioning schema is used with format 0.0.0. + // Used to support migrations. + optional string version = 1; + // The version of the SDK used in this Function Executor to run customer code. + optional string sdk_version = 2; + // The language of the SDK. Currently supported values: + // - "python" + optional string sdk_language = 3; + // The version of the SDK language. The language's versioning format is used. + optional string sdk_language_version = 4; +} + +service FunctionExecutor { + // Initializes the Function Executor to run allocations + // for a particular function. This method is called only + // once per Function Executor as it can only run a single function. + // It should be called before calling create_allocation. + rpc initialize(InitializeRequest) returns (InitializeResponse); + // Lists the currently-running allocations. + rpc list_allocations(ListAllocationsRequest) returns (ListAllocationsResponse); + // Creates and starts the allocation specified in the request. + rpc create_allocation(CreateAllocationRequest) returns (Empty); + // Streams updates of the allocation state until allocation finishes. + // If allocation is finished, streams the final state and closes the stream. + rpc watch_allocation_state(WatchAllocationStateRequest) returns (stream AllocationState); + // Deletes the allocation. The allocation must be finished before calling this method. + rpc delete_allocation(DeleteAllocationRequest) returns (Empty); + // Send an update to an allocation. + rpc send_allocation_update(AllocationUpdate) returns (Empty); + // Health check method to check if the FE is able to run allocations. + // The FE should be initialized before calling this method. + rpc check_health(HealthCheckRequest) returns (HealthCheckResponse); + // Information about this Function Executor. + rpc get_info(InfoRequest) returns (InfoResponse); +} diff --git a/dataplane/proto/status.proto b/dataplane/proto/status.proto new file mode 100644 index 000000000..2d2acdb01 --- /dev/null +++ b/dataplane/proto/status.proto @@ -0,0 +1,42 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; +package google.rpc; +import "google/protobuf/any.proto"; +option cc_enable_arenas = true; +option go_package = "google.golang.org/genproto/googleapis/rpc/status;status"; +option java_multiple_files = true; +option java_outer_classname = "StatusProto"; +option java_package = "com.google.rpc"; +option objc_class_prefix = "RPC"; +// The `Status` type defines a logical error model that is suitable for +// different programming environments, including REST APIs and RPC APIs. It is +// used by [gRPC](https://github.com/grpc). Each `Status` message contains +// three pieces of data: error code, error message, and error details. +// +// You can find out more about this error model and how to work with it in the +// [API Design Guide](https://cloud.google.com/apis/design/errors). +message Status { + // The status code, which should be an enum value of + // [google.rpc.Code][google.rpc.Code]. + int32 code = 1; + // A developer-facing error message, which should be in English. Any + // user-facing error message should be localized and sent in the + // [google.rpc.Status.details][google.rpc.Status.details] field, or localized + // by the client. + string message = 2; + // A list of messages that carry the error details. There is a common set of + // message types for APIs to use. + repeated google.protobuf.Any details = 3; +} diff --git a/dataplane/rust-toolchain.toml b/dataplane/rust-toolchain.toml new file mode 100644 index 000000000..e88baf106 --- /dev/null +++ b/dataplane/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "1.88.0" diff --git a/dataplane/src/executor/README.md b/dataplane/src/executor/README.md new file mode 100644 index 000000000..d357b6c97 --- /dev/null +++ b/dataplane/src/executor/README.md @@ -0,0 +1,18 @@ +# executor + +The Indexify Executor is a component that is in charge of spawning/allocating tasks to Tensorlake's Function Executor. It coordinates between the Function Executor and the Indexify Server. + +## Components + +* Monitoring Server: Connects to Indexify server and reports its health. It serves a HTTP server on a configurable port, running on 7000 by default. + * `/monitoring/startup`: Tells whether the Executor ready to receive tasks. Doesn't report the health of the connection to the server, thus doesn't indicate if tasks will successfully run. + * `/monitoring/health`: Tells whether the Executor has an healthy connection to the Indexify server. + * `/monitoring/metrics`: Exposes the Executor's prometheus metrics. + * `/state/reported`: Reports the metadata state of the Executor (host). These include the available host resources, dev environment setup, OS, CPU architecture, state hash, system time, and executor id. + * `/state/desired`: Reports the desired state of the Executor (host) received from the Indexify server. +* State Reconciler: Executes functions based on Executor initial state and updates the evenutal state. + +## Design decisions + +* Metrics: We consolidate the metrics into a single module in order to simplify state sharing across the Executor modules and threads. +* gRPC: The gRPC client API is also consolidated into a single module for simplifying implementation and for no other apparent reason. diff --git a/dataplane/src/executor/blob_store/local_fs.rs b/dataplane/src/executor/blob_store/local_fs.rs new file mode 100644 index 000000000..d8a787ef0 --- /dev/null +++ b/dataplane/src/executor/blob_store/local_fs.rs @@ -0,0 +1,82 @@ +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncWriteExt}, +}; + +use std::path::PathBuf; + +use crate::executor::blob_store::{BlobMetadata, BlobStoreImpl}; + +pub struct LocalFsBlobStore; + +impl LocalFsBlobStore { + pub async fn new() -> Self { + LocalFsBlobStore + } + + fn strip_prefix(uri: &str) -> &str { + uri.strip_prefix("file://").unwrap_or(uri) + } +} + +impl BlobStoreImpl for LocalFsBlobStore { + async fn get(&self, uri: &str) -> anyhow::Result { + let uri = Self::strip_prefix(uri); + let path = PathBuf::from(uri); + let mut file = File::open(&path).await?; + let metadata = file.metadata().await?; + let size = metadata.len(); + let mut buffer = vec![0; size as usize]; + file.read_exact(&mut buffer).await?; + Ok(buffer.into()) + } + + async fn get_metadata(&self, uri: &str) -> anyhow::Result { + let uri = Self::strip_prefix(uri); + let path = PathBuf::from(uri); + let file = File::open(&path).await?; + let metadata = file.metadata().await?; + let size_bytes = metadata.len(); + + Ok(BlobMetadata { size_bytes }) + } + + async fn presign_get_uri(&self, uri: &str, _expires_in_sec: u64) -> anyhow::Result { + Ok(uri.to_string()) + } + + async fn upload(&self, uri: &str, data: bytes::Bytes) -> anyhow::Result<()> { + let uri = Self::strip_prefix(uri); + let path = PathBuf::from(uri); + let mut file = File::create(&path).await?; + file.write_all(&data).await?; + Ok(()) + } + + async fn create_multipart_upload(&self, _uri: &str) -> anyhow::Result { + Ok("no-id-for-local-file".to_string()) + } + + async fn complete_multipart_upload( + &self, + _uri: &str, + _upload_id: &str, + _parts: Vec, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn abort_multipart_upload(&self, _uri: &str, _upload_id: &str) -> anyhow::Result<()> { + Ok(()) + } + + async fn presign_upload_part_uri( + &self, + uri: &str, + _part_number: i32, + _upload_id: &str, + _expires_in_sec: u64, + ) -> anyhow::Result { + Ok(uri.to_string()) + } +} diff --git a/dataplane/src/executor/blob_store/mod.rs b/dataplane/src/executor/blob_store/mod.rs new file mode 100644 index 000000000..d6232ad73 --- /dev/null +++ b/dataplane/src/executor/blob_store/mod.rs @@ -0,0 +1,125 @@ +use anyhow::Result; +use bytes::Bytes; + +use crate::executor::blob_store::{local_fs::LocalFsBlobStore, s3::S3BlobStore}; + +pub mod local_fs; +pub mod s3; + +pub struct BlobMetadata { + pub size_bytes: u64, +} + +pub trait BlobStoreImpl { + async fn get(&self, uri: &str) -> Result; + async fn get_metadata(&self, uri: &str) -> Result; + async fn presign_get_uri(&self, uri: &str, expires_in_sec: u64) -> Result; + async fn upload(&self, uri: &str, data: Bytes) -> Result<()>; + async fn create_multipart_upload(&self, uri: &str) -> Result; + async fn complete_multipart_upload( + &self, + uri: &str, + upload_id: &str, + parts: Vec, + ) -> Result<()>; + async fn abort_multipart_upload(&self, uri: &str, upload_id: &str) -> Result<()>; + async fn presign_upload_part_uri( + &self, + uri: &str, + part_number: i32, + upload_id: &str, + expires_in_sec: u64, + ) -> Result; +} + +pub struct BlobStore { + local: LocalFsBlobStore, + s3: S3BlobStore, +} + +impl BlobStore { + pub async fn new() -> Self { + let local = LocalFsBlobStore::new().await; + let s3 = S3BlobStore::new(); + Self { local, s3 } + } +} + +impl BlobStoreImpl for BlobStore { + async fn get(&self, uri: &str) -> Result { + if uri.starts_with("file://") { + return self.local.get(uri).await; + } + self.s3.get(uri).await + } + + async fn get_metadata(&self, uri: &str) -> Result { + if uri.starts_with("file://") { + return self.local.get_metadata(uri).await; + } + self.s3.get_metadata(uri).await + } + + async fn presign_get_uri(&self, uri: &str, expires_in_sec: u64) -> Result { + if uri.starts_with("file://") { + return self.local.presign_get_uri(uri, expires_in_sec).await; + } + self.s3.presign_get_uri(uri, expires_in_sec).await + } + + async fn upload(&self, uri: &str, data: Bytes) -> Result<()> { + if uri.starts_with("file://") { + return self.local.upload(uri, data).await; + } + self.s3.upload(uri, data).await + } + + async fn create_multipart_upload(&self, uri: &str) -> Result { + if uri.starts_with("file://") { + return self.local.create_multipart_upload(uri).await; + } + self.s3.create_multipart_upload(uri).await + } + + async fn complete_multipart_upload( + &self, + uri: &str, + upload_id: &str, + parts: Vec, + ) -> Result<()> { + if uri.starts_with("file://") { + return self + .local + .complete_multipart_upload(uri, upload_id, parts) + .await; + } + self.s3 + .complete_multipart_upload(uri, upload_id, parts) + .await + } + + async fn abort_multipart_upload(&self, uri: &str, upload_id: &str) -> Result<()> { + if uri.starts_with("file://") { + return self.local.abort_multipart_upload(uri, upload_id).await; + } + self.s3.abort_multipart_upload(uri, upload_id).await + } + + async fn presign_upload_part_uri( + &self, + uri: &str, + part_number: i32, + upload_id: &str, + expires_in_sec: u64, + ) -> Result { + if uri.starts_with("file://") { + return self + .local + .presign_upload_part_uri(uri, part_number, upload_id, expires_in_sec) + .await; + } + self.s3 + .presign_upload_part_uri(uri, part_number, upload_id, expires_in_sec) + .await + } +} diff --git a/dataplane/src/executor/blob_store/s3.rs b/dataplane/src/executor/blob_store/s3.rs new file mode 100644 index 000000000..4c54b05e6 --- /dev/null +++ b/dataplane/src/executor/blob_store/s3.rs @@ -0,0 +1,203 @@ +use std::time::Duration; + +use anyhow::{anyhow, Result}; +use aws_config::BehaviorVersion; +use aws_sdk_s3::{ + presigning::PresigningConfig, + primitives::ByteStream, + types::{CompletedMultipartUpload, CompletedPart}, + Client, +}; + +use crate::executor::blob_store::{BlobMetadata, BlobStoreImpl}; + +const MAX_RETRIES: u32 = 3; + +pub struct S3BlobStore { + client: Option, +} + +impl S3BlobStore { + pub fn new() -> Self { + S3BlobStore { client: None } + } + + pub async fn lazy_create_client(&mut self) -> Result<()> { + if self.client.is_none() { + let client = Client::new(&aws_config::load_defaults(BehaviorVersion::latest()).await); + self.client = Some(client); + } + Ok(()) + } + + fn parse_uri(uri: &str) -> Result<(String, String)> { + let uri = uri.strip_prefix("s3://").unwrap_or(uri); + let parts: Vec<&str> = uri.splitn(2, '/').collect(); + if parts.len() != 2 { + Err(anyhow!(format!( + "Failed parsing bucket name from S3 URI '{uri}'" + ))) + } else { + Ok((parts[0].to_string(), parts[1].to_string())) + } + } +} + +impl BlobStoreImpl for S3BlobStore { + async fn get(&self, uri: &str) -> Result { + let (bucket, key) = Self::parse_uri(uri)?; + let client = self + .client + .as_ref() + .ok_or_else(|| anyhow!("Client not initialized"))?; + let response = client.get_object().bucket(&bucket).key(&key).send().await?; + let bytes = response.body.collect().await?.into_bytes(); + Ok(bytes) + } + + async fn get_metadata(&self, uri: &str) -> Result { + let (bucket, key) = Self::parse_uri(uri)?; + let client = self + .client + .as_ref() + .ok_or_else(|| anyhow!("Client not initialized"))?; + let response = client + .head_object() + .bucket(&bucket) + .key(&key) + .send() + .await?; + let size_bytes = response.content_length().unwrap_or(0) as u64; + Ok(BlobMetadata { size_bytes }) + } + + async fn presign_get_uri(&self, uri: &str, expires_in_sec: u64) -> Result { + let (bucket, key) = Self::parse_uri(uri)?; + let client = self + .client + .as_ref() + .ok_or_else(|| anyhow!("Client not initialized"))?; + let presigned_request = client + .get_object() + .bucket(bucket) + .key(key) + .presigned(PresigningConfig::expires_in(Duration::from_secs( + expires_in_sec, + ))?) + .await?; + Ok(presigned_request.uri().to_string()) + } + + async fn upload(&self, uri: &str, data: bytes::Bytes) -> Result<()> { + let (bucket, key) = Self::parse_uri(uri)?; + let client = self + .client + .as_ref() + .ok_or_else(|| anyhow!("Client not initialized"))?; + client + .put_object() + .bucket(&bucket) + .key(&key) + .body(ByteStream::from(data)) + .send() + .await?; + Ok(()) + } + + async fn create_multipart_upload(&self, uri: &str) -> Result { + let (bucket, key) = Self::parse_uri(uri)?; + let client = self + .client + .as_ref() + .ok_or_else(|| anyhow!("Client not initialized"))?; + let upload_id = match client + .create_multipart_upload() + .bucket(&bucket) + .key(&key) + .send() + .await? + .upload_id() + { + Some(id) => id.to_string(), + None => "".to_string(), + }; + Ok(upload_id) + } + + async fn complete_multipart_upload( + &self, + uri: &str, + upload_id: &str, + parts: Vec, + ) -> Result<()> { + let (bucket, key) = Self::parse_uri(uri)?; + let client = self + .client + .as_ref() + .ok_or_else(|| anyhow!("Client not initialized"))?; + let completed_parts: Vec = parts + .into_iter() + .enumerate() + .map(|(i, etag)| { + CompletedPart::builder() + .e_tag(etag) + .part_number((i + 1) as i32) + .build() + }) + .collect(); + client + .complete_multipart_upload() + .bucket(&bucket) + .key(&key) + .upload_id(upload_id) + .multipart_upload( + CompletedMultipartUpload::builder() + .set_parts(Some(completed_parts)) + .build(), + ) + .send() + .await?; + Ok(()) + } + + async fn abort_multipart_upload(&self, uri: &str, upload_id: &str) -> Result<()> { + let (bucket, key) = Self::parse_uri(uri)?; + let client = self + .client + .as_ref() + .ok_or_else(|| anyhow!("Client not initialized"))?; + client + .abort_multipart_upload() + .bucket(&bucket) + .key(&key) + .upload_id(upload_id) + .send() + .await?; + Ok(()) + } + + async fn presign_upload_part_uri( + &self, + uri: &str, + part_number: i32, + upload_id: &str, + expires_in_sec: u64, + ) -> Result { + let (bucket, key) = Self::parse_uri(uri)?; + let client = self + .client + .as_ref() + .ok_or_else(|| anyhow!("Client not initialized"))?; + let presigned_request = client + .upload_part() + .bucket(bucket) + .key(key) + .part_number(part_number) + .upload_id(upload_id) + .presigned(PresigningConfig::expires_in(Duration::from_secs( + expires_in_sec, + ))?) + .await?; + Ok(presigned_request.uri().to_string()) + } +} diff --git a/dataplane/src/executor/executor.rs b/dataplane/src/executor/executor.rs new file mode 100644 index 000000000..22601d110 --- /dev/null +++ b/dataplane/src/executor/executor.rs @@ -0,0 +1,108 @@ +use std::{collections::HashMap, path::PathBuf, sync::Arc}; + +use prometheus_client::registry::Registry; +use tokio::sync::Mutex; + +use crate::executor::{ + blob_store::BlobStore, + executor_api::{ + executor_api_pb::ExecutorStatus, ChannelManager, ExecutorStateReporter, FunctionUri, + }, + function_executor::server_factory::SubprocessFunctionExecutorServerFactory, + host_resources::HostResourcesProvider, + monitoring::{ + health_check_handler::HealthCheckHandler, + health_checker::generic_health_checker::GenericHealthChecker, + prometheus_metrics_handler::PrometheusMetricsHandler, + reported_state_handler::ReportedStateHandler, server::MonitoringServer, + startup_probe_handler::StartupProbeHandler, + }, +}; + +pub struct Executor { + startup_probe_handler: StartupProbeHandler, + channel_manager: Arc, + state_reporter: ExecutorStateReporter, + state_reconciler: ExecutorStateReconciler, + monitoring_server: MonitoringServer, + registry: Arc, +} + +impl Executor { + pub async fn new( + id: String, + version: String, + labels: HashMap, + cache_path: PathBuf, + // TODO: use trait here + health_checker: Arc>, + function_uris: Vec, + // TODO: use trait here + function_executor_server_factory: SubprocessFunctionExecutorServerFactory, + server_addr: String, + grpc_server_addr: String, + config_path: Option, + monitoring_server_host: String, + monitoring_server_port: u16, + blob_store: BlobStore, + host_resource_provider: HostResourcesProvider, + catalog_entry_name: Option, + registry: Arc, + ) -> Result> { + let channel_manager = + Arc::new(ChannelManager::new(grpc_server_addr, config_path.clone()).await?); + let startup_probe_handler = StartupProbeHandler::new(); + let state_reporter = ExecutorStateReporter::new( + id.clone(), + version.clone(), + &mut labels, + parse_function_uris(function_uris), + channel_manager.clone(), + host_resource_provider, + health_checker.clone(), + catalog_entry_name.clone(), + ); + let state_reconciler = ExecutorStateReconciler::new( + id, + function_executor_server_factory, + server_addr, + config_path, + cache_path, + blob_store, + channel_manager, + state_reporter, + ); + state_reporter.update_executor_status(ExecutorStatus::StartingUp); + Ok(Executor { + startup_probe_handler, + channel_manager, + state_reporter, + state_reconciler, + monitoring_server: MonitoringServer::new( + monitoring_server_host, + monitoring_server_port, + startup_probe_handler, + HealthCheckHandler::new(health_checker.clone()), + PrometheusMetricsHandler::new(registry.clone()), + ReportedStateHandler::new(state_reporter), + // DesiredStateHandler::new(state_reconciler), + ), + registry, + }) + } + + pub fn run(&self) {} +} + +fn parse_function_uris(vec: Vec) -> Vec { + vec.into_iter() + .map(|uri| { + FunctionUri::new( + uri.split(':').nth(0).unwrap_or_default().to_string(), + uri.split(':').nth(1).unwrap_or_default().to_string(), + uri.split(':').nth(2).unwrap_or_default().to_string(), + uri.split(':').nth(3).map(|v| v.to_string()), + ) + }) + .collect() +} diff --git a/dataplane/src/executor/executor_api.rs b/dataplane/src/executor/executor_api.rs new file mode 100644 index 000000000..01a19f5d8 --- /dev/null +++ b/dataplane/src/executor/executor_api.rs @@ -0,0 +1,542 @@ +use std::{collections::HashMap, error::Error, sync::Arc, time::Duration}; + +use serde::Deserialize; +use tokio::{ + sync::{Mutex, Notify}, + time::{sleep, timeout}, +}; +use tonic::{ + Request, + transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity}, +}; +use tracing::{error, info, warn}; + +use crate::executor::{ + executor_api::executor_api_pb::{ + AllocationResult, AllowedFunction, ExecutorState, ExecutorStatus, ExecutorUpdate, + FunctionCallWatch, FunctionExecutorState, ReportExecutorStateRequest, + executor_api_client::ExecutorApiClient, + }, + host_resources::{HostResources, HostResourcesProvider}, + monitoring::health_checker::{HealthChecker, generic_health_checker::GenericHealthChecker}, +}; + +const REPORTING_INTERVAL_SEC: u64 = 5; +const REPORTING_BACKOFF_SEC: u64 = 5; +const REPORT_RPC_TIMEOUT_SEC: u64 = 5; + +#[allow(non_camel_case_types)] // The autogenerated code uses snake_case types in some cases +pub mod executor_api_pb { + tonic::include_proto!("executor_api_pb"); +} + +pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); + +#[derive(Debug, Clone)] +pub struct FunctionCallWatchInfo { + watch: FunctionCallWatch, + ref_counter: u64, +} + +fn function_call_watch_key(watch: FunctionCallWatch) -> String { + format!( + "{}.{}.{}.{}", + watch.namespace(), + watch.application(), + watch.request_id(), + watch.function_call_id() + ) +} + +#[derive(Debug)] +struct ReporterState { + executor_status: ExecutorStatus, + last_server_clock: u64, + pending_allocation_results: Vec, + function_executor_states: HashMap, + function_call_watches: HashMap, + last_state_report_request: Option, +} + +impl ReporterState { + fn new() -> Self { + Self { + executor_status: ExecutorStatus::Unknown, + last_server_clock: 0, + pending_allocation_results: Vec::new(), + function_executor_states: HashMap::new(), + function_call_watches: HashMap::new(), + last_state_report_request: None, + } + } + + fn current_function_call_watches(&self) -> Vec { + self.function_call_watches + .values() + .map(|info| info.watch.clone()) + .collect() + } + + fn remove_pending_update(&mut self, executor_id: &str) -> ExecutorUpdate { + let alloc_results = std::mem::take(&mut self.pending_allocation_results); + ExecutorUpdate { + executor_id: Some(executor_id.to_string()), + allocation_results: alloc_results, + } + } + + fn add_to_pending_update(&mut self, update: ExecutorUpdate) { + self.pending_allocation_results + .extend(update.allocation_results); + } +} + +#[derive(Debug, Clone)] +pub struct ExecutorStateReporter { + id: String, + version: String, + labels: HashMap, + channel_manager: Arc, + total_host_resources: HostResources, + total_function_executor_resources: HostResources, + health_checker: Arc>, + catalog_entry_name: Option, + allowed_functions: Vec, + state: Arc>, + state_report_scheduled: Arc, + state_reported: Arc, +} + +impl ExecutorStateReporter { + pub fn new( + id: String, + version: String, + labels: &mut HashMap, + function_allowlist: Vec, + channel_manager: Arc, + host_resource_provider: HostResourcesProvider, + health_checker: Arc>, + catalog_entry_name: Option, + ) -> Self { + labels.extend(executor_labels()); + + ExecutorStateReporter { + id, + version, + labels: labels.to_owned(), + channel_manager, + total_host_resources: host_resource_provider.total_host_resources(), + total_function_executor_resources: host_resource_provider + .total_function_executor_resources(), + health_checker, + catalog_entry_name, + allowed_functions: to_allowed_function(function_allowlist), + state: Arc::new(Mutex::new(ReporterState::new())), + state_report_scheduled: Arc::new(Notify::new()), + state_reported: Arc::new(Notify::new()), + } + } + + pub async fn last_state_report_request(&self) -> Option { + self.state.lock().await.last_state_report_request.clone() + } + + pub async fn update_executor_status(&mut self, status: ExecutorStatus) { + self.state.lock().await.executor_status = status; + } + + pub async fn update_last_server_clock(&mut self, clock: u64) { + self.state.lock().await.last_server_clock = clock; + } + + pub async fn update_function_executor_state(&mut self, state: FunctionExecutorState) { + match state.clone().description { + Some(desc) => match desc.id { + Some(id) => { + self.state + .lock() + .await + .function_executor_states + .insert(id.to_string(), state); + } + None => {} + }, + None => {} + } + } + + pub async fn remove_function_executor_state(&mut self, id: &str) { + if self + .state + .lock() + .await + .function_executor_states + .remove(id) + .is_none() + { + warn!( + function_executor_id = id, + "attempted to remove non-existing function executor state" + ) + } + } + + pub async fn add_completed_allocation_result(&mut self, alloc_result: AllocationResult) { + self.state + .lock() + .await + .pending_allocation_results + .push(alloc_result); + } + + pub async fn add_function_call_watcher(&mut self, watch: FunctionCallWatch) { + let mut state = self.state.lock().await; + let content_derived_key = function_call_watch_key(watch.clone()); + if !state + .function_call_watches + .contains_key(&content_derived_key) + { + state.function_call_watches.insert( + content_derived_key.clone(), + FunctionCallWatchInfo { + watch, + ref_counter: 0, + }, + ); + } + match state.function_call_watches.get_mut(&content_derived_key) { + Some(info) => info.ref_counter += 1, + None => {} + } + } + + pub async fn remove_function_call_watcher(&mut self, watch: FunctionCallWatch) { + let content_derived_key = function_call_watch_key(watch.clone()); + match self + .state + .lock() + .await + .function_call_watches + .get_mut(&content_derived_key) + { + Some(info) => { + if info.ref_counter == 0 { + self.state + .lock() + .await + .function_call_watches + .remove(&content_derived_key); + return; + } + info.ref_counter -= 1; + } + None => {} + } + } + + pub async fn current_function_call_watches(&self) -> Vec { + return self + .state + .lock() + .await + .function_call_watches + .iter() + .map(|f| f.1.watch.clone()) + .collect(); + } + + pub fn schedule_state_report(&self) { + self.state_report_scheduled.notify_one(); + } + + pub async fn report_state_and_wait_for_completion(&self) { + let notified = self.state_reported.notified(); + self.schedule_state_report(); + notified.await; + } + + pub async fn run(&self) { + let notify = self.state_report_scheduled.clone(); + tokio::spawn(async move { + Self::periodic_scheduler_loop(notify).await; + }); + + let worker_self = self.clone(); + tokio::spawn(async move { + worker_self.state_report_worker_loop().await; + }); + + info!("executor state reporter started"); + } + + async fn current_executor_state(&self) -> ExecutorState { + let state = self.state.lock().await; + let function_executor_states: Vec = + state.function_executor_states.values().cloned().collect(); + + ExecutorState { + executor_id: Some(self.id.clone()), + hostname: Some(gethostname::gethostname().to_string_lossy().to_string()), + version: Some(self.version.clone()), + status: Some(state.executor_status as i32), + total_function_executor_resources: Some( + self.total_function_executor_resources.to_proto(), + ), + total_resources: Some(self.total_host_resources.to_proto()), + allowed_functions: self.allowed_functions.clone(), + function_executor_states, + labels: self.labels.clone(), + catalog_entry_name: self.catalog_entry_name.clone(), + function_call_watches: state.current_function_call_watches(), + server_clock: Some(state.last_server_clock), + state_hash: None, // TODO: implement state hashing + } + } + + async fn periodic_scheduler_loop(notify: Arc) { + loop { + notify.notify_one(); + sleep(Duration::from_secs(REPORTING_INTERVAL_SEC)).await; + } + } + + async fn state_report_worker_loop(self) { + loop { + let channel = match self.channel_manager.create_channel() { + Ok(ch) => ch, + Err(e) => { + error!(error = ?e, "failed to create gRPC channel"); + sleep(Duration::from_secs(REPORTING_BACKOFF_SEC)).await; + continue; + } + }; + + let mut client = ExecutorApiClient::new(channel); + + loop { + self.state_report_scheduled.notified().await; + + let executor_state = self.current_executor_state().await; + let executor_update = { + let mut state = self.state.lock().await; + state.remove_pending_update(&self.id) + }; + + let request = Request::new(ReportExecutorStateRequest { + executor_state: Some(executor_state), + executor_update: Some(executor_update.clone()), + }); + + self.log_reported_update(&executor_update); + + { + let mut state = self.state.lock().await; + state.last_state_report_request = Some(request.get_ref().clone()); + } + + match timeout( + Duration::from_secs(REPORT_RPC_TIMEOUT_SEC), + client.report_executor_state(request), + ) + .await + { + Ok(Ok(_response)) => { + self.state_reported.notify_waiters(); + self.health_checker + .lock() + .await + .server_connection_state_changed( + true, + "grpc server channel is healthy".to_string(), + ); + } + Ok(Err(status)) => { + error!( + status = ?status, + "failed to report state, backing off for {} sec", + REPORTING_BACKOFF_SEC + ); + self.handle_report_failure(executor_update).await; + break; + } + Err(_) => { + error!( + "state report timed out, backing off for {} sec", + REPORTING_BACKOFF_SEC + ); + self.handle_report_failure(executor_update).await; + break; + } + } + } + } + } + + async fn handle_report_failure(&self, executor_update: ExecutorUpdate) { + { + let mut state = self.state.lock().await; + state.add_to_pending_update(executor_update); + } + + self.health_checker + .lock() + .await + .server_connection_state_changed(false, "grpc server channel is unhealthy".to_string()); + + sleep(Duration::from_secs(REPORTING_BACKOFF_SEC)).await; + } + + fn log_reported_update(&self, update: &ExecutorUpdate) { + for alloc_result in &update.allocation_results { + info!( + allocation_id = ?alloc_result.allocation_id, + outcome_code = alloc_result.outcome_code, + "reporting allocation outcome" + ); + } + } +} + +fn executor_labels() -> HashMap { + let mut labels = HashMap::new(); + labels.insert("os".to_string(), std::env::consts::OS.to_string()); + labels.insert( + "architecture".to_string(), + std::env::consts::ARCH.to_string(), + ); + labels.insert( + "rust_major_version".to_string(), + env!("CARGO_PKG_VERSION_MAJOR").to_string(), + ); + labels.insert( + "rust_minor_version".to_string(), + env!("CARGO_PKG_VERSION_MINOR").to_string(), + ); + labels +} + +pub struct FunctionUri { + namespace: String, + application: String, + compute_fn: String, + version: Option, +} + +impl FunctionUri { + pub fn new( + namespace: String, + application: String, + compute_fn: String, + version: Option, + ) -> Self { + FunctionUri { + namespace, + application, + compute_fn, + version, + } + } +} + +impl From<&FunctionUri> for HashMap { + fn from(uri: &FunctionUri) -> Self { + let mut map = HashMap::new(); + map.insert("namespace".to_string(), uri.namespace.clone()); + map.insert("application".to_string(), uri.application.clone()); + map.insert("compute_fn".to_string(), uri.compute_fn.clone()); + if let Some(version) = &uri.version { + map.insert("version".to_string(), version.clone()); + } + map + } +} + +fn to_allowed_function(uris: Vec) -> Vec { + uris.into_iter() + .map(|uri| AllowedFunction { + namespace: Some(uri.namespace.clone()), + application_name: Some(uri.application.clone()), + function_name: Some(uri.compute_fn.clone()), + application_version: uri.version.clone(), + }) + .collect() +} + +#[derive(Debug, Deserialize)] +struct TlsFileConfig { + cert_path: Option, + key_path: Option, + ca_bundle_path: Option, +} + +#[derive(Debug, Deserialize)] +struct TlsConfig { + #[serde(default)] + use_tls: bool, + tls_config: Option, +} + +#[derive(Debug, Clone)] +pub struct ChannelManager { + server_address: String, + tls_config: Option, + shared_channel: Option>>, +} + +impl ChannelManager { + pub async fn new( + server_address: String, + config_path: Option, + ) -> Result> { + let tls_config = match config_path { + Some(path) => Self::load_tls_config(&path).await?, + None => None, + }; + Ok(ChannelManager { + server_address, + tls_config, + shared_channel: None, + }) + } + + pub async fn load_tls_config(path: &str) -> Result, Box> { + let config: TlsConfig = serde_yaml::from_str(&tokio::fs::read_to_string(path).await?)?; + if !config.use_tls { + return Ok(None); + } + + let tls = config.tls_config.ok_or("TLS config not found")?; + let mut tls_config = ClientTlsConfig::new(); + + if let (Some(cert_path), Some(key_path)) = (tls.cert_path, tls.key_path) { + let cert = tokio::fs::read(cert_path).await?; + let key = tokio::fs::read(key_path).await?; + tls_config = tls_config.identity(Identity::from_pem(cert, key)); + } + if let Some(ca_bundle_path) = tls.ca_bundle_path { + let ca_bundle = tokio::fs::read(ca_bundle_path).await?; + tls_config = tls_config.ca_certificate(Certificate::from_pem(ca_bundle)); + } + + Ok(Some(tls_config)) + } + + pub fn get_shared_channel(&mut self) -> Result>, tonic::transport::Error> { + if let Some(channel) = self.shared_channel.take() { + Ok(channel) + } else { + self.shared_channel = Some(Arc::new(Mutex::new(self.create_channel()?))); + Ok(self.shared_channel.clone().unwrap()) + } + } + + pub fn create_channel(&self) -> Result { + let mut endpoint = Endpoint::from_shared(self.server_address.clone()) + .expect("Invalid server address") + .connect_timeout(CONNECTION_TIMEOUT); + if let Some(tls_config) = &self.tls_config { + endpoint = endpoint.tls_config(tls_config.clone())?; + } + Ok(endpoint.connect_lazy()) + } +} diff --git a/dataplane/src/executor/function_executor/health_checker.rs b/dataplane/src/executor/function_executor/health_checker.rs new file mode 100644 index 000000000..f963d8ca6 --- /dev/null +++ b/dataplane/src/executor/function_executor/health_checker.rs @@ -0,0 +1,108 @@ +use std::{sync::Arc, time::Duration}; + +use anyhow::Result; +use futures::future::BoxFuture; +use tokio::{sync::Mutex, task::JoinHandle}; +use tonic::transport::Channel; +use tracing::error; + +use crate::executor::function_executor::function_executor_service::{ + function_executor_client::FunctionExecutorClient, HealthCheckRequest, +}; + +const HEALTH_CHECK_POLL_PERIOD_SEC: u32 = 5; + +type HealthCheckFailedCallback = + Box BoxFuture<'static, ()> + Send + Sync>; + +pub struct HealthCheckResult { + is_healthy: bool, + pub reason: String, +} + +impl HealthCheckResult { + pub fn new(is_healthy: bool, reason: String) -> Self { + Self { is_healthy, reason } + } + + pub fn healthy(&self) -> bool { + self.is_healthy + } +} + +pub struct HealthChecker { + channel: Channel, + task: Arc>>>, + callback: Arc>>, +} + +impl HealthChecker { + pub fn new(channel: Channel) -> Self { + Self { + channel, + task: Arc::new(Mutex::new(None)), + callback: Arc::new(Mutex::new(None)), + } + } + + pub async fn check(&self) -> Result { + if std::env::var("INDEXIFY_DISABLE_FUNCTION_EXECUTOR_HEALTH_CHECKS").unwrap_or_default() + == "1" + { + Ok(HealthCheckResult::new(true, "Function Executor health checks are disabled using INDEXIFY_DISABLE_FUNCTION_EXECUTOR_HEALTH_CHECKS env var.".to_string())) + } else { + let mut client = FunctionExecutorClient::new(self.channel.clone()); + let response = client + .check_health(HealthCheckRequest {}) + .await? + .into_inner(); + // TODO: set labels + Ok(HealthCheckResult::new( + response.healthy(), + response.status_message().to_string(), + )) + } + } + + pub async fn start(self: Arc, callback: HealthCheckFailedCallback) -> Result<()> { + let task = self.task.lock().await; + if task.is_some() { + return Ok(()); + } + let mut cb = self.callback.lock().await; + *cb = Some(callback); + let _self = Arc::clone(&self); + let handle = tokio::spawn(async move { + loop { + match _self.check().await { + Ok(result) => { + let cb_opt = { + let mut cb_guard = _self.callback.lock().await; + cb_guard.take() + }; + + if let Some(cb) = cb_opt { + tokio::spawn(cb(result)); + } + break; + } + Err(err) => { + error!("Health check RPC failed, ignoring error: {err:?}") + } + } + tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_POLL_PERIOD_SEC as u64)).await; + } + }); + let mut task = self.task.lock().await; + *task = Some(handle); + Ok(()) + } + + pub async fn stop(self: Arc) { + let mut task = self.task.lock().await; + if let Some(handle) = task.take() { + handle.abort(); + } + self.callback.lock().await.take(); + } +} diff --git a/dataplane/src/executor/function_executor/mod.rs b/dataplane/src/executor/function_executor/mod.rs new file mode 100644 index 000000000..a9387b5d1 --- /dev/null +++ b/dataplane/src/executor/function_executor/mod.rs @@ -0,0 +1,170 @@ +use std::{sync::Arc, time::Duration}; + +use tokio::sync::Mutex; +use tonic::transport::Channel; +use tracing::error; + +use crate::executor::{ + function_executor::{ + function_executor_service::{ + function_executor_client::FunctionExecutorClient, InfoRequest, + }, + health_checker::HealthChecker, + }, + metrics::Metrics, +}; + +pub mod function_executor_service { + tonic::include_proto!("function_executor_service"); +} + +pub mod health_checker; +pub mod server; +pub mod server_factory; +pub mod subprocess; + +pub use function_executor_service::{InitializeRequest, InitializeResponse}; +pub use server::{FunctionExecutorServer, FunctionExecutorServerStatus}; +pub use server_factory::{FunctionExecutorServerConfiguration, FunctionExecutorServerFactory}; +pub use subprocess::SubprocessFunctionExecutorServer; + +#[derive(Debug)] +pub struct FunctionExecutorInitializationResult { + pub is_timeout: bool, + pub is_oom: bool, + pub response: Option, +} + +pub struct FunctionExecutor { + server_factory: Arc, + server: Option>, + channel: Option, + metrics: Arc>, + health_checker: Option>, +} + +impl FunctionExecutor { + pub fn new( + server_factory: Arc, + metrics: Arc>, + ) -> Self { + Self { + server_factory, + server: None, + channel: None, + metrics, + health_checker: None, + } + } + + pub fn channel(&self) -> Option<&Channel> { + self.channel.as_ref() + } + + pub async fn server_status(&self) -> Option { + self.server + .as_ref() + .map(|_| Some(FunctionExecutorServerStatus::Running))? + } + + pub async fn initialize( + &mut self, + config: FunctionExecutorServerConfiguration, + initialize_request: InitializeRequest, + customer_code_timeout_sec: f64, + ) -> Result> { + let server = self.server_factory.create(config).await?; + self.server = Some(server); + + self.channel = Some(self.establish_channel(customer_code_timeout_sec).await?); + if let Some(channel) = self.channel.as_ref() { + self.health_checker = Some(Arc::new(HealthChecker::new(channel.clone()))); + } + + match self.channel() { + Some(channel) => { + let mut client = FunctionExecutorClient::new(channel.clone()); + + match tokio::time::timeout( + Duration::from_secs_f64(customer_code_timeout_sec), + client.initialize(initialize_request), + ) + .await + { + Ok(Ok(response)) => Ok(FunctionExecutorInitializationResult { + is_timeout: false, + is_oom: false, + response: Some(response.into_inner()), + }), + Ok(Err(err)) => { + error!("Initialize RPC failed: {:?}", err); + if err.code() == tonic::Code::DeadlineExceeded { + Ok(FunctionExecutorInitializationResult { + is_timeout: true, + is_oom: false, + response: None, + }) + } else { + let is_oom = if let Some(server) = self.server.as_ref() { + server.status().await == FunctionExecutorServerStatus::OomKilled + } else { + false + }; + + Ok(FunctionExecutorInitializationResult { + is_timeout: false, + is_oom, + response: None, + }) + } + } + Err(_) => Ok(FunctionExecutorInitializationResult { + is_timeout: true, + is_oom: false, + response: None, + }), + } + } + None => Ok(FunctionExecutorInitializationResult { + is_timeout: true, + is_oom: false, + response: None, + }), + } + } + + pub async fn destroy(&mut self) -> Result<(), Box> { + if let Some(_channel) = self.channel.take() { + // Channel will be dropped + } + + if let Some(mut server) = self.server.take() { + server.kill().await?; + } + + Ok(()) + } + + async fn establish_channel( + &mut self, + customer_code_timeout_sec: f64, + ) -> Result> { + if let Some(server) = self.server.as_ref() { + let address = format!("http://{}", server.address().to_string()); + let channel = Channel::from_shared(address)? + .connect_timeout(Duration::from_secs_f64(customer_code_timeout_sec)) + .connect() + .await?; + Ok(channel) + } else { + Err("Server not initialized".into()) + } + } + + async fn server_info(&self, channel: &Channel) -> Result<(), Box> { + let mut client = FunctionExecutorClient::new(channel.clone()); + let _info = client.get_info(InfoRequest {}).await?.into_inner(); + // TODO: set labels + Ok(()) + } +} diff --git a/dataplane/src/executor/function_executor/server.rs b/dataplane/src/executor/function_executor/server.rs new file mode 100644 index 000000000..8107e7d19 --- /dev/null +++ b/dataplane/src/executor/function_executor/server.rs @@ -0,0 +1,19 @@ +use anyhow::Result; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FunctionExecutorServerStatus { + Running, + Exited, + OomKilled, +} + +#[async_trait::async_trait] +pub trait FunctionExecutorServer: Send + Sync { + async fn status(&self) -> FunctionExecutorServerStatus; + + fn address(&self) -> &str; + + fn pid(&self) -> Option; + + async fn kill(&mut self) -> Result<()>; +} diff --git a/dataplane/src/executor/function_executor/server_factory.rs b/dataplane/src/executor/function_executor/server_factory.rs new file mode 100644 index 000000000..321b6655e --- /dev/null +++ b/dataplane/src/executor/function_executor/server_factory.rs @@ -0,0 +1,90 @@ +use std::net::TcpListener; +use std::process::Stdio; + +use tokio::process::Command; + +use crate::executor::function_executor::server::FunctionExecutorServer; + +#[derive(Debug, Clone)] +pub struct FunctionExecutorServerConfiguration { + pub executor_id: String, + pub function_executor_id: String, + pub namespace: String, + pub application_name: String, + pub application_version: String, + pub function_name: String, + pub secret_names: Vec, + pub cpu_ms_per_sec: u32, + pub memory_bytes: u64, + pub disk_bytes: u64, + pub gpu_count: u32, +} + +#[async_trait::async_trait] +pub trait FunctionExecutorServerFactory: Send + Sync { + async fn create( + &self, + config: FunctionExecutorServerConfiguration, + ) -> Result, Box>; + + async fn destroy(&self) -> Result<(), Box>; +} + +pub struct SubprocessFunctionExecutorServerFactory { + verbose_logs: bool, +} + +impl SubprocessFunctionExecutorServerFactory { + pub fn new(verbose_logs: bool) -> Self { + Self { verbose_logs } + } + + fn find_free_port() -> Result> { + let listener = TcpListener::bind("127.0.0.1:0")?; + let port = listener.local_addr()?.port(); + Ok(port) + } + + fn server_address(port: u16) -> String { + format!("localhost:{}", port) + } +} + +#[async_trait::async_trait] +impl FunctionExecutorServerFactory for SubprocessFunctionExecutorServerFactory { + async fn create( + &self, + config: FunctionExecutorServerConfiguration, + ) -> Result, Box> { + let port = Self::find_free_port()?; + let address = Self::server_address(port); + + let mut args = vec![ + format!("--executor-id={}", config.executor_id), + format!("--function-executor-id={}", config.function_executor_id), + "--address".to_string(), + address.clone(), + ]; + + if self.verbose_logs { + args.push("--dev".to_string()); + } + + let child = Command::new("function-executor") + .args(&args) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .spawn()?; + + Ok(Box::new( + crate::executor::function_executor::subprocess::SubprocessFunctionExecutorServer::new( + child, address, + ), + )) + } + + async fn destroy(&self) -> Result<(), Box> { + // TODO: Implement server destruction + Ok(()) + } +} diff --git a/dataplane/src/executor/function_executor/subprocess.rs b/dataplane/src/executor/function_executor/subprocess.rs new file mode 100644 index 000000000..232896e25 --- /dev/null +++ b/dataplane/src/executor/function_executor/subprocess.rs @@ -0,0 +1,37 @@ +use anyhow::Result; +use tokio::process::Child; + +use super::server::{FunctionExecutorServer, FunctionExecutorServerStatus}; + +pub struct SubprocessFunctionExecutorServer { + process: Child, + address: String, +} + +impl SubprocessFunctionExecutorServer { + pub fn new(process: Child, address: String) -> Self { + Self { process, address } + } +} + +#[async_trait::async_trait] +impl FunctionExecutorServer for SubprocessFunctionExecutorServer { + async fn status(&self) -> FunctionExecutorServerStatus { + // TODO: implement proper status checking + FunctionExecutorServerStatus::Running + } + + fn address(&self) -> &str { + &self.address + } + + fn pid(&self) -> Option { + self.process.id() + } + + async fn kill(&mut self) -> Result<()> { + self.process.kill().await?; + self.process.wait().await?; + Ok(()) + } +} diff --git a/dataplane/src/executor/function_executor_controller/allocation.rs b/dataplane/src/executor/function_executor_controller/allocation.rs new file mode 100644 index 000000000..564de9728 --- /dev/null +++ b/dataplane/src/executor/function_executor_controller/allocation.rs @@ -0,0 +1,592 @@ +use std::{collections::HashMap, time::Duration}; + +use tokio::time::Instant; +use tracing::info; + +use crate::executor::{ + blob_store::{BlobStore, BlobStoreImpl}, + executor_api::{ + executor_api_pb::{ + execution_plan_update, function_arg::Source, Allocation, AllocationFailureReason, + AllocationOutcomeCode, DataPayload, ExecutionPlanUpdate, ExecutionPlanUpdates, + FunctionArg, FunctionCall, FunctionExecutorTerminationReason, FunctionRef, + }, + ChannelManager, ExecutorStateReporter, + }, + function_executor::{ + function_executor_service::{ + allocation_result, allocation_update::Update, + function_executor_client::FunctionExecutorClient, Allocation as FEAllocation, + AllocationFailureReason as FEAllocationFailureReason, AllocationFunctionCall, + AllocationOutcomeCode as FEAllocationOutcomeCode, AllocationOutputBlob, + AllocationOutputBlobRequest, AllocationProgress, AllocationResult, AllocationUpdate, + Blob, CreateAllocationRequest, DeleteAllocationRequest, + ExecutionPlanUpdates as FEExecutionPlanUpdates, FunctionCall as FEFunctionCall, + FunctionInputs, WatchAllocationStateRequest, + }, + FunctionExecutor, + }, + function_executor_controller::blob_utils::presign_write_only_blob, +}; + +const CREATE_ALLOCATION_TIMEOUT_SECS: u32 = 5; +const SEND_ALLOCATION_UPDATE_TIMEOUT_SECS: u32 = 5; +const DELETE_ALLOCATION_TIMEOUT_SECS: u32 = 5; + +const SERVER_CALL_FUNCTION_RPC_TIMEOUT_SECS: u32 = 5; +const SERVER_CALL_FUNCTION_RPC_BACKOFF_SECS: u32 = 2; +const SERVER_CALL_FUNCTION_RPC_MAX_RETRIES: u32 = 3; + +pub struct AllocationInput { + pub function_inputs: FunctionInputs, + pub request_error_blob_uri: String, + pub request_error_blob_upload_uri: String, +} + +impl AllocationInput { + pub fn new( + function_inputs: FunctionInputs, + request_error_blob_uri: String, + request_error_blob_upload_uri: String, + ) -> Self { + Self { + function_inputs, + request_error_blob_uri, + request_error_blob_upload_uri, + } + } +} + +pub struct AllocationOutput { + pub outcome_code: AllocationOutcomeCode, + pub failure_reason: Option, + pub fe_result: Option, + pub function_outputs_blob_uri: Option, + pub execution_duration_ms: Option, +} + +impl AllocationOutput { + pub fn new( + fe_result: Option, + function_outputs_blob_uri: Option, + execution_duration_ms: Option, + ) -> Self { + let outcome_code = if let Some(result) = fe_result { + Some(result.outcome_code()) + } else { + None + }; + let failure_reason = if let Some(result) = fe_result { + Some(result.failure_reason()) + } else { + None + }; + Self { + outcome_code, + failure_reason, + fe_result, + function_outputs_blob_uri, + execution_duration_ms, + } + } + + pub fn internal_error(&self, execution_duration_ms: Option) -> AllocationOutput { + Self { + outcome_code: Some(AllocationOutcomeCode::Failure), + failure_reason: Some(AllocationFailureReason::InternalError), + fe_result: None, + function_outputs_blob_uri: None, + execution_duration_ms, + } + } + + pub fn function_timeout(&self, execution_duration_ms: Option) -> AllocationOutput { + Self { + outcome_code: Some(AllocationOutcomeCode::Failure), + failure_reason: Some(AllocationFailureReason::FunctionTimeout), + fe_result: None, + function_outputs_blob_uri: None, + execution_duration_ms, + } + } + + pub fn function_error_with_healthy_function_executor( + &self, + execution_duration_ms: Option, + ) -> AllocationOutput { + Self { + outcome_code: Some(AllocationOutcomeCode::Failure), + failure_reason: Some(AllocationFailureReason::FunctionError), + fe_result: None, + function_outputs_blob_uri: None, + execution_duration_ms, + } + } + + pub fn function_executor_is_in_undefined_state_after_running_allocation( + &self, + execution_duration_ms: Option, + ) -> AllocationOutput { + Self { + outcome_code: Some(AllocationOutcomeCode::Failure), + failure_reason: Some(AllocationFailureReason::FunctionError), + fe_result: None, + function_outputs_blob_uri: None, + execution_duration_ms, + } + } + + pub fn allocation_cancelled(&self, execution_duration_ms: Option) -> AllocationOutput { + Self { + outcome_code: Some(AllocationOutcomeCode::Failure), + failure_reason: Some(AllocationFailureReason::AllocationCancelled), + fe_result: None, + function_outputs_blob_uri: None, + execution_duration_ms, + } + } + + pub fn allocation_didn_run_because_function_executor_terminated(&self) -> AllocationOutput { + Self { + outcome_code: Some(AllocationOutcomeCode::Failure), + failure_reason: Some(AllocationFailureReason::FunctionExecutorTerminated), + fe_result: None, + function_outputs_blob_uri: None, + execution_duration_ms: None, + } + } + + pub fn allocation_ran_out_of_memory( + &self, + execution_duration_ms: Option, + ) -> AllocationOutput { + Self { + outcome_code: Some(AllocationOutcomeCode::Failure), + failure_reason: Some(AllocationFailureReason::Oom), + fe_result: None, + function_outputs_blob_uri: None, + execution_duration_ms, + } + } + + pub fn allocation_didn_run_because_function_executor_startup_failed( + &self, + fe_termination_reason: FunctionExecutorTerminationReason, + ) -> AllocationOutput { + let failure_reason = match fe_termination_reason { + FunctionExecutorTerminationReason::Unknown => Some(AllocationFailureReason::Unknown), + FunctionExecutorTerminationReason::StartupFailedInternalError => { + Some(AllocationFailureReason::InternalError) + } + FunctionExecutorTerminationReason::StartupFailedFunctionError => { + Some(AllocationFailureReason::FunctionError) + } + FunctionExecutorTerminationReason::StartupFailedFunctionTimeout => { + Some(AllocationFailureReason::FunctionTimeout) + } + FunctionExecutorTerminationReason::Unhealthy => { + Some(AllocationFailureReason::Unhealthy) + } + FunctionExecutorTerminationReason::FunctionCancelled => { + Some(AllocationFailureReason::FunctionCancelled) + } + FunctionExecutorTerminationReason::InternalError => { + Some(AllocationFailureReason::InternalError) + } + FunctionExecutorTerminationReason::FunctionTimeout => { + Some(AllocationFailureReason::FunctionTimeout) + } + FunctionExecutorTerminationReason::Oom => Some(AllocationFailureReason::Oom), + }; + Self { + outcome_code: Some(AllocationOutcomeCode::Failure), + failure_reason, + fe_result: None, + function_outputs_blob_uri: None, + execution_duration_ms: None, + } + } +} + +fn to_server_alloc_outcome(code: FEAllocationOutcomeCode) -> AllocationOutcomeCode { + match code { + FEAllocationOutcomeCode::Unknown => AllocationOutcomeCode::Unknown, + FEAllocationOutcomeCode::Success => AllocationOutcomeCode::Success, + FEAllocationOutcomeCode::Failure => AllocationOutcomeCode::Failure, + } +} + +fn to_server_alloc_failure_reason( + code: FEAllocationFailureReason, +) -> Option { + match code { + FEAllocationFailureReason::Unknown => Some(AllocationFailureReason::Unknown), + FEAllocationFailureReason::InternalError => Some(AllocationFailureReason::InternalError), + FEAllocationFailureReason::FunctionError => Some(AllocationFailureReason::FunctionError), + FEAllocationFailureReason::RequestError => Some(AllocationFailureReason::RequestError), + } +} + +pub struct AllocationInfo { + pub allocation: Allocation, + pub allocation_timeout_ms: u64, + pub start_time: f64, + pub prepared_time: f64, + pub is_cancelled: bool, + pub input: Option, + pub output: Option, + pub is_completed: bool, +} + +impl AllocationInfo { + pub fn new(allocation: Allocation, allocation_timeout_ms: u64, start_time: f64) -> Self { + Self { + allocation, + allocation_timeout_ms, + start_time, + prepared_time: 0.0, + is_cancelled: false, + input: None, + output: None, + is_completed: false, + } + } +} + +struct BlobInfo { + id: String, + uri: String, + upload_id: String, +} + +struct RunAllocationOnFunctionExecutorResult { + fe_result: AllocationResult, + execution_end_time: f64, + function_outputs_blob_uri: Option, +} + +pub struct AllocationRunner { + alloc_info: AllocationInfo, + function_executor: FunctionExecutor, + blob_store: BlobStore, + state_reporter: ExecutorStateReporter, + // state_reconciler: ExecutorStateReconciler, + channel_manager: ChannelManager, + pending_output_blobs: HashMap, + // append-only + // TODO: use a bitvec + started_function_call_ids: Vec, + // pending_function_call_watchers: HashMap, + pending_request_state_read_ops: Vec, + // pending_request_state_write_ops: HashMap, +} + +impl AllocationRunner { + pub fn new( + alloc_info: AllocationInfo, + function_executor: FunctionExecutor, + blob_store: BlobStore, + state_reporter: ExecutorStateReporter, + // state_reconciler: ExecutorStateReconciler, + channel_manager: ChannelManager, + ) -> Self { + Self { + alloc_info, + function_executor, + blob_store, + state_reporter, + // state_reconciler, + channel_manager, + pending_output_blobs: HashMap::new(), + // append-only + // TODO: use a bitvec + started_function_call_ids: Vec::new(), + // pending_function_call_watchers: HashMap::new(), + pending_request_state_read_ops: Vec::new(), + // pending_request_state_write_ops: HashMap::new(), + } + } + + pub async fn run(&mut self) -> Result { + Ok(self.alloc_info.clone()) + } + + async fn run_alloc_on_fe( + &mut self, + ) -> Result> { + let no_progress_timeout = Duration::from_millis(self.alloc_info.allocation_timeout_ms); + + let channel = self + .function_executor + .channel() + .ok_or("Channel is None")? + .clone(); + + let fe_alloc = FEAllocation { + request_id: self.alloc_info.allocation.request_id.clone(), + function_call_id: self.alloc_info.allocation.function_call_id.clone(), + allocation_id: self.alloc_info.allocation.allocation_id.clone(), + inputs: self.alloc_info.input.function_inputs.clone(), + result: None, + }; + + let mut client = FunctionExecutorClient::new(channel); + + let create_request = CreateAllocationRequest { + allocation: Some(fe_alloc), + }; + + tokio::time::timeout( + Duration::from_secs(CREATE_ALLOCATION_TIMEOUT_SECS), + client.create_allocation(create_request), + ) + .await?; + + let watch_request = WatchAllocationStateRequest { + allocation_id: self.alloc_info.allocation.allocation_id.clone(), + }; + + let mut stream = client + .watch_allocation_state(watch_request) + .await? + .into_inner(); + + let mut previous_progress: Option = None; + let mut allocation_result: Option = None; + let mut deadline = Instant::now() + no_progress_timeout; + + loop { + let timeout_duration = deadline.saturating_duration_since(Instant::now()); + + let response = match tokio::time::timeout(timeout_duration, stream.message()).await { + Ok(Ok(Some(state))) => state, + Ok(Ok(None)) => break, // EOF + Ok(Err(e)) => { + // return Err(AllocationError::FailedLeavingFEInUndefinedState(e).into()); + } + Err(_) => { + // return Err(AllocationError::Timeout.into()); + } + }; + + if let Some(progress) = &response.progress { + if previous_progress.as_ref() != Some(progress) { + deadline = Instant::now() + no_progress_timeout; + } + previous_progress = Some(progress.clone()); + } + + let mut fe_output_blob_requests: HashMap = + HashMap::new(); + for output_blob_request in response.output_blob_requests { + // TODO: validate fe output blob req + fe_output_blob_requests + .insert(output_blob_request.id().to_string(), output_blob_request); + if !self + .pending_output_blobs + .contains_key(output_blob_request.id().to_string()) + { + let output_blob = self.create_output_blob(output_blob_request).await?; + let update = AllocationUpdate { + allocation_id: Some(self.alloc_info.allocation.allocation_id().to_string()), + update: Some(Update::OutputBlob(AllocationOutputBlob { + status: None, + blob: Some(output_blob), + })), + }; + tokio::time::timeout( + Duration::from_secs(SEND_ALLOCATION_UPDATE_TIMEOUT_SECS), + client.send_allocation_update(update), + ) + .await?; + } + } + let mut fe_function_calls: HashMap = HashMap::new(); + for function_call in response.function_calls { + if let Some(updates) = function_call.updates { + if let Some(root_function_call_id) = updates.root_function_call_id { + fe_function_calls.insert(root_function_call_id, function_call) + } + } + if let Some(args_blob) = function_call.args_blob { + let blob_info = self.pending_output_blobs.get(args_blob.id()); + + if let Some(blob_info) = blob_info { + let mut parts_etag: Vec = Vec::new(); + for blob_chunk in args_blob.chunks { + parts_etag.push(blob_chunk.etag().to_string()) + } + self.blob_store.complete_multipart_upload( + &blob_info.uri, + &blob_info.upload_id, + parts_etag, + ); + } + } + } + // TODO: self.reconcile_function_call_watchers(&mut client, &response.function_call_watchers) + // .await?; + + if response.request_state_operations.is_some() { + // TODO: self.reconcile_request_state_operations( + // &mut client, + // &response.request_state_operations, + // ) + // .await?; + } + + // if let Some(result) = response.result { + // allocation_result = Some(result); + // break; + // } + } + + let delete_request = DeleteAllocationRequest { + allocation_id: self.alloc_info.allocation.allocation_id.clone(), + }; + + tokio::time::timeout( + Duration::from_secs(DELETE_ALLOCATION_TIMEOUT_SECS), + client.delete_allocation(delete_request), + ) + .await?; + + let execution_end_time = Instant::now(); + + let mut function_outputs_blob_uri: Option = None; + + if let Some(result) = &allocation_result { + if let Some(uploaded_blob) = &result.uploaded_function_outputs_blob { + let blob_info = self + .pending_output_blobs + .get(&uploaded_blob.id) + .ok_or_else(|| { + info!("failing allocation because its outputs blob is not found"); + })?; + + function_outputs_blob_uri = Some(blob_info.uri.clone()); + + let mut parts_etag: Vec = Vec::new(); + for blob_chunk in uploaded_blob.chunks { + parts_etag.push(blob_chunk.etag().to_string()) + } + self.blob_store + .complete_multipart_upload(&blob_info.uri, &blob_info.upload_id, parts_etag) + .await?; + self.pending_output_blobs.remove(uploaded_blob.id()); + + if let Some(outputs) = &result.outputs { + if let Some(allocation_result::Outputs::Updates(updates)) = outputs { + to_server_execution_plan_updates( + updates, + function_outputs_blob_uri.as_ref(), + ) + .map_err(|e| { + info!( + "failing allocation because its FE execution plan updates are invalid", + ); + })?; + } + } + } + } + + Ok(RunAllocationOnFunctionExecutorResult { + fe_result: allocation_result.ok_or("No allocation result")?, + execution_end_time, + function_outputs_blob_uri, + }) + } + + async fn create_output_blob( + &self, + fe_output_blob_request: AllocationOutputBlobRequest, + ) -> Result> { + let blob_uri = format!( + "{}.{}.{}.output", + self.alloc_info.allocation.request_data_payload_uri_prefix(), + self.alloc_info.allocation.allocation_id(), + fe_output_blob_request.id() + ); + let function_outputs_blob_upload_id = + self.blob_store.create_multipart_upload(&blob_uri).await?; + let blob_info = BlobInfo { + id: fe_output_blob_request.id().to_string(), + uri: blob_uri, + upload_id: function_outputs_blob_upload_id, + }; + + self.pending_output_blobs + .insert(fe_output_blob_request.id().to_string(), blob_info); + + let blob = presign_write_only_blob( + &blob_info.id, + &blob_info.uri, + &blob_info.upload_id, + fe_output_blob_request.size(), + self.blob_store, + ) + .await?; + Ok(blob) + } +} + +fn to_server_execution_plan_updates( + fe_execution_plan_updates: FEExecutionPlanUpdates, + args_blob_uri: Option<&str>, +) -> ExecutionPlanUpdates { + let mut server_execution_plan_updates: Vec = Vec::new(); + + for fe_update in fe_execution_plan_updates.updates { + server_execution_plan_updates.push(ExecutionPlanUpdate { + op: execution_plan_update::Op::FunctionCall(to_server_function_call( + fe_update.op, + args_blob_uri, + )), + }); + } +} + +fn to_server_function_call( + fe_function_call: FEFunctionCall, + args_blob_uri: Option<&str>, +) -> FunctionCall { + let mut server_args: Vec = Vec::new(); + for fe_arg in fe_function_call.args { + if let Some(allocation_result::Outputs::Value(value)) = fe_arg { + if let Some(manifest) = value.manifest { + let arg = FunctionArg { + source: Some(Source::InlineData(DataPayload { + id: None, + uri: Some(args_blob_uri), + encoding: manifest.encoding, + encoding_version: manifest.encoding_version, + content_type: manifest.content_type, + metadata_size: manifest.metadata_size, + offset: value.offset, + size: manifest.size, + sha256_hash: manifest.sha256_hash, + source_function_call_id: manifest.source_function_call_id, + })), + }; + server_args.push(value); + } + } + } + let target = if let Some(val) = fe_function_call.target { + FunctionRef { + namespace: val.namespace, + application_name: val.application_name, + application_version: val.application_version, + function_name: val.function_name, + } + } else { + None + }; + FunctionCall { + id: fe_function_call.id, + target, + args: server_args, + call_metadata: fe_function_call.call_metadata, + } +} diff --git a/dataplane/src/executor/function_executor_controller/blob_utils.rs b/dataplane/src/executor/function_executor_controller/blob_utils.rs new file mode 100644 index 000000000..a437a5252 --- /dev/null +++ b/dataplane/src/executor/function_executor_controller/blob_utils.rs @@ -0,0 +1,81 @@ +use crate::executor::{ + blob_store::{BlobStore, BlobStoreImpl}, + function_executor::function_executor_service::{Blob, BlobChunk}, +}; + +const MAX_PRESIGNED_URI_EXPIRATION_SEC: u64 = 7 * 24 * 60 * 60; +const BLOB_OPTIMAL_CHUNK_SIZE_BYTES: u64 = 100 * 1024 * 1024; +const OUTPUT_BLOB_OPTIMAL_CHUNKS_COUNT: u64 = 100; +const OUTPUT_BLOB_SLOWER_CHUNK_SIZE_BYTES: u64 = 1 * 1024 * 1024 * 1024; + +// pub async fn presign_read_only_blob( +// blob_uri: &str, +// size: u32, +// blob_store: BlobStore, +// ) -> Result> { +// let mut chunks: Vec = Vec::new(); + +// while let mut chunk_total_size = 0 < size { +// let upload_chunk_uri = blob_store +// .presign_upload_part_uri( +// blob_uri, +// chunks.len() + 1 as i32, +// upload_id, +// MAX_PRESIGNED_URI_EXPIRATION_SEC, +// ) +// .await?; +// let chunk_size = if chunks.len() < OUTPUT_BLOB_OPTIMAL_CHUNKS_COUNT { +// BLOB_OPTIMAL_CHUNK_SIZE_BYTES +// } else { +// OUTPUT_BLOB_SLOWER_CHUNK_SIZE_BYTES +// }; +// chunk_total_size += chunk_size; +// chunks.push(BlobChunk { +// uri: Some(upload_chunk_uri), +// size: Some(chunk_size), +// etag: None, +// }); +// } + +// return Blob { +// id: Some(blob_id.to_string()), +// chunks, +// }; +// } + +pub async fn presign_write_only_blob( + blob_id: &str, + blob_uri: &str, + upload_id: &str, + size: u64, + blob_store: BlobStore, +) -> Result> { + let mut chunks: Vec = Vec::new(); + + while let mut chunk_total_size = 0 < size { + let upload_chunk_uri = blob_store + .presign_upload_part_uri( + blob_uri, + chunks.len() + 1 as i32, + upload_id, + MAX_PRESIGNED_URI_EXPIRATION_SEC, + ) + .await?; + let chunk_size = if chunks.len() < OUTPUT_BLOB_OPTIMAL_CHUNKS_COUNT { + BLOB_OPTIMAL_CHUNK_SIZE_BYTES + } else { + OUTPUT_BLOB_SLOWER_CHUNK_SIZE_BYTES + }; + chunk_total_size += chunk_size; + chunks.push(BlobChunk { + uri: Some(upload_chunk_uri), + size: Some(chunk_size), + etag: None, + }); + } + + return Blob { + id: Some(blob_id.to_string()), + chunks, + }; +} diff --git a/dataplane/src/executor/function_executor_controller/mod.rs b/dataplane/src/executor/function_executor_controller/mod.rs new file mode 100644 index 000000000..0b50cb5f3 --- /dev/null +++ b/dataplane/src/executor/function_executor_controller/mod.rs @@ -0,0 +1,110 @@ +use std::{collections::HashMap, path::PathBuf, sync::Arc}; + +use crate::executor::{ + blob_store::BlobStore, + executor_api::{ + executor_api_pb::{ + Allocation, FunctionExecutorDescription, FunctionExecutorState, FunctionExecutorStatus, + }, + ChannelManager, ExecutorStateReporter, + }, + function_executor::{ + server_factory::SubprocessFunctionExecutorServerFactory, FunctionExecutor, + }, + function_executor_controller::allocation::AllocationInfo, +}; + +pub mod allocation; +pub mod blob_utils; + +pub enum FEControllerState { + NotStarted, + StartingUp, + Running, + Terminating, + Terminated, +} + +pub struct FunctionExecutorController { + executor_id: String, + fe_description: FunctionExecutorDescription, + fe_server_factory: SubprocessFunctionExecutorServerFactory, + channel_manager: Arc, + state_reporter: ExecutorStateReporter, + state_reconciler: ExecutorStateReconciler, + blob_store: BlobStore, + cache_path: PathBuf, + fe: FunctionExecutor, + internal_state: FEControllerState, + reported_state: FunctionExecutorState, + events: Vec>, + allocations: HashMap, + runnable_allocations: Vec, + running_allocations: Vec, +} + +impl FunctionExecutorController { + pub fn new( + executor_id: String, + fe_description: FunctionExecutorDescription, + fe_server_factory: SubprocessFunctionExecutorServerFactory, + channel_manager: Arc, + state_reporter: ExecutorStateReporter, + state_reconciler: ExecutorStateReconciler, + blob_store: BlobStore, + cache_path: PathBuf, + ) -> Self { + let fe = fe_server_factory.create_function_executor( + executor_id.clone(), + fe_description.clone(), + channel_manager.clone(), + state_reconciler.clone(), + blob_store.clone(), + cache_path.clone(), + ); + + Self { + executor_id, + fe_description, + fe_server_factory, + channel_manager, + state_reporter, + state_reconciler, + blob_store, + cache_path, + fe, + internal_state: FEControllerState::NotStarted, + reported_state: FunctionExecutorState { + description: fe_description, + status: FunctionExecutorStatus::Unknown, + termination_reason: None, + allocation_ids_caused_termination: Vec::new(), + }, + events: Vec::new(), + allocations: HashMap::new(), + runnable_allocations: Vec::new(), + running_allocations: Vec::new(), + } + } + + pub fn function_executor_id(&self) -> &str { + &self.fe_description.id() + } + + pub fn add_allocation(&self, allocation: Allocation) { + // TODO + } + + pub fn has_allocation(&self, allocation_id: &str) -> bool { + self.allocations.contains_key(allocation_id) + } + + pub fn remove_allocation(&self, allocation_id: &str) { + if let Some(alloc_info) = self.allocations.get(allocation_id) { + if alloc_info.is_completed { + return; + } + alloc_info.is_cancelled = true; + } + } +} diff --git a/dataplane/src/executor/host_resources/mod.rs b/dataplane/src/executor/host_resources/mod.rs new file mode 100644 index 000000000..de5353b51 --- /dev/null +++ b/dataplane/src/executor/host_resources/mod.rs @@ -0,0 +1,101 @@ +use std::path::Path; + +use sysinfo::{Disks, System}; + +use crate::executor::{ + executor_api::executor_api_pb::{GpuResources, HostResources as HostResourcesProto}, + host_resources::nvidia_gpu::{NvidiaGpuAllocator, NvidiaGpuInfo}, +}; + +pub mod nvidia_gpu; + +#[derive(Debug, Clone)] +pub struct HostResources { + cpu_count: u32, + memory_mb: u64, + disk_mb: u64, + gpus: Vec, +} + +impl HostResources { + pub fn to_proto(&self) -> HostResourcesProto { + let gpu = if self.gpus.len() > 0 { + let model = Some(self.gpus[0].model as i32); + Some(GpuResources { + count: Some(self.gpus.len() as u32), + model, + }) + } else { + None + }; + HostResourcesProto { + cpu_count: Some(self.cpu_count), + memory_bytes: Some(self.memory_mb * 1024 * 1024), + disk_bytes: Some(self.disk_mb * 1024 * 1024), + gpu, + } + } +} + +#[derive(Debug, Clone)] +pub struct HostResourcesProvider { + gpu_allocator: NvidiaGpuAllocator, + function_executors_ephemeral_disk_path: String, + host_overhead_cpus: u32, + host_overhead_memory_gb: u32, + host_overhead_function_executors_ephemeral_disks_gb: u32, +} + +impl HostResourcesProvider { + pub fn new( + gpu_allocator: NvidiaGpuAllocator, + function_executors_ephemeral_disk_path: String, + host_overhead_cpus: u32, + host_overhead_memory_gb: u32, + host_overhead_function_executors_ephemeral_disks_gb: u32, + ) -> Self { + Self { + gpu_allocator, + function_executors_ephemeral_disk_path, + host_overhead_cpus, + host_overhead_memory_gb, + host_overhead_function_executors_ephemeral_disks_gb, + } + } + + pub fn total_host_resources(&self) -> HostResources { + let mut sys = System::new_all(); + sys.refresh_all(); + let gpus = self.gpu_allocator.list_all(); + let cpu_count = sys.cpus().len() as u32; + let memory_mb = (self.host_overhead_memory_gb * 1024) as u64; + let mut disk_mb: u64 = 0; + let disks = Disks::new_with_refreshed_list(); + for disk in &disks { + if disk.mount_point().to_str() + == Path::new(&self.function_executors_ephemeral_disk_path).to_str() + { + disk_mb += disk.available_space() as u64 / 1024 / 1024; + } + } + HostResources { + cpu_count, + memory_mb, + disk_mb, + gpus, + } + } + + pub fn total_function_executor_resources(&self) -> HostResources { + let total_resources = self.total_host_resources(); + HostResources { + cpu_count: (total_resources.cpu_count - self.host_overhead_cpus as u32).max(0), + memory_mb: (total_resources.memory_mb - (self.host_overhead_memory_gb * 1024) as u64) + .max(0), + disk_mb: (total_resources.disk_mb + - (self.host_overhead_function_executors_ephemeral_disks_gb * 1024) as u64) + .max(0), + gpus: total_resources.gpus.clone(), + } + } +} diff --git a/dataplane/src/executor/host_resources/nvidia_gpu.rs b/dataplane/src/executor/host_resources/nvidia_gpu.rs new file mode 100644 index 000000000..8ca101a71 --- /dev/null +++ b/dataplane/src/executor/host_resources/nvidia_gpu.rs @@ -0,0 +1,112 @@ +use tokio::process::Command; + +use crate::executor::executor_api::executor_api_pb::GpuModel; + +pub fn product_name_to_model(product_name: &str) -> GpuModel { + if product_name.starts_with("NVIDIA A100") && product_name.ends_with("40GB") { + GpuModel::NvidiaA10040gb + } else if product_name.starts_with("NVIDIA A100") && product_name.ends_with("80GB") { + GpuModel::NvidiaA10080gb + } else if product_name.starts_with("NVIDIA H100") && product_name.contains("80GB") { + GpuModel::NvidiaH10080gb + } else if product_name.starts_with("NVIDIA Tesla T4") { + GpuModel::NvidiaTeslaT4 + } else if product_name.starts_with("NVIDIA A6000") { + GpuModel::NvidiaA6000 + } else if product_name.starts_with("NVIDIA A10") { + GpuModel::NvidiaA10 + } else { + GpuModel::Unknown + } +} + +#[derive(Debug, Clone)] +pub struct NvidiaGpuInfo { + pub index: String, + pub uuid: String, + pub product_name: String, + pub model: GpuModel, +} + +pub async fn nvidia_gpus_are_available() -> bool { + Command::new("nvidia-smi").output().await.is_ok() +} + +pub async fn fetch_nvidia_gpu_info() -> Result, Box> { + let output = Command::new("nvidia-smi") + .args(&["--query-gpu=index,name,uuid", "--format=csv,noheader"]) + .output() + .await?; + String::from_utf8(output.stdout)? + .lines() + .filter_map(|line| { + let parts: Vec<&str> = line.split(',').collect(); + let model = product_name_to_model(parts[1].trim()); + if model == GpuModel::Unknown { + return None; + } + Some(Ok(NvidiaGpuInfo { + index: parts[0].trim().to_string(), + uuid: parts[2].trim().to_string(), + product_name: parts[1].trim().to_string(), + model, + })) + }) + .collect() +} + +#[derive(Debug, Clone)] +pub struct NvidiaGpuAllocator { + all_gpus: Vec, + free_gpus: Vec, +} + +impl NvidiaGpuAllocator { + pub fn new(gpus: Vec) -> Self { + let free_gpus = gpus.clone(); + Self { + all_gpus: gpus, + free_gpus, + } + } + + // Use Error over None here + pub fn allocate(&mut self, count: usize) -> Result, Error> { + if count > self.free_gpus.len() { + Err(Error::Other(format!( + "Not enough free GPUs available, requested {count}, avalaible={}", + self.free_gpus.len() + ))) + } else { + let allocated_gpus = self.free_gpus.drain(..count).collect(); + Ok(allocated_gpus) + } + } + + pub fn deallocate(&mut self, gpus: Vec) { + self.free_gpus.extend(gpus); + } + + pub fn list_all(&self) -> Vec { + self.all_gpus.clone() + } + + pub fn list_free(&self) -> Vec { + self.free_gpus.clone() + } +} + +#[derive(Debug)] +pub enum Error { + Other(String), +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Error::Other(msg) => write!(f, "{}", msg), + } + } +} + +impl std::error::Error for Error {} diff --git a/dataplane/src/executor/metrics.rs b/dataplane/src/executor/metrics.rs new file mode 100644 index 000000000..8da1bca46 --- /dev/null +++ b/dataplane/src/executor/metrics.rs @@ -0,0 +1,160 @@ +use std::hash::Hash; + +use prometheus_client::{ + encoding::{EncodeLabelSet, EncodeLabelValue}, + metrics::{counter::Counter, family::Family, gauge::Gauge, histogram::Histogram, info::Info}, + registry::Registry, +}; + +#[derive(Debug)] +pub struct Metrics { + pub(crate) healthy_gauge: Gauge, + pub(crate) grpc_server_channel_creations: Counter, + pub(crate) grpc_server_channel_creation_retries: Counter, + pub(crate) grpc_server_channel_creation_latency: Histogram, + pub(crate) executor_info: Info>, + pub(crate) executor_state: Family, + pub(crate) executor_events_pushed: Counter, + pub(crate) executor_event_push_errors: Counter, + pub(crate) desired_state_streams: Counter, + pub(crate) desired_state_stream_errors: Counter, + pub(crate) state_reconciliations: Counter, + pub(crate) state_reconciliation_errors: Counter, + pub(crate) state_reconciliation_latency: Histogram, + pub(crate) state_report_rpcs: Counter, + pub(crate) state_report_rpc_errors: Counter, + pub(crate) state_report_rpc_latency: Histogram, +} + +impl Metrics { + pub fn new(registry: &mut Registry, executor_info_vec: Vec<(String, String)>) -> Self { + let healthy_gauge = Gauge::default(); + registry.register( + "healthy", + "1 if the executor is healthy, 0 otherwise", + healthy_gauge.clone(), + ); + let grpc_server_channel_creations = Counter::default(); + registry.register( + "grpc_server_channel_creations", + "Number of times a channel to gRPC Server was created", + grpc_server_channel_creations.clone(), + ); + let grpc_server_channel_creation_retries = Counter::default(); + registry.register( + "grpc_server_channel_creation_retries", + "Number of retries during a channel creation to gRPC Server", + grpc_server_channel_creation_retries.clone(), + ); + // TODO: peek at impl in ::new + let grpc_server_channel_creation_latency = + Histogram::new(latency_metric_for_fast_operation()); + registry.register( + "grpc_server_channel_creation_latency_seconds", + "Latency of gRPC server channel creation", + grpc_server_channel_creation_latency.clone(), + ); + let executor_info = Info::new(executor_info_vec); + let executor_state = Family::::default(); + registry.register( + "executor_state", + "Current Executor state", + executor_state.clone(), + ); + let executor_events_pushed = Counter::default(); + registry.register( + "executor_events_pushed", + "Number of events pushed to collector", + executor_events_pushed.clone(), + ); + let executor_event_push_errors = Counter::default(); + registry.register( + "executor_event_push_errors", + "Number of errors while pushing events to collector", + executor_event_push_errors.clone(), + ); + let desired_state_streams = Counter::default(); + registry.register( + "desired_state_streams", + "Number of desired state streams created", + desired_state_streams.clone(), + ); + let desired_state_stream_errors = Counter::default(); + registry.register( + "desired_state_stream_errors", + "Number of desired state stream errors", + desired_state_stream_errors.clone(), + ); + let state_reconciliations = Counter::default(); + registry.register( + "state_reconciliations", + "Number of Executor state reconciliations", + state_reconciliations.clone(), + ); + let state_reconciliation_errors = Counter::default(); + registry.register( + "state_reconciliation_errors", + "Number of Executor state reconciliation errors after all retries", + state_reconciliation_errors.clone(), + ); + let state_reconciliation_latency = Histogram::new(latency_metric_for_fast_operation()); + registry.register( + "state_reconciliation_latency_seconds", + "Latency of Executor state reconciliation", + state_reconciliation_latency.clone(), + ); + let state_report_rpcs = Counter::default(); + registry.register( + "state_report_rpcs", + "Number of Executor state report RPCs to Server", + state_report_rpcs.clone(), + ); + let state_report_rpc_errors = Counter::default(); + registry.register( + "state_report_rpc_errors", + "Number of Executor state report RPC errors", + state_report_rpc_errors.clone(), + ); + let state_report_rpc_latency = Histogram::new(latency_metric_for_fast_operation()); + registry.register( + "state_report_rpc_latency_seconds", + "Latency of Executor state report RPC to Server", + state_report_rpc_latency.clone(), + ); + + Self { + healthy_gauge, + grpc_server_channel_creations, + grpc_server_channel_creation_retries, + grpc_server_channel_creation_latency, + executor_info, + executor_state, + executor_events_pushed, + executor_event_push_errors, + desired_state_streams, + desired_state_stream_errors, + state_reconciliations, + state_reconciliation_errors, + state_reconciliation_latency, + state_report_rpcs, + state_report_rpc_errors, + state_report_rpc_latency, + } + } +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +struct ExecutorStateLabel { + states: ExecutorState, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] +enum ExecutorState { + Starting, + Running, + ShuttingDown, +} + +fn latency_metric_for_fast_operation() -> impl IntoIterator { + [] +} diff --git a/dataplane/src/executor/mod.rs b/dataplane/src/executor/mod.rs new file mode 100644 index 000000000..b28a69daa --- /dev/null +++ b/dataplane/src/executor/mod.rs @@ -0,0 +1,8 @@ +pub mod blob_store; +pub mod executor; +pub mod executor_api; +pub mod function_executor; +pub mod function_executor_controller; +pub mod host_resources; +pub mod metrics; +pub mod monitoring; diff --git a/dataplane/src/executor/monitoring/handler.rs b/dataplane/src/executor/monitoring/handler.rs new file mode 100644 index 000000000..76ce33b20 --- /dev/null +++ b/dataplane/src/executor/monitoring/handler.rs @@ -0,0 +1,15 @@ +use std::convert::Infallible; + +use anyhow::Result; +use http_body_util::combinators::BoxBody; +use hyper::{ + body::Bytes, + http::{Request, Response}, +}; + +pub trait Handler { + async fn handle( + &self, + request: Request, + ) -> Result>, Infallible>; +} diff --git a/dataplane/src/executor/monitoring/health_check_handler.rs b/dataplane/src/executor/monitoring/health_check_handler.rs new file mode 100644 index 000000000..4e9a16290 --- /dev/null +++ b/dataplane/src/executor/monitoring/health_check_handler.rs @@ -0,0 +1,68 @@ +use std::{convert::Infallible, sync::Arc}; + +use anyhow::Result; +use http_body_util::combinators::BoxBody; +use hyper::{Request, Response, body::Bytes}; +use serde_json::json; +use tokio::sync::Mutex; + +use crate::executor::monitoring::{ + handler::Handler, + health_checker::{HealthChecker, generic_health_checker::GenericHealthChecker}, + server::full, +}; + +#[derive(Debug, Clone)] +pub struct HealthCheckHandler { + health_checker: Arc>, +} + +impl HealthCheckHandler { + pub fn new(health_checker: Arc>) -> Self { + Self { health_checker } + } +} + +impl Handler for HealthCheckHandler { + async fn handle( + &self, + _request: Request, + ) -> Result>, Infallible> { + let result = self.health_checker.lock().await.check(); + if result.is_success { + let message = match result.status_message { + Some(message) => message, + None => "Successful".to_string(), + }; + let body = full(Bytes::from( + json!({ + "status": "ok", + "message": message, + "checker": result.checker_name + }) + .to_string(), + )); + let response = Response::builder() + .header("Content-Type", "application/json") + .status(200) + .body(body) + .expect("Failed to build response."); + Ok(response) + } else { + let body = full(Bytes::from( + json!({ + "status": "nok", + "message": result.status_message.unwrap_or_default(), + "checker": result.checker_name + }) + .to_string(), + )); + let response = Response::builder() + .header("Content-Type", "application/json") + .status(503) + .body(body) + .expect("Failed to build response."); + Ok(response) + } + } +} diff --git a/dataplane/src/executor/monitoring/health_checker/generic_health_checker.rs b/dataplane/src/executor/monitoring/health_checker/generic_health_checker.rs new file mode 100644 index 000000000..981cf5060 --- /dev/null +++ b/dataplane/src/executor/monitoring/health_checker/generic_health_checker.rs @@ -0,0 +1,50 @@ +use std::sync::Arc; + +use crate::executor::{metrics::Metrics, monitoring::health_checker::HealthChecker}; + +pub const HEALTH_CHECKER_NAME: &str = "GenericHealthChecker"; + +#[derive(Debug, Clone)] +pub struct GenericHealthChecker { + pub is_success: bool, + pub status_message: Option, + pub checker_name: String, + pub metrics: Arc, +} + +impl GenericHealthChecker { + pub fn new( + is_success: bool, + status_message: Option, + checker_name: String, + metrics: Arc, + ) -> Self { + metrics.healthy_gauge.set(1); + GenericHealthChecker { + is_success, + status_message, + checker_name, + metrics, + } + } +} + +impl HealthChecker for GenericHealthChecker { + fn server_connection_state_changed(&mut self, is_healthy: bool, status_message: String) { + if is_healthy { + self.status_message = None; + self.is_success = true; + self.checker_name = HEALTH_CHECKER_NAME.to_string(); + self.metrics.healthy_gauge.set(1); + } else { + self.status_message = Some(status_message); + self.is_success = false; + self.checker_name = HEALTH_CHECKER_NAME.to_string(); + self.metrics.healthy_gauge.set(0); + } + } + + fn check(&self) -> Self { + self.clone() + } +} diff --git a/dataplane/src/executor/monitoring/health_checker/mod.rs b/dataplane/src/executor/monitoring/health_checker/mod.rs new file mode 100644 index 000000000..6a9d8f124 --- /dev/null +++ b/dataplane/src/executor/monitoring/health_checker/mod.rs @@ -0,0 +1,9 @@ +pub mod generic_health_checker; + +/// Abstract trait for health checkers. +pub trait HealthChecker { + /// Handle changes in server connection state. + fn server_connection_state_changed(&mut self, is_healthy: bool, status_message: String); + + fn check(&self) -> Self; +} diff --git a/dataplane/src/executor/monitoring/mod.rs b/dataplane/src/executor/monitoring/mod.rs new file mode 100644 index 000000000..643d8190e --- /dev/null +++ b/dataplane/src/executor/monitoring/mod.rs @@ -0,0 +1,7 @@ +pub mod handler; +pub mod health_check_handler; +pub mod health_checker; +pub mod prometheus_metrics_handler; +pub mod reported_state_handler; +pub mod server; +pub mod startup_probe_handler; diff --git a/dataplane/src/executor/monitoring/prometheus_metrics_handler.rs b/dataplane/src/executor/monitoring/prometheus_metrics_handler.rs new file mode 100644 index 000000000..b51c20f55 --- /dev/null +++ b/dataplane/src/executor/monitoring/prometheus_metrics_handler.rs @@ -0,0 +1,38 @@ +use std::{convert::Infallible, sync::Arc}; + +use http_body_util::combinators::BoxBody; +use hyper::{Response, body::Bytes}; +use prometheus_client::{encoding::text::encode, registry::Registry}; + +use crate::executor::monitoring::{handler::Handler, server::full}; + +#[derive(Debug, Clone)] +pub struct PrometheusMetricsHandler { + registry: Arc, +} + +impl PrometheusMetricsHandler { + pub fn new(registry: Arc) -> Self { + PrometheusMetricsHandler { registry } + } +} + +impl Handler for PrometheusMetricsHandler { + async fn handle( + &self, + _request: hyper::Request, + ) -> Result>, Infallible> { + let mut buf = String::new(); + encode(&mut buf, &self.registry).expect("Failed to encode metrics."); + let body = full(Bytes::from(buf)); + let response = Response::builder() + .header( + hyper::header::CONTENT_TYPE, + "application/openmetrics-text; version=1.0.0; charset=utf-8", + ) + .status(200) + .body(body) + .expect("Failed to build response."); + Ok(response) + } +} diff --git a/dataplane/src/executor/monitoring/reported_state_handler.rs b/dataplane/src/executor/monitoring/reported_state_handler.rs new file mode 100644 index 000000000..a114112b1 --- /dev/null +++ b/dataplane/src/executor/monitoring/reported_state_handler.rs @@ -0,0 +1,49 @@ +use std::convert::Infallible; + +use http_body_util::combinators::BoxBody; +use hyper::{Response, body::Bytes}; + +use crate::executor::{ + executor_api::ExecutorStateReporter, + monitoring::{handler::Handler, server::full}, +}; + +#[derive(Debug, Clone)] +pub struct ReportedStateHandler { + state_reporter: ExecutorStateReporter, +} + +impl ReportedStateHandler { + pub fn new(state_reporter: ExecutorStateReporter) -> Self { + ReportedStateHandler { state_reporter } + } +} + +impl Handler for ReportedStateHandler { + async fn handle( + &self, + _request: hyper::Request, + ) -> Result>, Infallible> { + let request = self.state_reporter.last_state_report_request().await; + match request { + Some(request) => { + let body = full(Bytes::from(format!("{request:?}"))); + let response = Response::builder() + .header(hyper::header::CONTENT_TYPE, "text/plain; charset=utf-8") + .status(200) + .body(body) + .expect("Failed to build response."); + Ok(response) + } + None => { + let body = full(Bytes::from("No state reported so far")); + let response = Response::builder() + .header(hyper::header::CONTENT_TYPE, "text/plain; charset=utf-8") + .status(200) + .body(body) + .expect("Failed to build response."); + Ok(response) + } + } + } +} diff --git a/dataplane/src/executor/monitoring/server.rs b/dataplane/src/executor/monitoring/server.rs new file mode 100644 index 000000000..8e6e4b8f1 --- /dev/null +++ b/dataplane/src/executor/monitoring/server.rs @@ -0,0 +1,105 @@ +use std::convert::Infallible; + +use http_body_util::{BodyExt, Empty, Full, combinators::BoxBody}; +use hyper::{ + Method, Request, Response, StatusCode, + body::{self, Bytes}, + server::conn::http1, + service::service_fn, +}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; + +use crate::executor::monitoring::{ + handler::Handler, health_check_handler::HealthCheckHandler, + prometheus_metrics_handler::PrometheusMetricsHandler, + reported_state_handler::ReportedStateHandler, startup_probe_handler::StartupProbeHandler, +}; + +#[derive(Clone)] +pub struct MonitoringServer { + host: String, + port: u16, + startup_probe_handler: StartupProbeHandler, + health_probe_handler: HealthCheckHandler, + metrics_handler: PrometheusMetricsHandler, + reported_state_handler: ReportedStateHandler, +} + +impl MonitoringServer { + pub fn new( + host: String, + port: u16, + startup_probe_handler: StartupProbeHandler, + health_probe_handler: HealthCheckHandler, + metrics_handler: PrometheusMetricsHandler, + reported_state_handler: ReportedStateHandler, + ) -> Self { + MonitoringServer { + host, + port, + startup_probe_handler, + health_probe_handler, + metrics_handler, + reported_state_handler, + } + } + + pub async fn run(&self) -> Result<(), Box> { + let listener = TcpListener::bind(format!("{}:{}", self.host, self.port)).await?; + loop { + let (stream, _) = listener.accept().await?; + + let io = TokioIo::new(stream); + let server = self.clone(); + + tokio::task::spawn(async move { + if let Err(err) = http1::Builder::new() + .serve_connection( + io, + service_fn(|request: Request| async { + server.routes(request).await + }), + ) + .await + { + eprintln!("Error serving connection: {:?}", err); + } + }); + } + } + + async fn routes( + &self, + request: Request, + ) -> Result>, Infallible> { + match (request.method(), request.uri().path()) { + (&Method::POST, "/monitoring/startup") => { + self.startup_probe_handler.handle(request).await + } + (&Method::POST, "/monitoring/health") => { + self.health_probe_handler.handle(request).await + } + (&Method::POST, "/monitoring/metrics") => self.metrics_handler.handle(request).await, + (&Method::POST, "/state/reported") => self.reported_state_handler.handle(request).await, + + _ => { + let mut not_found = Response::new(empty()); + *not_found.status_mut() = StatusCode::NOT_FOUND; + Ok(not_found) + } + } + } +} + +pub fn empty() -> BoxBody { + Empty::::new() + .map_err(|never| match never {}) + .boxed() +} + +pub fn full>(chunk: T) -> BoxBody { + Full::new(chunk.into()) + .map_err(|never| match never {}) + .boxed() +} diff --git a/dataplane/src/executor/monitoring/startup_probe_handler.rs b/dataplane/src/executor/monitoring/startup_probe_handler.rs new file mode 100644 index 000000000..a73d72718 --- /dev/null +++ b/dataplane/src/executor/monitoring/startup_probe_handler.rs @@ -0,0 +1,48 @@ +use std::convert::Infallible; + +use anyhow::Result; +use http_body_util::combinators::BoxBody; +use hyper::{Request, Response, body::Bytes}; +use serde_json::json; + +use crate::executor::monitoring::{handler::Handler, server::full}; + +#[derive(Debug, Clone, Copy)] +pub struct StartupProbeHandler { + ready: bool, +} + +impl StartupProbeHandler { + pub fn new() -> Self { + StartupProbeHandler { ready: false } + } + pub(crate) fn set_ready(&mut self, ready: bool) { + self.ready = ready; + } +} + +impl Handler for StartupProbeHandler { + #[inline] + async fn handle( + &self, + _request: Request, + ) -> Result>, Infallible> { + if self.ready { + let body = full(Bytes::from(json!({"status": "ok"}).to_string())); + let response = Response::builder() + .header("Content-Type", "application/json") + .status(200) + .body(body) + .expect("Failed to build response"); + Ok(response) + } else { + let body = full(Bytes::from(json!({"status": "nok"}).to_string())); + let response = Response::builder() + .header("Content-Type", "application/json") + .status(503) + .body(body) + .expect("Failed to build response"); + Ok(response) + } + } +} diff --git a/dataplane/src/main.rs b/dataplane/src/main.rs new file mode 100644 index 000000000..3344c89c8 --- /dev/null +++ b/dataplane/src/main.rs @@ -0,0 +1,3 @@ +pub mod executor; + +fn main() {}