import numpy as np
from copy import deepcopy
from ase.data import atomic_numbers
from ase.calculators.calculator import Parameters
from ..utilities import Data, EphemeralDatabase, FileDatabase, Logger, importer
from .cutoffs import Cosine, dict2cutoff
NeighborList = importer('NeighborList')
try:
from .. import fmodules
except ImportError:
fmodules = None
[docs]
class Gaussian(object):
"""Class that calculates Gaussian fingerprints (i.e., Behler-style).
Parameters
----------
cutoff : object or float
Cutoff function, typically from amp.descriptor.cutoffs. Can be also
fed as a float representing the radius above which neighbor
interactions are ignored; in this case a cosine cutoff function will be
employed. Default is a 6.5-Angstrom cosine cutoff.
Gs : dict or list
Dictionary of symbols and lists of dictionaries for making symmetry
functions. Either auto-genetrated, or given in the following form, for
example:
>>> Gs = {"O": [{"type":"G2", "element":"O", "eta":10.,
... "offset": 2.},
... {"type":"G4", "elements":["O", "Au"],
... "eta":5., "gamma":1., "zeta":1.0}],
... "Au": [{"type":"G2", "element":"O", "eta":2.},
... {"type":"G4", "elements":["O", "Au"],
... "eta":2., "gamma":1., "zeta":5.0}]}
You can use amp.model.gaussian.make_symmetry_functions to help create
these lists of dictionaries. If you supply a list instead of a
dictionary, it will assume you want identical symmetry functions for
each element.
dblabel : str, None, or False
Prefix/location for fingerprint database files (neighborlists,
fingerprints, fingerprint derivatives). If None, defaults to the
parent Amp label. Set to False to disable all disk writes and keep
fingerprints only in memory, discarding each image's data as soon as
it is used; recommended for inference and MD runs where images are
never revisited. A shared string path can be supplied to let multiple
calculator instances reuse the same on-disk cache.
elements : list
List of allowed elements present in the system. If not provided, will
be found automatically.
version : str
Version of fingerprints.
fortran : bool
If True, will use fortran modules (if compiled), if False, will not.
mode : str
Can be either 'atom-centered' or 'image-centered'.
Raises
------
RuntimeError
"""
def __init__(self, cutoff=Cosine(6.5), Gs=None, dblabel=None,
elements=None, version=None, fortran=True,
mode='atom-centered'):
# Check of the version of descriptor, particularly if restarting.
compatibleversions = ['2015.12', ]
if (version is not None) and version not in compatibleversions:
raise RuntimeError('Error: Trying to use Gaussian fingerprints'
' version %s, but this module only supports'
' versions %s. You may need an older or '
' newer version of Amp.' %
(version, compatibleversions))
else:
version = compatibleversions[-1]
# Check that the mode is atom-centered.
if mode != 'atom-centered':
raise RuntimeError('Gaussian scheme only works '
'in atom-centered mode. %s '
'specified.' % mode)
# If the cutoff is provided as a number, Cosine function will be used
# by default.
if isinstance(cutoff, int) or isinstance(cutoff, float):
cutoff = Cosine(cutoff)
# If the cutoff is provided as a dictionary, assume we need to load it
# with dict2cutoff.
if type(cutoff) is dict:
cutoff = dict2cutoff(cutoff)
# The parameters dictionary contains the minimum information
# to produce a compatible descriptor; that is, one that gives
# an identical fingerprint when fed an ASE image.
p = self.parameters = Parameters(
{'importname': '.descriptor.gaussian.Gaussian',
'mode': 'atom-centered'})
p.version = version
p.cutoff = cutoff.todict()
p.Gs = Gs
p.elements = elements
self.dblabel = dblabel
self.fortran = fortran
if fmodules is None:
self.fortran = False
self.parent = None # Can hold a reference to main Amp instance.
[docs]
def tostring(self):
"""Returns an evaluatable representation of the calculator that can
be used to restart the calculator.
"""
return self.parameters.tostring()
[docs]
def calculate_fingerprints(self, images, parallel=None, log=None,
calculate_derivatives=False):
"""Calculates the fingerpints of the images, for the ones not already
done.
Parameters
----------
images : dict
Dictionary of images; the key is a unique ID assigned to each
image and each value is an ASE atoms object. Typically created
from amp.utilities.hash_images.
parallel : dict
Configuration for parallelization. Should be in same form as in
amp.Amp.
log : Logger object
Write function at which to log data. Note this must be a callable
function.
calculate_derivatives : bool
Decides whether or not fingerprintprimes should also be calculated.
"""
if parallel is None:
parallel = {'cores': 1}
log = Logger(file=None) if log is None else log
if (self.dblabel is None) and hasattr(self.parent, 'dblabel'):
self.dblabel = self.parent.dblabel
self.dblabel = 'amp-data' if self.dblabel is None else self.dblabel
if self.dblabel is False:
_db = EphemeralDatabase
_prefix = 'ephemeral'
else:
_db = FileDatabase
_prefix = self.dblabel
p = self.parameters
if p.elements is None:
log('Finding unique set of elements in training data.')
p.elements = set([atom.symbol for atoms in images.values()
for atom in atoms])
p.elements = sorted(p.elements)
if p.Gs is None:
log('No symmetry functions supplied; creating defaults.')
p.Gs = make_default_symmetry_functions(p.elements)
elif not hasattr(p.Gs, 'keys'):
log('List of symmetry functions supplied; assumed identical for '
'each element.')
p.Gs = {element: deepcopy(p.Gs) for element in p.elements}
_params_key = str((p.cutoff, p.elements, str(p.Gs)))
if not hasattr(self, '_logged_params_key') or \
self._logged_params_key != _params_key:
log('Cutoff function: %s' % repr(dict2cutoff(p.cutoff)))
log('%i unique elements included: ' % len(p.elements) +
', '.join(p.elements))
log('Number of symmetry functions for each element:')
for _ in p.Gs.keys():
log(' %2s: %i' % (_, len(p.Gs[_])))
for element, fingerprints in p.Gs.items():
log('{} feature vector functions:'.format(element))
for index, fp in enumerate(fingerprints):
if fp['type'] == 'G2':
log(' {}: {}, {}, eta = {}, offset = {}'
.format(index, fp['type'], fp['element'],
fp['eta'], fp['offset']))
elif fp['type'] in ('G4', 'G5'):
log(' {}: {}, ({}, {}), eta={}, gamma={}, zeta={}'
.format(index, fp['type'], fp['elements'][0],
fp['elements'][1], fp['eta'],
fp['gamma'], fp['zeta']))
else:
log(str(fp))
self._logged_params_key = _params_key
log('Calculating neighborlists...', tic='nl')
if not hasattr(self, 'neighborlist'):
calc = NeighborlistCalculator(cutoff=p.cutoff['kwargs']['Rc'])
self.neighborlist = \
Data(filename='%s-neighborlists' % _prefix,
db=_db, calculator=calc)
self.neighborlist.calculate_items(images, parallel=parallel, log=log)
log('...neighborlists calculated.', toc='nl')
log('Fingerprinting images...', tic='fp')
if not hasattr(self, 'fingerprints'):
calc = FingerprintCalculator(neighborlist=self.neighborlist,
Gs=p.Gs,
cutoff=p.cutoff,
fortran=self.fortran)
self.fingerprints = Data(filename='%s-fingerprints' % _prefix,
db=_db, calculator=calc)
self.fingerprints.calculate_items(images, parallel=parallel, log=log)
log('...fingerprints calculated.', toc='fp')
if calculate_derivatives:
log('Calculating fingerprint derivatives...',
tic='derfp')
if not hasattr(self, 'fingerprintprimes'):
calc = \
FingerprintPrimeCalculator(neighborlist=self.neighborlist,
Gs=p.Gs,
cutoff=p.cutoff,
fortran=self.fortran)
self.fingerprintprimes = \
Data(filename='%s-fingerprint-primes' % _prefix,
db=_db, calculator=calc)
self.fingerprintprimes.calculate_items(
images, parallel=parallel, log=log)
log('...fingerprint derivatives calculated.', toc='derfp')
# Calculators #################################################################
# Neighborlist Calculator
[docs]
class NeighborlistCalculator:
"""For integration with .utilities.Data
For each image fed to calculate, a list of neighbors with offset distances
is returned.
Parameters
----------
cutoff : float
Radius above which neighbor interactions are ignored.
"""
def __init__(self, cutoff):
self.globals = Parameters({'cutoff': cutoff})
self.keyed = Parameters()
self.parallel_command = 'calculate_neighborlists'
[docs]
def calculate(self, image, key):
"""For integration with .utilities.Data
For each image fed to calculate, a list of neighbors with offset
distances is returned.
Parameters
----------
image : object
ASE atoms object.
key : str
key of the image after being hashed.
"""
cutoff = self.globals.cutoff
n = NeighborList(cutoffs=[cutoff / 2.] * len(image),
self_interaction=False,
bothways=True,
skin=0.)
n.update(image)
return [n.get_neighbors(index) for index in range(len(image))]
[docs]
class FingerprintCalculator:
"""For integration with .utilities.Data
Parameters
----------
neighborlist : list of str
List of neighbors.
Gs : dict
Dictionary of symbols and lists of dictionaries for making symmetry
functions. Either auto-genetrated, or given in the following form, for
example:
>>> Gs = {"O": [{"type":"G2", "element":"O", "eta":10.,
... "offset": 2.},
... {"type":"G4", "elements":["O", "Au"],
... "eta":5., "gamma":1., "zeta":1.0}],
... "Au": [{"type":"G2", "element":"O", "eta":2.},
... {"type":"G4", "elements":["O", "Au"],
... "eta":2., "gamma":1., "zeta":5.0}]}
cutoff : float
Radius above which neighbor interactions are ignored.
fortran : bool
If True, will use fortran modules, if False, will not.
"""
def __init__(self, neighborlist, Gs, cutoff, fortran):
self.globals = Parameters({'cutoff': cutoff,
'Gs': Gs})
self.keyed = Parameters({'neighborlist': neighborlist})
self.parallel_command = 'calculate_fingerprints'
self.fortran = fortran
[docs]
def calculate(self, image, key):
"""Makes a list of fingerprints, one per atom, for the fed image.
Parameters
----------
image : object
ASE atoms object.
key : str
key of the image after being hashed.
"""
self.atoms = image
nl = self.keyed.neighborlist[key]
fingerprints = []
chemical_symbols = np.array(image.get_chemical_symbols())
for atom in image:
symbol = atom.symbol
index = atom.index
neighborindices, neighboroffsets = nl[index]
neighborsymbols = chemical_symbols[neighborindices]
neighborpositions = (image.positions[neighborindices] +
np.dot(neighboroffsets, image.get_cell()))
indexfp = self.get_fingerprint(
index, symbol, neighborsymbols, neighborpositions)
fingerprints.append(indexfp)
return fingerprints
[docs]
def get_fingerprint(self, index, symbol,
neighborsymbols, neighborpositions):
"""Returns the fingerprint of symmetry function values for atom
specified by its index and symbol.
neighborsymbols and neighborpositions are lists of neighbors' symbols
and Cartesian positions, respectively.
Parameters
----------
index : int
Index of the center atom.
symbol : str
Symbol of the center atom.
neighborsymbols : numpy.ndarray of str
Array of neighbors' symbols.
neighborpositions : numpy.ndarray of float
Array of Cartesian atomic positions.
Returns
-------
symbol, fingerprint : list of float
fingerprints for atom specified by its index and symbol.
"""
Ri = self.atoms[index].position
num_symmetries = len(self.globals.Gs[symbol])
fingerprint = [None] * num_symmetries
neighbornumbers = [atomic_numbers[_] for _ in neighborsymbols]
for count in range(num_symmetries):
G = self.globals.Gs[symbol][count]
if G['type'] == 'G2':
ridge = calculate_G2(neighbornumbers, neighborsymbols,
neighborpositions, G['element'], G['eta'],
G['offset'], self.globals.cutoff, Ri,
self.fortran)
elif G['type'] == 'G4':
ridge = calculate_G4(neighbornumbers, neighborsymbols,
neighborpositions, G['elements'],
G['gamma'], G['zeta'], G['eta'],
self.globals.cutoff, Ri, self.fortran)
elif G['type'] == 'G5':
ridge = calculate_G5(neighbornumbers, neighborsymbols,
neighborpositions, G['elements'],
G['gamma'], G['zeta'], G['eta'],
self.globals.cutoff, Ri, self.fortran)
else:
raise NotImplementedError('Unknown G type: %s' % G['type'])
fingerprint[count] = ridge
return symbol, fingerprint
[docs]
class FingerprintPrimeCalculator:
"""For integration with .utilities.Data
Parameters
----------
neighborlist : list of str
List of neighbors.
Gs : dict
Dictionary of symbols and lists of dictionaries for making symmetry
functions. Either auto-genetrated, or given in the following form, for
example:
>>> Gs = {"O": [{"type":"G2", "element":"O", "eta":10.,
... "offset": 2.},
... {"type":"G4", "elements":["O", "Au"],
... "eta":5., "gamma":1., "zeta":1.0}],
... "Au": [{"type":"G2", "element":"O", "eta":2.},
... {"type":"G4", "elements":["O", "Au"],
... "eta":2., "gamma":1., "zeta":5.0}]}
cutoff : float
Radius above which neighbor interactions are ignored.
fortran : bool
If True, will use fortran modules, if False, will not.
"""
def __init__(self, neighborlist, Gs, cutoff, fortran):
self.globals = Parameters({'cutoff': cutoff,
'Gs': Gs})
self.keyed = Parameters({'neighborlist': neighborlist})
self.parallel_command = 'calculate_fingerprint_primes'
self.fortran = fortran
[docs]
def calculate(self, image, key):
"""Makes a list of fingerprint derivatives, one per atom,
for the fed image.
Parameters
----------
image : object
ASE atoms object.
key : str
key of the image after being hashed.
"""
self.atoms = image
nl = self.keyed.neighborlist[key]
fingerprintprimes = {}
chemical_symbols = np.array(image.get_chemical_symbols())
for atom in image:
selfsymbol = atom.symbol
selfindex = atom.index
selfneighborindices, selfneighboroffsets = nl[selfindex]
selfneighborsymbols = chemical_symbols[selfneighborindices]
selfneighborpositions = (image.positions[selfneighborindices] +
np.dot(selfneighboroffsets,
image.get_cell()))
for direction in range(3):
# Calculating derivative of fingerprints of self atom w.r.t.
# coordinates of itself.
fpprime = self.get_fingerprintprime(
selfindex, selfsymbol,
selfneighborindices,
selfneighborsymbols,
selfneighborpositions, selfindex, direction)
fingerprintprimes[(selfindex, selfsymbol, selfindex,
selfsymbol, direction)] = fpprime
# Calculating derivative of fingerprints of neighbor atom
# w.r.t. coordinates of self atom.
for nindex, nsymbol, noffset in \
zip(selfneighborindices,
selfneighborsymbols,
selfneighboroffsets):
if (selfindex, selfsymbol, nindex,
nsymbol, direction) not in fingerprintprimes:
nneighborindices, nneighboroffsets = nl[nindex]
nneighborsymbols = \
chemical_symbols[nneighborindices]
neighborpositions = \
(image.positions[nneighborindices] +
np.dot(nneighboroffsets, image.get_cell()))
fpprime = self.get_fingerprintprime(
nindex, nsymbol,
nneighborindices,
nneighborsymbols,
neighborpositions, selfindex, direction)
fingerprintprimes[(selfindex, selfsymbol, nindex,
nsymbol, direction)] = fpprime
return fingerprintprimes
[docs]
def get_fingerprintprime(self, index, symbol,
neighborindices,
neighborsymbols,
neighborpositions,
m, l):
""" Returns the value of the derivative of G for atom with index and
symbol with respect to coordinate x_{l} of atom index m.
neighborindices, neighborsymbols and neighborpositions are lists of
neighbors' indices, symbols and Cartesian positions, respectively.
Parameters
----------
index : int
Index of the center atom.
symbol : str
Symbol of the center atom.
neighborindices : list of int
List of neighbors' indices.
neighborsymbols : numpy.ndarray of str
Array of neighbors' symbols.
neighborpositions : numpy.ndarray of float
Array of Cartesian atomic positions.
m : int
Index of the pair atom.
l : int
Direction of the derivative; is an integer from 0 to 2.
Returns
-------
fingerprintprime : list of float
The value of the derivative of the fingerprints for atom with index
and symbol with respect to coordinate x_{l} of atom index m.
"""
num_symmetries = len(self.globals.Gs[symbol])
Rindex = self.atoms.positions[index]
fingerprintprime = [None] * num_symmetries
neighbornumbers = [atomic_numbers[_] for _ in neighborsymbols]
for count in range(num_symmetries):
G = self.globals.Gs[symbol][count]
if G['type'] == 'G2':
ridge = calculate_G2_prime(
neighborindices,
neighbornumbers,
neighborsymbols,
neighborpositions,
G['element'],
G['eta'],
G['offset'],
self.globals.cutoff,
index,
Rindex,
m,
l,
self.fortran)
elif G['type'] == 'G4':
ridge = calculate_G4_prime(
neighborindices,
neighbornumbers,
neighborsymbols,
neighborpositions,
G['elements'],
G['gamma'],
G['zeta'],
G['eta'],
self.globals.cutoff,
index,
Rindex,
m,
l,
self.fortran)
elif G['type'] == 'G5':
ridge = calculate_G5_prime(
neighborindices,
neighbornumbers,
neighborsymbols,
neighborpositions,
G['elements'],
G['gamma'],
G['zeta'],
G['eta'],
self.globals.cutoff,
index,
Rindex,
m,
l,
self.fortran)
else:
raise NotImplementedError('Unknown G type: %s' % G['type'])
fingerprintprime[count] = ridge
return fingerprintprime
# Auxiliary functions #########################################################
[docs]
def calculate_G2(neighbornumbers, neighborsymbols, neighborpositions,
G_element, eta, offset, cutoff, Ri, fortran):
"""Calculate G2 symmetry function.
Ideally this will not be used but will be a template for how to build the
fortran version (and serves as a slow backup if the fortran one goes
uncompiled). See Eq. 13a of the supplementary information of Khorshidi,
Peterson, CPC(2016).
Parameters
----------
neighbornumbers : list of int
List of neighbors' chemical numbers.
neighborsymbols : numpy.ndarray of str
Array of neighbors' symbols.
neighborpositions : numpy.ndarray of float
Array of Cartesian atomic positions.
G_element : str
Chemical symbol of the center atom.
eta : float
Parameter of Gaussian symmetry functions.
offset: float
offset values for gaussians in G2 fingerprints
cutoff : dict
Cutoff function, typically from amp.descriptor.cutoffs. Should be also
formatted as a dictionary by todict method, e.g.
cutoff=Cosine(6.5).todict()
Ri : list
Position of the center atom. Should be fed as a list of three floats.
fortran : bool
If True, will use the fortran subroutines, else will not.
Returns
-------
ridge : float
G2 fingerprint.
"""
if fortran: # fortran version; faster
G_number = [atomic_numbers[G_element]]
if len(neighbornumbers) == 0:
ridge = 0.
else:
Rc = cutoff['kwargs']['Rc']
if cutoff['name'] == 'Cosine':
cutofffn_code = 1
elif cutoff['name'] == 'Polynomial':
cutofffn_code = 2
else:
print("Unknown cutoff function specified! \
Only supports 'Cosine' and 'Polynomial'.")
args_calculate_g2 = dict(
neighbornumbers=neighbornumbers,
neighborpositions=neighborpositions,
g_number=G_number,
g_eta=eta,
offset=offset,
rc=Rc,
cutofffn_code=cutofffn_code,
ri=Ri
)
if cutofffn_code == 2:
args_calculate_g2['p_gamma'] = cutoff['kwargs']['gamma']
ridge = fmodules.calculate_g2(**args_calculate_g2)
else:
Rc = cutoff['kwargs']['Rc']
cutoff_fxn = dict2cutoff(cutoff)
ridge = 0. # One aspect of a fingerprint :)
num_neighbors = len(neighborpositions) # number of neighboring atoms
for count in range(num_neighbors):
symbol = neighborsymbols[count]
Rj = neighborpositions[count]
if symbol == G_element:
Rij = np.linalg.norm(Rj - Ri)
args_cutoff_fxn = dict(Rij=Rij)
if cutoff['name'] == 'Polynomial':
args_cutoff_fxn['gamma'] = cutoff['kwargs']['gamma']
ridge += (np.exp(-eta * ((Rij - offset) ** 2.) / (Rc ** 2.)) *
cutoff_fxn(**args_cutoff_fxn))
return ridge
[docs]
def calculate_G4(neighbornumbers, neighborsymbols, neighborpositions,
G_elements, gamma, zeta, eta, cutoff, Ri, fortran):
"""Calculate G4 symmetry function.
Ideally this will not be used but will be a template for how to build the
fortran version (and serves as a slow backup if the fortran one goes
uncompiled). See Eq. 13c of the supplementary information of Khorshidi,
Peterson, CPC(2016).
Parameters
----------
neighbornumbers : list of int
List of neighbors' chemical numbers.
neighborsymbols : numpy.ndarray of str
Array of neighbors' symbols.
neighborpositions : numpy.ndarray of float
Array of Cartesian atomic positions.
G_elements : list of str
A list of two members, each member is the chemical species of one of
the neighboring atoms forming the triangle with the center atom.
gamma : float
Parameter of Gaussian symmetry functions.
zeta : float
Parameter of Gaussian symmetry functions.
eta : float
Parameter of Gaussian symmetry functions.
cutoff : dict
Cutoff function, typically from amp.descriptor.cutoffs. Should be also
formatted as a dictionary by todict method, e.g.
cutoff=Cosine(6.5).todict()
Ri : list
Position of the center atom. Should be fed as a list of three floats.
fortran : bool
If True, will use the fortran subroutines, else will not.
Returns
-------
ridge : float
G4 fingerprint.
"""
if len(neighborpositions) == 0:
return 0.
if fortran: # fortran version; faster
G_numbers = sorted([atomic_numbers[el] for el in G_elements])
Rc = cutoff['kwargs']['Rc']
if cutoff['name'] == 'Cosine':
cutofffn_code = 1
elif cutoff['name'] == 'Polynomial':
cutofffn_code = 2
else:
print("Unknown cutoff function specified! \
Only supports 'Cosine' and 'Polynomial'.")
args_calculate_g4 = dict(
neighbornumbers=neighbornumbers,
neighborpositions=neighborpositions,
g_numbers=G_numbers,
g_gamma=gamma,
g_zeta=zeta,
g_eta=eta,
rc=Rc,
cutofffn_code=cutofffn_code,
ri=Ri
)
if cutofffn_code == 2:
args_calculate_g4['p_gamma'] = cutoff['kwargs']['gamma']
return fmodules.calculate_g4(**args_calculate_g4)
else:
Rc = cutoff['kwargs']['Rc']
cutoff_fxn = dict2cutoff(cutoff)
ridge = 0.
counts = range(len(neighborpositions))
for j in counts:
for k in counts[(j + 1):]:
els = sorted([neighborsymbols[j], neighborsymbols[k]])
if els != G_elements:
continue
Rij_vector = neighborpositions[j] - Ri
Rij = np.linalg.norm(Rij_vector)
Rik_vector = neighborpositions[k] - Ri
Rik = np.linalg.norm(Rik_vector)
Rjk_vector = neighborpositions[k] - neighborpositions[j]
Rjk = np.linalg.norm(Rjk_vector)
cos_theta_ijk = np.dot(Rij_vector, Rik_vector) / Rij / Rik
if cos_theta_ijk < -1.:
# Can occur by rounding error.
cos_theta_ijk = -1.
term = (1. + gamma * cos_theta_ijk) ** zeta
term *= np.exp(-eta * (Rij ** 2. + Rik ** 2. + Rjk ** 2.) /
(Rc ** 2.))
_Rij = dict(Rij=Rij)
_Rik = dict(Rij=Rik)
_Rjk = dict(Rij=Rjk)
if cutoff['name'] == 'Polynomial':
_Rij['gamma'] = cutoff['kwargs']['gamma']
_Rik['gamma'] = cutoff['kwargs']['gamma']
_Rjk['gamma'] = cutoff['kwargs']['gamma']
term *= cutoff_fxn(**_Rij)
term *= cutoff_fxn(**_Rik)
term *= cutoff_fxn(**_Rjk)
ridge += term
ridge *= 2. ** (1. - zeta)
return ridge
[docs]
def calculate_G5(neighbornumbers, neighborsymbols, neighborpositions,
G_elements, gamma, zeta, eta, cutoff,
Ri, fortran):
"""Calculate G5 symmetry function.
Ideally this will not be used but will be a template for how to build the
fortran version (and serves as a slow backup if the fortran one goes
uncompiled). In G5, the Gaussians and cutoff functions with respect to
R_ijk are omitted. This symmetry function is more useful for larger atomic
separations, and useful for angular configurations in which R_jk is larger
than Rc but still inside the cutoff radius e.g. triplets of 180 degrees.
For more information see: J. Behler, Int. J. Quantum Chem. 115, 1032
(2015).
Parameters
----------
neighbornumbers : list of int
List of neighbors' chemical numbers.
neighborsymbols : numpy.ndarray of str
Array of neighbors' symbols.
neighborpositions : numpy.ndarray of float
Array of Cartesian atomic positions.
G_elements : list of str
A list of two members, each member is the chemical species of one of
the neighboring atoms forming the triangle with the center atom.
gamma : float
Parameter of Gaussian symmetry functions.
zeta : float
Parameter of Gaussian symmetry functions.
eta : float
Parameter of Gaussian symmetry functions.
cutoff : dict
Cutoff function, typically from amp.descriptor.cutoffs. Should be also
formatted as a dictionary by todict method, e.g.
cutoff=Cosine(6.5).todict()
Ri : list
Position of the center atom. Should be fed as a list of three floats.
fortran : bool
If True, will use the fortran subroutines, else will not.
Returns
-------
ridge : float
G5 fingerprint.
"""
if fortran: # fortran version; faster
G_numbers = sorted([atomic_numbers[el] for el in G_elements])
neighbornumbers = \
[atomic_numbers[symbol] for symbol in neighborsymbols]
if len(neighborpositions) == 0:
return 0.
else:
Rc = cutoff['kwargs']['Rc']
if cutoff['name'] == 'Cosine':
cutofffn_code = 1
elif cutoff['name'] == 'Polynomial':
cutofffn_code = 2
else:
print("Unknown cutoff function specified! \
Only supports 'Cosine' and 'Polynomial'.")
args_calculate_g5 = dict(
neighbornumbers=neighbornumbers,
neighborpositions=neighborpositions,
g_numbers=G_numbers,
g_gamma=gamma,
g_zeta=zeta,
g_eta=eta,
rc=Rc,
cutofffn_code=cutofffn_code,
ri=Ri
)
if cutofffn_code == 2:
args_calculate_g5['p_gamma'] = cutoff['kwargs']['gamma']
ridge = fmodules.calculate_g5(**args_calculate_g5)
return ridge
else:
Rc = cutoff['kwargs']['Rc']
cutoff_fxn = dict2cutoff(cutoff)
ridge = 0.
counts = range(len(neighborpositions))
for j in counts:
for k in counts[(j + 1):]:
els = sorted([neighborsymbols[j], neighborsymbols[k]])
if els != G_elements:
continue
Rij_vector = neighborpositions[j] - Ri
Rij = np.linalg.norm(Rij_vector)
Rik_vector = neighborpositions[k] - Ri
Rik = np.linalg.norm(Rik_vector)
cos_theta_ijk = np.dot(Rij_vector, Rik_vector) / Rij / Rik
if cos_theta_ijk < -1:
# Can occur by rounding error.
cos_theta_ijk = -1.
term = (1. + gamma * cos_theta_ijk) ** zeta
term *= np.exp(-eta * (Rij ** 2. + Rik ** 2.) /
(Rc ** 2.))
_Rij = dict(Rij=Rij)
_Rik = dict(Rij=Rik)
if cutoff['name'] == 'Polynomial':
_Rij['gamma'] = cutoff['kwargs']['gamma']
_Rik['gamma'] = cutoff['kwargs']['gamma']
term *= cutoff_fxn(**_Rij)
term *= cutoff_fxn(**_Rik)
ridge += term
ridge *= 2. ** (1. - zeta)
return ridge
[docs]
def make_symmetry_functions(elements, type, etas, offsets=None,
zetas=None, gammas=None):
"""Helper function to create Gaussian symmetry functions.
Returns a list of dictionaries with symmetry function parameters
in the format expected by the Gaussian class.
Parameters
----------
elements : list of str
List of element types to be observed in this fingerprint.
type : str
Either G2, G4, or G5.
etas : list of floats
eta values to use in G2, G4 or G5 fingerprints
offsets: list of floats
offset values to use in G2 fingerprints
zetas : list of floats
zeta values to use in G4, and G5 fingerprints
gammas : list of floats
gamma values to use in G4, and G5 fingerprints
Returns
-------
G : list of dicts
A list, each item in the list contains a dictionary of fingerprint
parameters.
"""
if type == 'G2':
offsets = [0.] if offsets is None else offsets
G = [{'type': 'G2', 'element': element, 'eta': eta, 'offset': offset}
for eta in etas
for element in elements
for offset in offsets]
return G
elif type == 'G4':
G = []
for eta in etas:
for zeta in zetas:
for gamma in gammas:
for i1, el1 in enumerate(elements):
for el2 in elements[i1:]:
els = sorted([el1, el2])
G.append({'type': 'G4',
'elements': els,
'eta': eta,
'gamma': gamma,
'zeta': zeta})
return G
elif type == 'G5':
G = []
for eta in etas:
for zeta in zetas:
for gamma in gammas:
for i1, el1 in enumerate(elements):
for el2 in elements[i1:]:
els = sorted([el1, el2])
G.append({'type': 'G5',
'elements': els,
'eta': eta,
'gamma': gamma,
'zeta': zeta})
return G
raise NotImplementedError('Unknown type: {}.'.format(type))
[docs]
def make_default_symmetry_functions(elements):
"""Makes default set of G2 and G4 symmetry functions.
Parameters
----------
elements : list of str
List of the elements, as in: ["C", "O", "H", "Cu"].
Returns
-------
G : dict of lists
The generated symmetry function parameters.
"""
G = {}
for element in elements:
# Radial symmetry functions.
etas = np.logspace(np.log10(0.05), np.log10(5.), num=4)
_G = make_symmetry_functions(type='G2', etas=etas, elements=elements)
# Angular symmetry functions.
_G += make_symmetry_functions(type='G4', etas=[0.005],
zetas=[1., 4.], gammas=[+1., -1.],
elements=elements)
G[element] = _G
return G
[docs]
def Kronecker(i, j):
"""Kronecker delta function.
Parameters
----------
i : int
First index of Kronecker delta.
j : int
Second index of Kronecker delta.
Returns
-------
int
The value of the Kronecker delta.
"""
if i == j:
return 1
else:
return 0
[docs]
def dRij_dRml_vector(i, j, m, l):
"""Returns the derivative of the position vector R_{ij} with respect to
x_{l} of itomic index m.
See Eq. 14d of the supplementary information of Khorshidi, Peterson,
CPC(2016).
Parameters
----------
i : int
Index of the first atom.
j : int
Index of the second atom.
m : int
Index of the atom force is acting on.
l : int
Direction of force.
Returns
-------
list of float
The derivative of the position vector R_{ij} with respect to x_{l} of
atomic index m.
"""
if (m != i) and (m != j):
return [0, 0, 0]
else:
dRij_dRml_vector = [None, None, None]
c1 = Kronecker(m, j) - Kronecker(m, i)
dRij_dRml_vector[0] = c1 * Kronecker(0, l)
dRij_dRml_vector[1] = c1 * Kronecker(1, l)
dRij_dRml_vector[2] = c1 * Kronecker(2, l)
return dRij_dRml_vector
[docs]
def dRij_dRml(i, j, Ri, Rj, m, l):
"""Returns the derivative of the norm of position vector R_{ij} with
respect to coordinate x_{l} of atomic index m.
See Eq. 14c of the supplementary information of Khorshidi, Peterson,
CPC(2016).
Parameters
----------
i : int
Index of the first atom.
j : int
Index of the second atom.
Ri : float
Position of the first atom.
Rj : float
Position of the second atom.
m : int
Index of the atom force is acting on.
l : int
Direction of force.
Returns
-------
dRij_dRml : list of float
The derivative of the noRi of position vector R_{ij} with respect to
x_{l} of atomic index m.
"""
Rij = np.linalg.norm(Rj - Ri)
if m == i and i != j: # i != j is necessary for periodic systems
dRij_dRml = -(Rj[l] - Ri[l]) / Rij
elif m == j and i != j: # i != j is necessary for periodic systems
dRij_dRml = (Rj[l] - Ri[l]) / Rij
else:
dRij_dRml = 0
return dRij_dRml
[docs]
def dCos_theta_ijk_dR_ml(i, j, k, Ri, Rj, Rk, m, l):
"""Returns the derivative of Cos(theta_{ijk}) with respect to
x_{l} of atomic index m.
See Eq. 14f of the supplementary information of Khorshidi, Peterson,
CPC(2016).
Parameters
----------
i : int
Index of the center atom.
j : int
Index of the first atom.
k : int
Index of the second atom.
Ri : float
Position of the center atom.
Rj : float
Position of the first atom.
Rk : float
Position of the second atom.
m : int
Index of the atom force is acting on.
l : int
Direction of force.
Returns
-------
dCos_theta_ijk_dR_ml : float
Derivative of Cos(theta_{ijk}) with respect to x_{l} of atomic index m.
"""
Rij_vector = Rj - Ri
Rij = np.linalg.norm(Rij_vector)
Rik_vector = Rk - Ri
Rik = np.linalg.norm(Rik_vector)
dCos_theta_ijk_dR_ml = 0
dRijdRml = dRij_dRml_vector(i, j, m, l)
if np.array(dRijdRml).any() != 0:
dCos_theta_ijk_dR_ml += np.dot(dRijdRml, Rik_vector) / (Rij * Rik)
dRikdRml = dRij_dRml_vector(i, k, m, l)
if np.array(dRikdRml).any() != 0:
dCos_theta_ijk_dR_ml += np.dot(Rij_vector, dRikdRml) / (Rij * Rik)
dRijdRml = dRij_dRml(i, j, Ri, Rj, m, l)
if dRijdRml != 0:
dCos_theta_ijk_dR_ml += - np.dot(Rij_vector, Rik_vector) * dRijdRml / \
((Rij ** 2.) * Rik)
dRikdRml = dRij_dRml(i, k, Ri, Rk, m, l)
if dRikdRml != 0:
dCos_theta_ijk_dR_ml += - np.dot(Rij_vector, Rik_vector) * dRikdRml / \
(Rij * (Rik ** 2.))
return dCos_theta_ijk_dR_ml
[docs]
def calculate_G2_prime(neighborindices, neighbornumbers, neighborsymbols,
neighborpositions, G_element, eta, offset, cutoff,
i, Ri, m, l, fortran):
"""Calculates coordinate derivative of G2 symmetry function for atom at
index i and position Ri with respect to coordinate x_{l} of atom index
m.
See Eq. 13b of the supplementary information of Khorshidi, Peterson,
CPC(2016).
Parameters
---------
neighborindices : list of int
List of int of neighboring atoms.
neighbornumbers : list of int
List of neighbors' chemical numbers.
neighborsymbols : numpy.ndarray of str
Array of neighbors' symbols.
neighborpositions : numpy.ndarray of float
Array of Cartesian atomic positions.
G_element : dict
Symmetry functions of the center atom.
eta : float
Parameter of Behler symmetry functions.
offset: float
Offset for gaussians in G2 fingerprints
cutoff : dict
Cutoff function, typically from amp.descriptor.cutoffs. Should be also
formatted as a dictionary by todict method, e.g.
cutoff=Cosine(6.5).todict()
i : int
Index of the center atom.
Ri : list
Position of the center atom. Should be fed as a list of three floats.
m : int
Index of the atom force is acting on.
l : int
Direction of force.
fortran : bool
If True, will use the fortran subroutines, else will not.
Returns
-------
ridge : float
Coordinate derivative of G2 symmetry function for atom at index a and
position Ri with respect to coordinate x_{l} of atom index m.
"""
if fortran: # fortran version; faster
G_number = [atomic_numbers[G_element]]
if len(neighborpositions) == 0:
ridge = 0.
else:
Rc = cutoff['kwargs']['Rc']
if cutoff['name'] == 'Cosine':
cutofffn_code = 1
elif cutoff['name'] == 'Polynomial':
cutofffn_code = 2
else:
print("Unknown cutoff function specified! \
Only supports 'Cosine' and 'Polynomial'.")
args_calculate_g2_prime = dict(
neighborindices=list(neighborindices),
neighbornumbers=neighbornumbers,
neighborpositions=neighborpositions,
g_number=G_number,
g_eta=eta,
rc=Rc,
offset=offset,
cutofffn_code=cutofffn_code,
i=i,
ri=Ri,
m=m,
l=l
)
if cutofffn_code == 2:
args_calculate_g2_prime['p_gamma'] = cutoff['kwargs']['gamma']
ridge = fmodules.calculate_g2_prime(**args_calculate_g2_prime)
else:
Rc = cutoff['kwargs']['Rc']
cutoff_fxn = dict2cutoff(cutoff)
ridge = 0. # One aspect of a fingerprint :)
num_neighbors = len(neighborpositions) # number of neighboring atoms
for count in range(num_neighbors):
symbol = neighborsymbols[count]
Rj = neighborpositions[count]
j = neighborindices[count]
if symbol == G_element:
dRijdRml = dRij_dRml(i, j, Ri, Rj, m, l)
if dRijdRml != 0:
Rij = np.linalg.norm(Rj - Ri)
args_cutoff_fxn = dict(Rij=Rij)
if cutoff['name'] == 'Polynomial':
args_cutoff_fxn['gamma'] = cutoff['kwargs']['gamma']
term1 = (-2. * eta * (Rij - offset) *
cutoff_fxn(**args_cutoff_fxn) / (Rc ** 2.) +
cutoff_fxn.prime(**args_cutoff_fxn))
ridge += np.exp(- eta * ((Rij - offset) ** 2.) /
(Rc ** 2.)) * term1 * dRijdRml
return ridge
[docs]
def calculate_G4_prime(neighborindices, neighbornumbers, neighborsymbols,
neighborpositions, G_elements, gamma, zeta, eta,
cutoff, i, Ri, m, l, fortran):
"""Calculates coordinate derivative of G4 symmetry function for atom at
index i and position Ri with respect to coordinate x_{l} of atom index m.
See Eq. 13d of the supplementary information of Khorshidi, Peterson,
CPC(2016).
Parameters
----------
neighborindices : list of int
List of int of neighboring atoms.
neighbornumbers : list of int
List of neighbors' chemical numbers.
neighborsymbols : numpy.ndarray of str
Array of neighbors' symbols.
neighborpositions : numpy.ndarray of float
Array of Cartesian atomic positions.
G_elements : list of str
A list of two members, each member is the chemical species of one of
the neighboring atoms forming the triangle with the center atom.
gamma : float
Parameter of Behler symmetry functions.
zeta : float
Parameter of Behler symmetry functions.
eta : float
Parameter of Behler symmetry functions.
cutoff : dict
Cutoff function, typically from amp.descriptor.cutoffs. Should be also
formatted as a dictionary by todict method, e.g.
cutoff=Cosine(6.5).todict()
i : int
Index of the center atom.
Ri : list
Position of the center atom. Should be fed as a list of three floats.
m : int
Index of the atom force is acting on.
l : int
Direction of force.
fortran : bool
If True, will use the fortran subroutines, else will not.
Returns
-------
ridge : float
Coordinate derivative of G4 symmetry function for atom at index i and
position Ri with respect to coordinate x_{l} of atom index m.
"""
if fortran: # fortran version; faster
G_numbers = sorted([atomic_numbers[el] for el in G_elements])
if len(neighborpositions) == 0:
ridge = 0.
else:
Rc = cutoff['kwargs']['Rc']
if cutoff['name'] == 'Cosine':
cutofffn_code = 1
elif cutoff['name'] == 'Polynomial':
cutofffn_code = 2
else:
print("Unknown cutoff function specified! \
Only supports 'Cosine' and 'Polynomial'.")
args_calculate_g4_prime = dict(
neighborindices=list(neighborindices),
neighbornumbers=neighbornumbers,
neighborpositions=neighborpositions,
g_numbers=G_numbers,
g_gamma=gamma,
g_zeta=zeta,
g_eta=eta,
rc=Rc,
cutofffn_code=cutofffn_code,
i=i,
ri=Ri,
m=m,
l=l
)
if cutofffn_code == 2:
args_calculate_g4_prime['p_gamma'] = cutoff['kwargs']['gamma']
ridge = fmodules.calculate_g4_prime(**args_calculate_g4_prime)
else:
Rc = cutoff['kwargs']['Rc']
cutoff_fxn = dict2cutoff(cutoff)
ridge = 0.
# number of neighboring atoms
counts = range(len(neighborpositions))
for j in counts:
for k in counts[(j + 1):]:
els = sorted([neighborsymbols[j], neighborsymbols[k]])
if els != G_elements:
continue
Rj = neighborpositions[j]
Rk = neighborpositions[k]
Rij_vector = neighborpositions[j] - Ri
Rij = np.linalg.norm(Rij_vector)
Rik_vector = neighborpositions[k] - Ri
Rik = np.linalg.norm(Rik_vector)
Rjk_vector = neighborpositions[k] - neighborpositions[j]
Rjk = np.linalg.norm(Rjk_vector)
cos_theta_ijk = np.dot(Rij_vector, Rik_vector) / Rij / Rik
if cos_theta_ijk < -1.:
# Can occur by rounding error.
cos_theta_ijk = -1.
c1 = (1. + gamma * cos_theta_ijk)
_Rij = dict(Rij=Rij)
_Rik = dict(Rij=Rik)
_Rjk = dict(Rij=Rjk)
if cutoff['name'] == 'Polynomial':
_Rij['gamma'] = cutoff['kwargs']['gamma']
_Rik['gamma'] = cutoff['kwargs']['gamma']
_Rjk['gamma'] = cutoff['kwargs']['gamma']
fcRij = cutoff_fxn(**_Rij)
fcRik = cutoff_fxn(**_Rik)
fcRjk = cutoff_fxn(**_Rjk)
if zeta == 1:
term1 = \
np.exp(- eta * (Rij ** 2. + Rik ** 2. + Rjk ** 2.) /
(Rc ** 2.))
else:
term1 = c1 ** (zeta - 1.) * \
np.exp(- eta * (Rij ** 2. + Rik ** 2. + Rjk ** 2.) /
(Rc ** 2.))
term2 = 0.
fcRijfcRikfcRjk = fcRij * fcRik * fcRjk
dCosthetadRml = dCos_theta_ijk_dR_ml(i,
neighborindices[j],
neighborindices[k],
Ri, Rj,
Rk, m, l)
if dCosthetadRml != 0:
term2 += gamma * zeta * dCosthetadRml
dRijdRml = dRij_dRml(i, neighborindices[j], Ri, Rj, m, l)
if dRijdRml != 0:
term2 += -2. * c1 * eta * Rij * dRijdRml / (Rc ** 2.)
dRikdRml = dRij_dRml(i, neighborindices[k], Ri, Rk, m, l)
if dRikdRml != 0:
term2 += -2. * c1 * eta * Rik * dRikdRml / (Rc ** 2.)
dRjkdRml = dRij_dRml(neighborindices[j],
neighborindices[k],
Rj, Rk, m, l)
if dRjkdRml != 0:
term2 += -2. * c1 * eta * Rjk * dRjkdRml / (Rc ** 2.)
term3 = fcRijfcRikfcRjk * term2
term4 = cutoff_fxn.prime(**_Rij) * dRijdRml * fcRik * fcRjk
term5 = fcRij * cutoff_fxn.prime(**_Rik) * dRikdRml * fcRjk
term6 = fcRij * fcRik * cutoff_fxn.prime(**_Rjk) * dRjkdRml
ridge += term1 * (term3 + c1 * (term4 + term5 + term6))
ridge *= 2. ** (1. - zeta)
return ridge
[docs]
def calculate_G5_prime(neighborindices, neighbornumbers, neighborsymbols,
neighborpositions, G_elements, gamma, zeta, eta,
cutoff, i, Ri, m, l, fortran):
"""Calculates coordinate derivative of G5 symmetry function for atom at
index i and position Ri with respect to coordinate x_{l} of atom index m.
See Eq. 13d of the supplementary information of Khorshidi, Peterson,
CPC(2016).
Parameters
----------
neighborindices : list of int
List of int of neighboring atoms.
neighborsymbols : numpy.ndarray of str
Array of neighbors' symbols.
neighborpositions : numpy.ndarray of float
Array of Cartesian atomic positions.
G_elements : list of str
A list of two members, each member is the chemical species of one of
the neighboring atoms forming the triangle with the center atom.
gamma : float
Parameter of Behler symmetry functions.
zeta : float
Parameter of Behler symmetry functions.
eta : float
Parameter of Behler symmetry functions.
cutoff : dict
Cutoff function, typically from amp.descriptor.cutoffs. Should be also
formatted as a dictionary by todict method, e.g.
cutoff=Cosine(6.5).todict()
i : int
Index of the center atom.
Ri : list
Position of the center atom. Should be fed as a list of three floats.
m : int
Index of the atom force is acting on.
l : int
Direction of force.
fortran : bool
If True, will use the fortran subroutines, else will not.
Returns
-------
ridge : float
Coordinate derivative of G5 symmetry function for atom at index i and
position Ri with respect to coordinate x_{l} of atom index m.
"""
if fortran: # fortran version; faster
G_numbers = sorted([atomic_numbers[el] for el in G_elements])
if len(neighborpositions) == 0:
ridge = 0.
else:
Rc = cutoff['kwargs']['Rc']
if cutoff['name'] == 'Cosine':
cutofffn_code = 1
elif cutoff['name'] == 'Polynomial':
cutofffn_code = 2
else:
print("Unknown cutoff function specified! \
Only supports 'Cosine' and 'Polynomial'.")
args_calculate_g5_prime = dict(
neighborindices=list(neighborindices),
neighbornumbers=neighbornumbers,
neighborpositions=neighborpositions,
g_numbers=G_numbers,
g_gamma=gamma,
g_zeta=zeta,
g_eta=eta,
rc=Rc,
cutofffn_code=cutofffn_code,
i=i,
ri=Ri,
m=m,
l=l
)
if cutofffn_code == 2:
args_calculate_g5_prime['p_gamma'] = cutoff['kwargs']['gamma']
ridge = fmodules.calculate_g5_prime(**args_calculate_g5_prime)
else:
Rc = cutoff['kwargs']['Rc']
cutoff_fxn = dict2cutoff(cutoff)
ridge = 0.
# number of neighboring atoms
counts = range(len(neighborpositions))
for j in counts:
for k in counts[(j + 1):]:
els = sorted([neighborsymbols[j], neighborsymbols[k]])
if els != G_elements:
continue
Rj = neighborpositions[j]
Rk = neighborpositions[k]
Rij_vector = neighborpositions[j] - Ri
Rij = np.linalg.norm(Rij_vector)
Rik_vector = neighborpositions[k] - Ri
Rik = np.linalg.norm(Rik_vector)
cos_theta_ijk = np.dot(Rij_vector, Rik_vector) / Rij / Rik
if cos_theta_ijk < -1.:
cos_theta_ijk = -1.
c1 = (1. + gamma * cos_theta_ijk)
_Rij = dict(Rij=Rij)
_Rik = dict(Rij=Rik)
if cutoff['name'] == 'Polynomial':
_Rij['gamma'] = cutoff['kwargs']['gamma']
_Rik['gamma'] = cutoff['kwargs']['gamma']
fcRij = cutoff_fxn(**_Rij)
fcRik = cutoff_fxn(**_Rik)
if zeta == 1:
term1 = \
np.exp(- eta * (Rij ** 2. + Rik ** 2.) /
(Rc ** 2.))
else:
term1 = c1 ** (zeta - 1.) * \
np.exp(- eta * (Rij ** 2. + Rik ** 2.) /
(Rc ** 2.))
term2 = 0.
fcRijfcRik = fcRij * fcRik
dCosthetadRml = dCos_theta_ijk_dR_ml(i,
neighborindices[j],
neighborindices[k],
Ri, Rj,
Rk, m, l)
if dCosthetadRml != 0:
term2 += gamma * zeta * dCosthetadRml
dRijdRml = dRij_dRml(i, neighborindices[j], Ri, Rj, m, l)
if dRijdRml != 0:
term2 += -2. * c1 * eta * Rij * dRijdRml / (Rc ** 2.)
dRikdRml = dRij_dRml(i, neighborindices[k], Ri, Rk, m, l)
if dRikdRml != 0:
term2 += -2. * c1 * eta * Rik * dRikdRml / (Rc ** 2.)
term3 = fcRijfcRik * term2
term4 = cutoff_fxn.prime(**_Rij) * dRijdRml * fcRik
term5 = fcRij * cutoff_fxn.prime(**_Rik) * dRikdRml
ridge += term1 * (term3 + c1 * (term4 + term5))
ridge *= 2. ** (1. - zeta)
return ridge
if __name__ == "__main__":
"""Directly calling this module; apparently from another node.
Calls should come as
python -m amp.descriptor.gaussian id hostname:port
This session will then start a zmq session with that socket, labeling
itself with id. Instructions on what to do will come from the socket.
"""
import os
import sys
import tempfile
import zmq
from ..utilities import MessageDictionary
fortran = False if fmodules is None else True
hostsocket = sys.argv[-1]
proc_id = sys.argv[-2]
msg = MessageDictionary(proc_id)
# Send standard lines to stdout signaling process started and where
# error is directed. This should be caught by pxssh. (This could
# alternatively be done by zmq, but this works.)
print('<amp-connect>') # Signal that program started.
if not os.path.exists('tempfiles'):
os.mkdir('tempfiles')
sys.stderr = tempfile.NamedTemporaryFile(
mode='w', delete=False,
suffix='.stderr', dir="%s/tempfiles" % os.getcwd())
print('Log and error written to %s<stderr>' % sys.stderr.name)
sys.stderr.write('initiated\n')
sys.stderr.flush()
# Establish client session via zmq; find purpose.
context = zmq.Context()
sys.stderr.write('context started\n')
sys.stderr.flush()
socket = context.socket(zmq.REQ)
sys.stderr.write('socket started\n')
sys.stderr.flush()
socket.connect('tcp://%s' % hostsocket)
sys.stderr.write('connection made\n')
sys.stderr.flush()
socket.send_pyobj(msg('<purpose>'))
sys.stderr.write('message sent\n')
sys.stderr.flush()
purpose = socket.recv_pyobj()
sys.stderr.write('purpose received\n')
sys.stderr.flush()
sys.stderr.write('purpose: %s \n' % purpose)
sys.stderr.flush()
if purpose == 'calculate_neighborlists':
# Request variables.
socket.send_pyobj(msg('<request>', 'cutoff'))
cutoff = socket.recv_pyobj()
socket.send_pyobj(msg('<request>', 'images'))
images = socket.recv_pyobj()
# sys.stderr.write(str(images)) # Just to see if they are there.
# Perform the calculations.
calc = NeighborlistCalculator(cutoff=cutoff)
neighborlist = {}
# for key in images.iterkeys():
while len(images) > 0:
key, image = images.popitem() # Reduce memory.
neighborlist[key] = calc.calculate(image, key)
# Send the results.
socket.send_pyobj(msg('<result>', neighborlist))
socket.recv_string() # Needed to complete REQ/REP.
elif purpose == 'calculate_fingerprints':
# Request variables.
socket.send_pyobj(msg('<request>', 'cutoff'))
cutoff = socket.recv_pyobj()
socket.send_pyobj(msg('<request>', 'Gs'))
Gs = socket.recv_pyobj()
socket.send_pyobj(msg('<request>', 'neighborlist'))
neighborlist = socket.recv_pyobj()
socket.send_pyobj(msg('<request>', 'images'))
images = socket.recv_pyobj()
calc = FingerprintCalculator(neighborlist, Gs, cutoff,
fortran)
result = {}
while len(images) > 0:
key, image = images.popitem() # Reduce memory.
result[key] = calc.calculate(image, key)
if len(images) % 100 == 0:
socket.send_pyobj(msg('<info>', len(images)))
socket.recv_string() # Needed to complete REQ/REP.
# Send the results.
socket.send_pyobj(msg('<result>', result))
socket.recv_string() # Needed to complete REQ/REP.
elif purpose == 'calculate_fingerprint_primes':
# Request variables.
socket.send_pyobj(msg('<request>', 'cutoff'))
cutoff = socket.recv_pyobj()
socket.send_pyobj(msg('<request>', 'Gs'))
Gs = socket.recv_pyobj()
socket.send_pyobj(msg('<request>', 'neighborlist'))
neighborlist = socket.recv_pyobj()
socket.send_pyobj(msg('<request>', 'images'))
images = socket.recv_pyobj()
calc = FingerprintPrimeCalculator(neighborlist, Gs, cutoff,
fortran)
result = {}
while len(images) > 0:
key, image = images.popitem() # Reduce memory.
result[key] = calc.calculate(image, key)
if len(images) % 100 == 0:
socket.send_pyobj(msg('<info>', len(images)))
socket.recv_string() # Needed to complete REQ/REP.
# Send the results.
socket.send_pyobj(msg('<result>', result))
socket.recv_string() # Needed to complete REQ/REP.
else:
socket.close() # May be needed in python3 / ZMQ.
raise NotImplementedError('purpose %s unknown.' % purpose)
socket.close() # May be needed in python3 / ZMQ.