Skip to content
93 changes: 49 additions & 44 deletions multiparser/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@


def _default_callback(
_: typing.Dict[str, typing.Any], meta: typing.Dict[str, typing.Any]
_: dict[str, typing.Any], meta: dict[str, typing.Any]
) -> None:
"""Default per file callback if none set globally or per file"""
loguru.logger.warning(
Expand Down Expand Up @@ -77,7 +77,7 @@ def __init__(
exception_callback: typing.Callable | None = None,
notification_callback: typing.Callable | None = None,
termination_trigger: Event | None = None,
subprocess_triggers: typing.List[Event] | None = None,
subprocess_triggers: list[Event] | None = None,
timeout: int | None = None,
lock_callbacks: bool = True,
interval: float = 0.1,
Expand All @@ -91,7 +91,7 @@ def __init__(

Parameters
----------
threads_recorder : typing.Dict
threads_recorder : dict
Dictionary for caching values read from the file parsing threads
per_thread_callback : typing.Callable, optional
function to be executed whenever a monitored file is modified
Expand All @@ -101,7 +101,7 @@ def __init__(
notification_callback : typing.Callable | None, optional
function to be called when a new file is found, default is
a print statement
subprocess_triggers : typing.List[Event], optional
subprocess_triggers : list[Event], optional
if provided, events which will be set if monitor terminates
timeout : int, optional
time after which to terminate, default is None
Expand All @@ -127,19 +127,19 @@ def __init__(
self._per_thread_callback = per_thread_callback or _default_callback
self._notification_callback = notification_callback
self._shutdown_on_thread_failure: bool = terminate_all_on_fail
self._exceptions: typing.Dict[str, Exception | None] = {}
self._exceptions: dict[str, Exception | None] = {}
self._exception_callback = self._generate_exception_callback(exception_callback)
self._file_threads_mutex: "threading.Lock | None" = (
threading.Lock() if lock_callbacks else None
)
self._subprocess_triggers: typing.List[Event] | None = subprocess_triggers
self._subprocess_triggers: list[Event] | None = subprocess_triggers
self._monitor_termination_trigger = (
termination_trigger or multiprocessing.Event()
)
self._known_files: typing.List[str] = []
self._file_trackables: typing.List[FullFileTrackable] = []
self._log_trackables: typing.List[LogFileTrackable] = []
self._excluded_patterns: typing.List[str] = []
self._known_files: list[str] = []
self._file_trackables: list[FullFileTrackable] = []
self._log_trackables: list[LogFileTrackable] = []
self._excluded_patterns: list[str] = []
self._file_monitor_thread: threading.Thread | None = None
self._log_monitor_thread: threading.Thread | None = None
self._timer_process: multiprocessing.Process | None = None
Expand Down Expand Up @@ -168,7 +168,7 @@ def _exception_callback(
_exceptions: dict[str, Exception | None] = self._exceptions,
user_defined=user_callback,
abort_on_fail=self._shutdown_on_thread_failure,
abort_func=self.terminate,
abort_func=(lambda: self._monitor_termination_trigger.set()),
) -> None:
if user_defined:
user_defined(f"{type(exception).__name__}: '{exception.args[0]}'")
Expand All @@ -188,9 +188,9 @@ def _create_monitor_threads(self) -> None:
"""Create threads for the log file and full file monitors"""

def _full_file_monitor_func(
ff_trackables: typing.List[FullFileTrackable],
exc_glob_exprs: typing.List[str],
file_list: typing.List[str],
ff_trackables: list[FullFileTrackable],
exc_glob_exprs: list[str],
file_list: list[str],
termination_trigger: threading.Event,
interval: float,
flatten_data: bool,
Expand All @@ -213,9 +213,9 @@ def _full_file_monitor_func(
_full_file_threads.run()

def _log_file_monitor_func(
lf_trackables: typing.List[LogFileTrackable],
exc_glob_exprs: typing.List[str],
file_list: typing.List[str],
lf_trackables: list[LogFileTrackable],
exc_glob_exprs: list[str],
file_list: list[str],
termination_trigger: threading.Event,
interval: float,
flatten_data: bool,
Expand Down Expand Up @@ -247,6 +247,7 @@ def _log_file_monitor_func(
self._interval,
self._flatten_data,
),
daemon=True,
)

self._log_monitor_thread = threading.Thread(
Expand All @@ -259,6 +260,7 @@ def _log_file_monitor_func(
self._interval,
self._flatten_data,
),
daemon=True,
)

def _check_custom_log_parser(
Expand Down Expand Up @@ -291,15 +293,20 @@ def _check_custom_log_parser(
try:
# Parsers are expected to have the keyword argument 'file_content'
_out = parser(
file_content=_test_str, __input_file=__file__, __read_bytes=None, **parser_kwargs
file_content=_test_str,
__input_file=__file__,
__read_bytes=None,
**parser_kwargs,
)

# If the custom parser returns a list of entries, not just one
if isinstance(_out, list):
_out = _out[0]

except Exception as e:
raise AssertionError(f"Custom parser testing failed for '{parser.__name__}' with exception:\n{e}")
raise AssertionError(
f"Custom parser testing failed for '{parser.__name__}' with exception:\n{e}"
)

if (
len(_out) != 2
Expand All @@ -319,12 +326,12 @@ def _check_custom_log_parser(
"multiparser.log_parser decorator"
)

def exclude(self, path_glob_exprs: typing.List[str] | str) -> None:
def exclude(self, path_glob_exprs: list[str] | str) -> None:
"""Exclude a set of files from monitoring.

Parameters
----------
path_glob_exprs : typing.List[str] | str
path_glob_exprs : list[str] | str
a list or string defining globular expressions for files
to exclude from tracking
"""
Expand All @@ -340,11 +347,11 @@ def exclude(self, path_glob_exprs: typing.List[str] | str) -> None:
def track(
self,
*,
path_glob_exprs: typing.List[str] | str,
path_glob_exprs: list[str] | str,
tracked_values: TrackedValues | None = None,
callback: typing.Callable | None = None,
parser_func: typing.Callable | None = None,
parser_kwargs: typing.Dict | None = None,
parser_kwargs: dict | None = None,
static: bool = False,
file_type: str | None = None,
) -> None:
Expand All @@ -357,17 +364,17 @@ def track(

Parameters
----------
path_glob_exprs : typing.List[str] | str
path_glob_exprs : list[str] | str
set of or single globular expression(s) defining files
to monitor
tracked_values : typing.List[str] | None, optional
tracked_values : list[str] | None, optional
a list of regular expressions defining variables to track
within the file, by default None
callback : typing.Callable | None, optional
override the global per file callback for this instance
parser_func : typing.Callable | None, optional
provide a custom parsing function
parser_kwargs : typing.Dict | None, optional
parser_kwargs : dict | None, optional
arguments to include when running the specified custom parser
static : bool, optional
(if known) whether the given file(s) are written only once
Expand Down Expand Up @@ -400,7 +407,7 @@ def track(
)

if isinstance(path_glob_exprs, str):
_parsing_dict: typing.Dict[str, typing.Any] = {
_parsing_dict: dict[str, typing.Any] = {
"glob_expr": path_glob_exprs,
"tracked_values": tracked_values,
"static": static,
Expand Down Expand Up @@ -430,17 +437,16 @@ def track(
raise AssertionError("Globular expression must be of type AnyStr")
glob.glob(_glob_ex)


def tail( # noqa: C901
def tail( # noqa: C901
self,
*,
path_glob_exprs: typing.List[str] | str,
path_glob_exprs: list[str] | str,
tracked_values: TrackedValues | None = None,
skip_lines_w_pattern: typing.List[re.Pattern | str] | None = None,
labels: str | typing.List[str | None] | None = None,
skip_lines_w_pattern: list[re.Pattern | str] | None = None,
labels: str | list[str | None] | None = None,
callback: typing.Callable | None = None,
parser_func: typing.Callable | None = None,
parser_kwargs: typing.Dict | None = None,
parser_kwargs: dict | None = None,
) -> None:
"""Tail a set of files.

Expand Down Expand Up @@ -490,25 +496,25 @@ def tail( # noqa: C901

Parameters
----------
path_glob_exprs : typing.List[str] | str
path_glob_exprs : list[str] | str
set of or single globular expression(s) defining files
to monitor
tracked_values : typing.List[Pattern | str], optional
tracked_values : list[Pattern | str], optional
a set of regular expressions or strings defining variables to track.
Where one capture group is defined the user must provide
an associative label. Where two are defined, the first capture
group is taken to be the label, the second the value.
skip_lines_w_pattern : typing.List[Pattern | str], optional
skip_lines_w_pattern : list[Pattern | str], optional
specify patterns defining lines which should be skipped
labels : typing.List[str], optional
labels : list[str], optional
define the label to assign to each value, if an element in the
list is None, then a capture group is used. If labels itself is
None, it is assumed all matches have a label capture group.
callback : typing.Callable | None, optional
override the global per file callback for this instance
parser_func : typing.Callable | None, optional
provide a custom parsing function
parser_kwargs : typing.Dict | None, optional
parser_kwargs : dict | None, optional
arguments to include when running the specified custom parser
"""
if parser_func:
Expand All @@ -522,12 +528,11 @@ def tail( # noqa: C901

if parser_func and labels:
raise AssertionError(
"Cannot specify both labels and custom parser for monitor "
"method 'tail'"
"Cannot specify both labels and custom parser for monitor method 'tail'"
)

_tracked_values: typing.List[str | re.Pattern]
_labels: typing.List[str | None]
_tracked_values: list[str | re.Pattern]
_labels: list[str | None]

if tracked_values is None:
_tracked_values = []
Expand All @@ -550,7 +555,7 @@ def tail( # noqa: C901

if not _tracked_values or parser_func:
_reg_lab_expr_pairing: (
typing.List[typing.Tuple[str | None, re.Pattern[str] | str]] | None
list[typing.Tuple[str | None, re.Pattern[str] | str]] | None
) = None
else:
_labels = _labels or [None] * len(_tracked_values)
Expand All @@ -563,7 +568,7 @@ def tail( # noqa: C901
parser_kwargs["ignore_lines"] = skip_lines_w_pattern

if isinstance(path_glob_exprs, (str, re.Pattern)):
_parsing_dict: typing.Dict[str, typing.Any] = {
_parsing_dict: dict[str, typing.Any] = {
"glob_expr": path_glob_exprs,
"tracked_values": _reg_lab_expr_pairing,
"static": False,
Expand Down
18 changes: 9 additions & 9 deletions multiparser/parsing/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class methods.
def _wrapper(*args, input_file: str, **kwargs) -> TimeStampedData:
"""Full file parser decorator"""
_data: TimeStampedData = parser(*args, input_file=input_file, **kwargs)
_meta_data: typing.Dict[str, str] = {
_meta_data: dict[str, str] = {
"timestamp": datetime.datetime.fromtimestamp(
os.path.getmtime(input_file)
).strftime("%Y-%m-%d %H:%M:%S.%f"),
Expand Down Expand Up @@ -151,7 +151,7 @@ def record_toml(input_file: str) -> TimeStampedData:
return {}, toml.load(input_file)


SUFFIX_PARSERS: typing.Dict[typing.Tuple[str, ...], typing.Callable] = {
SUFFIX_PARSERS: dict[typing.Tuple[str, ...], typing.Callable] = {
("csv",): record_csv,
("pkl", "pickle", "pckl"): record_pickle,
("pqt", "parquet"): record_parquet,
Expand All @@ -165,8 +165,8 @@ def record_toml(input_file: str) -> TimeStampedData:

def _full_file_parse(parse_func, in_file, tracked_values, **parser_kwargs) -> TimeStampedData:
"""Apply specific parser to a file"""
_data: typing.List[typing.Dict[str, typing.Any]]
_meta: typing.Dict[str, typing.Any]
_data: list[dict[str, typing.Any]]
_meta: dict[str, typing.Any]
_parsed = parse_func(input_file=in_file, **parser_kwargs)
_meta, _data = _parsed

Expand All @@ -180,10 +180,10 @@ def _full_file_parse(parse_func, in_file, tracked_values, **parser_kwargs) -> Ti
return _parsed

# Filter by key through each set of values
_out_data: typing.List[typing.Dict[str, typing.Any]] = []
_out_data: list[dict[str, typing.Any]] = []

for entry in _data:
_out_data_entry: typing.Dict[str, typing.Any] = {}
_out_data_entry: dict[str, typing.Any] = {}
for tracked_val in tracked_values or []:
_out_data_entry |= {
k: v
Expand All @@ -199,7 +199,7 @@ def _full_file_parse(parse_func, in_file, tracked_values, **parser_kwargs) -> Ti
def record_file(
input_file: str,
*,
tracked_values: typing.List[re.Pattern[str]] | None,
tracked_values: list[re.Pattern[str]] | None,
parser_func: typing.Callable | None,
file_type: str | None,
**parser_kwargs
Expand All @@ -213,7 +213,7 @@ def record_file(
----------
input_file : str
the file to parse
tracked_values : typing.List[re.Pattern[str]] | None
tracked_values : list[re.Pattern[str]] | None
regular expressions defining the values to be monitored, by default None
parser_func : typing.Callable | None
a custom parser to use for the given file
Expand All @@ -235,7 +235,7 @@ def record_file(
if the given file type is not recognised
"""
_extension: str = file_type or os.path.splitext(input_file)[1].replace(".", "")
_tracked_vals: typing.List[re.Pattern[str]] | None = tracked_values or []
_tracked_vals: list[re.Pattern[str]] | None = tracked_values or []

if parser_func:
return _full_file_parse(parser_func, input_file, _tracked_vals, **parser_kwargs)
Expand Down
Loading