Source code for ensemble_md.utils.gmx_parser

####################################################################
#                                                                  #
#    ensemble_md,                                                  #
#    a python package for running GROMACS simulation ensembles     #
#                                                                  #
#    Written by Wei-Tse Hsu <wehs7661@colorado.edu>                #
#    Copyright (c) 2022 University of Colorado Boulder             #
#                                                                  #
####################################################################
"""
The :code:`gmx_parser` module provides functions for parsing GROMACS files.
"""
import os
import re
import six
import warnings
from collections import OrderedDict as odict

from ensemble_md.utils import utils
from ensemble_md.utils.exceptions import ParseError


[docs]def parse_log(log_file): """ Parses a log file generated by a GROMACS expanded ensemble simulation and extracts important information. This function is especially useful for extracting information from each iteration in a REXEE simulation on the fly. There are three types of log files from an expanded ensemble simulation: - **Case 1**: The weights are still updating in the simulation and have never been equilibrated. - In this case, the output :code:`equil_time` should be -1. - **Case 2**: The weights were equilibrated during the simulation. - The output :code:`equil_time` is the time (in ps) it took to get the weights equilibrated. - The final weights (:code:`weights`) will just be the equilibrated weights. - **Case 3**: The weights were fixed in the simulation. - In this case, the output :code:`equil_time` should be 0. - The final weights (which never change during the simulation) and the final counts will still be returned. Parameters ---------- log_file : str The file path of the input log file Returns ------- weights : list In all cases, :code:`weights` should be a list of lists (of weights). - In Case 1, a list of list of weights as a function of time since the last update of the Wang-Landau incrementor will be returned. - In Case 2, a list of list of weights as a function of time since the last update of the Wang-Landau incrementor up to equilibration will be returned. - In Case 3, the returned list will only have one list inside, which is the list of values at which the weights were fixed. That is, for all cases, :code:`weights[-1]` will be the final weights, which are important for seeding the next iteration in a REXEE simulation. counts : list The final histogram counts. wl_delta : float The final Wang-Landau incementor. In Cases 2 and 3, :code:`None` will be returned. equil_time : int or float - In Case 1, -1 will be returned, which means that the weights have not been equilibrated. - In Case 2, the equilibration time in ps will be returned. - In Case 3, 0 will be returned, which means that the weights were fixed during the simulation. """ f = open(log_file, "r") lines = f.readlines() f.close() case = None # itialized as None and should end up as 1 or 2 or 3. # First parse the MD parameters to tell the type of the simulation. for l in lines: # noqa: E741 if "n-lambdas" in l: N_states = int(l.split("=")[1]) if "tinit" in l: tinit = float(l.split("=")[1]) if "weight-equil-wl-delta" in l: # For Case 1 and Case 2 cutoff = float(l.split("=")[1]) if "lmc-stats" in l: if l.split("=")[1].split()[0] in ["no", "No"]: # Case 3 case = '3' equil_time = 0 wl_delta = None else: pass # Either Case 1 or Case 2 if "dt " in l: dt = float(l.split("=")[1]) # For all cases, we need to find weights and counts weights = [] if case == '3': # could be either '3' or None # We only need the info at the end of the simulation, so it's faster to search from the bottom of the file. lines.reverse() n = -1 counts = [] for l in lines: # noqa: E741 n += 1 if "Count G(in kT)" in l: w = [] # the list of weights at this time frame # The first occurrence would be the final weights. (We've reversed the lines!) for i in range(1, N_states + 1): if "<<" in lines[n - i]: w.append(float(lines[n - i].split()[-3])) counts.append(int(lines[n - i].split()[-4])) else: w.append(float(lines[n - i].split()[-2])) counts.append(int(lines[n - i].split()[-3])) weights.append(w) break else: # Case 1 and Case 2 # Here we search from the top, since we need weights as a function of time anyway. n = -1 find_equil, append_equil = False, False wl_delta_list = [None] # We use None so that the change in wl-delta will always get deteced at the beginning for l in lines: # noqa: E741 n += 1 if "Count G(in kT)" in l: # this line is lines[n] # We should first check if wl_delta is changed, set weights=[] if needed before we append new weights if "Wang-Landau incrementor is" in lines[n - 1]: # Case 1 so far: wl_delta is still updating case = '1' equil_time = -1 current_wl_delta = float(lines[n - 1].split(":")[1]) if wl_delta_list[-1] != current_wl_delta and current_wl_delta > cutoff: # when wl-delta changes # Note that if the time step the equilibration is reached happens to be a multiple of nstlog, # a wl-delta right below the cutoff will be printed. In that case, we don't want to reset # `weights` so we get the time series since the last time when we still have wl_delta > cutoff weights = [] # we want only time series since the latest update of wl_delta wl_delta_list.append(float(lines[n - 1].split(":")[1])) w, counts = [], [] # the list of weights at this time frame for i in range(1, N_states + 1): # counts will be constantly updated by new values if "<<" in lines[n + i]: w.append(float(lines[n + i].split()[-3])) counts.append(int(lines[n + i].split()[-4])) else: w.append(float(lines[n + i].split()[-2])) counts.append(int(lines[n + i].split()[-3])) if find_equil is False or append_equil is False: weights.append(w) if find_equil is True: append_equil = True if "Weights have equilibrated" in l: case = '2' # we don't break the loop even if equil_time is found, as we need the final counts. find_equil = True # After this, we will append weights one last time, which are equilibrated weights. equil_step = int(l.split(":")[0].split("Step")[1]) equil_time = equil_step * dt + tinit # ps if wl_delta is not None and wl_delta < cutoff: # Should only happen when equil_time % nstlog == 0, where the weights should have been appended # Note that we additionally have wl_delta is not None. Since wl_delta could be None if the # weights get equilibrated right after the simulation stop (before nstlog). append_equil = True wl_delta = wl_delta_list[-1] if case == '2': wl_delta = None return weights, counts, wl_delta, equil_time
[docs]class MDP(odict): """ A class that represents a GROMACS MDP file. Note that an MDP instance is an ordered dictionary, with the i-th key corresponding to the i-th line in the MDP file. Comments and blank lines are also preserved, e.g., with keys 'C0001' and 'B0001', respectively. The value corresponding to a 'C' key is the comment itself, while the value corresponding to a 'B' key is an empty string. Comments after a parameter on the same line are discarded. Leading and trailing spaces are always stripped. Parameters ---------- input_mdp : str, Optional The path of the input MDP file. The default is None. **kwargs : Optional Additional keyword arguments to be passed to add additional key-value pairs to the MDP instance. Note that no sanity checks will be performed for the key-value pairs passed in this way. This also does not work for keys that are not legal python variable names, such as anything that includes a minus '-' sign or starts with a number. Attributes ---------- COMMENT : :code:`re.Pattern` object A compiled regular expression pattern for comments in MDP files. PARAMETER : :code:`re.Pattern` object A compiled regular expression pattern for parameters in MDP files. input_mdp : str The real path of the input MDP file returned by :code:`os.path.realpath(input_mdp)`, which resolves any symbolic links in the path. Example ------- >>> from ensemble_md.utils import gmx_parser >>> gmx_parser.MDP("em.mdp") MDP([('C0001', 'em.mdp - used as input into grompp to generate em.tpr'), ('C0002', 'All unspecified parameters adopt their own default values.'), ('B0001', ''), ('C0003', 'Run Control'), ('integrator', 'steep'), ('nsteps', 500000), ('B0002', ''), ('C0004', 'Energy minnimization'), ('emtol', 100.0), ('emstep', 0.01), ('B0003', ''), ('C0005', 'Neighbor searching/Electrostatics/Van der Waals'), ('cutoff-scheme', 'Verlet'), ('nstlist', 10), ('ns_type', 'grid'), ('pbc', 'xyz'), ('coulombtype', 'PME'), ('rcoulomb', 1.0), ('rvdw', 1.0)]) # noqa: E501 """ # Below are some class variables accessible to all functions. COMMENT = re.compile("""\s*;\s*(?P<value>.*)""") # noqa: W605 PARAMETER = re.compile("""\s*(?P<parameter>[^=]+?)\s*=\s*(?P<value>[^;]*)(?P<comment>\s*;.*)?""", re.VERBOSE) # noqa: W605, E501 def __init__(self, input_mdp=None, **kwargs): super(MDP, self).__init__(**kwargs) # can use kwargs to set dict! (but no sanity checks!) if input_mdp is not None: self.input_mdp = os.path.realpath(input_mdp) self.read()
[docs] def read(self): """ Reads and parses the input MDP file. """ def BLANK(i): return f"B{i:04d}" def COMMENT(i): return f"C{i:04d}" data = odict() iblank = icomment = 0 with open(self.input_mdp) as mdp: for line in mdp: line = line.strip() if len(line) == 0: iblank += 1 data[BLANK(iblank)] = "" continue m = self.COMMENT.match(line) if m: icomment += 1 data[COMMENT(icomment)] = m.group("value") continue m = self.PARAMETER.match(line) if m: parameter = m.group("parameter") value = utils._convert_to_numeric(m.group("value")) data[parameter] = value else: err_msg = f"{os.path.basename(self.input_mdp)!r}: unknown line in mdp file, {line!r}" raise ParseError(err_msg) super(MDP, self).update(data)
[docs] def write(self, output_mdp=None, skipempty=False): """ Writes the MDP instance (the ordered dictionary) to an output MDP file. Parameters ---------- output_mdp : str, Optional The file path of the output MDP file. The default is the filename the MDP instance was built from. If that if :code:`output_mdp` is not specified, the input MDP file will be overwritten. skipempty : bool, Optional Whether to skip empty values when writing the MDP file. If :code:`True`, any parameter lines from the output that contain empty values will be removed. The default is :code:`False`. """ # The line 'if skipempty and (v == "" or v is None):' below could possibly incur FutureWarning warnings.simplefilter(action='ignore', category=FutureWarning) if output_mdp is None: output_mdp = self.input_mdp with open(output_mdp, "w") as mdp: for k, v in self.items(): if k[0] == "B": # blank line mdp.write("\n") elif k[0] == "C": # comment mdp.write(f"; {v!s}\n") else: # parameter = value if skipempty and (v == "" or v is None): continue if isinstance(v, six.string_types) or not hasattr(v, "__iter__"): mdp.write(f"{k!s} = {v!s}\n") else: mdp.write(f"{k} = {' '.join(map(str, v))}\n")
[docs]def compare_MDPs(mdp_list, print_diff=False): """ Identifies the parameters differeing between a given list of MDP files. Note that this function is not aware of the default values of GROMACS parameters. (Currently, this function is not used in the workflow adopted in :code:`run_REXEE.py` but it might be useful in some places, so we decided to keep it.) Parameters ---------- mdp_list : list A list of MDP files. print_diff : bool, Optional Whether to print the parameters that are different among the MDP files in a more readable format. The default is :code:`False`. Returns ------- diff_params : dict A dictionary of parameters differing between MDP files. The keys are the parameter names and the values is a list of values of the parameters in the MDP files. Example ------- >>> from ensemble_md.utils import gmx_parser >>> mdp_list = ['A.mdp', 'B.mdp'] >>> diff_params = gmx_parser.compare_MDPs(mdp_list, print_diff=True) The following parameters are different among the MDP files: wl_scale - A.mdp: None - B.mdp: 0.8 ... >>> print(diff_params) {'wl_scale': [None, 0.8], ...} """ diff_params = {} for i in range(len(mdp_list)): mdps = [MDP(mdp_list[i]) for i in range(len(mdp_list))] params_dicts = [odict([(k.replace('-', '_'), v) if type(v) is not str else (k.replace('-', '_'), v.replace('-', '_')) for k, v in p.items()]) for p in mdps] # noqa: E501 # First figure out the union set of the parameters and exclude blanks and comments all_params = set([key for d in params_dicts for key in d.keys()]) all_params = [p for p in all_params if not p.startswith(('B', 'C'))] for p in all_params: if p in diff_params: pass # already in the dictionary, no need to compare again else: if not all(p in d for d in params_dicts): # the parameter is not in all MDP files diff_params[p] = [d[p] if p in d else None for d in params_dicts] else: if type(params_dicts[0][p]) is not type(params_dicts[1][p]): # e.g., tau_t = 1.0 1.0 1.0 v.s. tau_t = 1.0 (list v.s. float) diff_params[p] = [d[p] for d in params_dicts] else: # the parameter is in all MDP files (Note that "set([1, 1, 1]={1}.)") if isinstance(params_dicts[0][p], list): # the parameter is a list, which is unhashable if len(set([tuple(d[p]) for d in params_dicts])) > 1: diff_params[p] = [d[p] for d in params_dicts] else: if len(set([d[p] for d in params_dicts])) > 1: diff_params[p] = [d[p] for d in params_dicts] if print_diff: print("The following parameters are different among the MDP files:") for k, v in diff_params.items(): print(k) for i in range(len(mdp_list)): print(f' - {mdp_list[i]}: {v[i]}') print() return diff_params
[docs]def read_top(file_name, resname): """ Reads the topology to find the file containing the molecule of interest Parameters ---------- file_name : str Name for the topology file. resname : str Name of the residue of interest we are searching for. Returns ------- input_file : list A list of strings containing the content of the topology file which contains the residue of interest. """ input_file = open(file_name).readlines() itp_files = [] atom_sect = False for line in input_file: if line == '[ atoms ]\n': atom_sect = True if atom_sect: if line == '\n': atom_sect = False line_sep = line.split(' ') while '' in line_sep: line_sep.remove('') if len(line_sep) > 4 and line_sep[3] == resname: return input_file if '#include' in line: line_sep = line.split(' ') while '' in line_sep: line_sep.remove('') itp_files.append(line_sep[-1].strip('\n""')) file_dir, file_name = os.path.split(file_name) if file_dir == '': file_dir = '.' for file in itp_files: if os.path.exists(f'{file_dir}/{file}'): input_file = open(f'{file_dir}/{file}').readlines() atom_sect = False for line in input_file: if line == '[ atoms ]\n': atom_sect = True if atom_sect: if line == '\n': break line_sep = line.split(' ') while '' in line_sep: line_sep.remove('') if len(line_sep) > 4 and line_sep[3] == resname: return input_file raise Exception(f'Residue {resname} can not be found in {file_name}')
[docs]def deter_atom_order(mol_file, resname): """ Determine the order of atoms in the residue we will modify to ensure the output GRO is formatted properly Parameters ---------- file_name : list of str Contains the contents of the original GRO file resname : str Name of the residue of interest we are searching for. Returns ------- atom_order : list of str List of the atom names in the order they appear in the GRO file """ from ensemble_md.utils import coordinate_swap atom_order = [] for line in mol_file: split_line = line.split(' ') while '' in split_line: split_line.remove('') if resname in split_line[0]: if len(split_line[1]) > 5: split_line = coordinate_swap.sep_merge(split_line) atom_order.append(split_line[1]) elif len(atom_order) != 0: break return atom_order