Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 53 additions & 27 deletions test/deltares_testbench/src/suite/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import copy
import os
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import ClassVar, Dict, List, Tuple

from src.config.test_case_config import TestCaseConfig
Expand All @@ -14,36 +16,53 @@
from src.utils.paths import Paths


# Test case handler (compare or reference)
@dataclass
class DirectoryState:
"""Snapshot of a directory's state used to detect added/changed files.

Attributes
----------
files : Dict[str, datetime]
Mapping of filename to last modification time (UTC).
size : int
Total size in bytes of all files in the directory at snapshot time.
"""

files: Dict[str, datetime] = field(default_factory=dict)
size: int = 0


Comment on lines +19 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the point of this class. I see the data gets collected, but I don't see it being used anywhere. Could you explain the thought process behind it?

class TestCase:
"""Test Case instance (compare or reference)."""

__test__: ClassVar[bool] = False

# constructor
# input: test case configuration
def __init__(self, config: TestCaseConfig, logger: ILogger) -> None:
self.__config = config
self.__logger = logger
self.__maxRunTime: float = self.__config.max_run_time
self.__max_run_time: float = self.__config.max_run_time
self.__programs: List[Tuple[int, Program]] = []
self.__errors: list[Exception] = []

logger.debug(f"Initializing test case ({self.__config.name}), max runtime : {str(self.__maxRunTime)}")
logger.debug(f"Initializing test case ({self.__config.name}), max runtime : {str(self.__max_run_time)}")

self.__config.run_file_name = os.path.join(self.__config.absolute_test_case_path, "_tb3_char.run")
refrunfile = os.path.join(config.absolute_test_case_reference_path, "_tb3_char.run")

if os.path.exists(refrunfile):
refruntime = self.__findCharacteristicsRunTime__(refrunfile)
refruntime = self.__find_characteristics_run_time(refrunfile)
if refruntime:
self.__config.ref_run_time = refruntime
if not self.__config.overrule_ref_max_run_time:
# set maxRunTime to 1.5 * reference runtime and add a few seconds (some systems start slow)
# The variation in runtimes vary a lot (different machines, other processes)
self.__maxRunTime = refruntime * 1.5 + 10.0
logger.info(f"Overwriting max run time via reference _tb3_char.run ({str(self.__maxRunTime)})")
self.__max_run_time = refruntime * 1.5 + 10.0
logger.info(f"Overwriting max run time via reference _tb3_char.run ({str(self.__max_run_time)})")

self.__maxRunTime = max(self.__maxRunTime, 120.0) * 5.0 + 300.0
logger.debug(f"maxRunTime increased to {str(self.__maxRunTime)}")
self.__max_run_time = max(self.__max_run_time, 120.0) * 5.0 + 300.0
logger.debug(f"maxRunTime increased to {str(self.__max_run_time)}")

