####################################################################
# #
# ensemble_md, #
# a python package for running GROMACS simulation ensembles #
# #
# Written by Wei-Tse Hsu <wehs7661@colorado.edu> #
# Copyright (c) 2022 University of Colorado Boulder #
# #
####################################################################
"""
The :code:`gmx_parser` module provides functions for parsing GROMACS files.
"""
import os
import re
import six
import warnings
from collections import OrderedDict as odict
from ensemble_md.utils import utils
from ensemble_md.utils.exceptions import ParseError
[docs]def parse_log(log_file):
"""
Parses a log file generated by a GROMACS expanded ensemble simulation and extracts
important information. This function is especially useful for extracting information from each iteration in a REXEE
simulation on the fly. There are three types of log files from an expanded ensemble simulation:
- **Case 1**: The weights are still updating in the simulation and have never been equilibrated.
- In this case, the output :code:`equil_time` should be -1.
- **Case 2**: The weights were equilibrated during the simulation.
- The output :code:`equil_time` is the time (in ps) it took to get the weights equilibrated.
- The final weights (:code:`weights`) will just be the equilibrated weights.
- **Case 3**: The weights were fixed in the simulation.
- In this case, the output :code:`equil_time` should be 0.
- The final weights (which never change during the simulation) and the final counts will
still be returned.
Parameters
----------
log_file : str
The file path of the input log file
Returns
-------
weights : list
In all cases, :code:`weights` should be a list of lists (of weights).
- In Case 1, a list of list of weights as a function of time since the last update of the Wang-Landau
incrementor will be returned.
- In Case 2, a list of list of weights as a function of time since the last update of the Wang-Landau
incrementor up to equilibration will be returned.
- In Case 3, the returned list will only have one list inside, which is the list of values at which
the weights were fixed.
That is, for all cases, :code:`weights[-1]` will be the final weights, which are important for seeding the
next iteration in a REXEE simulation.
counts : list
The final histogram counts.
wl_delta : float
The final Wang-Landau incementor. In Cases 2 and 3, :code:`None` will be returned.
equil_time : int or float
- In Case 1, -1 will be returned, which means that the weights have not been equilibrated.
- In Case 2, the equilibration time in ps will be returned.
- In Case 3, 0 will be returned, which means that the weights were fixed during the simulation.
"""
f = open(log_file, "r")
lines = f.readlines()
f.close()
case = None # itialized as None and should end up as 1 or 2 or 3.
# First parse the MD parameters to tell the type of the simulation.
for l in lines: # noqa: E741
if "n-lambdas" in l:
N_states = int(l.split("=")[1])
if "tinit" in l:
tinit = float(l.split("=")[1])
if "weight-equil-wl-delta" in l:
# For Case 1 and Case 2
cutoff = float(l.split("=")[1])
if "lmc-stats" in l:
if l.split("=")[1].split()[0] in ["no", "No"]: # Case 3
case = '3'
equil_time = 0
wl_delta = None
else:
pass # Either Case 1 or Case 2
if "dt " in l:
dt = float(l.split("=")[1])
# For all cases, we need to find weights and counts
weights = []
if case == '3': # could be either '3' or None
# We only need the info at the end of the simulation, so it's faster to search from the bottom of the file.
lines.reverse()
n = -1
counts = []
for l in lines: # noqa: E741
n += 1
if "Count G(in kT)" in l:
w = [] # the list of weights at this time frame
# The first occurrence would be the final weights. (We've reversed the lines!)
for i in range(1, N_states + 1):
if "<<" in lines[n - i]:
w.append(float(lines[n - i].split()[-3]))
counts.append(int(lines[n - i].split()[-4]))
else:
w.append(float(lines[n - i].split()[-2]))
counts.append(int(lines[n - i].split()[-3]))
weights.append(w)
break
else: # Case 1 and Case 2
# Here we search from the top, since we need weights as a function of time anyway.
n = -1
find_equil, append_equil = False, False
wl_delta_list = [None] # We use None so that the change in wl-delta will always get deteced at the beginning
for l in lines: # noqa: E741
n += 1
if "Count G(in kT)" in l: # this line is lines[n]
# We should first check if wl_delta is changed, set weights=[] if needed before we append new weights
if "Wang-Landau incrementor is" in lines[n - 1]: # Case 1 so far: wl_delta is still updating
case = '1'
equil_time = -1
current_wl_delta = float(lines[n - 1].split(":")[1])
if wl_delta_list[-1] != current_wl_delta and current_wl_delta > cutoff: # when wl-delta changes
# Note that if the time step the equilibration is reached happens to be a multiple of nstlog,
# a wl-delta right below the cutoff will be printed. In that case, we don't want to reset
# `weights` so we get the time series since the last time when we still have wl_delta > cutoff
weights = [] # we want only time series since the latest update of wl_delta
wl_delta_list.append(float(lines[n - 1].split(":")[1]))
w, counts = [], [] # the list of weights at this time frame
for i in range(1, N_states + 1):
# counts will be constantly updated by new values
if "<<" in lines[n + i]:
w.append(float(lines[n + i].split()[-3]))
counts.append(int(lines[n + i].split()[-4]))
else:
w.append(float(lines[n + i].split()[-2]))
counts.append(int(lines[n + i].split()[-3]))
if find_equil is False or append_equil is False:
weights.append(w)
if find_equil is True:
append_equil = True
if "Weights have equilibrated" in l:
case = '2' # we don't break the loop even if equil_time is found, as we need the final counts.
find_equil = True # After this, we will append weights one last time, which are equilibrated weights.
equil_step = int(l.split(":")[0].split("Step")[1])
equil_time = equil_step * dt + tinit # ps
if wl_delta is not None and wl_delta < cutoff:
# Should only happen when equil_time % nstlog == 0, where the weights should have been appended
# Note that we additionally have wl_delta is not None. Since wl_delta could be None if the
# weights get equilibrated right after the simulation stop (before nstlog).
append_equil = True
wl_delta = wl_delta_list[-1]
if case == '2':
wl_delta = None
return weights, counts, wl_delta, equil_time
[docs]class MDP(odict):
"""
A class that represents a GROMACS MDP file. Note that an MDP instance is an ordered dictionary,
with the i-th key corresponding to the i-th line in the MDP file. Comments and blank lines are
also preserved, e.g., with keys 'C0001' and 'B0001', respectively. The value corresponding to a
'C' key is the comment itself, while the value corresponding to a 'B' key is an empty string.
Comments after a parameter on the same line are discarded. Leading and trailing spaces
are always stripped.
Parameters
----------
input_mdp : str, Optional
The path of the input MDP file. The default is None.
**kwargs : Optional
Additional keyword arguments to be passed to add additional key-value pairs to the MDP instance.
Note that no sanity checks will be performed for the key-value pairs passed in this way. This
also does not work for keys that are not legal python variable names, such as anything that includes
a minus '-' sign or starts with a number.
Attributes
----------
COMMENT : :code:`re.Pattern` object
A compiled regular expression pattern for comments in MDP files.
PARAMETER : :code:`re.Pattern` object
A compiled regular expression pattern for parameters in MDP files.
input_mdp : str
The real path of the input MDP file returned by :code:`os.path.realpath(input_mdp)`,
which resolves any symbolic links in the path.
Example
-------
>>> from ensemble_md.utils import gmx_parser
>>> gmx_parser.MDP("em.mdp")
MDP([('C0001', 'em.mdp - used as input into grompp to generate em.tpr'), ('C0002', 'All unspecified parameters adopt their own default values.'), ('B0001', ''), ('C0003', 'Run Control'), ('integrator', 'steep'), ('nsteps', 500000), ('B0002', ''), ('C0004', 'Energy minnimization'), ('emtol', 100.0), ('emstep', 0.01), ('B0003', ''), ('C0005', 'Neighbor searching/Electrostatics/Van der Waals'), ('cutoff-scheme', 'Verlet'), ('nstlist', 10), ('ns_type', 'grid'), ('pbc', 'xyz'), ('coulombtype', 'PME'), ('rcoulomb', 1.0), ('rvdw', 1.0)]) # noqa: E501
"""
# Below are some class variables accessible to all functions.
COMMENT = re.compile("""\s*;\s*(?P<value>.*)""") # noqa: W605
PARAMETER = re.compile("""\s*(?P<parameter>[^=]+?)\s*=\s*(?P<value>[^;]*)(?P<comment>\s*;.*)?""", re.VERBOSE) # noqa: W605, E501
def __init__(self, input_mdp=None, **kwargs):
super(MDP, self).__init__(**kwargs) # can use kwargs to set dict! (but no sanity checks!)
if input_mdp is not None:
self.input_mdp = os.path.realpath(input_mdp)
self.read()
[docs] def read(self):
"""
Reads and parses the input MDP file.
"""
def BLANK(i):
return f"B{i:04d}"
def COMMENT(i):
return f"C{i:04d}"
data = odict()
iblank = icomment = 0
with open(self.input_mdp) as mdp:
for line in mdp:
line = line.strip()
if len(line) == 0:
iblank += 1
data[BLANK(iblank)] = ""
continue
m = self.COMMENT.match(line)
if m:
icomment += 1
data[COMMENT(icomment)] = m.group("value")
continue
m = self.PARAMETER.match(line)
if m:
parameter = m.group("parameter")
value = utils._convert_to_numeric(m.group("value"))
data[parameter] = value
else:
err_msg = f"{os.path.basename(self.input_mdp)!r}: unknown line in mdp file, {line!r}"
raise ParseError(err_msg)
super(MDP, self).update(data)
[docs] def write(self, output_mdp=None, skipempty=False):
"""
Writes the MDP instance (the ordered dictionary) to an output MDP file.
Parameters
----------
output_mdp : str, Optional
The file path of the output MDP file. The default is the filename the MDP instance was built from.
If that if :code:`output_mdp` is not specified, the input MDP file will be overwritten.
skipempty : bool, Optional
Whether to skip empty values when writing the MDP file. If :code:`True`, any parameter lines from
the output that contain empty values will be removed. The default is :code:`False`.
"""
# The line 'if skipempty and (v == "" or v is None):' below could possibly incur FutureWarning
warnings.simplefilter(action='ignore', category=FutureWarning)
if output_mdp is None:
output_mdp = self.input_mdp
with open(output_mdp, "w") as mdp:
for k, v in self.items():
if k[0] == "B": # blank line
mdp.write("\n")
elif k[0] == "C": # comment
mdp.write(f"; {v!s}\n")
else: # parameter = value
if skipempty and (v == "" or v is None):
continue
if isinstance(v, six.string_types) or not hasattr(v, "__iter__"):
mdp.write(f"{k!s} = {v!s}\n")
else:
mdp.write(f"{k} = {' '.join(map(str, v))}\n")
[docs]def compare_MDPs(mdp_list, print_diff=False):
"""
Identifies the parameters differeing between a given list of MDP files. Note that
this function is not aware of the default values of GROMACS parameters.
(Currently, this function is not used in the workflow adopted in :code:`run_REXEE.py`
but it might be useful in some places, so we decided to keep it.)
Parameters
----------
mdp_list : list
A list of MDP files.
print_diff : bool, Optional
Whether to print the parameters that are different among the MDP files in a more readable format.
The default is :code:`False`.
Returns
-------
diff_params : dict
A dictionary of parameters differing between MDP files. The keys are the parameter names and
the values is a list of values of the parameters in the MDP files.
Example
-------
>>> from ensemble_md.utils import gmx_parser
>>> mdp_list = ['A.mdp', 'B.mdp']
>>> diff_params = gmx_parser.compare_MDPs(mdp_list, print_diff=True)
The following parameters are different among the MDP files:
wl_scale
- A.mdp: None
- B.mdp: 0.8
...
>>> print(diff_params)
{'wl_scale': [None, 0.8], ...}
"""
diff_params = {}
for i in range(len(mdp_list)):
mdps = [MDP(mdp_list[i]) for i in range(len(mdp_list))]
params_dicts = [odict([(k.replace('-', '_'), v) if type(v) is not str else (k.replace('-', '_'), v.replace('-', '_')) for k, v in p.items()]) for p in mdps] # noqa: E501
# First figure out the union set of the parameters and exclude blanks and comments
all_params = set([key for d in params_dicts for key in d.keys()])
all_params = [p for p in all_params if not p.startswith(('B', 'C'))]
for p in all_params:
if p in diff_params:
pass # already in the dictionary, no need to compare again
else:
if not all(p in d for d in params_dicts):
# the parameter is not in all MDP files
diff_params[p] = [d[p] if p in d else None for d in params_dicts]
else:
if type(params_dicts[0][p]) is not type(params_dicts[1][p]):
# e.g., tau_t = 1.0 1.0 1.0 v.s. tau_t = 1.0 (list v.s. float)
diff_params[p] = [d[p] for d in params_dicts]
else:
# the parameter is in all MDP files (Note that "set([1, 1, 1]={1}.)")
if isinstance(params_dicts[0][p], list):
# the parameter is a list, which is unhashable
if len(set([tuple(d[p]) for d in params_dicts])) > 1:
diff_params[p] = [d[p] for d in params_dicts]
else:
if len(set([d[p] for d in params_dicts])) > 1:
diff_params[p] = [d[p] for d in params_dicts]
if print_diff:
print("The following parameters are different among the MDP files:")
for k, v in diff_params.items():
print(k)
for i in range(len(mdp_list)):
print(f' - {mdp_list[i]}: {v[i]}')
print()
return diff_params
[docs]def read_top(file_name, resname):
"""
Reads the topology to find the file containing the molecule of interest
Parameters
----------
file_name : str
Name for the topology file.
resname : str
Name of the residue of interest we are searching for.
Returns
-------
input_file : list
A list of strings containing the content of the topology file which contains the
residue of interest.
"""
input_file = open(file_name).readlines()
itp_files = []
atom_sect = False
for line in input_file:
if line == '[ atoms ]\n':
atom_sect = True
if atom_sect:
if line == '\n':
atom_sect = False
line_sep = line.split(' ')
while '' in line_sep:
line_sep.remove('')
if len(line_sep) > 4 and line_sep[3] == resname:
return input_file
if '#include' in line:
line_sep = line.split(' ')
while '' in line_sep:
line_sep.remove('')
itp_files.append(line_sep[-1].strip('\n""'))
file_dir, file_name = os.path.split(file_name)
if file_dir == '':
file_dir = '.'
for file in itp_files:
if os.path.exists(f'{file_dir}/{file}'):
input_file = open(f'{file_dir}/{file}').readlines()
atom_sect = False
for line in input_file:
if line == '[ atoms ]\n':
atom_sect = True
if atom_sect:
if line == '\n':
break
line_sep = line.split(' ')
while '' in line_sep:
line_sep.remove('')
if len(line_sep) > 4 and line_sep[3] == resname:
return input_file
raise Exception(f'Residue {resname} can not be found in {file_name}')
[docs]def deter_atom_order(mol_file, resname):
"""
Determine the order of atoms in the residue we will modify to ensure the output GRO is formatted properly
Parameters
----------
file_name : list of str
Contains the contents of the original GRO file
resname : str
Name of the residue of interest we are searching for.
Returns
-------
atom_order : list of str
List of the atom names in the order they appear in the GRO file
"""
from ensemble_md.utils import coordinate_swap
atom_order = []
for line in mol_file:
split_line = line.split(' ')
while '' in split_line:
split_line.remove('')
if resname in split_line[0]:
if len(split_line[1]) > 5:
split_line = coordinate_swap.sep_merge(split_line)
atom_order.append(split_line[1])
elif len(atom_order) != 0:
break
return atom_order