-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathtest_runtime.py
93 lines (77 loc) · 3.05 KB
/
test_runtime.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# Copyright 2023 The Crossplane Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import dataclasses
import os
import signal
import unittest
import grpc
import crossplane.function.proto.v1.run_function_pb2 as fnv1
import crossplane.function.proto.v1.run_function_pb2_grpc as grpcv1
import crossplane.function.proto.v1beta1.run_function_pb2 as fnv1beta1
from crossplane.function import logging, runtime
class TestRuntime(unittest.IsolatedAsyncioTestCase):
def setUp(self) -> None:
logging.configure(level=logging.Level.DISABLED)
async def test_run_function(self) -> None:
@dataclasses.dataclass
class TestCase:
reason: str
runner: grpcv1.FunctionRunnerService
req: fnv1beta1.RunFunctionRequest
want: fnv1beta1.RunFunctionResponse
cases = [
TestCase(
reason="The v1 response should be returned as a v1beta1 response.",
runner=EchoRunner(),
req=fnv1beta1.RunFunctionRequest(meta=fnv1beta1.RequestMeta(tag="hi")),
want=fnv1beta1.RunFunctionResponse(
meta=fnv1beta1.ResponseMeta(tag="hi")
),
),
]
for case in cases:
beta_runner = runtime.BetaFunctionRunner(wrapped=case.runner)
rsp = await beta_runner.RunFunction(case.req, None)
self.assertEqual(rsp, case.want, "-want, +got")
async def test_sigterm_handling(self) -> None:
async def mock_server():
await server.start()
await asyncio.sleep(1)
self.assertTrue(server._server.is_running(), "Server should be running")
os.kill(os.getpid(), signal.SIGTERM)
await server.wait_for_termination()
self.assertFalse(
server._server.is_running(),
"Server should have been stopped on SIGTERM",
)
server = grpc.aio.server()
signal.signal(
signal.SIGTERM,
lambda _, __: asyncio.create_task(runtime._stop(server)),
)
await mock_server()
class EchoRunner(grpcv1.FunctionRunnerService):
def __init__(self):
self.log = logging.get_logger()
async def RunFunction( # noqa: N802 # gRPC requires this name.
self,
req: fnv1.RunFunctionRequest,
_: grpc.aio.ServicerContext,
) -> fnv1beta1.RunFunctionResponse:
return fnv1beta1.RunFunctionResponse(
meta=fnv1beta1.ResponseMeta(tag=req.meta.tag)
)
if __name__ == "__main__":
unittest.main()