Desmond
Usage
from mdscribe.desmond import Multisim
# read template .msj and .cfg
md_msj = Multisim(template="desmond-md.msj")
md_cfg = Multisim(template="desmond-md.cfg")
with open(msj_file,"w") as msj:
# modify desmond msj template
md_msj.dot.simulate[-1].cfg_file = cfg_file_basename
# Setting up restraints using the restraints keyword:
# https://www.schrodinger.com/kb/332119
if args.posres_force > 0.0:
# print the restraints in the multisim log file
md_msj.dot.simulate[-1].print_restraint = 'true'
# add the new terms defined in "restraints.new" to existing restraints.
# The default is restraints.existing = ignore which will
# delete existing terms before adding any new ones.
# md_msj.dot.simulate[-1].restraints.existing = 'retain'
md_msj.dot.simulate[-1].restraints.new = [
{
'name' : 'posre_harm',
'atoms' : [ f'"{args.posres}"' ],
'force_constants' : [ args.posres_force, ] * 3,
}
]
# force constants in the x, y, and z direction
# write to a .msj file
md_msj.write(msj)
with open(cfg_file,"w") as cfg:
# modify desmond cfg template
md_cfg.dot.randomize_velocity.seed = random.randint(1000, 9999)
md_cfg.dot.time = total_simulation_time
md_cfg.dot.temperature = t_schedule
md_cfg.dot.trajectory.interval = args.interval
# write to a .cfg file
md_cfg.write(cfg)
Metadynamics
Collective variable types
| type | Description | Default Width | Wall | Floor | Atom sites |
|---|---|---|---|---|---|
| dist | Distance | 0.05 Å | yes | yes | 2 |
| angle | Angle | 2.5° | yes | yes | 3 |
| dihedral | Dihedral | 5.0° | no | no | 4 |
| rgyr | Radius of gyration | 0.1 Å | no | no | 1 |
| rgyr_mass | Mass-weighted radius of gyration | 0.1 Å | no | no | 1 |
| rmsd | RMSD from aligned starting structure | 0.1 Å | no | no | 1 |
| rmsd_symm | Symmetry aware RMSD | 0.1 Å | no | no | 1 |
| zdist | Distance along the z axis | 0.05 Å | yes | yes | 1 |
| zdist0 | Absolute distance along the z axis | 0.1 Å | yes | yes | 1 |
| whim1 | WHIM1 - first principal moment [35] | 0.5 Å2 | no | no | 1 |
| whim2 | WHIM2 - second principal moment [35] | 0.25 Å2 | no | no | 1 |
Choice of the bias factor (kTemp)
In well-tempered metadynamics, the height of the deployed Gaussians are rescaled (decreased) during simulation time by:
omega_0 * exp(-V/(kB * ΔT))
Where omega_0 is the initial hill height, V is the bias potential and the denominator in the exponential, kB * ΔT,
is the bias factor (kTemp). During well-tempered metadynamics, the dynamics of the system are effectively accelerated
(without heating up the system) up to T + ΔT, where T is the chosen MD simulation temperature.
The choice of the bias factor value is guided by the highest barrier in the simulation system
which the well-tempered metadynamics run should overcome. Here are some suggestions,
assuming that the initial hill height ω0 (height parameter in the Metadynamics panel) has been set to 0.3 kcal/mol:
| Max. barrier height (kcal/mol) | kTemp (kcal/mol) |
|---|---|
| 3 | 1.7 |
| 6 | 3.4 |
| 10 | 5.6 |
| 15 | 8.4 |
| 20 | 11.2 |
Examples
distance:
cv = [
{atom = ["res. UNK" "res. 849" ]
type = dist
wall = 40
floor = 10
width = 0.05
}
]
dihedral:
cv = [
{atom = [404 406 407 415 ]
type = dihedral
width = 5.0
}
{atom = [406 407 415 417 ]
type = dihedral
width = 5.0
}
]
zdist(membrane):
cv = [
{atom = ["res. UNK"]
type = zdist
width = 0.05
wall = 20
floor = 5
}
]
rmsd:
cv = [
{atom = ["res. UNK" "res. 849" ]
type = rmsd
width = 0.1
}
]
Extending Simulations
See https://www.schrodinger.com/kb/788642 for extending metadynamics simulation.
Command-Line Interface (CLI)
desmond.Multisim class are used to build below CLI.
| Command-line interface | Description |
|---|---|
mdinfo |
Display a running Desmond MD simulation information in the Amber-style |
batch-desmond-setup |
Setup/parametrize Desmond MD simulations |
batch-desmond-min |
Run Desmond energy minimizations |
batch-desmond-md |
Run Desmond MD simulations |
batch-desmond-metad |
Run metadynamics MD simulations |
batch-desmond-pli |
Analyze protein-ligand interactions |
batch-desmond-extend |
Extend Desmond MD simulations |
batch-desmond-report |
Generate reports for Desmond MD simulations |
batch-desmond-dihedral |
Analyze dihedral angles from Desmond MD trajectories |
batch-desmond-ligrmsd |
Analyze ligand rmsd from Desmond MD trajectories |
batch-desmond-rg |
Analyze radius of gyration from Desmond MD trajectories |
For more helps, $ command --help.
mdscribe.desmond.cli
SCHRODINGER = os.environ['SCHRODINGER']
module-attribute
USER = os.environ['USER']
module-attribute
multisim = '{}/utilities/multisim'.format(SCHRODINGER)
module-attribute
schrodinger_hosts = '{}/schrodinger.hosts'.format(SCHRODINGER)
module-attribute
schrodinger_run = '{}/run'.format(SCHRODINGER)
module-attribute
script_path = files('mdscribe.desmond.script')
module-attribute
batch_dihedral()
batch_distance()
batch_extend()
batch_ligrmsd()
batch_md()
Source code in mdscribe/desmond/cli.py
def batch_md():
md_msj = desmond.Multisim(template="desmond-md.msj")
md_cfg = desmond.Multisim(template="desmond-md.cfg")
parser = argparse.ArgumentParser(description="batch gdesmond md jobs",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-g', dest="gpu_device", type=int, default=0,
metavar="gpu_device", help="gpu device id")
parser.add_argument('-T', dest="temp", nargs="+", type=float, default=[300.0,],
metavar="temperature", help="temperature in K")
parser.add_argument('-R', dest="ramp", nargs="+", type=float, default=[10.0,],
metavar="tempramp", help="heat and cool ramps in ps/K")
parser.add_argument('-t', dest="simulation_time", nargs="+", type=float, default=[100.0,],
metavar="simulation_time", help="simulation time in ns")
parser.add_argument('-i', dest="interval", type=float, default=100.0,
metavar="interval", help="frame interval in ps")
parser.add_argument('--posres', dest="posres", default="res. UNK",
metavar="posres", help="ASL for positional restraints during prod. sim.")
parser.add_argument('--posres-force', dest="posres_force", type=float, default=0.0,
metavar="posres_force", help="positional restraints force constant (kcal/mol/A**2)")
parser.add_argument('-p', dest="prefix", default="r", help="directory prefix")
parser.add_argument('-s', dest="start", type=int, default=1, help="directory start")
parser.add_argument('-r', dest="repeat", type=int, default=1, help="number of repeats")
parser.add_argument('-j', dest="job_file", default="desmond_md_job_1.sh", help="job filename")
parser.add_argument('cms', nargs="+", help="desmond cms file")
args = parser.parse_args()
try:
cms_files = [os.path.abspath(f) for f in args.cms]
assert(len(cms_files) > 0)
except:
print(".cms file(s) not found")
sys.exit(0)
opt = '-HOST localhost -maxjob 1 -cpu 1 -mode umbrella '
opt += '-set stage[1].set_family.md.jlaunch_opt=["-gpu"] -lic "DESMOND_GPGPU:16"'
job_file = args.job_file
while os.path.exists(job_file):
splited = job_file.replace(".sh","").split("_")
splited[-1] = str(int(splited[-1]) + 1)
job_file = "_".join(splited) + ".sh"
if len(args.temp) > 1:
try:
assert len(args.temp) == len(args.simulation_time)
assert len(args.temp) == (len(args.ramp) + 1)
except:
print("For a variable temperature simulaton, the number of temperatures and simulation times ")
print("should match and temperature ramp(s) (ps/K) should be given between temperatures.")
print("Please check -T, -t and -R options")
sys.exit(0)
else:
args.ramp = [] # not used
# Note: if not specified,
# all times are in the unit of ps and
# energy is in the unit of kcal/mol.
with open("README","a") as readme, open(job_file,"w") as job:
print(f"Job file = {job_file}")
dirs = [ f"{args.prefix}{n:02d}" for n in range(args.start, args.start+args.repeat) ]
t_schedule = [ [ args.temp[0], 0.0 ], ]
if len(args.temp) > 1 :
elapsed = 0
prev_temp = args.temp[0]
idx = 0
for (temp,simulation_time) in zip(args.temp, args.simulation_time):
deltaT = abs(temp - prev_temp)
if deltaT > 0.001: # ramp
elapsed += args.ramp[idx] * deltaT # ramp: ps/K
t_schedule.append([temp, elapsed])
idx += 1
elapsed += simulation_time * 1000. # ns -> ps
t_schedule.append([temp, elapsed]) # ns -> ps
prev_temp = temp
total_simulation_time = elapsed # ps
else:
total_simulation_time = sum(args.simulation_time) * 1000.0 # ps
if not args.interval:
# args.simulation_time in ns and args.interval in ps
# default: make 1000 frames
args.interval = total_simulation_time / 1000.
cmd_echo = ""
for argv in sys.argv:
if cmd_echo:
cmd_echo += " "
if " " in argv:
cmd_echo += f'"{argv}"'
else:
cmd_echo += f'{argv}'
readme.write(f"{cmd_echo}\n\n")
readme.write(f"GPU device = {args.gpu_device}\n")
readme.write(f"Temperature (K) = {args.temp}\n")
readme.write(f"Temperature Ramp (ps/K) = {args.ramp}\n")
readme.write(f"Simulation Time (ns) = {args.simulation_time}\n")
readme.write(f"Temperature schedule = {str(t_schedule)}\n")
readme.write(f"Total Sim. Time (ns) = {total_simulation_time/1000.0}\n")
readme.write(f"Trajectory interval(ps) = {args.interval}\n")
readme.write(f"Repeat = {args.repeat}\n")
readme.write( "Directory = %s\n" % " ".join(dirs))
readme.write(f"Jobfile = {job_file}\n\n")
for i, infile in enumerate(cms_files):
info = f"[{i+1:02d}] {infile}"
print(info)
readme.write(info+"\n")
readme.write("\n\n")
job.write(f'export CUDA_VISIBLE_DEVICES="{args.gpu_device}"\n\n')
for n in range(args.start, args.start+args.repeat):
outdir = f"{args.prefix}{n:02d}"
outdir_abspath = os.path.abspath(outdir)
job.write(f"cd {outdir_abspath}/\n\n")
if not os.path.exists(outdir):
os.makedirs(outdir)
msj_file = f"{outdir}/desmond_md_job_{n:02d}.msj"
cfg_file = f"{outdir}/desmond_md_job_{n:02d}.cfg"
cfg_file_basename= os.path.basename(cfg_file)
with open(msj_file,"w") as msj:
# modify desmond msj template
md_msj.dot.simulate[-1].cfg_file = cfg_file_basename
# Setting up restraints using the restraints keyword:
# https://www.schrodinger.com/kb/332119
if args.posres_force > 0.0:
# print the restraints in the multisim log file
md_msj.dot.simulate[-1].print_restraint = 'true'
# add the new terms defined in "restraints.new" to existing restraints.
# The default is restraints.existing = ignore which will
# delete existing terms before adding any new ones.
# md_msj.dot.simulate[-1].restraints.existing = 'retain'
md_msj.dot.simulate[-1].restraints.new = [
{
'name' : 'posre_harm',
'atoms' : [ f'"{args.posres}"' ],
'force_constants' : [ args.posres_force, ] * 3,
}
]
# force constants in the x, y, and z direction
md_msj.write(msj)
with open(cfg_file,"w") as cfg:
# modify desmond cfg template
md_cfg.dot.randomize_velocity.seed = random.randint(1000, 9999)
md_cfg.dot.time = total_simulation_time
md_cfg.dot.temperature = t_schedule
md_cfg.dot.trajectory.interval = args.interval
md_cfg.write(cfg)
for infile in cms_files:
prefix_ = re.sub(r'desmond_setup[-_]','', os.path.basename(infile))
prefix = prefix_.replace("-out.cms","")
job_name = f"desmond_md_job_{n:02d}_{prefix}"
job.write('if [ ! -f {}/{} ]\n'.format( outdir_abspath, f"{job_name}-out.cms",))
job.write('then\n')
job.write('{} -JOBNAME {} -m {} -c {} -description "{}" {} {} -o {} -WAIT\n'.format(
multisim,
job_name,
os.path.basename(msj_file),
os.path.basename(cfg_file),
"GPU desmond MD",
opt,
os.path.join("..",infile),
f"{job_name}-out.cms",
))
job.write('fi\n\n')
os.chmod(job_file, 0o777)
batch_metad()
Metadynamics Simulations.
Source code in mdscribe/desmond/cli.py
def batch_metad():
"""Metadynamics Simulations.
"""
metad_msj = desmond.Multisim(template="desmond-metad.msj")
metad_cfg = desmond.Multisim(template="desmond-metad.cfg")
opt = '-HOST localhost -maxjob 1 -cpu 1 -mode umbrella -lic "DESMOND_GPGPU:16" '
opt += '-description "metadynamics"'
parser = argparse.ArgumentParser(description="batch gdesmond metadynamics jobs",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--meta-height', dest="meta_height", type=float, default=0.03, help="Height of the repulsive Gaussian potential in kcal/mol (0.03)")
parser.add_argument('--meta-interval', dest="meta_interval", type=float, default=0.09, help="Interval in ps at which Gaussian potentials are added (0.09)")
parser.add_argument('--meta-first', dest="meta_first", type=float, default=0.0, help="Time in ps at which the Gaussian potentials are first added (0)")
parser.add_argument('--meta-last', dest="meta_last", type=float, default=-1.0, help="Time in ps at which the Gaussian potentials are last added (simulation time)")
parser.add_argument('--meta-kTemp', dest="meta_kTemp", type=float, default=2.4, help="Perform the well-tempered metadynamics (2.4)")
parser.add_argument('--cv-dist', dest='cv_dist', nargs=4, action='append', default=[], help="atom1 atom2 width(0.05) wall")
parser.add_argument('--cv-angle', dest='cv_angle', nargs=4, action='append', default=[], help="atom1 atom2 atom3 width(2.5)")
parser.add_argument('--cv-dihedral', dest='cv_dihedral', nargs=5, action='append', default=[], help="atom1 atom2 atom3 atom4 width(5.0)")
parser.add_argument('--cv-rgyr', dest='cv_rgyr', nargs=2, action='append', default=[], help="atom width(0.1)")
parser.add_argument('--cv-rgyr-mass', dest='cv_rgyr_mass', nargs=2, action='append', default=[], help="atom width(0.1)")
parser.add_argument('--cv-rmsd', dest='cv_rmsd', nargs=2, action='append', default=[], help="atom width(0.1)")
parser.add_argument('--cv-rmsd-symm', dest='cv_rmsd_symm', nargs=2, action='append', default=[], help="atom width(0.1)")
parser.add_argument('--cv-zdist', dest='cv_zdist', nargs=3, action='append', default=[], help="atom width wall(0.05)")
parser.add_argument('--cv-zdist0', dest='cv_zdist0', nargs=3, action='append', default=[], help="atom width wall(0.1)")
parser.add_argument('-g', dest="gpu_device", type=int, default=0, help="gpu device id")
parser.add_argument('-T', dest="temperature", type=float, default=300.0, help="temperature in Kelvin")
parser.add_argument('-t', dest="simulation_time", type=float, default=40.0, help="simulation time in ns")
parser.add_argument('-i', dest="interval", type=float, default=40.0, help="frame interval in ps")
parser.add_argument('-p', dest="prefix", type=str, default="m", help="directory prefix")
parser.add_argument('-s', dest="start", type=int, default=1, help="directory start")
parser.add_argument('-r', dest="repeat", type=int, default=1, help="number of repeats")
parser.add_argument('-j', dest="job_file", type=str, default="desmond_metadynamics_job_1.sh", help="job filename")
parser.add_argument('cms', nargs="+", help="desmond cms file")
args = parser.parse_args()
try:
cms_files = [os.path.abspath(f) for f in args.cms]
assert(len(cms_files) > 0)
except:
print(".cms file(s) not found")
sys.exit(0)
if not args.interval:
args.interval = args.simulation_time # it will save 1000 frames
job_file = args.job_file
while os.path.exists(job_file):
splited = job_file.replace(".sh","").split("_")
splited[-1] = str(int(splited[-1]) + 1)
job_file = "_".join(splited) + ".sh"
with open("README","a") as readme, open(job_file,"w") as job:
print("\n" + job_file + "\n")
outdir_nums = list(range(args.start, args.start+args.repeat))
outdirs = [f"{args.prefix}{num:02d}" for num in outdir_nums]
cmd_echo = ""
for argv in sys.argv:
if cmd_echo:
cmd_echo += " "
if " " in argv:
cmd_echo += f'"{argv}"'
else:
cmd_echo += f'{argv}'
readme.write(f"{cmd_echo}\n\n")
readme.write(f"GPU device = {args.gpu_device}\n")
readme.write(f"Temperature (K) = {args.temperature}\n")
readme.write(f"Simulation Time (ns) = {args.simulation_time}\n")
readme.write(f"Trajectory interval(ps) = {args.interval}\n")
readme.write(f"Repeat = {args.repeat}\n")
readme.write( "Directory = %s\n" % " ".join(outdirs))
readme.write(f"Jobfile = {job_file}\n\n")
job.write(f'export CUDA_VISIBLE_DEVICES="{args.gpu_device}"\n\n')
for i, infile in enumerate(cms_files):
info = f"[{i+1}] {infile}"
print(info)
readme.write(info+"\n")
print()
readme.write("\n")
for (num, outdir) in zip(outdir_nums, outdirs):
outdir_abspath = os.path.abspath(outdir)
job.write(f"cd {outdir_abspath}/\n\n")
if not os.path.exists(outdir):
os.makedirs(outdir)
cfg_file = f"{outdir}/desmond_metadynamics_job_{num:02d}.cfg"
msj_file = f"{outdir}/desmond_metadynamics_job_{num:02d}.msj"
cfg_file_basename= os.path.basename(cfg_file)
with open(cfg_file,"w") as cfg, open(msj_file,"w") as msj:
metad_cfg.dot.randomize_velocity.seed = str(random.randint(1000,9999))
metad_cfg.dot.time = str(args.simulation_time*1000.0)
metad_cfg.dot.trajectory.interval = str(args.interval)
metad_cfg.dot.temperature = f"[ [{args.temperature} 0] ]"
metad_cfg.write(cfg)
metad_msj.dot.simulate[-1].cfg_file = cfg_file_basename
# metadynamics
cv_list = []
for atom1, atom2, width, wall in args.cv_dist:
cv = copy.deepcopy(metad_msj.dot.simulate[-1].meta.cv[0])
atom1 = index_or_asl(atom1)
atom2 = index_or_asl(atom2)
cv.atom = f'[{atom1} {atom2}]'
cv.type = 'dist'
cv.width = str(width)
if not wall.startswith('-'):
cv.wall = str(wall)
cv_list.append(cv)
for atom1, atom2, atom3, width in args.cv_angle:
cv = copy.deepcopy(metad_msj.dot.simulate[-1].meta.cv[0])
atom1 = index_or_asl(atom1)
atom2 = index_or_asl(atom2)
atom3 = index_or_asl(atom3)
cv.atom = f'[{atom1} {atom2} {atom3}]'
cv.type = 'angle'
cv.width = str(width)
cv_list.append(cv)
for atom1, atom2, atom3, atom4, width in args.cv_dihedral:
cv = copy.deepcopy(metad_msj.dot.simulate[-1].meta.cv[0])
atom1 = index_or_asl(atom1)
atom2 = index_or_asl(atom2)
atom3 = index_or_asl(atom3)
atom4 = index_or_asl(atom4)
cv.atom = f'[{atom1} {atom2} {atom3} {atom4}]'
cv.type = 'dihedral'
cv.width = str(width)
cv_list.append(cv)
for atom1, width in args.cv_rmsd:
cv = copy.deepcopy(metad_msj.dot.simulate[-1].meta.cv[0])
atom1 = index_or_asl(atom1)
cv.atom = f'[{atom1}]'
cv.type = 'rmsd'
cv.width = str(width)
cv_list.append(cv)
for atom1, width in args.cv_rmsd_symm:
cv = copy.deepcopy(metad_msj.dot.simulate[-1].meta.cv[0])
atom1 = index_or_asl(atom1)
cv.atom = f'[{atom1}]'
cv.type = 'rmsd_symm'
cv.width = str(width)
cv_list.append(cv)
for atom1, width in args.cv_rgyr:
cv = copy.deepcopy(metad_msj.dot.simulate[-1].meta.cv[0])
atom1 = index_or_asl(atom1)
cv.atom = f'[{atom1}]'
cv.type = 'rgyr'
cv.width = str(width)
cv_list.append(cv)
for atom1, width in args.cv_rgyr_mass:
cv = copy.deepcopy(metad_msj.dot.simulate[-1].meta.cv[0])
atom1 = index_or_asl(atom1)
cv.atom = f'[{atom1}]'
cv.type = 'rgyr_mass'
cv.width = str(width)
cv_list.append(cv)
for atom1, width, wall in args.cv_zdist:
cv = copy.deepcopy(metad_msj.dot.simulate[-1].meta.cv[0])
atom1 = index_or_asl(atom1)
cv.atom = f'[{atom1}]'
cv.type = 'zdist'
cv.width = str(width)
if not wall.startswith('-'):
cv.wall = str(wall)
cv_list.append(cv)
for atom1, width, wall in args.cv_zdist0:
cv = copy.deepcopy(metad_msj.dot.simulate[-1].meta.cv[0])
atom1 = index_or_asl(atom1)
cv.atom = f'[{atom1}]'
cv.type = 'zdist0'
cv.width = str(width)
if not wall.startswith('-'):
cv.wall = str(wall)
cv_list.append(cv)
metad_msj.dot.simulate[-1].meta.cv = cv_list
metad_msj.dot.simulate[-1].meta.height = str(args.meta_height)
metad_msj.dot.simulate[-1].meta.interval = str(args.meta_interval)
metad_msj.dot.simulate[-1].meta.first = str(args.meta_first)
if args.meta_last > 0.0:
metad_msj.dot.simulate[-1].meta.last = str(args.meta_last)
# well-tempered metadynamics
if args.meta_kTemp > 0.0:
metad_msj.dot.simulate[-1].meta.kTemp = str(args.meta_kTemp)
metad_msj.write(msj)
# append to job file
for infile in cms_files:
prefix_ = re.sub(r'desmond_setup[-_]','', os.path.basename(infile))
prefix = prefix_.replace("-out.cms","")
job_name = f"desmond_metadynamics_job_{num:02d}_{prefix}"
job.write('{} -JOBNAME {} -m {} -c {} {} {} -o {} -WAIT\n\n'.format(
multisim,
job_name,
os.path.basename(msj_file),
os.path.basename(cfg_file),
opt,
infile,
f"{job_name}-out.cms",
))
os.chmod(job_file, 0o777)
batch_min()
Source code in mdscribe/desmond/cli.py
def batch_min():
min_msj = desmond.Multisim(template="desmond-min.msj")
min_cfg = desmond.Multisim(template="desmond-min.cfg")
opt = '-HOST localhost -maxjob 1 -cpu 1 -mode umbrella '
opt += '-lic "DESMOND_GPGPU:16" '
opt += '-description "minimization"'
parser = argparse.ArgumentParser(description="batch gdesmond minimization jobs",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-g', dest="gpu_device", type=int, default=0,
help="gpu device id")
parser.add_argument('-t', dest="simulation_time", type=float, default=100.0,
help="simulation time in ps")
parser.add_argument('-p', dest="prefix", type=str, default="r",
help="directory prefix")
parser.add_argument('-s', dest="start", type=int, default=1,
help="directory start")
parser.add_argument('-r', dest="repeat", type=int, default=1,
help="number of repeats")
parser.add_argument('-j', dest="job_file", type=str, default="desmond_min_job_1.sh",
help="job filename")
parser.add_argument('cms', nargs="+", help="desmond cms file")
args = parser.parse_args()
try:
cms_files = [os.path.abspath(f) for f in args.cms]
assert(len(cms_files) > 0)
except:
print(".cms file(s) not found")
sys.exit(0)
job_file = args.job_file
while os.path.exists(job_file):
splited = job_file.replace(".sh","").split("_")
splited[-1] = str(int(splited[-1]) + 1)
job_file = "_".join(splited) + ".sh"
with open("README","a") as readme, open(job_file,"w") as job:
print("\n" + job_file + "\n")
outdir_nums = list(range(args.start, args.start+args.repeat))
outdirs = [f"{args.prefix}{num:02d}" for num in outdir_nums]
cmd_echo = ""
for argv in sys.argv:
if cmd_echo:
cmd_echo += " "
if " " in argv:
cmd_echo += f'"{argv}"'
else:
cmd_echo += f'{argv}'
readme.write(f"{cmd_echo}\n\n")
readme.write(f"GPU device = {args.gpu_device}\n")
readme.write(f"Simulation Time (ns) = {args.simulation_time}\n")
readme.write(f"Repeat = {args.repeat}\n")
readme.write( "Directory = %s\n" % " ".join(outdirs))
readme.write(f"Jobfile = {job_file}\n\n")
job.write(f'export CUDA_VISIBLE_DEVICES="{args.gpu_device}"\n\n')
for i, infile in enumerate(cms_files):
info = f"[{i+1}] {infile}"
print(info)
readme.write(info+"\n")
print()
readme.write("\n")
for (num, outdir) in zip(outdir_nums, outdirs):
outdir_abspath = os.path.abspath(outdir)
job.write(f"cd {outdir_abspath}/\n\n")
if not os.path.exists(outdir):
os.makedirs(outdir)
cfg_file = f"{outdir}/desmond_min_job_{num:02d}.cfg"
msj_file = f"{outdir}/desmond_min_job_{num:02d}.msj"
cfg_file_basename= os.path.basename(cfg_file)
with open(cfg_file,"w") as cfg, open(msj_file,"w") as msj:
min_cfg.dot.randomize_velocity.seed = str(random.randint(1000,9999))
min_cfg.dot.time = str(args.simulation_time) # (ps)
min_cfg.write(cfg)
min_msj.dot.simulate.cfg_file = cfg_file_basename
min_msj.write(msj)
# append to job file
for infile in cms_files:
prefix_ = re.sub(r'desmond_setup[-_]','', os.path.basename(infile))
prefix = prefix_.replace("-out.cms","")
job_name = f"desmond_min_job_{num:02d}_{prefix}"
job.write('{} -JOBNAME {} -m {} -c {} {} {} -o {} -WAIT\n\n'.format(
multisim,
job_name,
os.path.basename(msj_file),
os.path.basename(cfg_file),
opt,
infile,
f"{job_name}-out.cms",
))
os.chmod(job_file, 0o777)
batch_pli()
Source code in mdscribe/desmond/cli.py
def batch_pli():
teststring ="""Keywords = [
{RMSD = {
ASL = "((mol. 1 and backbone) and not (atom.ele H) and not (mol. 2))"
Frame = 0
Panel = pl_interact_survey
Result = [0.0 1.161 1.286 1.331 1.176 1.195 ]
SelectionType = Backbone
Tab = pl_rmsd_tab
Type = ASL
Unit = Angstrom
}
}
{RMSD = {
ASL = "(mol. 1 and sidechain and not (mol. 2))"
Frame = 0
Panel = pl_interact_survey
Result = [0.0 1.161 1.286 1.331 1.176 1.195 ]
SelectionType = "Side chains"
Tab = pl_rmsd_tab
Type = ASL
Unit = Angstrom
}
}
]"""
# result = expr.parse_string(teststring)
# d = result.as_dict()
# traverse_dict(d)
# dot = DotMap(d)
# try:
# assert dot.Keywords[1].RMSD.SelectionType == '"Side chains"'
# print("ok")
# except:
# print("error")
parser = argparse.ArgumentParser(description="Average Protein-Ligand Interactions",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--out', dest='out', default='mean-PLIF', help="output basename")
parser.add_argument('eaf', nargs='+', default=[], help='input -out.eaf filename(s)')
args = parser.parse_args()
if len(args.eaf) == 0:
argparse.print_help()
sys.exit(0)
total_num_frames = 0
data = {}
for filename in args.eaf:
(num_frames, HBond, Hydrophobic, Polar, WaterBridge,
HalogenBond, LigWat, Metal, PiCat, PiPi) = read_eaf(filename, verbose=False)
total_num_frames += num_frames
print(f"{filename} {num_frames} frames")
for resSeq in HBond:
if resSeq in data:
data[resSeq]['hbond'] += HBond[resSeq]['count']
else:
data[resSeq] = {'resName': HBond[resSeq]['resName'],
'hbond': HBond[resSeq]['count'],
'hydrophobic': 0,
'polar': 0,
'waterbridge': 0,
}
for resSeq in Hydrophobic:
if resSeq in data:
data[resSeq]['hydrophobic'] += Hydrophobic[resSeq]['count']
else:
data[resSeq] = {'resName': Hydrophobic[resSeq]['resName'],
'hbond': 0,
'hydrophobic': Hydrophobic[resSeq]['count'],
'polar': 0,
'waterbridge': 0,
}
for resSeq in Polar:
if resSeq in data:
data[resSeq]['polar'] += Polar[resSeq]['count']
else:
data[resSeq] = {'resName': Polar[resSeq]['resName'],
'hbond': 0,
'hydrophobic': 0,
'polar': Polar[resSeq]['count'],
'waterbridge': 0,
}
for resSeq in WaterBridge:
if resSeq in data:
data[resSeq]['waterbridge'] += WaterBridge[resSeq]['count']
else:
data[resSeq] = {'resName': WaterBridge[resSeq]['resName'],
'hbond': 0,
'hydrophobic' : 0,
'polar': 0,
'waterbridge': WaterBridge[resSeq]['count'],
}
csvdata = {'resid':[],
'resSeq':[],
'resName':[],
'hbond':[],
'hydrophobic':[],
'polar': [],
'waterbridge': [],
}
for resSeq in sorted(data):
csvdata['resSeq'].append(resSeq)
csvdata['resName'].append(data[resSeq]['resName'])
csvdata['resid'].append(f"{data[resSeq]['resName']}_{resSeq}")
csvdata['hbond'].append(float(data[resSeq]['hbond'])/total_num_frames)
csvdata['hydrophobic'].append(float(data[resSeq]['hydrophobic'])/total_num_frames)
csvdata['polar'].append(float(data[resSeq]['polar'])/total_num_frames)
csvdata['waterbridge'].append(float(data[resSeq]['waterbridge'])/total_num_frames)
df = pd.DataFrame(csvdata)
df.to_csv(args.out + '.csv', index=False, float_format='%.4f')
g = df.loc[:, ~df.columns.isin(['resSeq','resName'])].plot.bar(
x="resid",
stacked=True,
title="Protein-Ligand Interactions",
xlabel='Residue',
ylabel='Fraction of MD trajectory',
figsize=(8,3),
fontsize=8,
)
fig = g.get_figure()
fig.savefig(args.out + '.pdf', bbox_inches="tight", pad_inches=0.2, dpi=150)
batch_report()
batch_rg()
batch_setup()
Source code in mdscribe/desmond/cli.py
def batch_setup():
setup_msj = desmond.Multisim(template="desmond-setup.msj")
parser = argparse.ArgumentParser(description="batch gdesmond md system setup",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-c','--conc', dest="conc", type=float, default=0.15, help="salt concentration in M")
parser.add_argument('-d','--dist', dest="dist", type=float, default=10.0, help="buffer distance in A")
parser.add_argument('-l','--lipid', dest="lipid", default="", help="lipid bilayer")
parser.add_argument('--cpp', dest="cpp", default=False, action="store_true", help="CPP simulation setup")
parser.add_argument('-s','--solvent', dest="solvent", default="TIP3P", help="solvent model")
parser.add_argument('-i','--counterion', dest="counterion", default="Na", help="neutralizing ion")
parser.add_argument('-n','--negative', dest="neg", default="Cl", help="negative salt ion")
parser.add_argument('-p','--positive', dest="pos", default="Na", help="positive salt ion")
parser.add_argument('-f','--forcefield', dest="forcefield", default="S-OPLS", help="forcefield")
parser.add_argument('-j','--jobfile', dest="job_file", default="desmond_setup_1.sh", help="job filename")
parser.add_argument('-m','--msjfile', dest="msj_file", default="desmond_setup_1.msj", help="msj filename")
parser.add_argument('-a','--appendix', dest="appendix", default="", help="job name appendix")
parser.add_argument('mae', nargs="+", help="desmond mae file")
args = parser.parse_args()
if args.appendix:
msj_file = args.msj_file[:-4] + "_" + args.appendix + ".msj"
else:
msj_file = args.msj_file
if args.appendix:
job_file = args.job_file[:-3] + "_" + args.appendix + ".sh"
else:
job_file = args.job_file
# job file (.sh) and msj file (.msj) should match
while os.path.exists(job_file):
splited = job_file.replace(".sh","").split("_")
splited[-1] = str(int(splited[-1]) + 1)
job_file = "_".join(splited) + ".sh"
while os.path.exists(msj_file):
splited = msj_file.replace(".msj","").split("_")
splited[-1] = str(int(splited[-1]) + 1)
msj_file = "_".join(splited) + ".msj"
with open(msj_file, "w") as msj:
setup_msj.dot.build_geometry.solvent = str(args.solvent)
setup_msj.dot.build_geometry.add_counterion.ion = str(args.counterion)
setup_msj.dot.build_geometry.salt.concentration = str(args.conc)
setup_msj.dot.build_geometry.box.size = f"[ {args.dist} {args.dist} {args.dist} ]"
setup_msj.dot.build_geometry.salt.negative_ion = str(args.neg)
setup_msj.dot.build_geometry.salt.positive_ion = str(args.pos)
if args.cpp:
setup_msj.dot.build_geometry.rezero_system = "False"
args.lipid = "POPC"
if args.lipid:
setup_msj.dot.build_geometry.membrane_box.lipid = str(args.lipid)
else:
# remove membrane_box block
setup_msj.dot.build_geometry.pop("membrane_box")
setup_msj.dot.build_geometry.override_forcefield = str(args.forcefield)
setup_msj.dot.assign_forcefield.forcefield = str(args.forcefield)
setup_msj.dot.assign_forcefield.water = str(args.solvent)
setup_msj.write(msj)
with open("README","a") as readme, open(job_file,"w") as job:
cmd_echo = ""
for argv in sys.argv:
if cmd_echo:
cmd_echo += " "
if " " in argv:
cmd_echo += f'"{argv}"'
else:
cmd_echo += f'{argv}'
readme.write(f"{cmd_echo}\n\n")
readme.write(f"Force Field = {args.forcefield}\n")
readme.write(f"Solvent = {args.solvent}\n")
readme.write(f"Counter Ion = {args.counterion}\n")
readme.write(f"Positive Ion = {args.pos}\n")
readme.write(f"Negative Ion = {args.neg}\n")
readme.write(f"Concentration = {args.conc} (M)\n")
readme.write(f"Size = {args.dist} (A)\n")
readme.write(f"Lipid = {args.lipid}\n")
readme.write(f"msjfile = {msj_file}\n")
readme.write(f"Jobfile = {job_file}\n")
readme.write(f"Input structure(s):\n")
for i, infile in enumerate(args.mae):
prefix = os.path.basename(infile).split(".")[0]
job_name = f"desmond_setup-{prefix}"
if args.appendix:
job_name += f"-{args.appendix}"
cms_file = f"{job_name}-out.cms"
job.write(f"if [ ! -f {cms_file} ]\n")
job.write(f"then\n")
job.write(f"{multisim} \\\n")
job.write(f" -JOBNAME {job_name} \\\n")
job.write(f" -m {msj_file} {os.path.abspath(infile)} \\\n")
job.write(f" -o {cms_file} \\\n")
job.write(f" -HOST localhost:20 -maxjob 20 -WAIT\n")
job.write(f"fi\n")
readme.write(f"[{i+1:02d}] {infile}\n")
readme.write("\n\n")
os.chmod(job_file, 0o777)
index_or_asl(expr)
mdinfo()
Source code in mdscribe/desmond/cli.py
def mdinfo():
simulation_time = re.compile(r'last_time = "(?P<t>[.0-9]+)"')
# last_time = "100000.0"
progress = re.compile(r'Chemical time:\s+(?P<finished>[.0-9]+) ps, Step:\s+[0-9]+, ns/day:\s+(?P<rate>[.0-9]+)')
#Chemical time: 33507.6000 ps, Step: 5584600, ns/day: 296.231
parser = argparse.ArgumentParser(description="desmond MD info.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-t', '--tmpdir', dest='tmpdir', default=None, help='desmond temporary directory')
parser.add_argument('-l', '--logfile', dest='logfile', default=None)
args = parser.parse_args()
if args.tmpdir is None:
with open(schrodinger_hosts, "r") as f:
for line in f:
if line.strip().startswith("#"):
continue
c = line.strip().split(":")
if len(c) != 2:
continue
k = c[0].strip()
v = c[1].strip()
if k == 'tmpdir':
args.tmpdir = os.path.join(v, USER)
if not os.path.exists(args.tmpdir):
sys.stderr.write("Use -t or --tmpdir to give temporary directory\n")
sys.exit(1)
if args.logfile is None:
args.logfile = args.tmpdir+"/desmond_*/*.log"
logfile = None
for f in glob.glob(args.logfile):
if not "multisim" in f:
logfile = f
break
if not logfile:
sys.stderr.write(f"desmond log not found in {args.logfile}\n")
sys.stderr.write("Use -l or --logfile to give log file path\n")
sys.stderr.write("Use -t or --tmpdir to give temporary directory\n")
sys.exit(2)
total = None
rate = []
with open(logfile,"r") as f:
for line in f:
m = simulation_time.search(line)
n = progress.search(line)
if not total and m and m.group("t"):
total = float(m.group("t"))*0.001
if n and n.group("finished") and n.group("rate"):
rate.append(float(n.group("rate")))
finished = float(n.group("finished"))*0.001
n = len(rate)
print("-"*80)
print(f"| SCHRODINGER= {SCHRODINGER}")
print(f"| tmpdir: {args.tmpdir}")
print(f"|")
print(f"| Desmond MD Timing Info")
print(f"| ----------------------")
print(f"|")
print(f"| {os.path.dirname(logfile)}/")
print(f"| {os.path.basename(logfile)}")
print(f"|")
if n == 0:
if total:
print(f"| Total {total:9.2f} ns")
print(f"| Timing data not available yet")
print("-"*80)
sys.exit(0)
remaining = total-finished
avg_rate = sum(rate[-n:])/n
eta = 24.0*remaining/avg_rate # hours
eta_time = datetime.now() + timedelta(hours=eta)
print(f"| Total {total:9.2f} ns")
print(f"| Completed {finished:9.2f} ns ({100.0*finished/total:5.1f}%)")
print(f"| Remaining {remaining:9.2f} ns")
print(f"| ")
print(f"| Average timings")
print(f"| ns/day = {avg_rate:9.2f}")
print(f"| ")
print(f"| Estimated time")
print(f"| remaining = {eta:.2f} hours")
print(f"| completed = {eta_time.ctime()}")
print("-"*80)
read_eaf(filename, verbose=False)
Source code in mdscribe/desmond/cli.py
def read_eaf(filename, verbose=False):
HBond = {}
Hydrophobic = {}
WaterBridge = {}
Polar = {}
HalogenBond = {}
LigWat = {}
Metal = {}
PiCat = {}
PiPi = {}
result = desmond.expr.parse_file(filename)
d = result.as_dict()
desmond.traverse_dict(d)
dot = desmond.DotMap(d)
for section in dot.Keywords:
try:
assert section.ProtLigInter.HBondResult
num_frames = len(section.ProtLigInter.HBondResult)
for frame in section.ProtLigInter.HBondResult:
# [[3 "_:ARG_143:HH22" d-s "L-FRAG_0:N6" ]]
for (frameno, prot, hbond_type, lig) in frame:
prot = prot.strip('\"')
(_, resid, atom) = prot.split(":")
(resName, resSeq) = resid.split("_")
resSeq = int(resSeq)
if resSeq in HBond:
HBond[resSeq]['count'] += 1
else:
HBond[resSeq] = {'resName': resName, 'count':1 }
for resSeq in sorted(HBond):
fraction = float(HBond[resSeq]['count'])/num_frames
if verbose:
print(f"HBond {HBond[resSeq]['resName']}_{resSeq} {fraction:5.3f} {num_frames}")
except:
pass
try:
assert section.ProtLigInter.HydrophobicResult
num_frames = len(section.ProtLigInter.HydrophobicResult)
for frame in section.ProtLigInter.HydrophobicResult:
# [[0 "_:PHE_223" L-FRAG_0 ] [0 "_:ALA_241" L-FRAG_0 ]]
for (frameno, prot, lig) in frame:
prot = prot.strip('\"')
(_, resid) = prot.split(":")
(resName, resSeq) = resid.split("_")
resSeq = int(resSeq)
if resSeq in Hydrophobic:
Hydrophobic[resSeq]['count'] += 1
else:
Hydrophobic[resSeq] = {'resName': resName, 'count':1 }
for resSeq in sorted(Hydrophobic):
fraction = float(Hydrophobic[resSeq]['count'])/num_frames
if verbose:
print(f"Hydrophobic {Hydrophobic[resSeq]['resName']}_{resSeq} {fraction:5.3f} {num_frames}")
except:
pass
try:
assert section.ProtLigInter.PolarResult
num_frames = len(section.ProtLigInter.PolarResult)
for frame in section.ProtLigInter.PolarResult:
# [[1 "_:GLU_216:OE2" b "L-FRAG_1:N3" 4.45 ]]
for (frameno, prot, _, lig, _) in frame:
prot = prot.strip('\"')
(_, resid, atom) = prot.split(":")
(resName, resSeq) = resid.split("_")
resSeq = int(resSeq)
if resSeq in Polar:
Polar[resSeq]['count'] += 1
else:
Polar[resSeq] = {'resName': resName, 'count':1 }
for resSeq in sorted(Polar):
fraction = float(Polar[resSeq]['count'])/num_frames
if verbose:
print(f"Polar {Polar[resSeq]['resName']}_{resSeq} {fraction:5.3f} {num_frames}")
except:
pass
try:
assert section.ProtLigInter.WaterBridgeResult
num_frames = len(section.ProtLigInter.WaterBridgeResult)
for frame in section.ProtLigInter.WaterBridgeResult:
# [[3 "_:GLU_216:OE2" a "L-FRAG_0:N2" a 2431 ]]
for (frameno, prot, _, lig, _, _) in frame:
prot = prot.strip('\"')
(_, resid, atom) = prot.split(":")
(resName, resSeq) = resid.split("_")
resSeq = int(resSeq)
if resSeq in WaterBridge:
WaterBridge[resSeq]['count'] += 1
else:
WaterBridge[resSeq] = {'resName': resName, 'count':1 }
for resSeq in sorted(WaterBridge):
fraction = float(WaterBridge[resSeq]['count'])/num_frames
if verbose:
print(f"WaterBridge {WaterBridge[resSeq]['resName']}_{resSeq} {fraction:5.3f} {num_frames}")
except:
pass
return num_frames, HBond, Hydrophobic, Polar, WaterBridge, HalogenBond, LigWat, Metal, PiCat, PiPi
mdscribe.desmond
Multisim
Parsing Desmond multisim .cfg and .msj expressions
Source code in mdscribe/desmond/multisim.py
class Multisim:
"""Parsing Desmond multisim .cfg and .msj expressions"""
# variable, value, array, expr
# pyparsing module’s default behavior is to ignore whitespace.
# +: AND, |: MatchFirst, left-to-right, ^: Or(longest match)
# Group: --> list
# Dict: --> dict
# Forward: --> recursive
EQ = pp.Suppress('=')
LBRACKET, RBRACKET, LBRACE, RBRACE = map(pp.Literal, "[]{}")
variable = (pp.Word(pp.alphanums + "._/?-@") +
pp.Opt("." + pp.Word(pp.alphanums))).set_parse_action(''.join)
_string1 = pp.Word(pp.alphanums + "._/?-@*")
_string2 = pp.quoted_string()
_number = ppc.number()
value = (_string1 | _string2 | _number)
array = pp.Forward()
array <<= pp.Group(LBRACKET + (pp.ZeroOrMore(value | array)) + RBRACKET)
expr = pp.Forward()
_expr_0 = (variable + EQ + value)
_expr_1 = (variable + EQ + array)
_expr_2 = (variable + EQ + pp.Group(
LBRACE + pp.ZeroOrMore(expr) + RBRACE ))
_expr_3 = (variable + EQ + pp.Group(
LBRACKET + pp.ZeroOrMore(pp.Group(LBRACE + expr + RBRACE)) + RBRACKET))
_expr_4 = pp.Group(variable + pp.Group(
LBRACE + pp.ZeroOrMore( expr ) + RBRACE))
expr <<= pp.OneOrMore(pp.Dict(pp.Group(
_expr_0 | _expr_1 | _expr_2 | _expr_3 | _expr_4)))
expr.ignore("#" + pp.restOfLine)
def __init__(self, **kwargs):
self.template_path = None
self.ParseResults = None
self.dict = {}
self.dot = DotMap()
self.output = sys.stdout # do not attempt to close
self.indent = 4
if 'template' in kwargs:
template = kwargs['template']
template_path = pathlib.Path(template)
if template_path.is_file():
self.template_path = template_path
else:
with importlib.resources.files('mdscribe.desmond') as template_path:
self.template_path = template_path / template
if self.template_path is None:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), template)
self.ParseResults = Multisim.expr.parse_file(self.template_path)
self.decode()
elif 'string' in kwargs:
string = kwargs['string']
try:
self.ParseResults = Multisim.expr.parse_string(string)
self.decode()
except:
raise RuntimeError("Multisim: cannot parse the input string.")
else:
raise RuntimeError("Multisim: template filename or string is required.")
@staticmethod
def unfold_dict_key(d) -> None:
"""Unfold '.' in the keys of a dictionary
Args:
d (dict): dictionary (to be modified)
"""
ks = [ k for k in d ]
for k in ks:
v = d[k]
if '.' in k:
# d[k] = v --> d[kk[0]] = { kk[1] : v }
_u = None
_k = k.split('.')[0]
for kk in k.split('.')[-1:0:-1]:
if _u is None:
_u = {kk : v }
else:
_u = {kk : _u}
d[_k] = _u
del d[k]
else:
pass
if isinstance(v, dict):
Multisim.unfold_dict_key(v)
@staticmethod
def traverse_dict(d) -> None:
"""Recursively traverse a nested dictionary/list
Args:
d (dict): dictionary (to be modified)
"""
if isinstance(d, dict):
for k,v in d.items():
if isinstance(v, dict):
Multisim.traverse_dict(v)
elif isinstance(v, list):
if v == ['{','}']:
d[k] = {}
elif v == ['[',']']:
d[k] = []
elif v[0] == '[' and v[-1] == ']':
d[k] = v[1:-1]
Multisim.traverse_dict(d[k])
elif v[0] == '{' and v[-1] == '}':
d[k] = dict(v[1:-1])
Multisim.traverse_dict(d[k])
elif isinstance(d, list):
for k,v in enumerate(d):
if isinstance(v, dict):
Multisim.traverse_dict(v)
elif isinstance(v, list):
if v == ['{','}']:
d[k] = {}
elif v == ['[',']']:
d[k] = []
elif v[0] == '[' and v[-1] == ']':
d[k] = v[1:-1]
Multisim.traverse_dict(d[k])
elif v[0] == '{' and v[-1] == '}':
d[k] = dict(v[1:-1])
Multisim.traverse_dict(d[k])
def decode(self) -> None:
"""decode the parsed results into a dictionary and its dotmap"""
# create .dict
if isinstance(self.ParseResults.as_list()[0][0], str): # key
self.dict = self.ParseResults.as_dict()
Multisim.traverse_dict(self.dict)
# handle the case where key has '.'
Multisim.unfold_dict_key(self.dict)
elif isinstance(self.ParseResults.as_list()[0][0], list):
self.dict = [] # now self.dict is a list of dictionary
for section in self.ParseResults:
dict_ = dict(section.as_list())
Multisim.traverse_dict(dict_)
# handle the case where key has '.'
Multisim.unfold_dict_key(dict_)
self.dict.append(dict_)
# create .dot
if isinstance(self.dict, list):
self.dot = {}
for d in self.dict:
for k,v in d.items():
if k in self.dot:
if isinstance(self.dot[k], list):
self.dot[k].append(v)
else:
self.dot[k] = [self.dot[k], v]
else:
self.dot[k] = v
self.dot = DotMap(self.dot)
else:
self.dot = DotMap(self.dict)
def write(self, output=None):
"""Writes DOT object"""
if isinstance(output, io.IOBase):
self.output = output
elif isinstance(output, str):
self.output = open(output, "w")
if isinstance(self.dict, list):
blocks = []
for k, v in self.dot.items():
if isinstance(v, list):
for vv in v:
blocks.append({k:vv})
else:
blocks.append({k:v})
for block in blocks:
self._write_dict(block, block=True)
self.output.write("\n")
else:
self._write_dict(self.dot)
def _write_dict(self, d, block=False, depth=0):
"""subroutine of .write() method"""
spc = ' ' * self.indent * depth
if isinstance(d, dict) or isinstance(d, DotMap):
for k, v in d.items():
k = str(k)
if v:
if isinstance(v, dict) or isinstance(v, DotMap):
if depth == 0 and block:
self.output.write(spc + k + " {\n")
else:
self.output.write(spc + k + " = {\n")
self._write_dict(v, depth=depth+1)
self.output.write(spc + "}\n")
elif isinstance(v, list):
self.output.write(spc + k + " = [")
for vv in v:
if isinstance(vv, dict) or isinstance(vv, DotMap):
self.output.write("{\n")
elif isinstance(vv, list):
self.output.write("[")
self._write_dict(vv, depth=depth+1)
if isinstance(vv, dict) or isinstance(vv, DotMap):
self.output.write(spc + "}")
elif isinstance(vv, list):
self.output.write("]")
self.output.write("]\n")
else:
self.output.write(spc + k + " = " + str(v) +"\n")
else:
if isinstance(v, list) and (not bool(v)):
self.output.write(spc + k + " = []\n")
elif (isinstance(v, dict) or isinstance(v, DotMap))and (not bool(v)):
self.output.write(spc + k + " = {\n}\n")
else:
self.output.write(spc + k + " = \n")
elif isinstance(d, list):
for v in d:
self._write_dict(v, depth=depth+1)
else:
self.output.write(" " + str(d) + " ")
def to_dot(self) -> DotMap:
"""Returns parsed results as a DotMap object
Returns:
DotMap : DotMap object
"""
return self.dot
def to_list(self) -> list:
"""Returns parsed results as a list
Returns:
list : list
"""
return self.ParseResults.as_list()
def to_dict(self) -> dict:
"""Returns parsed results as a dictionary
Returns:
dict : dictionary
"""
return self.dict
EQ = pp.Suppress('=')
class-attribute
instance-attribute
ParseResults = Multisim.expr.parse_string(string)
instance-attribute
_expr_0 = variable + EQ + value
class-attribute
instance-attribute
_expr_1 = variable + EQ + array
class-attribute
instance-attribute
_expr_2 = variable + EQ + pp.Group(LBRACE + pp.ZeroOrMore(expr) + RBRACE)
class-attribute
instance-attribute
_expr_3 = variable + EQ + pp.Group(LBRACKET + pp.ZeroOrMore(pp.Group(LBRACE + expr + RBRACE)) + RBRACKET)
class-attribute
instance-attribute
_expr_4 = pp.Group(variable + pp.Group(LBRACE + pp.ZeroOrMore(expr) + RBRACE))
class-attribute
instance-attribute
_number = ppc.number()
class-attribute
instance-attribute
_string1 = pp.Word(pp.alphanums + '._/?-@*')
class-attribute
instance-attribute
_string2 = pp.quoted_string()
class-attribute
instance-attribute
array = pp.Forward()
class-attribute
instance-attribute
dict = {}
instance-attribute
dot = DotMap()
instance-attribute
expr = pp.Forward()
class-attribute
instance-attribute
indent = 4
instance-attribute
output = sys.stdout
instance-attribute
template_path = template_path / template
instance-attribute
value = _string1 | _string2 | _number
class-attribute
instance-attribute
variable = pp.Word(pp.alphanums + '._/?-@') + pp.Opt('.' + pp.Word(pp.alphanums)).set_parse_action(''.join)
class-attribute
instance-attribute
__init__(**kwargs)
Source code in mdscribe/desmond/multisim.py
def __init__(self, **kwargs):
self.template_path = None
self.ParseResults = None
self.dict = {}
self.dot = DotMap()
self.output = sys.stdout # do not attempt to close
self.indent = 4
if 'template' in kwargs:
template = kwargs['template']
template_path = pathlib.Path(template)
if template_path.is_file():
self.template_path = template_path
else:
with importlib.resources.files('mdscribe.desmond') as template_path:
self.template_path = template_path / template
if self.template_path is None:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), template)
self.ParseResults = Multisim.expr.parse_file(self.template_path)
self.decode()
elif 'string' in kwargs:
string = kwargs['string']
try:
self.ParseResults = Multisim.expr.parse_string(string)
self.decode()
except:
raise RuntimeError("Multisim: cannot parse the input string.")
else:
raise RuntimeError("Multisim: template filename or string is required.")
_write_dict(d, block=False, depth=0)
subroutine of .write() method
Source code in mdscribe/desmond/multisim.py
def _write_dict(self, d, block=False, depth=0):
"""subroutine of .write() method"""
spc = ' ' * self.indent * depth
if isinstance(d, dict) or isinstance(d, DotMap):
for k, v in d.items():
k = str(k)
if v:
if isinstance(v, dict) or isinstance(v, DotMap):
if depth == 0 and block:
self.output.write(spc + k + " {\n")
else:
self.output.write(spc + k + " = {\n")
self._write_dict(v, depth=depth+1)
self.output.write(spc + "}\n")
elif isinstance(v, list):
self.output.write(spc + k + " = [")
for vv in v:
if isinstance(vv, dict) or isinstance(vv, DotMap):
self.output.write("{\n")
elif isinstance(vv, list):
self.output.write("[")
self._write_dict(vv, depth=depth+1)
if isinstance(vv, dict) or isinstance(vv, DotMap):
self.output.write(spc + "}")
elif isinstance(vv, list):
self.output.write("]")
self.output.write("]\n")
else:
self.output.write(spc + k + " = " + str(v) +"\n")
else:
if isinstance(v, list) and (not bool(v)):
self.output.write(spc + k + " = []\n")
elif (isinstance(v, dict) or isinstance(v, DotMap))and (not bool(v)):
self.output.write(spc + k + " = {\n}\n")
else:
self.output.write(spc + k + " = \n")
elif isinstance(d, list):
for v in d:
self._write_dict(v, depth=depth+1)
else:
self.output.write(" " + str(d) + " ")
decode()
decode the parsed results into a dictionary and its dotmap
Source code in mdscribe/desmond/multisim.py
def decode(self) -> None:
"""decode the parsed results into a dictionary and its dotmap"""
# create .dict
if isinstance(self.ParseResults.as_list()[0][0], str): # key
self.dict = self.ParseResults.as_dict()
Multisim.traverse_dict(self.dict)
# handle the case where key has '.'
Multisim.unfold_dict_key(self.dict)
elif isinstance(self.ParseResults.as_list()[0][0], list):
self.dict = [] # now self.dict is a list of dictionary
for section in self.ParseResults:
dict_ = dict(section.as_list())
Multisim.traverse_dict(dict_)
# handle the case where key has '.'
Multisim.unfold_dict_key(dict_)
self.dict.append(dict_)
# create .dot
if isinstance(self.dict, list):
self.dot = {}
for d in self.dict:
for k,v in d.items():
if k in self.dot:
if isinstance(self.dot[k], list):
self.dot[k].append(v)
else:
self.dot[k] = [self.dot[k], v]
else:
self.dot[k] = v
self.dot = DotMap(self.dot)
else:
self.dot = DotMap(self.dict)
to_dict()
to_dot()
to_list()
traverse_dict(d)
staticmethod
Recursively traverse a nested dictionary/list
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
d
|
dict
|
dictionary (to be modified) |
required |
Source code in mdscribe/desmond/multisim.py
@staticmethod
def traverse_dict(d) -> None:
"""Recursively traverse a nested dictionary/list
Args:
d (dict): dictionary (to be modified)
"""
if isinstance(d, dict):
for k,v in d.items():
if isinstance(v, dict):
Multisim.traverse_dict(v)
elif isinstance(v, list):
if v == ['{','}']:
d[k] = {}
elif v == ['[',']']:
d[k] = []
elif v[0] == '[' and v[-1] == ']':
d[k] = v[1:-1]
Multisim.traverse_dict(d[k])
elif v[0] == '{' and v[-1] == '}':
d[k] = dict(v[1:-1])
Multisim.traverse_dict(d[k])
elif isinstance(d, list):
for k,v in enumerate(d):
if isinstance(v, dict):
Multisim.traverse_dict(v)
elif isinstance(v, list):
if v == ['{','}']:
d[k] = {}
elif v == ['[',']']:
d[k] = []
elif v[0] == '[' and v[-1] == ']':
d[k] = v[1:-1]
Multisim.traverse_dict(d[k])
elif v[0] == '{' and v[-1] == '}':
d[k] = dict(v[1:-1])
Multisim.traverse_dict(d[k])
unfold_dict_key(d)
staticmethod
Unfold '.' in the keys of a dictionary
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
d
|
dict
|
dictionary (to be modified) |
required |
Source code in mdscribe/desmond/multisim.py
@staticmethod
def unfold_dict_key(d) -> None:
"""Unfold '.' in the keys of a dictionary
Args:
d (dict): dictionary (to be modified)
"""
ks = [ k for k in d ]
for k in ks:
v = d[k]
if '.' in k:
# d[k] = v --> d[kk[0]] = { kk[1] : v }
_u = None
_k = k.split('.')[0]
for kk in k.split('.')[-1:0:-1]:
if _u is None:
_u = {kk : v }
else:
_u = {kk : _u}
d[_k] = _u
del d[k]
else:
pass
if isinstance(v, dict):
Multisim.unfold_dict_key(v)
write(output=None)
Writes DOT object
Source code in mdscribe/desmond/multisim.py
def write(self, output=None):
"""Writes DOT object"""
if isinstance(output, io.IOBase):
self.output = output
elif isinstance(output, str):
self.output = open(output, "w")
if isinstance(self.dict, list):
blocks = []
for k, v in self.dot.items():
if isinstance(v, list):
for vv in v:
blocks.append({k:vv})
else:
blocks.append({k:v})
for block in blocks:
self._write_dict(block, block=True)
self.output.write("\n")
else:
self._write_dict(self.dot)