diff --git a/contact_map/__init__.py b/contact_map/__init__.py index b46f5b6..d3f3f44 100644 --- a/contact_map/__init__.py +++ b/contact_map/__init__.py @@ -11,6 +11,8 @@ from .contact_count import ContactCount +from .contact_trajectory import ContactTrajectory, RollingContactFrequency + from .min_dist import NearestAtoms, MinimumDistanceCounter from .concurrence import ( diff --git a/contact_map/contact_map.py b/contact_map/contact_map.py index 428b87e..6af9b6f 100644 --- a/contact_map/contact_map.py +++ b/contact_map/contact_map.py @@ -174,6 +174,24 @@ def __init__(self, topology, query, haystack, cutoff, n_neighbors_ignored): } self._atom_idx_to_residue_idx = self._set_atom_idx_to_residue_idx() + @classmethod + def from_contacts(cls, atom_contacts, residue_contacts, topology, + query=None, haystack=None, cutoff=0.45, + n_neighbors_ignored=2): + obj = cls.__new__(cls) + super(cls, obj).__init__(topology, query, haystack, cutoff, + n_neighbors_ignored) + + def get_contact_counter(contact): + if isinstance(contact, ContactCount): + return contact.counter + else: + return contact + + obj._atom_contacts = get_contact_counter(atom_contacts) + obj._residue_contacts = get_contact_counter(residue_contacts) + return obj + def _set_atom_slice(self): """ Set atom slice logic """ if (self._class_use_atom_slice is None and @@ -774,6 +792,17 @@ def __init__(self, trajectory, query=None, haystack=None, cutoff=0.45, contacts = self._build_contact_map(trajectory) (self._atom_contacts, self._residue_contacts) = contacts + @classmethod + def from_contacts(cls, atom_contacts, residue_contacts, n_frames, + topology, query=None, haystack=None, cutoff=0.45, + n_neighbors_ignored=2): + obj = super(ContactFrequency, cls).from_contacts( + atom_contacts, residue_contacts, topology, query, haystack, + cutoff, n_neighbors_ignored + ) + obj._n_frames = n_frames + return obj + def __hash__(self): return hash((super(ContactFrequency, self).__hash__(), tuple(self._atom_contacts.items()), @@ -951,6 +980,10 @@ def __sub__(self, other): def contact_map(self, *args, **kwargs): #pylint: disable=W0221 raise NotImplementedError + @classmethod + def from_contacts(self, *args, **kwargs): #pylint: disable=W0221 + raise NotImplementedError + @property def atom_contacts(self): n_x = self.topology.n_atoms diff --git a/contact_map/contact_trajectory.py b/contact_map/contact_trajectory.py new file mode 100644 index 0000000..3bda7c8 --- /dev/null +++ b/contact_map/contact_trajectory.py @@ -0,0 +1,368 @@ +from collections import abc, Counter + +from .contact_map import ContactFrequency, ContactObject +import json + +class ContactTrajectory(ContactObject, abc.Sequence): + """Track all the contacts over a trajectory, frame-by-frame. + + Internally, this has a single-frame :class:`.ContactFrequency` for each + frame of the trajectory. + + Parameters + ---------- + trajectory : mdtraj.Trajectory + the trajectory to calculate contacts for + query : list of int + Indices of the atoms to be included as query. Default ``None`` + means all heavy, non-water atoms. + haystack : list of int + Indices of the atoms to be included as haystack. Default ``None`` + means all heavy, non-water atoms. + cutoff : float + Cutoff distance for contacts, in nanometers. Default 0.45. + n_neighbors_ignored : int + Number of neighboring residues (in the same chain) to ignore. + Default 2. + """ + _class_use_atom_slice = None + def __init__(self, trajectory, query=None, haystack=None, cutoff=0.45, + n_neighbors_ignored=2): + super(ContactTrajectory, self).__init__(trajectory.topology, query, + haystack, cutoff, + n_neighbors_ignored) + contacts = self._build_contacts(trajectory) + self._contact_maps = [ + ContactFrequency.from_contacts( + topology=self.topology, + query=self.query, + haystack=self.haystack, + cutoff=self.cutoff, + n_neighbors_ignored=self.n_neighbors_ignored, + atom_contacts=atom_contacts, + residue_contacts=residue_contacts, + n_frames=1 + ) + for atom_contacts, residue_contacts in zip(*contacts) + ] + + def __getitem__(self, num): + return self._contact_maps[num] + + def __len__(self): + return len(self._contact_maps) + + def __hash__(self): + return hash((super(ContactTrajectory, self).__hash__(), + tuple([frozenset(frame.counter.items()) + for frame in self.atom_contacts]), + tuple([frozenset(frame.counter.items()) + for frame in self.residue_contacts]))) + + def __eq__(self, other): + return hash(self) == hash(other) + + @classmethod + def from_contacts(cls, atom_contacts, residue_contacts, topology, + query=None, haystack=None, cutoff=0.45, + n_neighbors_ignored=2): + contact_maps = [ + ContactFrequency.from_contacts( + atom_cs, + res_cs, + n_frames=1, + topology=topology, + query=query, + haystack=haystack, + cutoff=cutoff, + n_neighbors_ignored=n_neighbors_ignored + ) + for atom_cs, res_cs in zip(atom_contacts, residue_contacts) + ] + return cls.from_contact_maps(contact_maps) + + def _build_contacts(self, trajectory): + # atom_contacts, residue_contacts = self._empty_contacts() + atom_contacts = [] + residue_contacts = [] + + residue_ignore_atom_idxs = self.residue_ignore_atom_idxs + residue_query_atom_idxs = self.residue_query_atom_idxs + used_trajectory = self.slice_trajectory(trajectory) + + # range(len(trajectory)) avoids recopying topology, as would occur + # in `for frame in trajectory` + for frame_num in range(len(trajectory)): + frame_contacts = self.contact_map(used_trajectory, frame_num, + residue_query_atom_idxs, + residue_ignore_atom_idxs) + frame_atom_contacts, frame_residue_contacts = frame_contacts + frame_atom_contacts = \ + self.convert_atom_contacts(frame_atom_contacts) + # TODO unify contact building with something like this? + # atom_contacts, residue_contact = self._update_contacts(...) + atom_contacts.append(frame_atom_contacts) + residue_contacts.append(frame_residue_contacts) + return atom_contacts, residue_contacts + + def contact_frequency(self): + """Create a :class:`.ContactFrequency` from this contact trajectory + """ + freq = ContactFrequency.from_contacts( + atom_contacts=Counter(), + residue_contacts=Counter(), + n_frames=0, + topology=self.topology, + query=self.query, + haystack=self.haystack, + cutoff=self.cutoff, + n_neighbors_ignored=self.n_neighbors_ignored + ) + for cmap in self._contact_maps: + # TODO: skipping compatibility checks would help performance; we + # know that everything in here *should* be compatible + freq.add_contact_frequency(cmap) + + return freq + + def to_dict(self): + return { + 'contact_maps': [cmap.to_dict() for cmap in self._contact_maps] + } + + @classmethod + def from_dict(cls, dct): + contact_maps = [ContactFrequency.from_dict(cmap) + for cmap in dct['contact_maps']] + obj = cls.from_contact_maps(contact_maps) + return obj + + @property + def atom_contacts(self): + return [cmap.atom_contacts for cmap in self._contact_maps] + + @property + def residue_contacts(self): + return [cmap.residue_contacts for cmap in self._contact_maps] + + @classmethod + def from_contact_maps(cls, maps): + obj = cls.__new__(cls) + super(cls, obj).__init__(maps[0].topology, maps[0].query, + maps[0].haystack, maps[0].cutoff, + maps[0].n_neighbors_ignored) + + for cmap in maps: + obj._check_compatibility(cmap) + + obj._contact_maps = maps + return obj + + @classmethod + def join(cls, others): + """Concatenate ContactTrajectory instances + + Parameters + ---------- + others : List[:class:.ContactTrajectory] + contact trajectories to concatenate + + Returns + ------- + :class:`.ContactTrajectory` : + concatenated contact trajectory + """ + contact_maps = sum([o._contact_maps for o in others], []) + return cls.from_contact_maps(contact_maps) + + def rolling_frequency(self, window_size=1, step=1): + """:class:`.RollingContactFrequency` iterator for this trajectory + + Parameters + ---------- + window_size : int + the number of frames in the window + step : int + the number of frames between successive starting points of the + window (like the ``step`` parameter in a Python slice object) + + Returns + ------- + :class:`.RollingContactFrequency` : + windowed iterator for this trajectory + """ + return RollingContactFrequency(self, width=window_size, step=step) + + +class MutableContactTrajectory(ContactTrajectory, abc.MutableSequence): + """Mutable version of :class:`.ContactTrajectory` + + Parameters + ---------- + trajectory : mdtraj.Trajectory + the trajectory to calculate contacts for + query : list of int + Indices of the atoms to be included as query. Default ``None`` + means all heavy, non-water atoms. + haystack : list of int + Indices of the atoms to be included as haystack. Default ``None`` + means all heavy, non-water atoms. + cutoff : float + Cutoff distance for contacts, in nanometers. Default 0.45. + n_neighbors_ignored : int + Number of neighboring residues (in the same chain) to ignore. + Default 2. + + """ + def __setitem__(self, key, value): + self._contact_maps[key] = value + + def __delitem__(self, key): + del self._contact_maps[key] + + def insert(self, key, value): + self._contact_maps.insert(key, value) + + def __hash__(self): + # mutable objects must have unique hashes + return id(self) + + +class WindowedIterator(abc.Iterator): + """ + Helper for windowed ("rolling average") iterators. + + The idea is that this is an easy and reusable code for getting windowed + quantitiies such as needed for rolling averages. This iterator itself + just returns sets of indices/slices to add/remove from whatever counter + is being tracked. The idea is that it will be used inside of another + iterator. + + + Parameters + ---------- + length : int + the length of the list windowed over + width : int + the number of items in the window + step : int + the number of items skipped between successive windows (as with the + ``step`` parameter in slices) + slow_build : bool + if True, the iterator builds up the window "step" objects at a time. + Otherwise, the first value is the full width of the window. + + Attributes + ---------- + min : int + the index of the first object in the cached window + max : int + the index of the last object in the cached window (note that this is + included in the window, unlike Python slices) + """ + def __init__(self, length, width, step, slow_build): + self.length = length + self.width = width + self.step = step + self.slow_build = slow_build + self.min = -1 + self.max = -1 + + def _startup(self): + to_sub = slice(0, 0) + self.min = max(self.min, 0) + if self.slow_build: + to_add = slice(self.max + 1, self.max + self.step + 1) + self.max += self.step + else: + self.max = self.width - 1 + to_add = slice(self.min, self.max + 1) + return to_add, to_sub + + def _normal(self): + self.min = max(0, self.min) + new_max = self.max + self.step + + if not self.slow_build: + new_max = max(new_max, self.width - 1) + + new_min = max(self.min, new_max - self.width + 1) + + to_sub = slice(self.min, new_min) + to_add = slice(self.max + 1, new_max + 1) + self.min = new_min + self.max = new_max + return to_add, to_sub + + def __next__(self): + # if self.max + self.step < self.width: + # to_add, to_sub = self._startup() + if self.max + self.step < self.length: + to_add, to_sub = self._normal() + else: + raise StopIteration + + return to_add, to_sub + + +class RollingContactFrequency(abc.Iterator): + """Iterator for "rolling-average" contact frequencies over a trajectory + + Parameters + ---------- + contact_trajectory : :class:`.ContactTrajectory` + input trajectory + width : int + the number of frames in the window + step : int + the number of frames between successive starting points of the + window (like the ``step`` parameter in a Python slice object) + """ + + _slow_build_iter = False + + def __init__(self, contact_trajectory, width=1, step=1): + self.trajectory = contact_trajectory + self.width = width + self.step = step + self.slow_build_iter = self._slow_build_iter + self._window_iter = None + self._contact_map = None + + def __iter__(self): + self._window_iter = WindowedIterator(length=len(self.trajectory), + width=self.width, + step=self.step, + slow_build=self.slow_build_iter) + self._contact_map = ContactFrequency.from_contacts( + Counter(), Counter(), + topology=self.trajectory.topology, + query=self.trajectory.query, + haystack=self.trajectory.haystack, + cutoff=self.trajectory.cutoff, + n_neighbors_ignored=self.trajectory.n_neighbors_ignored, + n_frames=0 + ) + return self + + def __next__(self): + to_add, to_sub = next(self._window_iter) + for frame in self.trajectory[to_add]: + self._contact_map.add_contact_frequency(frame) + for frame in self.trajectory[to_sub]: + self._contact_map.subtract_contact_frequency(frame) + + # need to make a copy in case the user does list(rolling_freq), + # otherwise they get copies of only the last version! + cmap = self._contact_map + map_copy = ContactFrequency.from_contacts( + cmap._atom_contacts.copy(), + cmap._residue_contacts.copy(), + topology=cmap.topology, + query=cmap.query, + haystack=cmap.haystack, + cutoff=cmap.cutoff, + n_neighbors_ignored=cmap.n_neighbors_ignored, + n_frames=cmap.n_frames + ) + return map_copy diff --git a/contact_map/tests/test_contact_map.py b/contact_map/tests/test_contact_map.py index 2d02445..11a2c58 100644 --- a/contact_map/tests/test_contact_map.py +++ b/contact_map/tests/test_contact_map.py @@ -12,7 +12,7 @@ # stuff to be testing in this file from contact_map.contact_map import * -from contact_map.contact_count import HAS_MATPLOTLIB +from contact_map.contact_count import HAS_MATPLOTLIB, ContactCount traj = md.load(find_testfile("trajectory.pdb")) @@ -158,6 +158,25 @@ def test_counters(self, idx): assert m._residue_contacts == expected assert m.residue_contacts.counter == expected + @pytest.mark.parametrize('contactcount', [True, False]) + def test_from_contacts(self, idx, contactcount): + expected = self.maps[idx] + atom_contact_list = self.expected_atom_contacts[expected] + residue_contact_list = self.expected_residue_contacts[expected] + atom_contacts = counter_of_inner_list(atom_contact_list) + residue_contacts = counter_of_inner_list(residue_contact_list) + if contactcount: + atom_contacts = ContactCount(atom_contacts, self.topology.atom, + 10, 10) + residue_contacts = ContactCount(residue_contacts, + self.topology.residue, 5, 5) + + cmap = ContactMap.from_contacts(atom_contacts, residue_contacts, + topology=self.topology, + cutoff=0.075, + n_neighbors_ignored=0) + _contact_object_compare(cmap, expected) + def test_to_dict(self, idx): m = self.maps[idx] dct = m.to_dict() @@ -374,6 +393,25 @@ def test_counters(self): def test_contacts_dict(self): _check_contacts_dict_names(self.map) + @pytest.mark.parametrize('contactcount', [True, False]) + def test_from_contacts(self, contactcount): + atom_contacts = self.expected_atom_contact_count + residue_contacts = self.expected_residue_contact_count + top = traj.topology + if contactcount: + atom_contacts = ContactCount(atom_contacts, top.atom, + 10, 10) + residue_contacts = ContactCount(residue_contacts, top.residue, + 5, 5) + + cmap = ContactFrequency.from_contacts(atom_contacts, + residue_contacts, + n_frames=5, + topology=top, + cutoff=0.075, + n_neighbors_ignored=0) + _contact_object_compare(cmap, self.map) + def test_check_compatibility_true(self): map2 = ContactFrequency(trajectory=traj[0:2], cutoff=0.075, diff --git a/contact_map/tests/test_contact_trajectory.py b/contact_map/tests/test_contact_trajectory.py new file mode 100644 index 0000000..c9575a4 --- /dev/null +++ b/contact_map/tests/test_contact_trajectory.py @@ -0,0 +1,348 @@ +# pylint: disable=wildcard-import, missing-docstring, protected-access +# pylint: disable=attribute-defined-outside-init, invalid-name, no-self-use +# pylint: disable=wrong-import-order, unused-wildcard-import + +from .utils import * +from .test_contact_map import ( + counter_of_inner_list, _contact_object_compare, traj_atom_contact_count, + traj_residue_contact_count +) + +import mdtraj as md + +from contact_map.contact_trajectory import * +from contact_map.contact_count import ContactCount + +TRAJ_ATOM_CONTACTS = [ + [[1, 4], [4, 6], [5, 6]], + [[1, 5], [4, 6], [5, 6]], + [[1, 4], [4, 6], [5, 6]], + [[1, 4], [4, 6], [4, 7], [5, 6], [5, 7]], + [[0, 9], [0, 8], [1, 8], [1, 9], [1, 4], [8, 4], [8, 5], [4, 6], [4, 7], + [5, 6], [5, 7]] +] + +TRAJ_RES_CONTACTS = [ + [[0, 2], [2, 3]], + [[0, 2], [2, 3]], + [[0, 2], [2, 3]], + [[0, 2], [2, 3]], + [[0, 2], [2, 3], [0, 4], [2, 4]] +] + +class TestContactTrajectory(object): + def setup(self): + self.traj = md.load(find_testfile("trajectory.pdb")) + self.map = ContactTrajectory(self.traj, cutoff=0.075, + n_neighbors_ignored=0) + self.expected_atom_contacts = TRAJ_ATOM_CONTACTS + self.expected_residue_contacts = TRAJ_RES_CONTACTS + + @pytest.mark.parametrize('contact_type', ['atom', 'residue']) + def test_contacts(self, contact_type): + assert len(self.map) == 5 + contacts = {'atom': self.map.atom_contacts, + 'residue': self.map.residue_contacts}[contact_type] + expected = {'atom': self.expected_atom_contacts, + 'residue': self.expected_residue_contacts}[contact_type] + + for contact, expect in zip(contacts, expected): + expected_counter = counter_of_inner_list(expect) + assert contact.counter == expected_counter + + @pytest.mark.parametrize('contact_type', ['atom', 'residue']) + def test_contacts_sliced(self, contact_type): + selected_atoms = [2, 3, 4, 5, 6, 7, 8, 9] + cmap = ContactTrajectory(self.traj, query=selected_atoms, + haystack=selected_atoms, cutoff=0.075, + n_neighbors_ignored=0) + contacts = {'atom': cmap.atom_contacts, + 'residue': cmap.residue_contacts}[contact_type] + expected = { + 'atom': [ + [[4, 6], [5, 6]], + [[4, 6], [5, 6]], + [[4, 6], [5, 6]], + [[4, 6], [4, 7], [5, 6], [5, 7]], + [[8, 4], [8, 5], [4, 6], [4, 7], [5, 6], [5, 7]] + ], + 'residue': [ + [[2, 3]], + [[2, 3]], + [[2, 3]], + [[2, 3]], + [[2, 3], [2, 4]] + ] + }[contact_type] + + for contact, expect in zip(contacts, expected): + expected_counter = counter_of_inner_list(expect) + assert contact.counter == expected_counter + + + @pytest.mark.parametrize('contactcount', [True, False]) + def test_from_contacts(self, contactcount): + atom_contacts = [ + counter_of_inner_list(frame_contacts) + for frame_contacts in self.expected_atom_contacts + ] + residue_contacts = [ + counter_of_inner_list(frame_contacts) + for frame_contacts in self.expected_residue_contacts + ] + top = self.traj.topology + if contactcount: + atom_contacts = [ContactCount(contact, top.atom, 10, 10) + for contact in atom_contacts] + residue_contacts = [ContactCount(contact, top.residue, 5, 5) + for contact in residue_contacts] + + cmap = ContactTrajectory.from_contacts(atom_contacts, + residue_contacts, + topology=top, + cutoff=0.075, + n_neighbors_ignored=0) + for truth, beauty in zip(self.map, cmap): + _contact_object_compare(truth, beauty) + assert truth == beauty + _contact_object_compare(cmap, self.map) + assert cmap == self.map + + + def test_contact_frequency(self): + freq = self.map.contact_frequency() + expected_atom_count = { + key: val / 5.0 for key, val in traj_atom_contact_count.items() + } + expected_res_count = { + key: val / 5.0 + for key, val in traj_residue_contact_count.items() + } + assert freq.atom_contacts.counter == expected_atom_count + assert freq.residue_contacts.counter == expected_res_count + + @pytest.mark.parametrize("intermediate", ["dict", "json"]) + def test_serialization_cycle(self, intermediate): + # NOTE: this is identical to TestContactFrequency; can probably + # abstract it out + serializer, deserializer = { + 'json': (self.map.to_json, ContactTrajectory.from_json), + 'dict': (self.map.to_dict, ContactTrajectory.from_dict) + }[intermediate] + + serialized = serializer() + reloaded = deserializer(serialized) + _contact_object_compare(self.map, reloaded) + assert self.map == reloaded + + def test_from_contact_maps(self): + maps = [ContactFrequency(frame, cutoff=0.075, n_neighbors_ignored=0) + for frame in self.traj] + cmap = ContactTrajectory.from_contact_maps(maps) + _contact_object_compare(self.map, cmap) + assert self.map == cmap + + def test_from_contact_maps_incompatible(self): + map0 = ContactFrequency(self.traj[0], cutoff=0.075, + n_neighbors_ignored=0) + maps = [map0] + [ContactFrequency(frame) for frame in self.traj[1:]] + with pytest.raises(AssertionError): + _ = ContactTrajectory.from_contact_maps(maps) + + def test_join(self): + segments = self.traj[0], self.traj[1:3], self.traj[3:] + assert [len(s) for s in segments] == [1, 2, 2] + assert md.join(segments) == self.traj + + cmaps = [ContactTrajectory(segment, cutoff=0.075, + n_neighbors_ignored=0) + for segment in segments] + + cmap = ContactTrajectory.join(cmaps) + + assert len(cmap) == len(self.map) + for i, (truth, beauty) in enumerate(zip(self.map, cmap)): + _contact_object_compare(truth, beauty) + assert truth == beauty + + _contact_object_compare(self.map, cmap) + assert self.map == cmap + + def test_rolling_frequency(self): + # smoke test; correctness is tested in tests for + # RollingContactFrequency + assert len(list(self.map.rolling_frequency(window_size=2))) == 4 + + +class TestMutableContactTrajectory(object): + def setup(self): + self.traj = md.load(find_testfile("trajectory.pdb")) + self.map = MutableContactTrajectory(self.traj, cutoff=0.075, + n_neighbors_ignored=0) + self.expected_atom_contacts = TRAJ_ATOM_CONTACTS.copy() + self.expected_residue_contacts = TRAJ_RES_CONTACTS.copy() + + def _test_expected_contacts(self, traj_map, exp_atoms, exp_res): + for cmap, exp_a, exp_r in zip(traj_map, exp_atoms, exp_res): + atom_counter = cmap.atom_contacts.counter + res_counter = cmap.residue_contacts.counter + assert atom_counter == counter_of_inner_list(exp_a) + assert res_counter == counter_of_inner_list(exp_r) + + def test_setitem(self): + cmap4 = ContactFrequency(self.traj[4], cutoff=0.075, + n_neighbors_ignored=0) + self.map[1] = cmap4 + expected_atoms = self.expected_atom_contacts + expected_atoms[1] = expected_atoms[4] + expected_res = self.expected_residue_contacts + expected_res[1] = expected_res[4] + self._test_expected_contacts(self.map, expected_atoms, expected_res) + + def test_delitem(self): + del self.map[1] + assert len(self.map) == 4 + expected_atoms = (self.expected_atom_contacts[:1] + + self.expected_atom_contacts[2:]) + expected_res = (self.expected_residue_contacts[:1] + + self.expected_residue_contacts[2:]) + self._test_expected_contacts(self.map, expected_atoms, expected_res) + + def test_insert(self): + cmap4 = self.map[4] + self.map.insert(0, cmap4) + expected_atoms = [TRAJ_ATOM_CONTACTS[4]] + TRAJ_ATOM_CONTACTS + expected_res = [TRAJ_RES_CONTACTS[4]] + TRAJ_RES_CONTACTS + self._test_expected_contacts(self.map, expected_atoms, expected_res) + + def test_hash_eq(self): + cmap = MutableContactTrajectory(self.traj, cutoff=0.075, + n_neighbors_ignored=0) + assert hash(cmap) != hash(self.map) + assert cmap != self.map + + +class TestWindowedIterator(object): + def setup(self): + self.iter = WindowedIterator(length=10, width=3, step=2, + slow_build=False) + + def test_startup_normal(self): + to_add, to_sub = self.iter._startup() + assert to_sub == slice(0, 0) + assert to_add == slice(0, 3) + assert self.iter.min == 0 + assert self.iter.max == 2 + + def test_startup_slow_build_step1(self): + itr = WindowedIterator(length=10, width=3, step=1, slow_build=True) + to_add, to_sub = itr._startup() + assert to_sub == slice(0, 0) + assert to_add == slice(0, 1) + assert itr.min == 0 + assert itr.max == 0 + + to_add, to_sub = itr._startup() + assert to_sub == slice(0, 0) + assert to_add == slice(1, 2) + assert itr.min == 0 + assert itr.max == 1 + + def test_normal(self): + self.iter.min = 0 + self.iter.max = 2 + to_add, to_sub = self.iter._normal() + assert to_sub == slice(0, 2) + assert to_add == slice(3, 5) + assert self.iter.min == 2 + assert self.iter.max == 4 + + @pytest.mark.parametrize('length,width,step,slow_build,expected', [ + (5, 3, 2, False, [(slice(0, 0), slice(0, 3), 0, 2), + (slice(0, 2), slice(3, 5), 2, 4)]), + (5, 3, 1, True, [(slice(0, 0), slice(0, 1), 0, 0), + (slice(0, 0), slice(1, 2), 0, 1), + (slice(0, 0), slice(2, 3), 0, 2), + (slice(0, 1), slice(3, 4), 1, 3), + (slice(1, 2), slice(4, 5), 2, 4)]), + (5, 3, 2, True, [(slice(0, 0), slice(0, 2), 0, 1), + (slice(0, 1), slice(2, 4), 1, 3)]), + (6, 3, 3, False, [(slice(0, 0), slice(0, 3), 0, 2), + (slice(0, 3), slice(3, 6), 3, 5)]), + (6, 3, 3, True, [(slice(0, 0), slice(0, 3), 0, 2), + (slice(0, 3), slice(3, 6), 3, 5)]), + ]) + def test_next(self, length, width, step, slow_build, expected): + itr = WindowedIterator(length, width, step, slow_build) + for expect in expected: + exp_sub, exp_add, exp_min, exp_max = expect + to_add, to_sub = next(itr) + assert to_add == exp_add + assert to_sub == exp_sub + assert itr.min == exp_min + assert itr.max == exp_max + with pytest.raises(StopIteration): + next(itr) + + +class TestRollingContactFrequency(object): + def setup(self): + self.traj = md.load(find_testfile("trajectory.pdb")) + self.map = ContactTrajectory(self.traj, cutoff=0.075, + n_neighbors_ignored=0) + self.rolling_freq = RollingContactFrequency(self.map, width=2, + step=1) + self.expected_atoms = [ + {frozenset([1, 4]): 0.5, frozenset([4, 6]): 1.0, + frozenset([5, 6]): 1.0, frozenset([1, 5]): 0.5}, + {frozenset([1, 5]): 0.5, frozenset([4, 6]): 1.0, + frozenset([5, 6]): 1.0, frozenset([1, 4]): 0.5}, + {frozenset([1, 4]): 1.0, frozenset([4, 6]): 1.0, + frozenset([5, 6]): 1.0, frozenset([4, 7]): 0.5, + frozenset([5, 7]): 0.5}, + {frozenset([0, 9]): 0.5, frozenset([0, 8]): 0.5, + frozenset([1, 8]): 0.5, frozenset([1, 9]): 0.5, + frozenset([1, 4]): 1.0, frozenset([8, 4]): 0.5, + frozenset([8, 5]): 0.5, frozenset([4, 6]): 1.0, + frozenset([4, 7]): 1.0, frozenset([5, 6]): 1.0, + frozenset([5, 7]): 1.0} + ] + self.expected_residues = [ + {frozenset([0, 2]): 1.0, frozenset([2, 3]): 1.0}, + {frozenset([0, 2]): 1.0, frozenset([2, 3]): 1.0}, + {frozenset([0, 2]): 1.0, frozenset([2, 3]): 1.0}, + {frozenset([0, 2]): 1.0, frozenset([2, 3]): 1.0, + frozenset([0, 4]): 0.5, frozenset([2, 4]): 0.5} + ] + + def test_normal_iteration(self): + results = list(freq for freq in self.rolling_freq) + assert len(results) == 4 + + atom_contacts = [r.atom_contacts.counter for r in results] + for beauty, truth in zip(atom_contacts, self.expected_atoms): + assert beauty == truth + + residue_contacts = [r.residue_contacts.counter for r in results] + for beauty, truth in zip(residue_contacts, self.expected_residues): + assert beauty == truth + + def test_slow_build_iteration(self): + self.rolling_freq.slow_build_iter = True + results = list(freq for freq in self.rolling_freq) + assert len(results) == 5 + + expected_atoms = [{frozenset([1, 4]): 1.0, frozenset([4, 6]): 1.0, + frozenset([5, 6]): 1.0}] + self.expected_atoms + expected_residues = ( + [{frozenset([0, 2]): 1.0, frozenset([2, 3]): 1.0}] + + self.expected_residues + ) + + atom_contacts = [r.atom_contacts.counter for r in results] + for beauty, truth in zip(atom_contacts, expected_atoms): + assert beauty == truth + + residue_contacts = [r.residue_contacts.counter for r in results] + for beauty, truth in zip(residue_contacts, expected_residues): + assert beauty == truth diff --git a/docs/api.rst b/docs/api.rst index 57e8502..7d0884a 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -17,6 +17,8 @@ Contact maps ContactMap ContactFrequency ContactDifference + ContactTrajectory + RollingContactFrequency Contact Concurrences -------------------- diff --git a/examples/contact_trajectory.ipynb b/examples/contact_trajectory.ipynb new file mode 100644 index 0000000..93592f4 --- /dev/null +++ b/examples/contact_trajectory.ipynb @@ -0,0 +1,342 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Contact Trajectories\n", + "\n", + "Sometimes you're interested in how contacts evolve in a trajectory, frame-by-frame. Contact Map Explorer provides the `ContactTrajectory` class for this purpose.\n", + "\n", + "We'll look at this using a trajectory of a specific inhibitor during its binding process to GSK3B. This system is also studied in the notebook on contact concurrences (with very similar initial discussion)." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n" + ] + } + ], + "source": [ + "from __future__ import print_function\n", + "%matplotlib inline\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "\n", + "from contact_map import ContactTrajectory, RollingContactFrequency\n", + "import mdtraj as md\n", + "traj = md.load(\"data/gsk3b_example.h5\")\n", + "print(traj) # to see number of frames; size of system" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First, we'll use MDTraj's [atom selection language](http://mdtraj.org/latest/atom_selection.html) to split out the protein and the ligand, which has residue name YYG in the input files. We're only interested in contacts between the protein and the ligand (not contacts within the protein). We'll also only look at heavy atom contacts." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "topology = traj.topology\n", + "yyg = topology.select('resname YYG and element != \"H\"')\n", + "protein = topology.select('protein and element != \"H\"')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Making an accessing a contact trajectory\n", + "\n", + "Contact trajectories have the same keyword arguments as other contact objects" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "contacts = ContactTrajectory(traj, query=yyg, haystack=protein)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Once the `ContactTrajectory` has been made, contacts for individual frames can be accessed either by taking the index of the `ContactTrajectory` itself, or by getting the list of contact (e.g., all the residue contacts frame-by-frame) and selecting the frame of interest." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[([YYG351, SER32], 1.0), ([YYG351, GLY31], 1.0), ([ASN30, YYG351], 1.0)]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "contacts[0].residue_contacts.most_common()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[([YYG351, SER32], 1.0), ([YYG351, GLY31], 1.0), ([ASN30, YYG351], 1.0)]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "contacts.residue_contacts[0].most_common()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Advanced Python indexing is also allowed. In this example, note how the most common partners for YYG change! This is also what we see in the contact concurrences example." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[([VAL27, YYG351], 1.0), ([ARG107, YYG351], 1.0), ([ILE28, YYG351], 1.0)]\n", + "[([VAL27, YYG351], 1.0), ([ILE28, YYG351], 1.0), ([GLN151, YYG351], 1.0)]\n", + "[([VAL27, YYG351], 1.0), ([ASN30, YYG351], 1.0), ([GLY34, YYG351], 1.0)]\n", + "[([ASP166, YYG351], 1.0), ([PHE33, YYG351], 1.0), ([LYS149, YYG351], 1.0)]\n", + "[([YYG351, SER32], 1.0), ([VAL53, YYG351], 1.0), ([PHE33, YYG351], 1.0)]\n", + "[([GLU63, YYG351], 1.0), ([VAL53, YYG351], 1.0), ([PHE33, YYG351], 1.0)]\n", + "[([ASP166, YYG351], 1.0), ([VAL53, YYG351], 1.0), ([PHE33, YYG351], 1.0)]\n", + "[([YYG351, GLY168], 1.0), ([YYG351, SER32], 1.0), ([ASP166, YYG351], 1.0)]\n" + ] + } + ], + "source": [ + "for contact in contacts[50:80:4]:\n", + " print(contact.residue_contacts.most_common()[:3])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can easily turn the `ContactTrajectory` into `ContactFrequency`:" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "needs_background": "light" + }, + "output_type": "display_data" + } + ], + "source": [ + "freq = contacts.contact_frequency()\n", + "\n", + "fig, ax = plt.subplots(figsize=(5.5,5))\n", + "freq.residue_contacts.plot_axes(ax=ax)\n", + "ax.set_xlim(*contacts.query_residue_range);" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Rolling Contact Frequencies\n", + "\n", + "A `ContactTrajectory` keeps all the time-dependent information about the contacts, whereas a `ContactFrequency`, as plotted above, loses all of it. What about something in between? For this, we have a `RollingContactFrequency`, which acts like a rolling average. It creates a contact frequency over a certain window of frames, with a certain step size between each window.\n", + "\n", + "This can be created either with the `RollingContactFrequency` object, or, more easily, with the `ContactTrajectory.rolling_frequency()` method." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "RollingContactFrequency(contacts, width=30, step=14)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "rolling_frequencies = contacts.rolling_frequency(window_size=30, step=14)\n", + "rolling_frequencies" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we'll plot each windowed frequency, and we will see the transition as some contacts fade out and others grow in." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "needs_background": "light" + }, + "output_type": "display_data" + } + ], + "source": [ + "fig, axs = plt.subplots(3, 2, figsize=(12, 10))\n", + "for ax, freq in zip(axs.flatten(), rolling_frequencies):\n", + " freq.residue_contacts.plot_axes(ax=ax)\n", + " ax.set_xlim(*contacts.query_residue_range);" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.3" + }, + "toc": { + "base_numbering": 1, + "nav_menu": {}, + "number_sections": true, + "sideBar": true, + "skip_h1_title": true, + "title_cell": "Table of Contents", + "title_sidebar": "Contents", + "toc_cell": false, + "toc_position": {}, + "toc_section_display": true, + "toc_window_display": true + }, + "varInspector": { + "cols": { + "lenName": 16, + "lenType": 16, + "lenVar": 40 + }, + "kernels_config": { + "python": { + "delete_cmd_postfix": "", + "delete_cmd_prefix": "del ", + "library": "var_list.py", + "varRefreshCmd": "print(var_dic_list())" + }, + "r": { + "delete_cmd_postfix": ") ", + "delete_cmd_prefix": "rm(", + "library": "var_list.r", + "varRefreshCmd": "cat(var_dic_list()) " + } + }, + "types_to_exclude": [ + "module", + "function", + "builtin_function_or_method", + "instance", + "_Feature" + ], + "window_display": false + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}