diff --git a/pycgtool.py b/pycgtool.py index ea626df64ef0fc08b90b0da7b9c02625e100a1fa..e779d8600178b9fc1d506f87ea63cb55723cd354 100755 --- a/pycgtool.py +++ b/pycgtool.py @@ -33,6 +33,8 @@ def main(args, config): for _ in Progress(numframes, postwhile=frame.next_frame): if args.map: cgframe = mapping.apply(frame, exclude={"SOL"}) + if config.output_xtc: + cgframe.write_xtc("out.xtc") else: cgframe = frame @@ -59,11 +61,18 @@ def map_only(args, config): :param args: Program arguments :param config: Object containing run options """ - frame = Frame(gro=args.gro) + frame = Frame(gro=args.gro, xtc=args.xtc) mapping = Mapping(args.map, config) cgframe = mapping.apply(frame, exclude={"SOL"}) cgframe.output("out.gro", format=config.output) + if args.xtc and config.output_xtc: + numframes = frame.numframes - args.begin if args.end == -1 else args.end - args.begin + for _ in Progress(numframes, postwhile=frame.next_frame): + cgframe = mapping.apply(frame, exclude={"SOL"}) + cgframe.write_xtc("out.xtc") + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Perform coarse-grain mapping of atomistic trajectory") @@ -90,7 +99,8 @@ if __name__ == "__main__": ("temperature", 310), ("angle_default_fc", True), ("generate_angles", True), - ("generate_dihedrals", False)], + ("generate_dihedrals", False), + ("output_xtc", False)], args) if not args.bnd: config.set("map_only", True) diff --git a/pycgtool/frame.py b/pycgtool/frame.py index 69c28476aa4042f85ba258ae6cca0d3a4c5a6e05..e6f2d9e9fce9b006b14312a74ad58365d897bc7d 100644 --- a/pycgtool/frame.py +++ b/pycgtool/frame.py @@ -11,12 +11,15 @@ import numpy as np from simpletraj import trajectory +from mdtraj.formats import XTCTrajectoryFile + from .util import backup_file from .parsers.cfg import CFG np.seterr(all="raise") +# Create FileNotFoundError if using older version of Python try: try: raise FileNotFoundError @@ -105,7 +108,7 @@ class Frame: Hold Atom data separated into Residues """ - def __init__(self, gro=None, xtc=None, itp=None, frame_start=0): + def __init__(self, gro=None, xtc=None, itp=None, frame_start=0, xtc_reader="simpletraj"): """ Return Frame instance having read Residues and Atoms from GRO if provided @@ -119,26 +122,66 @@ class Frame: self.numframes = 0 self.box = np.zeros(3, dtype=np.float32) + self._out_xtc = None + if gro is not None: self._parse_gro(gro) self.numframes += 1 if xtc is not None: + self._xtc_reader = xtc_reader + open_xtc = {"simpletraj": self._open_xtc_simpletraj, + "mdtraj": self._open_xtc_mdtraj} try: - self.xtc = trajectory.XtcTrajectory(xtc) - except OSError as e: - if not os.path.isfile(xtc): - raise FileNotFoundError(xtc) from e - e.args = ("Error opening file '{0}'".format(xtc),) + open_xtc[self._xtc_reader](xtc) + except KeyError as e: + e.args = ("XTC reader {0} is not a valid option.".format(self._xtc_reader)) raise - else: - if self.xtc.numatoms != self.natoms: - raise AssertionError("Number of atoms does not match between gro and xtc files.") - self.numframes += self.xtc.numframes if itp is not None: self._parse_itp(itp) + def _open_xtc_simpletraj(self, xtc): + try: + self.xtc = trajectory.XtcTrajectory(xtc) + except OSError as e: + if not os.path.isfile(xtc): + raise FileNotFoundError(xtc) from e + e.args = ("Error opening file '{0}'".format(xtc),) + raise + else: + if self.xtc.numatoms != self.natoms: + raise AssertionError("Number of atoms does not match between gro and xtc files.") + self.numframes += self.xtc.numframes + + def _open_xtc_mdtraj(self, xtc): + try: + self.xtc = XTCTrajectoryFile(xtc) + except OSError as e: + if not os.path.isfile(xtc): + raise FileNotFoundError(xtc) from e + e.args = ("Error opening file '{0}'".format(xtc),) + raise + else: + xyz, time, step, box = self.xtc.read(n_frames=1) + natoms = len(xyz[0]) + if natoms != self.natoms: + print(xyz[0]) + print(natoms, self.natoms) + raise AssertionError("Number of atoms does not match between gro and xtc files.") + + # Seek to end to count frames + # self.xtc.seek(0, whence=2) + self.numframes += 1 + self.xtc.seek(0) + while True: + try: + self.xtc.seek(1, whence=1) + self.numframes += 1 + except IndexError: + break + self.xtc.seek(0) + def __len__(self): return len(self.residues) @@ -163,6 +206,36 @@ class Frame: :return: True if successful else False """ + xtc_read = {"simpletraj": self._next_frame_simpletraj, + "mdtraj": self._next_frame_mdtraj} + try: + return xtc_read[self._xtc_reader](exclude) + except KeyError as e: + e.args = ("XTC reader {0} is not a valid option.".format(self._xtc_reader)) + raise + + def _next_frame_mdtraj(self, exclude=None): + try: + # self.xtc.seek(self.number) + i = 0 + xyz, time, step, box = self.xtc.read(n_frames=1) + xyz = xyz[0] + for res in self.residues: + if exclude is not None and res.name in exclude: + continue + for atom in res: + atom.coords = xyz[i] + i += 1 + + self.number += 1 + self.box = np.diag(box[0]) / 10. + return True + # IndexError - run out of xtc frames + # AttributeError - we didn't provide an xtc + except (IndexError, AttributeError): + return False + + def _next_frame_simpletraj(self, exclude=None): try: self.xtc.get_frame(self.number) i = 0 @@ -184,6 +257,22 @@ class Frame: except (IndexError, AttributeError): return False + def write_xtc(self, filename=None): + if self._out_xtc is None: + self._out_xtc = XTCTrajectoryFile(filename, mode="w") + + xyz = np.ndarray((1, self.natoms, 3), dtype=np.float32) + i = 0 + for residue in self.residues: + for atom in residue.atoms: + xyz[0][i] = atom.coords + i += 1 + step = np.array([self.number], dtype=np.int32) + box = np.zeros((1, 3, 3), dtype=np.float32) + for i in range(3): + box[0][i][i] = self.box[i] + self._out_xtc.write(xyz, step=step, box=box) + def _parse_gro(self, filename): """ Parse a GROMACS GRO file and create Residues/Atoms diff --git a/test/test_frame.py b/test/test_frame.py index e94ce2a85640e58e1ea672d574c96db063fe4720..f46d87b84fd34db68c4afd099d210d87fff2748c 100644 --- a/test/test_frame.py +++ b/test/test_frame.py @@ -65,6 +65,10 @@ class FrameTest(unittest.TestCase): self.assertTrue(filecmp.cmp("test/data/water.gro", "water-out.gro")) os.remove("water-out.gro") + def test_frame_read_xtc_simpletraj_numframes(self): + frame = Frame(gro="test/data/water.gro", xtc="test/data/water.xtc") + self.assertEqual(12, frame.numframes) + def test_frame_read_xtc(self): frame = Frame(gro="test/data/water.gro", xtc="test/data/water.xtc") # These are the coordinates from the gro file @@ -78,6 +82,31 @@ class FrameTest(unittest.TestCase): np.testing.assert_allclose(np.array([1.122, 1.130, 1.534]), frame.residues[0].atoms[0].coords) + def test_frame_read_xtc_mdtraj_numframes(self): + frame = Frame(gro="test/data/water.gro", xtc="test/data/water.xtc", + xtc_reader="mdtraj") + self.assertEqual(12, frame.numframes) + + def test_frame_read_xtc_mdtraj(self): + frame = Frame(gro="test/data/water.gro", xtc="test/data/water.xtc", + xtc_reader="mdtraj") + # These are the coordinates from the gro file + np.testing.assert_allclose(np.array([0.696, 1.33, 1.211]), + frame.residues[0].atoms[0].coords) + frame.next_frame() + # These coordinates are from the xtc file + np.testing.assert_allclose(np.array([1.176, 1.152, 1.586]), + frame.residues[0].atoms[0].coords) + frame.next_frame() + np.testing.assert_allclose(np.array([1.122, 1.130, 1.534]), + frame.residues[0].atoms[0].coords) + + def test_frame_write_xtc_mdtraj(self): + frame = Frame(gro="test/data/water.gro", xtc="test/data/water.xtc", + xtc_reader="mdtraj") + while frame.next_frame(): + frame.write_xtc("test.xtc") + if __name__ == '__main__': unittest.main()