Source code for amp

import os
import sys
import shutil
import numpy as np
import tempfile
import platform
from getpass import getuser
from socket import gethostname
import subprocess
import warnings

import ase
from ase.calculators.calculator import Calculator, Parameters
from ase.utils.timing import Timer, timer
try:
    from ase import __version__ as aseversion
except ImportError:
    # We're on ASE 3.9 or older
    from ase.version import version as aseversion

from .utilities import (make_filename, hash_images, Logger, string2dict,
                        logo, now, assign_cores, TrainingConvergenceError,
                        check_images, hash_gc_images, get_gc_hash,
                        EphemeralDatabase)

try:
    from . import fmodules
except ImportError:
    warnings.warn('Did not find fortran modules.')
else:
    fmodules_version = 15
    wrong_version = fmodules.check_version(version=fmodules_version)
    if wrong_version:
        raise RuntimeError('fortran modules are not updated. Recompile '
                           'with f2py as described in the README. '
                           'Correct version is %i.' % fmodules_version)

version_file = os.path.join(os.path.split(os.path.abspath(__file__))[0],
                            'VERSION')
_ampversion = open(version_file).read().strip()
__version__ = _ampversion


[docs] class Amp(Calculator, object): """Atomistic Machine-Learning Potential (Amp) ASE calculator Parameters ---------- descriptor : object Class representing local atomic environment. model : object Class representing the regression model. Can be only NeuralNetwork for now. Input arguments for NeuralNetwork are hiddenlayers, activation, weights, and scalings; for more information see docstring for the class NeuralNetwork. label : str Default prefix/location used for all files. Setting to None disables logging. dblabel : str, None, or False Prefix/location for fingerprint database files (neighborlists, fingerprints, fingerprint derivatives). If None, defaults to label. Set to False to disable all disk writes and keep fingerprints only in memory, discarding each image's data as soon as it is used; this is the default when loading via Amp.load() since inference and MD runs rarely revisit the same image. A shared string path lets multiple calculator instances reuse the same on-disk cache. cores : int Can specify cores to use for parallel training; if None, will determine from environment envcommand : string For parallel processing across nodes, a command can be supplied here to load the appropriate environment before starting workers. logging : boolean Option to turn off logging; e.g., to speed up force calls. atoms : object ASE atoms objects with positions, symbols, energy, and forces in ASE format. """ implemented_properties = ['energy', 'forces', 'charges'] def __init__(self, descriptor, model, label='amp', dblabel=None, cores=None, envcommand=None, logging=True, atoms=None): self.timer = Timer() self.logging = logging Calculator.__init__(self, label=label, atoms=atoms) # Note self._log is set when Calculator.__init__ sets the label. self._printheader(self._log) self._parallel = {'envcommand': envcommand} # Note the following are properties: these are setter functions. self.descriptor = descriptor self.model = model self.cores = cores # Note this calls 'assign_cores'. self.dblabel = label if dblabel is None else dblabel def __del__(self): """ Write timings to log file when calculator is closed.""" if hasattr(self, 'timer') and self._log.file is not None: self.timer.write(self._log.file) @property def cores(self): """ Get or set the cores for the parallel environment. Parameters ---------- cores : int or dictionary Parallel configuration. If cores is an integer, parallelizes over this many processes on machine localhost. cores can also be a dictionary of the type {'node324': 16, 'node325': 16}. If not specified, tries to determine from environment, using amp.utilities.assign_cores. """ return self._parallel['cores'] @cores.setter def cores(self, cores): self._parallel['cores'] = assign_cores(cores, log=self._log) @property def descriptor(self): """Get or set the atomic descriptor. Parameters ---------- descriptor : object Class instance representing the local atomic environment. """ return self._descriptor @descriptor.setter def descriptor(self, descriptor): descriptor.parent = self # gives the descriptor object a reference to # the main Amp instance. Then descriptor can pull parameters directly # from Amp without needing them to be passed in each method call. self._descriptor = descriptor self.reset() # Clears any calculation results. @property def model(self): """Get or set the machine-learning model. Parameters ---------- model : object Class instance representing the regression model. """ return self._model @model.setter def model(self, model): model.parent = self # gives the model object a reference to the main # Amp instance. Then model can pull parameters directly from Amp # without needing them to be passed in each method call. self._model = model self.reset() # Clears any calculation results.
[docs] @classmethod def load(Cls, file, Descriptor=None, Model=None, dblabel=False, **kwargs): """Attempts to load calculators and return a new instance of Amp. Only a filename or file-like object is required, in typical cases. If using a home-rolled descriptor or model, also supply uninstantiated classes to those models, as in Model=MyModel. (Not as Model=MyModel()!) Additional keyword arguments (such as label) are fed through to Amp. Parameters ---------- file : str Name of the file to load data from. Descriptor : object Class representing local atomic environment. Model : object Class representing the regression model. dblabel : str, None, or False Database label; defaults to 'False' here, meaning fingerprints, etc., are not saved to disk (typically preferred for inference) """ if hasattr(file, 'read'): text = file.read() else: with open(file) as f: text = f.read() # Unpack parameter dictionaries. p = string2dict(text) for key in ['descriptor', 'model']: p[key] = string2dict(p[key]) # If modules are not specified, find them. if Descriptor is None: Descriptor = importhelper(p['descriptor'].pop('importname')) if Model is None: Model = importhelper(p['model'].pop('importname')) # Key 'importname' and the value removed so that it is not splatted # into the keyword arguments used to instantiate in the next line. # Instantiate the descriptor and model. descriptor = Descriptor(**p['descriptor']) # ** sends all the key-value pairs at once. model = Model(**p['model']) # Instantiate Amp. calc = Cls(descriptor=descriptor, model=model, dblabel=dblabel, **kwargs) calc._log('Loaded file: %s' % file) return calc
[docs] def set(self, **kwargs): """Function to set parameters. For now, this doesn't do anything as all parameters are within the model and descriptor. """ changed_parameters = Calculator.set(self, **kwargs) if len(changed_parameters) > 0: self.reset()
@property def label(self): # Note this only needed so we can define a setter. return Calculator.label.__get__(self) @label.setter def label(self, label): Calculator.label.__set__(self, label) # Create directories for output structure if needed. # Note ASE doesn't do this for us. if self.label: if not os.path.isdir(self.directory): os.makedirs(self.directory) # Create logger corresponding to label. if self.logging is True: self._log = Logger(make_filename(self.label, '-log.txt')) else: self._log = Logger(None)
[docs] @timer('calculate') def calculate(self, atoms, properties, system_changes): """Calculation of the energy of system and forces of all atoms. If charge learning mode is applied, calculation of the atomic charges of all atoms also is performed""" # The inherited method below just sets the atoms object, # if specified, to self.atoms. Calculator.calculate(self, atoms, properties, system_changes) if self.dblabel is False: EphemeralDatabase.clear_all() log = self._log log('Calculation requested.') if self.model.__class__.__name__ != 'ChargeNeuralNetwork': images = hash_images([self.atoms]) key = list(images.keys())[0] else: # Charge training requires Ne and potential info # Resolve electrode_potential from this calculator's own # parameters. # Must be set by the caller via calc.set(electrode_potential=...) # before any call to get_potential_energy() / get_forces(). # (During training, the potential is read from image.calc.results # directly inside the training loop — not through calculate().) try: _ep = self.parameters['electrode_potential'] except KeyError: raise RuntimeError( "electrode_potential not found. " "Set it with calc.set(electrode_potential=...) " "before calling get_potential_energy() / get_forces()." ) key = get_gc_hash(atoms, electrode_potential=_ep) images = {key: atoms} electrode_potentials = {key: _ep} (charge_fp_append, charge_fpprime_append) = self.model.calculate_charge_fp_append( images, self.model.parameters.slab_metal, self.model.parameters.surface_correction, self.model.parameters.etas, electrode_potentials=electrode_potentials) if properties == ['energy']: log('Calculating potential energy...', tic='pot-energy') self.timer.start('calculate_fingerprints') self.descriptor.calculate_fingerprints(images=images, log=log, calculate_derivatives=False) self.timer.stop('calculate_fingerprints') if self.model.__class__.__name__ == 'NeuralNetwork': self.timer.start('calculate_energy') energy = self.model.calculate_energy( self.descriptor.fingerprints[key]) self.timer.stop('calculate_energy') elif self.model.__class__.__name__ == 'ChargeNeuralNetwork': self.timer.start('calculate_energy') wf = electrode_potentials[key] energy, charge = self.model.calculate_gc_energy( self.descriptor.fingerprints[key], wf, qfp_append=charge_fp_append[key]) self.timer.stop('calculate_energy') elif self.model.__class__.__name__ == 'KernelRidge': # KRR needs training images. fingerprints = self.descriptor.fingerprints log('Loading the training set') if isinstance(self.model.trainingimages, str): trainingimages = hash_images( ase.io.Trajectory(self.model.trainingimages) ) else: trainingimages = hash_images(self.model.trainingimages) self.descriptor.calculate_fingerprints( images=trainingimages, log=log, calculate_derivatives=False ) fp_trainingimages = self.descriptor.fingerprints energy = self.model.calculate_energy( fingerprints, hash=key, trainingimages=trainingimages, fp_trainingimages=fp_trainingimages ) self.results['energy'] = energy log('...potential energy calculated.', toc='pot-energy') if properties == ['forces']: if self.model.__class__.__name__ != 'KernelRidge': log('Calculating forces...', tic='forces') with self.timer('calculate_fingerprint_w_der'): self.descriptor.calculate_fingerprints( images=images, log=log, calculate_derivatives=True) with self.timer('calculate_forces'): if self.model.__class__.__name__ == 'NeuralNetwork': forces = self.model.calculate_forces( self.descriptor.fingerprints[key], self.descriptor.fingerprintprimes[key]) elif (self.model.__class__.__name__ == 'ChargeNeuralNetwork'): wf = electrode_potentials[key] forces = self.model.calculate_gc_forces( self.descriptor.fingerprints[key], self.descriptor.fingerprintprimes[key], wf, qfp_append=charge_fp_append[key], qfpprime_append=charge_fpprime_append[key],) self.results['forces'] = forces log('...forces calculated.', toc='forces') else: log('Calculating forces...', tic='forces') self.descriptor.calculate_fingerprints( images=images, log=log, calculate_derivatives=True ) log('Loading the training set') if isinstance(self.model.trainingimages, str): trainingimages = hash_images( ase.io.Trajectory(self.model.trainingimages) ) else: trainingimages = hash_images(self.model.trainingimages) self.descriptor.calculate_fingerprints( images=trainingimages, log=log, calculate_derivatives=True ) t_descriptor = self.descriptor forces = \ self.model.calculate_forces( self.descriptor.fingerprints[key], self.descriptor.fingerprintprimes[key], hash=key, trainingimages=trainingimages, t_descriptor=t_descriptor, ) self.results['forces'] = forces log('...forces calculated.', toc='forces') if properties == ['charges']: log('Calculating charges...', tic='charges') self.timer.start('calculate_fingerprints') self.descriptor.calculate_fingerprints(images=images, log=log, calculate_derivatives=False) self.timer.stop('calculate_fingerprints') if self.model.__class__.__name__ == 'ChargeNeuralNetwork': self.timer.start('calculate_charge') wf = electrode_potentials[key] energy, charge = self.model.calculate_gc_energy( self.descriptor.fingerprints[key], wf, qfp_append=charge_fp_append[key],) self.timer.stop('calculate_charge') else: raise NotImplementedError('Charge predictions can only be ' 'processed with ChargeNeuralNetwork') # Atomic charges are returned to be consistant with ASE. The number # of excess electrons is the sum of atomic charges with an # opposite sign. self.results['charges'] = self.model.atomic_charges log('...charge calculated.', toc='charges') # Mirror SJM behaviour: store the potential used so downstream code # can read calc.results['electrode_potential'] regardless of which # calculator type is attached. if self.model.__class__.__name__ == 'ChargeNeuralNetwork': self.results['electrode_potential'] = electrode_potentials[key]
[docs] @timer('train') def train(self, images, overwrite=False, max_attempts=1, ): """Fits the model to the training images. Parameters ---------- images : list or str List of ASE atoms objects with positions, symbols, energies, and forces in ASE format. This is the training set of data. This can also be the path to an ASE trajectory (.traj) or database (.db) file. Energies can be obtained from any reference, e.g. DFT calculations. overwrite : bool If an output file with the same name exists, overwrite it. max_attempts : int Maximum number of training attempts. If training does not converge, the model weights (including packed biases) are reset to new random values and training is retried. The first attempt always uses whatever weights are currently on the model (e.g., from a loaded checkpoint); randomization only applies to subsequent attempts. Fingerprints are computed only once and reused across attempts. Default is 1 (no retries). """ log = self._log log('\nAmp training started. ' + now() + '\n') log('Descriptor: %s\n (%s)' % (self.descriptor.__class__.__name__, self.descriptor)) log('Model: %s\n (%s)' % (self.model.__class__.__name__, self.model)) with self.timer('hash_images'): if self.model.__class__.__name__ != 'ChargeNeuralNetwork': images = hash_images(images, log=log) else: images = hash_gc_images(images, log=log) log('\nDescriptor\n==========') train_forces = self.model.forcetraining # True / False if self.model.__class__.__name__ != 'ChargeNeuralNetwork': check_images(images, forces=train_forces, charges=False) else: check_images(images, forces=train_forces, charges=True) with self.timer('calculate_fingerprints'): self.descriptor.calculate_fingerprints( images=images, parallel=self._parallel, log=log, calculate_derivatives=train_forces) log('\nModel fitting\n=============') result = False for attempt in range(max_attempts): if attempt > 0: log('Training did not converge (attempt %i of %i); ' 'retrying with new random weights.' % (attempt, max_attempts)) self.model.parameters['weights'] = None if 'charge_weights' in self.model.parameters: self.model.parameters['charge_weights'] = None self.model.lossfunction.reset() for suffix in ('-checkpoint.amp', '-initial-parameters.amp'): f = make_filename(self.label, suffix) if os.path.exists(f): os.remove(f) checkpoint_dir = self.label + '-checkpoints' if os.path.exists(checkpoint_dir): shutil.rmtree(checkpoint_dir) with self.timer('fit model'): result = self.model.fit(trainingimages=images, descriptor=self.descriptor, log=log, parallel=self._parallel) if result is True: break if result is True: log('Amp successfully trained. Saving current parameters.') filename = self.label + '.amp' self.reset() # Clears any calculation results. else: log('Amp not trained successfully. Saving current parameters.') filename = make_filename(self.label, '-untrained-parameters.amp') filename = self.save(filename, overwrite) log('Parameters saved in file "%s".' % filename) log("This file can be opened with `calc = Amp.load('%s')`" % filename) if result is False: raise TrainingConvergenceError( 'Amp did not converge upon training after %i attempt(s). ' 'See log file for more information.' % max_attempts)
[docs] def save(self, filename, overwrite=False): """Saves the calculator in a way that it can be re-opened with load. Parameters ---------- filename : str File object or path to the file to write to. overwrite : bool If an output file with the same name exists, overwrite it. """ if os.path.exists(filename): if overwrite is False: oldfilename = filename filename = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.amp').name self._log('File "%s" exists. Instead saving to "%s".' % (oldfilename, filename)) else: oldfilename = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.amp').name self._log('Overwriting file: "%s". Moving original to "%s".' % (filename, oldfilename)) shutil.move(filename, oldfilename) descriptor = self.descriptor.tostring() model = self.model.tostring() p = Parameters({'descriptor': descriptor, 'model': model}) p.write(filename) return filename
def _printheader(self, log): """Prints header to log file; inspired by that in GPAW. """ log(logo) log('Amp: Atomistic Machine-learning Package') log('Developed by Andrew Peterson, Alireza Khorshidi, and others,') log('Brown University.') log('PI Website: http://brown.edu/go/catalyst') log('Official repository: http://bitbucket.org/andrewpeterson/amp') log('Official documentation: http://amp.readthedocs.io/') log('Citation:') log(' Alireza Khorshidi & Andrew A. Peterson,') log(' Computer Physics Communications 207: 310-324 (2016).') log(' http://doi.org/10.1016/j.cpc.2016.05.010') log('=' * 70) log('User: %s' % getuser()) log('Hostname: %s' % gethostname()) log('Date: %s' % now(with_utc=True)) uname = platform.uname() log('Architecture: %s' % uname[4]) log('PID: %s' % os.getpid()) log('Amp version: %s' % _ampversion) ampdirectory = os.path.dirname(os.path.abspath(__file__)) log('Amp directory: %s' % ampdirectory) commithash, commitdate = get_git_commit(ampdirectory) log(' Last commit: {:s}'.format(commithash)) log(' Last commit date: {:s}'.format(commitdate)) log('Python: v{0}.{1}.{2}: %s'.format(*sys.version_info[:3]) % sys.executable) log('ASE v%s: %s' % (aseversion, os.path.dirname(ase.__file__))) log('NumPy v%s: %s' % (np.version.version, os.path.dirname(np.__file__))) # SciPy is not a strict dependency. try: import scipy log('SciPy v%s: %s' % (scipy.version.version, os.path.dirname(scipy.__file__))) except ImportError: log('SciPy: not available') # ZMQ and pxssh are only necessary for parallel calculations. try: import zmq log('ZMQ/PyZMQ v%s/v%s: %s' % (zmq.zmq_version(), zmq.pyzmq_version(), os.path.dirname(zmq.__file__))) except ImportError: log('ZMQ: not available') try: import pxssh log('pxssh: %s' % os.path.dirname(pxssh.__file__)) except ImportError: log('pxssh: Not available from pxssh.') try: from pexpect import pxssh except ImportError: log('pxssh: Not available from pexpect.') else: import pexpect log('pxssh (via pexpect v%s): %s' % (pexpect.__version__, pxssh.__file__)) log('=' * 70)
[docs] def importhelper(importname): """Manually compiled list of available modules. This is to prevent the execution of arbitrary (potentially malicious) code. However, since there is an `eval` statement in string2dict maybe this is silly. """ if importname == '.descriptor.gaussian.Gaussian': from .descriptor.gaussian import Gaussian as Imported elif importname == '.descriptor.zernike.Zernike': from .descriptor.zernike import Zernike as Imported elif importname == '.descriptor.bispectrum.Bispectrum': from .descriptor.bispectrum import Bispectrum as Imported elif importname == '.model.neuralnetwork.NeuralNetwork': from .model.neuralnetwork import NeuralNetwork as Imported elif importname == '.model.chargeneuralnetwork.ChargeNeuralNetwork': from .model.chargeneuralnetwork import ChargeNeuralNetwork as Imported elif importname == '.model.neuralnetwork.tflow': from .model.tflow import NeuralNetwork as Imported elif importname == '.model.kernelridge.KernelRidge': from .model.kernelridge import KernelRidge as Imported elif importname == '.model.LossFunction': from .model import LossFunction as Imported else: raise NotImplementedError( 'Attempt to import the module %s. Was this intended? ' 'If so, trying manually importing this module and ' 'feeding it to Amp.load. To avoid this error, this ' 'module can be added to amp.importhelper.' % importname) return Imported
[docs] def get_git_commit(ampdirectory): """Attempts to get the last git commit from the amp directory. """ pwd = os.getcwd() os.chdir(ampdirectory) try: with open(os.devnull, 'w') as devnull: output = subprocess.check_output(['git', 'log', '-1', '--pretty=%H\t%ci'], stderr=devnull) except (subprocess.CalledProcessError, OSError): output = b'unknown hash\tunknown date' output = output.decode('utf-8') output = output.strip() commithash, commitdate = output.split('\t') os.chdir(pwd) return commithash, commitdate