def run(self, programs: List[Program]) -> None:
"""Execute a Test Case.
Expand All @@ -63,12 +82,14 @@ def run(self, programs: List[Program]) -> None:
# prepare the programs for running

logger = self.__logger
self.__initializeProgramList__(programs)
self.__initialize_program_list(programs)

logger.debug("Starting test case")

# prepare presets for testbench run file
input_files, size = self.__get_initial_state()
pre_run_state = self.__get_state_directory(self.__config.absolute_test_case_path)
pre_run_files = pre_run_state.files
size = pre_run_state.size

start_time = time.time()
logger.debug(f"Test case start time {str(time.ctime(int(start_time)))}")
Expand All @@ -91,34 +112,39 @@ def run(self, programs: List[Program]) -> None:
with open(self.__config.run_file_name, "w") as runfile:
runfile.write("Start_size:" + str(size) + "\n")
runfile.write("Runtime:" + str(elapsed_time) + "\n")
for allfile in os.listdir(self.__config.absolute_test_case_path):
post_run_state = self.__get_state_directory(self.__config.absolute_test_case_path)
for post_file in post_run_state.files:
# collect all added and changed files in the working directory (after running, compare to initial list)
if allfile not in {}.fromkeys(input_files, 0):
runfile.write("Output_added:" + str(allfile) + "\n")
size = size + os.path.getsize(os.path.join(self.__config.absolute_test_case_path, allfile))
if post_file not in {}.fromkeys(pre_run_files, 0):
runfile.write("Output_added:" + str(post_file) + "\n")
size = size + os.path.getsize(os.path.join(self.__config.absolute_test_case_path, post_file))
else:
ftime = os.path.getmtime(os.path.join(self.__config.absolute_test_case_path, allfile))
if ftime != input_files[allfile]:
runfile.write("Output_changed:" + str(allfile) + "\n")
ftime = post_run_state.files[post_file]
if ftime != pre_run_files[post_file]:
runfile.write("Output_changed:" + str(post_file) + "\n")
runfile.write("End_size:" + str(size) + "\n")

def __get_initial_state(self) -> Tuple[Dict[str, float], int]:
inputfiles: Dict[str, float] = {}
def __get_state_directory(self, directory: str) -> DirectoryState:
files: Dict[str, datetime] = {}
size: int = 0

# collect all initial files in the working directory before running
for infile in os.listdir(self.__config.absolute_test_case_path):
inputfiles[infile] = os.path.getmtime(os.path.join(self.__config.absolute_test_case_path, infile))
size = size + os.path.getsize(os.path.join(self.__config.absolute_test_case_path, infile))
for infile in os.listdir(directory):
files[infile] = datetime.fromtimestamp(
os.path.getmtime(os.path.join(directory, infile)),
tz=timezone.utc,
)
size = size + os.path.getsize(os.path.join(directory, infile))

return inputfiles, size
return DirectoryState(files=files, size=size)

# get errors from Test Case
# output: list of Errors (type), can be None
def getErrors(self):
# output: list of Errors (type)
def get_errors(self) -> list[Exception]:
"""Retrieve errors encountered during Test Case execution."""
return self.__errors

def __initializeProgramList__(self, programs: List[Program]):
def __initialize_program_list(self, programs: List[Program]) -> None:
"""Prepare programs from configuration."""
# programs are loaded by the manager
shell_arguments = " ".join(self.__config.shell_arguments)
Expand Down Expand Up @@ -150,7 +176,7 @@ def __initializeProgramList__(self, programs: List[Program]):
# retrieve runtime or none from _tb3_char.run file
# input: path to _tb3_char.run file
# output: actual runtime value (float)
def __findCharacteristicsRunTime__(self, filename):
def __find_characteristics_run_time(self, filename: str) -> float | None:
with open(filename) as f:
retval = None
for line in f:
Expand Down
31 changes: 27 additions & 4 deletions test/deltares_testbench/src/suite/test_set_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import multiprocessing
import os
import shutil
import sys
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
Expand All @@ -23,7 +24,7 @@
from src.suite.test_bench_settings import TestBenchSettings
from src.suite.test_case import TestCase
from src.suite.test_case_result import TestCaseResult
from src.utils.common import log_header, log_separator, log_sub_header
from src.utils.common import delete_directory, log_header, log_separator, log_sub_header
from src.utils.errors.test_bench_error import TestBenchError
from src.utils.handlers.handler_factory import HandlerFactory
from src.utils.handlers.resolve_handler import ResolveHandler
Expand Down Expand Up @@ -240,8 +241,8 @@ def run_test_case(
logger.info("Testcase not executed (ignored)...\n")

# Check for errors during execution of testcase
if len(testcase.getErrors()) > 0:
errstr = ",".join(str(e) for e in testcase.getErrors())
if len(testcase.get_errors()) > 0:
errstr = ",".join(str(e) for e in testcase.get_errors())
logger.error("Errors during testcase: " + errstr)
raise TestCaseFailure("Errors during testcase: " + errstr)

Expand Down Expand Up @@ -559,6 +560,9 @@ def __process_test_case_locations(self, config: TestCaseConfig, logger: ILogger)
remote_path = self.__build_remote_path(config, location)
local_path = self.__build_local_path(config, location)
self.__download_location_with_retries(config, location, remote_path, local_path, logger)
if location.type == PathType.INPUT:
self.__copy_to_work_folder(local_path, logger)

self.__set_absolute_paths(config, location.type, local_path)

def __validate_location(self, config: TestCaseConfig, location: Location) -> None:
Expand Down Expand Up @@ -637,6 +641,25 @@ def __download_location_with_retries(
error_message = f"Unable to download testcase: {error}"
raise TestBenchError(error_message) from e

def __copy_to_work_folder(self, local_path: str, logger: ILogger) -> None:
"""Copy downloaded files to work folder if needed."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that local_path should be a Path instead of a str. What if there's a trailing slash in the string in some config files or something. Then you get <path>/_work, or maybe <path>\_work depending on the platform. The os.path functionality can all be replaced by Path methods. Path implements the PathLike protocol, which shutil.copytree accepts as arguments. So you don't even need to convert the Paths to str before passing them to shutil.copytree

copy_path = local_path + "_work"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
copy_path = local_path + "_work"
copy_path = Path(local_path) / "_work"

if not os.path.exists(local_path):
logger.warning(f"Work path does not exist, cannot create work copy: {local_path}")
return

# Clean work directory if it exists
if os.path.exists(copy_path):
delete_directory(copy_path, logger)

# Copy input to work directory
logger.debug(f"Copying input from {local_path} to {copy_path}")
if os.path.isdir(local_path):
shutil.copytree(local_path, copy_path, symlinks=False, ignore_dangling_symlinks=True)
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can rule out that local_path is not a directory, right? The input of a case should always be a directory. Maybe do a check at the start of this method: local_path.is_dir() and raise the builtin NotADirectoryError. Then you can skip this edgecase.

os.makedirs(os.path.dirname(copy_path) or ".", exist_ok=True)
shutil.copy2(local_path, copy_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why copy2? what's the difference with copytree?


def __download_single_location(
self, config: TestCaseConfig, location: Location, remote_path: str, local_path: str, logger: ILogger
) -> None:
Expand All @@ -655,7 +678,7 @@ def __get_destination_directory(self, location_type: PathType) -> Optional[str]:
def __set_absolute_paths(self, config: TestCaseConfig, location_type: PathType, local_path: str) -> None:
"""Set absolute paths on the config based on location type."""
if location_type == PathType.INPUT:
config.absolute_test_case_path = local_path
config.absolute_test_case_path = local_path + "_work"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd advie to use Path here as well if possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.absolute_test_case_path is a str for historical reasons, right? I suggest something like this to avoid problems with path separators.

local = Path(local_path)
config.absolute_test_case_path = str(local.parent / (local.name + "_work"))

elif location_type == PathType.REFERENCE:
config.absolute_test_case_reference_path = local_path

Expand Down
Loading