Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions cylc/flow/main_loop/log_tracemalloc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Profile Cylc with tracemalloc.

This takes tracemalloc snapshots periodically.

Snapshots are written into "~/cylc-run/<workflow>/tracemalloc/", to load them
for analysis, run:

tracemalloc.Snapshot.load('.../path/to/x.tracemalloc')

The memory diffs are written to stdout.
"""

from pathlib import Path
import tracemalloc

from cylc.flow import LOG
from cylc.flow.main_loop import periodic, shutdown, startup


@startup
async def init(scheduler, state):
"""Create the state object on startup."""
tracemalloc.start()
state['out_dir'] = Path(scheduler.workflow_run_dir, 'tracemalloc')
state['out_dir'].mkdir()
logfile = state['out_dir'] / 'log'
state['log'] = logfile.open('w+')
state['itt'] = 0
LOG.warning(f'Writing tracemalloc output to {logfile}')


@periodic
async def take_snapshot(scheduler, state, diff_filter='cylc/', max_length=20):
"""Take a memory snapshot and compare it to the previous one.

Args:
scheduler:
Unused in this plugin.
state:
The state object initialised in "init".
diff_filter:
If supplied, only changes containing this string will be displayed.
Used to restrict reporting to items which contain Cylc file paths.
max_length:
The top "max_length" items will be displayed with each summary.

"""
# take a snapshot
new = tracemalloc.take_snapshot()

# dump the snapshot to the filesystem
new.dump(state['out_dir'] / f'{state["itt"]}.tracemalloc')

# compare this snapshot to the previous one
if state.get('prev'):
# generate a list of the things which have changed
cmp = [
item
for item in new.compare_to(state['prev'], 'lineno')
# filter for the libraries we are interested in
if not diff_filter or diff_filter in str(item)
]

# print a summary of the memory change
print('+/-', sum(stat.size_diff for stat in cmp), file=state['log'])

# report the individual changes
for stat in sorted(cmp, key=lambda x: x.size_diff, reverse=True)[
:max_length
]:
if stat.size_diff != 0:
print(f' {stat}', file=state['log'])
print('', file=state['log'])

state['prev'] = new
state['itt'] += 1
state['log'].flush()


@shutdown
async def close_log(scheduler, state):
"""Close the log file on shutdown."""
state['log'].close()
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ cylc.main_loop =
log_main_loop = cylc.flow.main_loop.log_main_loop [main_loop-log_main_loop]
log_memory = cylc.flow.main_loop.log_memory [main_loop-log_memory]
reset_bad_hosts = cylc.flow.main_loop.reset_bad_hosts
log_tracemalloc = cylc.flow.main_loop.log_tracemalloc
# NOTE: all entry points should be listed here even if Cylc Flow does not
# provide any implementations, to make entry point scraping easier
cylc.pre_configure =
Expand Down
73 changes: 73 additions & 0 deletions tests/unit/main_loop/test_log_tracemalloc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from types import SimpleNamespace
from cylc.flow.main_loop.log_tracemalloc import take_snapshot, init, close_log

import pytest


# find the number of the MARKER line in this file
MARKER_LINE = None
with open(__file__, 'r') as this_file:
for line_number, line in enumerate(this_file):
print('$', line.strip())
print('#', line[-7:].strip())
if line[-7:] == 'MARKER\n':
MARKER_LINE = line_number + 1
break


@pytest.fixture
async def state(tmp_path):
"""A clean state object for this plugin."""
state = {}
await init(SimpleNamespace(workflow_run_dir=tmp_path), state)
return state


async def test_tracemalloc(tmp_path, state):
"""Test the tracemalloc plugin functionality."""
out_dir = tmp_path / 'tracemalloc'

# test the empty state object
assert state['itt'] == 0
assert len(list(out_dir.iterdir())) == 1 # the tracemalloc folder
assert state['log'].closed is False # the log file is open

# take a snapshot
await take_snapshot(None, state, diff_filter=None)
assert state['itt'] == 1, 'the iteration has been incremented'
assert len(list(out_dir.iterdir())) == 2, 'dump file has been written'

# allocate some memory
_memory = [x for x in range(100)] # MARKER

# take another snapshot
await take_snapshot(None, state, diff_filter=None)
assert state['itt'] == 2, 'the iteration has been incremented'
assert len(list(out_dir.iterdir())) == 3, 'dump file has been written'

# close the log file
await close_log(None, state)
assert state['log'].closed is True, 'log file has been closed'

# ensure the allocated memory appears in the log file
with open(out_dir / 'log', 'r') as tracemalloc_file:
tracemalloc_log = tracemalloc_file.read()
assert f'{__file__}:{MARKER_LINE}' in tracemalloc_log

del _memory # make linters happy
Loading