diff --git a/pycgtool/__main__.py b/pycgtool/__main__.py index 102e94f3e0c1b68e19ec0fdb3d8f16fb1e44a84b..b4835973bed5283a972041d7074c0d9780c0ebdb 100755 --- a/pycgtool/__main__.py +++ b/pycgtool/__main__.py @@ -24,128 +24,127 @@ class ArgumentValidationError(ValueError): """Exception raised for invalid combinations of command line arguments.""" -def get_output_filepath(ext: str, config) -> pathlib.Path: - """Get file path for an output file by extension. +class PyCGTOOL: + def __init__(self, config): + self.config = config - :param ext: - :param config: Program arguments from argparse - """ - out_dir = pathlib.Path(config.out_dir) - return out_dir.joinpath(config.output_name + '.' + ext) + def get_output_filepath(self, ext: str) -> pathlib.Path: + """Get file path for an output file by extension. + :param ext: + """ + out_dir = pathlib.Path(self.config.out_dir) + return out_dir.joinpath(self.config.output_name + '.' + ext) -def measure_bonds(frame: Frame, mapping: typing.Optional[Mapping], - config) -> None: - """Measure bonds at the end of a run. - :param frame: - :param mapping: - :param config: Program arguments from argparse - """ - bonds = BondSet(config.bondset, config) - bonds.apply(frame) - - if config.mapping and config.trajectory: - # Only perform Boltzmann Inversion if we have a mapping and a trajectory. - # Otherwise we get infinite force constants. - logger.info('Starting Boltzmann Inversion') - bonds.boltzmann_invert() - logger.info('Finished Boltzmann Inversion') - - if config.output_forcefield: - logger.info("Writing GROMACS forcefield directory") - out_dir = pathlib.Path(config.out_dir) - forcefield = ForceField(config.output_name, dir_path=out_dir) - forcefield.write(config.output_name, mapping, bonds) - logger.info("Finished writing GROMACS forcefield directory") + def measure_bonds(self, frame: Frame, mapping: typing.Optional[Mapping]) -> None: + """Measure bonds at the end of a run. - else: - bonds.write_itp(get_output_filepath('itp', config), - mapping=mapping) + :param frame: + :param mapping: + """ + bonds = BondSet(self.config.bondset, self.config) + bonds.apply(frame) - if config.dump_measurements: - logger.info('Writing bond measurements to file') - bonds.dump_values(config.dump_n_values, config.out_dir) - logger.info('Finished writing bond measurements to file') + if self.config.mapping and self.config.trajectory: + # Only perform Boltzmann Inversion if we have a mapping and a trajectory. + # Otherwise we get infinite force constants. + logger.info('Starting Boltzmann Inversion') + bonds.boltzmann_invert() + logger.info('Finished Boltzmann Inversion') + if self.config.output_forcefield: + logger.info("Writing GROMACS forcefield directory") + out_dir = pathlib.Path(self.config.out_dir) + forcefield = ForceField(self.config.output_name, dir_path=out_dir) + forcefield.write(self.config.output_name, mapping, bonds) + logger.info("Finished writing GROMACS forcefield directory") -def mapping_loop(frame: Frame, config) -> typing.Tuple[Frame, Mapping]: - """Perform mapping loop over input trajectory. + else: + bonds.write_itp(self.get_output_filepath('itp'), + mapping=mapping) - :param frame: - :param config: Program arguments from argparse - """ - logger.info('Starting AA->CG mapping') - mapping = Mapping(config.mapping, config, itp_filename=config.itp) + if self.config.dump_measurements: + logger.info('Writing bond measurements to file') + bonds.dump_values(self.config.dump_n_values, self.config.out_dir) + logger.info('Finished writing bond measurements to file') - cg_frame = mapping.apply(frame) - cg_frame.save(get_output_filepath('gro', config), frame_number=0) - logging.info('Finished AA->CG mapping') - return cg_frame, mapping + def mapping_loop(self, frame: Frame) -> typing.Tuple[Frame, Mapping]: + """Perform mapping loop over input trajectory. + :param frame: + """ + logger.info('Starting AA->CG mapping') + mapping = Mapping(self.config.mapping, self.config, itp_filename=self.config.itp) -def get_coords(frame: Frame, resname: str) -> np.ndarray: - return np.concatenate([ - frame._trajectory.atom_slice([atom.index for atom in residue.atoms]).xyz - for residue in frame._trajectory.topology.residues - if residue.name == resname - ]) + cg_frame = mapping.apply(frame) + cg_frame.save(self.get_output_filepath('gro'), frame_number=0) + logging.info('Finished AA->CG mapping') + self.train_backmapper(frame, cg_frame) -def train_backmapper(aa_frame: Frame, cg_frame: Frame): - # resname = 'POPC' - # aa_coords = get_coords(aa_frame, resname) - # cg_coords = get_coords(cg_frame, resname) + return cg_frame, mapping - cg_subset_traj = cg_frame._trajectory.atom_slice(cg_frame._trajectory.topology.select('resid 1')) - aa_subset_traj = aa_frame._trajectory.atom_slice(aa_frame._trajectory.topology.select('resid 1')) - cg_subset_traj.save('cg_test.gro') - aa_subset_traj.save('aa_test.gro') + def get_coords(self, frame: Frame, resname: str) -> np.ndarray: + return np.concatenate([ + frame._trajectory.atom_slice([atom.index for atom in residue.atoms]).xyz + for residue in frame._trajectory.topology.residues + if residue.name == resname + ]) - logger.info('Training backmapper') - backmapper = GLIMPS() - backmapper.fit(cg_subset_traj.xyz, aa_subset_traj.xyz) - logger.info('Finished training backmapper') - logger.info('Testing backmapper') - backmapped = backmapper.transform(cg_subset_traj.xyz) - aa_subset_traj.xyz = backmapped - aa_subset_traj.save('backmapped.gro') - logger.info('Finished testing backmapper') + def train_backmapper(self, aa_frame: Frame, cg_frame: Frame): + # resname = 'POPC' + # aa_coords = get_coords(aa_frame, resname) + # cg_coords = get_coords(cg_frame, resname) - backmapper.save('backmapper.pkl') + cg_subset_traj = cg_frame._trajectory.atom_slice(cg_frame._trajectory.topology.select('resid 1')) + aa_subset_traj = aa_frame._trajectory.atom_slice(aa_frame._trajectory.topology.select('resid 1')) + cg_subset_traj.save('cg_test.gro') + aa_subset_traj.save('aa_test.gro') -def full_run(config): - """Main function of the program PyCGTOOL. + logger.info('Training backmapper') + backmapper = GLIMPS() + backmapper.fit(cg_subset_traj.xyz, aa_subset_traj.xyz) + logger.info('Finished training backmapper') - Performs the complete AA->CG mapping and outputs a files dependent on given input. + logger.info('Testing backmapper') + backmapped = backmapper.transform(cg_subset_traj.xyz) + aa_subset_traj.xyz = backmapped + aa_subset_traj.save('backmapped.gro') + logger.info('Finished testing backmapper') - :param config: Program arguments from argparse - """ - frame = Frame( - topology_file=config.topology, - trajectory_file=config.trajectory, # May be None - frame_start=config.begin, - frame_end=config.end) - frame._trajectory.make_molecules_whole(inplace=True) + backmapper.save(self.get_output_filepath('backmapper.pkl')) - if config.mapping: - cg_frame, mapping = mapping_loop(frame, config) - train_backmapper(frame, cg_frame) - else: - logger.info('Skipping AA->CG mapping') - mapping = None - cg_frame = frame + def full_run(self): + """Main function of the program PyCGTOOL. - if config.output_xtc: - cg_frame.save(get_output_filepath('xtc', config)) + Performs the complete AA->CG mapping and outputs a files dependent on given input. + """ + frame = Frame( + topology_file=self.config.topology, + trajectory_file=self.config.trajectory, # May be None + frame_start=self.config.begin, + frame_end=self.config.end) + frame._trajectory.make_molecules_whole(inplace=True) - if config.bondset: - measure_bonds(cg_frame, mapping, config) + if self.config.mapping: + cg_frame, mapping = self.mapping_loop(frame) + + else: + logger.info('Skipping AA->CG mapping') + mapping = None + cg_frame = frame + + if self.config.output_xtc: + cg_frame.save(self.get_output_filepath('xtc')) + + if self.config.bondset: + self.measure_bonds(cg_frame, mapping) class BooleanAction(argparse.Action): @@ -280,7 +279,7 @@ def main(): logging.basicConfig(level=args.log_level, format='%(message)s', datefmt='[%X]', - handlers=[RichHandler()]) + handlers=[RichHandler(rich_tracebacks=True)]) banner = """\ _____ _____ _____ _______ ____ ____ _ @@ -305,19 +304,21 @@ def main(): logger.info(30 * '-') try: + pycgtool = PyCGTOOL(args) + if args.profile: with cProfile.Profile() as profiler: - full_run(args) + pycgtool.full_run() profiler.dump_stats('gprof.out') else: - full_run(args) + pycgtool.full_run() logger.info('Finished processing - goodbye!') except Exception as exc: - logger.error(exc) + logger.exception(exc) if __name__ == "__main__":