Source code for amp.model


import numpy as np
from ase.calculators.calculator import Parameters
from ..utilities import (Logger, ConvergenceOccurred, make_sublists, now,
                         setup_parallel)
try:
    from .. import fmodules
except ImportError:
    fmodules = None


[docs]class Model(object): """ Class that includes common methods between different models. """ @property def log(self): """Method to set or get a logger. Should be an instance of amp.utilities.Logger. Parameters ---------- log : Logger object Write function at which to log data. Note this must be a callable function. """ if hasattr(self, '_log'): return self._log if hasattr(self.parent, 'log'): return self.parent.log return Logger(None) @log.setter def log(self, log): self._log = log
[docs] def tostring(self): """Returns an evaluatable representation of the calculator that can be used to re-establish the calculator.""" # Make sure numpy prints out enough data. np.set_printoptions(precision=30, threshold=999999999) return self.parameters.tostring()
[docs] def calculate_energy(self, fingerprints): """Calculates the model-predicted energy for an image, based on its fingerprint. Parameters ---------- fingerprints : dict Dictionary with images hashs as keys and the corresponding fingerprints as values. """ if self.parameters.mode == 'image-centered': raise NotImplementedError('This needs to be coded.') elif self.parameters.mode == 'atom-centered': self.atomic_energies = [] energy = 0.0 for index, (symbol, afp) in enumerate(fingerprints): atom_energy = self.calculate_atomic_energy(afp=afp, index=index, symbol=symbol) self.atomic_energies.append(atom_energy) energy += atom_energy return energy
[docs] def calculate_forces(self, fingerprints, fingerprintprimes): """Calculates the model-predicted forces for an image, based on derivatives of fingerprints. Parameters ---------- fingerprints : dict Dictionary with images hashs as keys and the corresponding fingerprints as values. fingerprintprimes : dict Dictionary with images hashs as keys and the corresponding fingerprint derivatives as values. """ if self.parameters.mode == 'image-centered': raise NotImplementedError('This needs to be coded.') elif self.parameters.mode == 'atom-centered': selfindices = set([key[0] for key in fingerprintprimes.keys()]) forces = np.zeros((len(selfindices), 3)) for (selfindex, selfsymbol, nindex, nsymbol, i), derafp in \ fingerprintprimes.iteritems(): afp = fingerprints[nindex][1] dforce = self.calculate_force(afp=afp, derafp=derafp, nindex=nindex, nsymbol=nsymbol, direction=i,) forces[selfindex][i] += dforce return forces
[docs] def calculate_dEnergy_dParameters(self, fingerprints): """Calculates a list of floats corresponding to the derivative of model-predicted energy of an image with respect to model parameters. Parameters ---------- fingerprints : dict Dictionary with images hashs as keys and the corresponding fingerprints as values. """ if self.parameters.mode == 'image-centered': raise NotImplementedError('This needs to be coded.') elif self.parameters.mode == 'atom-centered': denergy_dparameters = None for index, (symbol, afp) in enumerate(fingerprints): temp = self.calculate_dAtomicEnergy_dParameters(afp=afp, index=index, symbol=symbol) if denergy_dparameters is None: denergy_dparameters = temp else: denergy_dparameters += temp return denergy_dparameters
[docs] def calculate_numerical_dEnergy_dParameters(self, fingerprints, d=0.00001): """Evaluates dEnergy_dParameters using finite difference. This will trigger two calls to calculate_energy(), with each parameter perturbed plus/minus d. Parameters ---------- fingerprints : dict Dictionary with images hashs as keys and the corresponding fingerprints as values. d : float The amount of perturbation in each parameter. """ if self.parameters.mode == 'image-centered': raise NotImplementedError('This needs to be coded.') elif self.parameters.mode == 'atom-centered': vector = self.vector denergy_dparameters = [] for _ in range(len(vector)): vector[_] += d self.vector = vector eplus = self.calculate_energy(fingerprints) vector[_] -= 2 * d self.vector = vector eminus = self.calculate_energy(fingerprints) denergy_dparameters += [(eplus - eminus) / (2 * d)] vector[_] += d self.vector = vector denergy_dparameters = np.array(denergy_dparameters) return denergy_dparameters
[docs] def calculate_dForces_dParameters(self, fingerprints, fingerprintprimes): """Calculates an array of floats corresponding to the derivative of model-predicted atomic forces of an image with respect to model parameters. Parameters ---------- fingerprints : dict Dictionary with images hashs as keys and the corresponding fingerprints as values. fingerprintprimes : dict Dictionary with images hashs as keys and the corresponding fingerprint derivatives as values. """ if self.parameters.mode == 'image-centered': raise NotImplementedError('This needs to be coded.') elif self.parameters.mode == 'atom-centered': selfindices = set([key[0] for key in fingerprintprimes.keys()]) dforces_dparameters = {(selfindex, i): None for selfindex in selfindices for i in range(3)} for (selfindex, selfsymbol, nindex, nsymbol, i), derafp in \ fingerprintprimes.iteritems(): afp = fingerprints[nindex][1] temp = self.calculate_dForce_dParameters(afp=afp, derafp=derafp, direction=i, nindex=nindex, nsymbol=nsymbol,) if dforces_dparameters[(selfindex, i)] is None: dforces_dparameters[(selfindex, i)] = temp else: dforces_dparameters[(selfindex, i)] += temp return dforces_dparameters
[docs] def calculate_numerical_dForces_dParameters(self, fingerprints, fingerprintprimes, d=0.00001): """Evaluates dForces_dParameters using finite difference. This will trigger two calls to calculate_forces(), with each parameter perturbed plus/minus d. Parameters --------- fingerprints : dict Dictionary with images hashs as keys and the corresponding fingerprints as values. fingerprintprimes : dict Dictionary with images hashs as keys and the corresponding fingerprint derivatives as values. d : float The amount of perturbation in each parameter. """ if self.parameters.mode == 'image-centered': raise NotImplementedError('This needs to be coded.') elif self.parameters.mode == 'atom-centered': selfindices = set([key[0] for key in fingerprintprimes.keys()]) dforces_dparameters = {(selfindex, i): [] for selfindex in selfindices for i in range(3)} vector = self.vector for _ in range(len(vector)): vector[_] += d self.vector = vector fplus = self.calculate_forces(fingerprints, fingerprintprimes) vector[_] -= 2 * d self.vector = vector fminus = self.calculate_forces(fingerprints, fingerprintprimes) for selfindex in selfindices: for i in range(3): dforces_dparameters[(selfindex, i)] += \ [(fplus[selfindex][i] - fminus[selfindex][i]) / ( 2 * d)] vector[_] += d self.vector = vector for selfindex in selfindices: for i in range(3): dforces_dparameters[(selfindex, i)] = \ np.array(dforces_dparameters[(selfindex, i)]) return dforces_dparameters
[docs]class LossFunction: """Basic loss function, which can be used by the model.get_loss method which is required in standard model classes. This version is pure python and thus will be slow compared to a fortran/parallel implementation. If parallel is None, it will pull it from the model itself. Only use this keyword to override the model's specification. Also has parallelization methods built in. See self.default_parameters for the default values of parameters specified as None. Parameters ---------- energy_coefficient : float Coefficient of the energy contribution in the loss function. force_coefficient : float Coefficient of the force contribution in the loss function. Can set to None as shortcut to turn off force training. convergence : dict Dictionary of keys and values defining convergence. Keys are 'energy_rmse', 'energy_maxresid', 'force_rmse', and 'force_maxresid'. If 'force_rmse' and 'force_maxresid' are both set to None, force training is turned off and force_coefficient is set to None. parallel : dict Parallel configuration dictionary. Will pull from model itself if not specified. overfit : float Multiplier of the weights norm penalty term in the loss function. raise_ConvergenceOccurred : bool If True will raise convergence notice. log_losses : bool If True will log the loss function value in the log file else will not. d : None or float If d is None, both loss function and its gradient are calculated analytically. If d is a float, then gradient of the loss function is calculated by perturbing each parameter plus/minus d. """ default_parameters = {'convergence': {'energy_rmse': 0.001, 'energy_maxresid': None, 'force_rmse': 0.005, 'force_maxresid': None, } } def __init__(self, energy_coefficient=1.0, force_coefficient=0.04, convergence=None, parallel=None, overfit=0., raise_ConvergenceOccurred=True, log_losses=True, d=None): p = self.parameters = Parameters( {'importname': '.model.LossFunction'}) # 'dict' creates a copy; otherwise mutable in class. c = p['convergence'] = dict(self.default_parameters['convergence']) if convergence is not None: for key, value in convergence.iteritems(): p['convergence'][key] = value p['energy_coefficient'] = energy_coefficient p['force_coefficient'] = force_coefficient p['overfit'] = overfit self.raise_ConvergenceOccurred = raise_ConvergenceOccurred self.log_losses = log_losses self.d = d self._step = 0 self._initialized = False self._data_sent = False self._parallel = parallel if (c['force_rmse'] is None) and (c['force_maxresid'] is None): p['force_coefficient'] = None if p['force_coefficient'] is None: c['force_rmse'] = None c['force_maxresid'] = None
[docs] def attach_model(self, model, fingerprints=None, fingerprintprimes=None, images=None): """Attach the model to be used to the loss function. fingerprints and training images need not be supplied if they are already attached to the model via model.trainingparameters. Parameters ---------- model : object Class representing the regression model. fingerprints : dict Dictionary with images hashs as keys and the corresponding fingerprints as values. fingerprintprimes : dict Dictionary with images hashs as keys and the corresponding fingerprint derivatives as values. images : list or str List of ASE atoms objects with positions, symbols, energies, and forces in ASE format. This is the training set of data. This can also be the path to an ASE trajectory (.traj) or database (.db) file. Energies can be obtained from any reference, e.g. DFT calculations. """ self._model = model self.fingerprints = fingerprints self.fingerprintprimes = fingerprintprimes self.images = images
def _initialize(self): """Procedures to be run on the first call only, such as establishing SSH sessions, etc.""" if self._initialized is True: return if self._parallel is None: self._parallel = self._model._parallel log = self._model.log if self.fingerprints is None: self.fingerprints = \ self._model.trainingparameters.descriptor.fingerprints # May also make sense to decide whether or not to calculate # fingerprintprimes based on the value of train_forces. if ((self.parameters.force_coefficient is not None) and (self.fingerprintprimes is None)): self.fingerprintprimes = \ self._model.trainingparameters.descriptor.fingerprintprimes if self.images is None: self.images = self._model.trainingparameters.trainingimages if self._parallel['cores'] != 1: # Initialize workers. workercommand = 'python -m %s' % self.__module__ server, connections, n_pids = setup_parallel(self._parallel, workercommand, log) self._sessions = {'master': server, 'connections': connections, # SSH's/nodes 'n_pids': n_pids} # total no. of workers if self.log_losses: p = self.parameters convergence = p['convergence'] log(' Loss function convergence criteria:') log(' energy_rmse: ' + str(convergence['energy_rmse'])) log(' energy_maxresid: ' + str(convergence['energy_maxresid'])) log(' force_rmse: ' + str(convergence['force_rmse'])) log(' force_maxresid: ' + str(convergence['force_maxresid'])) log(' Loss function set-up:') log(' energy_coefficient: ' + str(p.energy_coefficient)) log(' force_coefficient: ' + str(p.force_coefficient)) log(' overfit: ' + str(p.overfit)) log('\n') if p.force_coefficient is None: header = '%5s %19s %12s %12s %12s' log(header % ('', '', '', '', 'Energy')) log(header % ('Step', 'Time', 'Loss (SSD)', 'EnergyRMSE', 'MaxResid')) log(header % ('=' * 5, '=' * 19, '=' * 12, '=' * 12, '=' * 12)) else: header = '%5s %19s %12s %12s %12s %12s %12s' log(header % ('', '', '', '', 'Energy', '', 'Force')) log(header % ('Step', 'Time', 'Loss (SSD)', 'EnergyRMSE', 'MaxResid', 'ForceRMSE', 'MaxResid')) log(header % ('=' * 5, '=' * 19, '=' * 12, '=' * 12, '=' * 12, '=' * 12, '=' * 12)) self._initialized = True def _send_data_to_fortran(self,): """Procedures to be run in fortran mode for a single requested core only. Also just on the first call for sending data to fortran modules. """ if self._data_sent is True: return num_images = len(self.images) p = self.parameters energy_coefficient = p.energy_coefficient overfit = p.overfit if p.force_coefficient is None: train_forces = False force_coefficient = 0. else: train_forces = True force_coefficient = p.force_coefficient mode = self._model.parameters.mode if mode == 'atom-centered': num_atoms = None elif mode == 'image-centered': raise NotImplementedError('Image-centered mode is not coded yet.') (actual_energies, actual_forces, elements, atomic_positions, num_images_atoms, atomic_numbers, raveled_fingerprints, num_neighbors, raveled_neighborlists, raveled_fingerprintprimes) = (None,) * 10 value = ravel_data(train_forces, mode, self.images, self.fingerprints, self.fingerprintprimes,) if mode == 'image-centered': if not train_forces: (actual_energies, atomic_positions) = value else: (actual_energies, actual_forces, atomic_positions) = value else: if not train_forces: (actual_energies, elements, num_images_atoms, atomic_numbers, raveled_fingerprints) = value else: (actual_energies, actual_forces, elements, num_images_atoms, atomic_numbers, raveled_fingerprints, num_neighbors, raveled_neighborlists, raveled_fingerprintprimes) = value send_data_to_fortran(fmodules, energy_coefficient, force_coefficient, overfit, train_forces, num_atoms, num_images, actual_energies, actual_forces, atomic_positions, num_images_atoms, atomic_numbers, raveled_fingerprints, num_neighbors, raveled_neighborlists, raveled_fingerprintprimes, self._model, self.d) self._data_sent = True def _cleanup(self): """Closes SSH sessions.""" self._initialized = False if not hasattr(self, '_sessions'): return server = self._sessions['master'] finished = np.array([False] * self._sessions['n_pids']) while not finished.all(): message = server.recv_pyobj() if (message['subject'] == '<request>' and message['data'] == 'parameters'): server.send_pyobj('<stop>') finished[int(message['id'])] = True for _ in self._sessions['connections']: if hasattr(_, 'logout'): _.logout() del self._sessions['connections']
[docs] def get_loss(self, parametervector, lossprime): """Returns the current value of the loss function for a given set of parameters, or, if the energy is less than the energy_tol raises a ConvergenceException. Parameters ---------- parametervector : list Parameters of the regression model in the form of a list. lossprime : bool If True, will calculate and return dloss_dparameters, else will only return zero for dloss_dparameters. """ self._initialize() if self._parallel['cores'] == 1: if self._model.fortran: self._model.vector = parametervector self._send_data_to_fortran() (loss, dloss_dparameters, energy_loss, force_loss, energy_maxresid, force_maxresid) = \ fmodules.calculate_loss(parameters=parametervector, num_parameters=len( parametervector), lossprime=lossprime) else: loss, dloss_dparameters, energy_loss, force_loss, \ energy_maxresid, force_maxresid = \ self.calculate_loss(parametervector, lossprime=lossprime) else: server = self._sessions['master'] n_pids = self._sessions['n_pids'] # Subdivide tasks. keys = make_sublists(self.images.keys(), n_pids) args = {'lossprime': lossprime, 'd': self.d} results = self.process_parallels(parametervector, server, n_pids, keys, args=args) loss = results['loss'] dloss_dparameters = results['dloss_dparameters'] energy_loss = results['energy_loss'] force_loss = results['force_loss'] energy_maxresid = results['energy_maxresid'] force_maxresid = results['force_maxresid'] self.loss, self.energy_loss, self.force_loss, \ self.energy_maxresid, self.force_maxresid = \ loss, energy_loss, force_loss, energy_maxresid, force_maxresid if lossprime: self.dloss_dparameters = dloss_dparameters if self.raise_ConvergenceOccurred: # Only during calculation of loss function (and not lossprime) # convergence is checked and values are printed out in the log # file. if lossprime is False: self._model.vector = parametervector converged = self.check_convergence(loss, energy_loss, force_loss, energy_maxresid, force_maxresid) if converged: self._cleanup() raise ConvergenceOccurred() return {'loss': self.loss, 'dloss_dparameters': (self.dloss_dparameters if lossprime is True else dloss_dparameters), 'energy_loss': self.energy_loss, 'force_loss': self.force_loss, 'energy_maxresid': self.energy_maxresid, 'force_maxresid': self.force_maxresid, }
[docs] def calculate_loss(self, parametervector, lossprime): """Method that calculates the loss, derivative of the loss with respect to parameters (if requested), and max_residual. Parameters ---------- parametervector : list Parameters of the regression model in the form of a list. lossprime : bool If True, will calculate and return dloss_dparameters, else will only return zero for dloss_dparameters. """ self._model.vector = parametervector p = self.parameters energyloss = 0. forceloss = 0. energy_maxresid = 0. force_maxresid = 0. dloss_dparameters = np.array([0.] * len(parametervector)) model = self._model for hash, image in self.images.iteritems(): no_of_atoms = len(image) amp_energy = model.calculate_energy(self.fingerprints[hash]) actual_energy = image.get_potential_energy(apply_constraint=False) residual_per_atom = abs(amp_energy - actual_energy) / \ len(image) if residual_per_atom > energy_maxresid: energy_maxresid = residual_per_atom energyloss += residual_per_atom**2 # Calculates derivative of the loss function with respect to # parameters if lossprime is true if lossprime: if model.parameters.mode == 'image-centered': raise NotImplementedError('This needs to be coded.') elif model.parameters.mode == 'atom-centered': if self.d is None: denergy_dparameters = \ model.calculate_dEnergy_dParameters( self.fingerprints[hash]) else: denergy_dparameters = \ model.calculate_numerical_dEnergy_dParameters( self.fingerprints[hash], d=self.d) temp = p.energy_coefficient * 2. * \ (amp_energy - actual_energy) * \ denergy_dparameters / \ (no_of_atoms ** 2.) dloss_dparameters += temp if p.force_coefficient is not None: amp_forces = \ model.calculate_forces(self.fingerprints[hash], self.fingerprintprimes[hash]) actual_forces = image.get_forces(apply_constraint=False) for index in xrange(no_of_atoms): for i in xrange(3): force_resid = abs(amp_forces[index][i] - actual_forces[index][i]) if force_resid > force_maxresid: force_maxresid = force_resid temp = (1. / 3.) * (amp_forces[index][i] - actual_forces[index][i]) ** 2. / \ no_of_atoms forceloss += temp # Calculates derivative of the loss function with respect to # parameters if lossprime is true if lossprime: if model.parameters.mode == 'image-centered': raise NotImplementedError('This needs to be coded.') elif model.parameters.mode == 'atom-centered': if self.d is None: dforces_dparameters = \ model.calculate_dForces_dParameters( self.fingerprints[hash], self.fingerprintprimes[hash]) else: dforces_dparameters = \ model.calculate_numerical_dForces_dParameters( self.fingerprints[hash], self.fingerprintprimes[hash], d=self.d) for selfindex in range(no_of_atoms): for i in range(3): temp = p.force_coefficient * (2.0 / 3.0) * \ (amp_forces[selfindex][i] - actual_forces[selfindex][i]) * \ dforces_dparameters[(selfindex, i)] \ / no_of_atoms dloss_dparameters += temp loss = p.energy_coefficient * energyloss if p.force_coefficient is not None: loss += p.force_coefficient * forceloss dloss_dparameters = np.array(dloss_dparameters) # if overfit coefficient is more than zero, overfit contribution to # loss and dloss_dparameters is also added. if p.overfit > 0.: overfitloss = 0. for component in parametervector: overfitloss += component ** 2. overfitloss *= p.overfit loss += overfitloss doverfitloss_dparameters = \ 2 * p.overfit * np.array(parametervector) dloss_dparameters += doverfitloss_dparameters return loss, dloss_dparameters, energyloss, forceloss, \ energy_maxresid, force_maxresid
# All incoming requests will be dictionaries with three keys. # d['id']: process id number, assigned when process created above. # d['subject']: what the message is asking for / telling you. # d['data']: optional data passed from worker.
[docs] def process_parallels(self, vector, server, n_pids, keys, args): """ Parameters ---------- vector : list Parameters of the regression model in the form of a list. server : object Master session of parallel processing. processes: list of objects Worker sessions for parallel processing. keys : list List of images keys for worker processes. args : dict Dictionary containing arguments of the method to be called on each worker process. """ # For each process finished = np.array([False] * n_pids) results = {'loss': 0., 'dloss_dparameters': [0.] * len(vector), 'energy_loss': 0., 'force_loss': 0., 'energy_maxresid': 0., 'force_maxresid': 0.} while not finished.all(): message = server.recv_pyobj() if message['subject'] == '<purpose>': server.send_string('calculate_loss_function') elif message['subject'] == '<request>': request = message['data'] # Variable name. if request == 'images': subimages = {k: self.images[k] for k in keys[int(message['id'])]} server.send_pyobj(subimages) elif request == 'fortran': server.send_pyobj(self._model.fortran) elif request == 'modelstring': server.send_pyobj(self._model.tostring()) elif request == 'lossfunctionstring': server.send_pyobj(self.parameters.tostring()) elif request == 'fingerprints': server.send_pyobj({k: self.fingerprints[k] for k in keys[int(message['id'])]}) elif request == 'fingerprintprimes': if self.fingerprintprimes is not None: server.send_pyobj({k: self.fingerprintprimes[k] for k in keys[int(message['id'])]}) else: server.send_pyobj(None) elif request == 'args': server.send_pyobj(args) elif request == 'parameters': if finished[int(message['id'])]: server.send_pyobj('<continue>') else: server.send_pyobj(vector) else: raise NotImplementedError() elif message['subject'] == '<result>': result = message['data'] server.send_string('meaningless reply') results['loss'] += result['loss'] results['dloss_dparameters'] += result['dloss_dparameters'] results['energy_loss'] += result['energy_loss'] results['force_loss'] += result['force_loss'] if result['energy_maxresid'] > results['energy_maxresid']: results['energy_maxresid'] = result['energy_maxresid'] if result['force_maxresid'] > results['force_maxresid']: results['force_maxresid'] = result['force_maxresid'] finished[int(message['id'])] = True return results
[docs] def check_convergence(self, loss, energy_loss, force_loss, energy_maxresid, force_maxresid): """Check convergence Checks to see whether convergence is met; if it is, raises ConvergenceException to stop the optimizer. Parameters ---------- loss : float Value of the loss function. energy_loss : float Value of the energy contribution of the loss function. force_loss : float Value of the force contribution of the loss function. energy_maxresid : float Maximum energy residual. force_maxresid : float Maximum force residual. """ p = self.parameters energy_rmse_converged = True log = self._model.log if p.convergence['energy_rmse'] is not None: energy_rmse = np.sqrt(energy_loss / len(self.images)) if energy_rmse > p.convergence['energy_rmse']: energy_rmse_converged = False energy_maxresid_converged = True if p.convergence['energy_maxresid'] is not None: if energy_maxresid > p.convergence['energy_maxresid']: energy_maxresid_converged = False if p.force_coefficient is not None: force_rmse_converged = True if p.convergence['force_rmse'] is not None: force_rmse = np.sqrt(force_loss / len(self.images)) if force_rmse > p.convergence['force_rmse']: force_rmse_converged = False force_maxresid_converged = True if p.convergence['force_maxresid'] is not None: if force_maxresid > p.convergence['force_maxresid']: force_maxresid_converged = False if self.log_losses: log('%5i %19s %12.4e %10.4e %1s' ' %10.4e %1s %10.4e %1s %10.4e %1s' % (self._step, now(), loss, energy_rmse, 'C' if energy_rmse_converged else '-', energy_maxresid, 'C' if energy_maxresid_converged else '-', force_rmse, 'C' if force_rmse_converged else '-', force_maxresid, 'C' if force_maxresid_converged else '-')) self._step += 1 return energy_rmse_converged and energy_maxresid_converged and \ force_rmse_converged and force_maxresid_converged else: if self.log_losses: log('%5i %19s %12.4e %10.4e %1s %10.4e %1s' % (self._step, now(), loss, energy_rmse, 'C' if energy_rmse_converged else '-', energy_maxresid, 'C' if energy_maxresid_converged else '-')) self._step += 1 return energy_rmse_converged and energy_maxresid_converged
[docs]def calculate_fingerprints_range(fp, images): """Calculates the range for the fingerprints corresponding to images, stored in fp. fp is a fingerprints object with the fingerprints data stored in a dictionary-like object at fp.fingerprints. (Typically this is a .utilties.Data structure.) images is a hashed dictionary of atoms for which to consider the range. In image-centered mode, returns an array of (min, max) values for each fingerprint. In atom-centered mode, returns a dictionary of such arrays, one per element. """ if fp.parameters.mode == 'image-centered': raise NotImplementedError() elif fp.parameters.mode == 'atom-centered': fprange = {} for hash, image in images.iteritems(): imagefingerprints = fp.fingerprints[hash] for element, fingerprint in imagefingerprints: if element not in fprange: fprange[element] = [[_, _] for _ in fingerprint] else: assert len(fprange[element]) == len(fingerprint) for i, ridge in enumerate(fingerprint): if ridge < fprange[element][i][0]: fprange[element][i][0] = ridge elif ridge > fprange[element][i][1]: fprange[element][i][1] = ridge for key, value in fprange.iteritems(): fprange[key] = value return fprange
[docs]def ravel_data(train_forces, mode, images, fingerprints, fingerprintprimes,): """ Reshapes data of images into lists. Parameters --------- train_forces : bool Determining whether forces are also trained or not. mode : str Can be either 'atom-centered' or 'image-centered'. images : list or str List of ASE atoms objects with positions, symbols, energies, and forces in ASE format. This is the training set of data. This can also be the path to an ASE trajectory (.traj) or database (.db) file. Energies can be obtained from any reference, e.g. DFT calculations. fingerprints : dict Dictionary with images hashs as keys and the corresponding fingerprints as values. fingerprintprimes : dict Dictionary with images hashs as keys and the corresponding fingerprint derivatives as values. """ from ase.data import atomic_numbers as an actual_energies = [image.get_potential_energy(apply_constraint=False) for hash, image in images.iteritems()] if mode == 'atom-centered': num_images_atoms = [len(image) for hash, image in images.iteritems()] atomic_numbers = [an[atom.symbol] for hash, image in images.iteritems() for atom in image] def ravel_fingerprints(images, fingerprints): """ Reshape fingerprints of images into a list. """ raveled_fingerprints = [] elements = [] for hash, image in images.iteritems(): for index in range(len(image)): elements += [fingerprints[hash][index][0]] raveled_fingerprints += [fingerprints[hash][index][1]] elements = sorted(set(elements)) # Could also work without images: # raveled_fingerprints = [afp # for hash, value in fingerprints.iteritems() # for (element, afp) in value] return elements, raveled_fingerprints elements, raveled_fingerprints = ravel_fingerprints(images, fingerprints) else: atomic_positions = [image.positions.ravel() for hash, image in images.iteritems()] if train_forces is True: actual_forces = \ [image.get_forces(apply_constraint=False)[index] for hash, image in images.iteritems() for index in range(len(image))] if mode == 'atom-centered': def ravel_neighborlists_and_fingerprintprimes(images, fingerprintprimes): """ Reshape neighborlists and fingerprintprimes of images into a list and a matrix, respectively. """ # Only neighboring atoms of type II (within the main cell) # need to be sent to fortran for force training. # All keys in fingerprintprimes are for type II neighborhoods. # Also note that each atom is considered as neighbor of # itself in fingerprintprimes. num_neighbors = [] raveled_neighborlists = [] raveled_fingerprintprimes = [] for hash, image in images.iteritems(): for atom in image: selfindex = atom.index selfsymbol = atom.symbol selfneighborindices = [] selfneighborsymbols = [] for key, derafp in fingerprintprimes[hash].iteritems(): # key = (selfindex, selfsymbol, nindex, nsymbol, i) # i runs from 0 to 2. neighbor indices and symbols # should be added just once. if key[0] == selfindex and key[4] == 0: selfneighborindices += [key[2]] selfneighborsymbols += [key[3]] neighborcount = 0 for nindex, nsymbol in zip(selfneighborindices, selfneighborsymbols): raveled_neighborlists += [nindex] neighborcount += 1 for i in range(3): fpprime = fingerprintprimes[hash][(selfindex, selfsymbol, nindex, nsymbol, i)] raveled_fingerprintprimes += [fpprime] num_neighbors += [neighborcount] return (num_neighbors, raveled_neighborlists, raveled_fingerprintprimes) (num_neighbors, raveled_neighborlists, raveled_fingerprintprimes) = \ ravel_neighborlists_and_fingerprintprimes(images, fingerprintprimes) if mode == 'image-centered': if not train_forces: return (actual_energies, atomic_positions) else: return (actual_energies, actual_forces, atomic_positions) else: if not train_forces: return (actual_energies, elements, num_images_atoms, atomic_numbers, raveled_fingerprints) else: return (actual_energies, actual_forces, elements, num_images_atoms, atomic_numbers, raveled_fingerprints, num_neighbors, raveled_neighborlists, raveled_fingerprintprimes)
[docs]def send_data_to_fortran(_fmodules, energy_coefficient, force_coefficient, overfit, train_forces, num_atoms, num_images, actual_energies, actual_forces, atomic_positions, num_images_atoms, atomic_numbers, raveled_fingerprints, num_neighbors, raveled_neighborlists, raveled_fingerprintprimes, model, d): """ Function that sends images data to fortran code. Is used just once on each core. """ from ase.data import atomic_numbers as an if model.parameters.mode == 'image-centered': mode_signal = 1 elif model.parameters.mode == 'atom-centered': mode_signal = 2 _fmodules.images_props.num_images = num_images _fmodules.images_props.actual_energies = actual_energies if train_forces: _fmodules.images_props.actual_forces = actual_forces _fmodules.model_props.energy_coefficient = energy_coefficient _fmodules.model_props.force_coefficient = force_coefficient _fmodules.model_props.overfit = overfit _fmodules.model_props.train_forces = train_forces _fmodules.model_props.mode_signal = mode_signal if d is None: _fmodules.model_props.numericprime = False else: _fmodules.model_props.numericprime = True _fmodules.model_props.d = d if model.parameters.mode == 'atom-centered': fprange = model.parameters.fprange elements = sorted(fprange.keys()) num_elements = len(elements) elements_numbers = [an[elm] for elm in elements] min_fingerprints = \ [[fprange[elm][_][0] for _ in range(len(fprange[elm]))] for elm in elements] max_fingerprints = [[fprange[elm][_][1] for _ in range(len(fprange[elm]))] for elm in elements] num_fingerprints_of_elements = \ [len(fprange[elm]) for elm in elements] _fmodules.images_props.num_elements = num_elements _fmodules.images_props.elements_numbers = elements_numbers _fmodules.images_props.num_images_atoms = num_images_atoms _fmodules.images_props.atomic_numbers = atomic_numbers if train_forces: _fmodules.images_props.num_neighbors = num_neighbors _fmodules.images_props.raveled_neighborlists = \ raveled_neighborlists _fmodules.fingerprint_props.num_fingerprints_of_elements = \ num_fingerprints_of_elements _fmodules.fingerprint_props.raveled_fingerprints = raveled_fingerprints _fmodules.neuralnetwork.min_fingerprints = min_fingerprints _fmodules.neuralnetwork.max_fingerprints = max_fingerprints if train_forces: _fmodules.fingerprint_props.raveled_fingerprintprimes = \ raveled_fingerprintprimes else: _fmodules.images_props.num_atoms = num_atoms _fmodules.images_props.atomic_positions = atomic_positions # for neural neyworks only if model.parameters['importname'] == '.model.neuralnetwork.NeuralNetwork': hiddenlayers = model.parameters.hiddenlayers activation = model.parameters.activation if model.parameters.mode == 'atom-centered': from collections import OrderedDict no_layers_of_elements = \ [3 if isinstance(hiddenlayers[elm], int) else (len(hiddenlayers[elm]) + 2) for elm in elements] nn_structure = OrderedDict() for elm in elements: len_of_fps = len(fprange[elm]) if isinstance(hiddenlayers[elm], int): nn_structure[elm] = \ ([len_of_fps] + [hiddenlayers[elm]] + [1]) else: nn_structure[elm] = \ ([len_of_fps] + [layer for layer in hiddenlayers[elm]] + [1]) no_nodes_of_elements = [nn_structure[elm][_] for elm in elements for _ in range(len(nn_structure[elm]))] else: num_atoms = model.parameters.num_atoms if isinstance(hiddenlayers, int): no_layers_of_elements = [3] else: no_layers_of_elements = [len(hiddenlayers) + 2] if isinstance(hiddenlayers, int): nn_structure = ([3 * num_atoms] + [hiddenlayers] + [1]) else: nn_structure = ([3 * num_atoms] + [layer for layer in hiddenlayers] + [1]) no_nodes_of_elements = [nn_structure[_] for _ in range(len(nn_structure))] _fmodules.neuralnetwork.no_layers_of_elements = no_layers_of_elements _fmodules.neuralnetwork.no_nodes_of_elements = no_nodes_of_elements if activation == 'tanh': activation_signal = 1 elif activation == 'sigmoid': activation_signal = 2 elif activation == 'linear': activation_signal = 3 _fmodules.neuralnetwork.activation_signal = activation_signal