Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions multiparser/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def _exception_callback(
_exceptions: dict[str, Exception | None] = self._exceptions,
user_defined=user_callback,
abort_on_fail=self._shutdown_on_thread_failure,
abort_func=self.terminate,
abort_func=(lambda: self._monitor_termination_trigger.set()),
) -> None:
if user_defined:
user_defined(f"{type(exception).__name__}: '{exception.args[0]}'")
Expand Down Expand Up @@ -247,6 +247,7 @@ def _log_file_monitor_func(
self._interval,
self._flatten_data,
),
daemon=True,
)

self._log_monitor_thread = threading.Thread(
Expand All @@ -259,6 +260,7 @@ def _log_file_monitor_func(
self._interval,
self._flatten_data,
),
daemon=True,
)

def _check_custom_log_parser(
Expand Down Expand Up @@ -291,15 +293,20 @@ def _check_custom_log_parser(
try:
# Parsers are expected to have the keyword argument 'file_content'
_out = parser(
file_content=_test_str, __input_file=__file__, __read_bytes=None, **parser_kwargs
file_content=_test_str,
__input_file=__file__,
__read_bytes=None,
**parser_kwargs,
)

# If the custom parser returns a list of entries, not just one
if isinstance(_out, list):
_out = _out[0]

except Exception as e:
raise AssertionError(f"Custom parser testing failed for '{parser.__name__}' with exception:\n{e}")
raise AssertionError(
f"Custom parser testing failed for '{parser.__name__}' with exception:\n{e}"
)

if (
len(_out) != 2
Expand Down Expand Up @@ -430,8 +437,7 @@ def track(
raise AssertionError("Globular expression must be of type AnyStr")
glob.glob(_glob_ex)


def tail( # noqa: C901
def tail( # noqa: C901
self,
*,
path_glob_exprs: typing.List[str] | str,
Expand Down Expand Up @@ -522,8 +528,7 @@ def tail( # noqa: C901

if parser_func and labels:
raise AssertionError(
"Cannot specify both labels and custom parser for monitor "
"method 'tail'"
"Cannot specify both labels and custom parser for monitor method 'tail'"
)

_tracked_values: typing.List[str | re.Pattern]
Expand Down
11 changes: 6 additions & 5 deletions multiparser/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def _append_thread(
parser_func: typing.Callable | None = None,
file_type: str | None = None,
glob_expr: str | None = None,
**parser_kwargs
**parser_kwargs,
) -> None:
"""Create a new thread for a monitored file

Expand Down Expand Up @@ -355,9 +355,11 @@ def _read_loop(
"""Thread target function for parsing of detected file"""

_cached_metadata: typing.Dict[str, str | int] = {}

_terminated = False
try:
while not termination_trigger.is_set():
while not _terminated:
_terminated = termination_trigger.is_set()

time.sleep(interval)

# If the file does not exist yet then continue
Expand Down Expand Up @@ -398,8 +400,7 @@ def _read_loop(
exception_callback(exception=e)

self._file_threads[file_name] = threading.Thread(
target=_read_loop,
args=(self._records,),
target=_read_loop, args=(self._records,), daemon=True
)

@handle_monitor_thread_exception
Expand Down
49 changes: 34 additions & 15 deletions tests/test_data_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import importlib.util

import pytest
from conftest import fake_csv, fake_feather, fake_nml, fake_toml
from conftest import fake_csv, fake_feather, fake_nml, fake_parquet, fake_toml

import multiparser.parsing as mp_parse
from multiparser.parsing.file import (
file_parser,
record_csv as file_record_csv,
record_fortran_nml,
record_feather,
record_parquet,
record_toml,
)
from multiparser.parsing.tail import (
Expand All @@ -35,7 +35,9 @@ def test_parse_f90nml() -> None:
with tempfile.TemporaryDirectory() as temp_d:
_data_file = fake_nml(temp_d)
_meta, _data = record_fortran_nml(input_file=_data_file)
_, _data2 = mp_parse.record_file(_data_file, tracked_values=None, parser_func=None, file_type=None)
_, _data2 = mp_parse.record_file(
_data_file, tracked_values=None, parser_func=None, file_type=None
)
assert "timestamp" in _meta
assert list(sorted(_data.items())) == sorted(_data2.items())

Expand All @@ -45,7 +47,9 @@ def test_parse_csv() -> None:
with tempfile.TemporaryDirectory() as temp_d:
_data_file = fake_csv(temp_d)
_meta, _data = file_record_csv(input_file=_data_file)
_, _data2 = mp_parse.record_file(_data_file, tracked_values=None, parser_func=None, file_type=None)
_, _data2 = mp_parse.record_file(
_data_file, tracked_values=None, parser_func=None, file_type=None
)
assert "timestamp" in _meta
assert sorted([i.items() for i in _data]) == sorted([i.items() for i in _data2])

Expand All @@ -61,12 +65,25 @@ def test_parse_feather() -> None:
assert "timestamp" in _meta


@pytest.mark.parsing
@pytest.mark.skipif(
importlib.util.find_spec("pyarrow") is None, reason="Module 'pyarrow' not installed"
)
def test_parse_parquet() -> None:
with tempfile.TemporaryDirectory() as temp_d:
_data_file = fake_parquet(temp_d)
_meta, _ = record_parquet(input_file=_data_file)
assert "timestamp" in _meta


@pytest.mark.parsing
def test_parse_toml() -> None:
with tempfile.TemporaryDirectory() as temp_d:
_data_file = fake_toml(temp_d)
_meta, _data = record_toml(input_file=_data_file)
_, _data2 = mp_parse.record_file(_data_file, tracked_values=None, parser_func=None, file_type=None)
_, _data2 = mp_parse.record_file(
_data_file, tracked_values=None, parser_func=None, file_type=None
)
assert "timestamp" in _meta
assert list(sorted(_data.items())) == sorted(_data2.items())

Expand All @@ -77,7 +94,9 @@ def test_unrecognised_file_type() -> None:
with open(temp_f.name, "w") as out_f:
out_f.write("...")
with pytest.raises(TypeError):
mp_parse.record_file(temp_f.name, tracked_values=None, parser_func=None, file_type=None)
mp_parse.record_file(
temp_f.name, tracked_values=None, parser_func=None, file_type=None
)


@pytest.mark.parsing
Expand Down Expand Up @@ -151,7 +170,7 @@ def test_parse_delimited(fake_delimited_log, request, header) -> None:

for _ in range(10):
time.sleep(0.1)
_, _parsed_data = mp_parse.record_log(
_, _parsed_data = mp_parse.record_log(
input_file=_file,
tracked_values=None,
parser_func=record_with_delimiter,
Expand All @@ -175,9 +194,7 @@ def test_parse_delimited(fake_delimited_log, request, header) -> None:
(["1231.235", "3455.223", "45632.234", "34536.23"], None),
ids=("header", "no_header"),
)
@pytest.mark.parametrize(
"convert", (True, False)
)
@pytest.mark.parametrize("convert", (True, False))
def test_tail_csv(fake_delimited_log, header, convert) -> None:
_file = fake_delimited_log

Expand All @@ -189,7 +206,7 @@ def test_tail_csv(fake_delimited_log, header, convert) -> None:

for _ in range(10):
time.sleep(0.1)
_, _parsed_data = mp_parse.record_log(
_, _parsed_data = mp_parse.record_log(
input_file=_file,
convert=convert,
tracked_values=None,
Expand All @@ -209,12 +226,12 @@ def test_flattening() -> None:

@pytest.mark.parsing
@pytest.mark.parametrize(
"regex_result", ("value", ("value",), ("value", "label"), ("value", "label", "surplus")),
ids=("string_result", "single_result", "two_results", "three_results")
"regex_result",
("value", ("value",), ("value", "label"), ("value", "label", "surplus")),
ids=("string_result", "single_result", "two_results", "three_results"),
)
@pytest.mark.parametrize("label", (None, "label"), ids=("no_label", "label"))
def test_label_value_extraction(regex_result: tuple, label: str | None) -> None:

if isinstance(regex_result, str) and not label:
with pytest.raises(ValueError) as e:
_extract_label_value_pair(
Expand All @@ -234,7 +251,9 @@ def test_label_value_extraction(regex_result: tuple, label: str | None) -> None:
tracked_val=re.compile("undefined"),
type_descriptor="NoType",
)
assert "Expected label for regex with only single matching entry" in str(e.value)
assert "Expected label for regex with only single matching entry" in str(
e.value
)
else:
_extract_label_value_pair(
regex_result,
Expand Down
Loading
Loading