Skip to content

Commit 0cfbc03

Browse files
authored
[Model Monitoring] Test and Fix for Model-monitoring data-path: support LLM/deep-learning models (mlrun#5289)
1 parent 2af3d96 commit 0cfbc03

File tree

4 files changed

+228
-7
lines changed

4 files changed

+228
-7
lines changed

mlrun/model_monitoring/controller.py

-7
Original file line numberDiff line numberDiff line change
@@ -445,13 +445,6 @@ def model_endpoint_process(
445445
m_fs = fstore.get_feature_set(
446446
endpoint[mm_constants.EventFieldType.FEATURE_SET_URI]
447447
)
448-
labels = endpoint[mm_constants.EventFieldType.LABEL_NAMES]
449-
if labels:
450-
if isinstance(labels, str):
451-
labels = json.loads(labels)
452-
for label in labels:
453-
if label not in list(m_fs.spec.features.keys()):
454-
m_fs.add_feature(fstore.Feature(name=label, value_type="float"))
455448

456449
for application in applications_names:
457450
batch_window = batch_window_generator.get_batch_window(

mlrun/model_monitoring/stream_processing.py

+2
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,8 @@ def do(self, event):
587587
for key in [
588588
EventFieldType.FEATURES,
589589
EventFieldType.NAMED_FEATURES,
590+
EventFieldType.PREDICTION,
591+
EventFieldType.NAMED_PREDICTIONS,
590592
]:
591593
event.pop(key, None)
592594

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Copyright 2024 Iguazio
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import mlrun.serving
15+
16+
17+
class OneToOne(mlrun.serving.V2ModelServer):
18+
"""
19+
In this class the predict method returns one result to each input
20+
"""
21+
22+
def load(self):
23+
pass
24+
25+
def predict(self, body: dict) -> list:
26+
inputs = body.get("inputs")
27+
if isinstance(inputs[0], list) and len(inputs) == 600: # single image
28+
outputs = 3
29+
elif isinstance(inputs[0], list) and len(inputs) == 2 and len(inputs[0]) == 600:
30+
outputs = [2, 2]
31+
elif isinstance(inputs[0], list) or (
32+
isinstance(inputs[0], str) and isinstance(inputs, list)
33+
):
34+
outputs = [inp[0] for inp in inputs]
35+
else:
36+
outputs = inputs[0]
37+
return outputs
38+
39+
40+
class OneToMany(mlrun.serving.V2ModelServer):
41+
"""
42+
In this class the predict method returns 5 port outputs result to each input
43+
"""
44+
45+
def load(self):
46+
pass
47+
48+
def predict(self, body: dict) -> list:
49+
inputs = body.get("inputs")
50+
if isinstance(inputs[0], list) or (
51+
isinstance(inputs[0], str) and isinstance(inputs, list)
52+
):
53+
outputs = [[inp[0], inp[0], 3.0, "a", 5] for inp in inputs]
54+
else:
55+
outputs = [inputs[0], inputs[0], 3.0, "a", 5]
56+
return outputs

tests/system/model_monitoring/test_app.py

+170
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import concurrent
16+
import concurrent.futures
1517
import json
1618
import pickle
1719
import time
@@ -32,7 +34,9 @@
3234
import mlrun
3335
import mlrun.common.schemas.model_monitoring.constants as mm_constants
3436
import mlrun.feature_store
37+
import mlrun.feature_store as fstore
3538
import mlrun.model_monitoring.api
39+
from mlrun.datastore.targets import ParquetTarget
3640
from mlrun.model_monitoring import TrackingPolicy
3741
from mlrun.model_monitoring.application import ModelMonitoringApplicationBase
3842
from mlrun.model_monitoring.applications.histogram_data_drift import (
@@ -450,3 +454,169 @@ def test_enable_model_monitoring(self) -> None:
450454
]
451455
== "1m"
452456
)
457+
458+
459+
@TestMLRunSystem.skip_test_if_env_not_configured
460+
@pytest.mark.enterprise
461+
class TestAllKindOfServing(TestMLRunSystem):
462+
project_name = "test-mm-serving"
463+
# Set image to "<repo>/mlrun:<tag>" for local testing
464+
image: typing.Optional[str] = None
465+
466+
@classmethod
467+
def custom_setup_class(cls) -> None:
468+
random_rgb_image_list = (
469+
np.random.randint(0, 256, (20, 30, 3), dtype=np.uint8)
470+
.reshape(-1, 3)
471+
.tolist()
472+
)
473+
cls.models = {
474+
"int_one_to_one": {
475+
"name": "serving_1",
476+
"model_name": "int_one_to_one",
477+
"class_name": "OneToOne",
478+
"data_point": [1, 2, 3],
479+
"schema": ["f0", "f1", "f2", "p0"],
480+
},
481+
"int_one_to_many": {
482+
"name": "serving_2",
483+
"model_name": "int_one_to_many",
484+
"class_name": "OneToMany",
485+
"data_point": [1, 2, 3],
486+
"schema": ["f0", "f1", "f2", "p0", "p1", "p2", "p3", "p4"],
487+
},
488+
"str_one_to_one": {
489+
"name": "serving_3",
490+
"model_name": "str_one_to_one",
491+
"class_name": "OneToOne",
492+
"data_point": "input_str",
493+
"schema": ["f0", "p0"],
494+
},
495+
"str_one_to_many": {
496+
"name": "serving_4",
497+
"model_name": "str_one_to_many",
498+
"class_name": "OneToMany",
499+
"data_point": "input_str",
500+
"schema": ["f0", "p0", "p1", "p2", "p3", "p4"],
501+
},
502+
"img_one_to_one": {
503+
"name": "serving_5",
504+
"model_name": "img_one_to_one",
505+
"class_name": "OneToOne",
506+
"data_point": random_rgb_image_list,
507+
"schema": [f"f{i}" for i in range(600)] + ["p0"],
508+
},
509+
"int_and_str_one_to_one": {
510+
"name": "serving_6",
511+
"model_name": "int_and_str_one_to_one",
512+
"class_name": "OneToOne",
513+
"data_point": [1, "a", 3],
514+
"schema": ["f0", "f1", "f2", "p0"],
515+
},
516+
}
517+
518+
def _log_model(self, model_name) -> None:
519+
self.project.log_model(
520+
model_name,
521+
model_dir=str((Path(__file__).parent / "assets").absolute()),
522+
model_file="model.pkl",
523+
)
524+
525+
@classmethod
526+
def _deploy_model_serving(
527+
cls, name: str, model_name: str, class_name: str, **kwargs
528+
) -> mlrun.runtimes.nuclio.serving.ServingRuntime:
529+
serving_fn = mlrun.code_to_function(
530+
project=cls.project_name,
531+
name=name,
532+
filename=f"{str((Path(__file__).parent / 'assets').absolute())}/models.py",
533+
kind="serving",
534+
)
535+
serving_fn.add_model(
536+
model_name,
537+
model_path=f"store://models/{cls.project_name}/{model_name}:latest",
538+
class_name=class_name,
539+
)
540+
serving_fn.set_tracking()
541+
if cls.image is not None:
542+
serving_fn.spec.image = serving_fn.spec.build.image = cls.image
543+
544+
serving_fn.deploy()
545+
return typing.cast(mlrun.runtimes.nuclio.serving.ServingRuntime, serving_fn)
546+
547+
def _test_endpoint(self, model_name, feature_set_uri) -> dict[str, typing.Any]:
548+
model_dict = self.models[model_name]
549+
serving_fn = self.project.get_function(model_dict.get("name"))
550+
data_point = model_dict.get("data_point")
551+
552+
serving_fn.invoke(
553+
f"v2/models/{model_name}/infer",
554+
json.dumps(
555+
{"inputs": [data_point]},
556+
),
557+
)
558+
serving_fn.invoke(
559+
f"v2/models/{model_name}/infer",
560+
json.dumps({"inputs": [data_point, data_point]}),
561+
)
562+
time.sleep(
563+
mlrun.mlconf.model_endpoint_monitoring.parquet_batching_timeout_secs + 10
564+
)
565+
566+
offline_response_df = ParquetTarget(
567+
name="temp",
568+
path=fstore.get_feature_set(feature_set_uri).spec.targets[0].path,
569+
).as_df()
570+
571+
is_schema_saved = set(model_dict.get("schema")).issubset(
572+
offline_response_df.columns
573+
)
574+
has_all_the_events = offline_response_df.shape[0] == 3
575+
576+
return {
577+
"model_name": model_name,
578+
"is_schema_saved": is_schema_saved,
579+
"has_all_the_events": has_all_the_events,
580+
}
581+
582+
def test_all(self) -> None:
583+
self.project.enable_model_monitoring(
584+
image=self.image or "mlrun/mlrun",
585+
base_period=1,
586+
deploy_histogram_data_drift_app=False,
587+
)
588+
futures = []
589+
with ThreadPoolExecutor() as executor:
590+
for model_name, model_dict in self.models.items():
591+
self._log_model(model_name)
592+
future = executor.submit(self._deploy_model_serving, **model_dict)
593+
futures.append(future)
594+
595+
for future in concurrent.futures.as_completed(futures):
596+
future.result()
597+
598+
futures_2 = []
599+
with ThreadPoolExecutor() as executor:
600+
self.db = mlrun.model_monitoring.get_model_endpoint_store(
601+
project=self.project_name
602+
)
603+
endpoints = self.db.list_model_endpoints()
604+
for endpoint in endpoints:
605+
future = executor.submit(
606+
self._test_endpoint,
607+
model_name=endpoint[mm_constants.EventFieldType.MODEL].split(":")[
608+
0
609+
],
610+
feature_set_uri=endpoint[
611+
mm_constants.EventFieldType.FEATURE_SET_URI
612+
],
613+
)
614+
futures_2.append(future)
615+
616+
for future in concurrent.futures.as_completed(futures_2):
617+
res_dict = future.result()
618+
assert res_dict[
619+
"is_schema_saved"
620+
], f"For {res_dict['model_name']} the schema of parquet is missing columns"
621+
622+
assert res_dict["has_all_the_events"], "Not all the events were saved"

0 commit comments

Comments
 (0)