Files
MDAF/SourceCode/AlgorithmAnalyser.py
2021-05-22 01:05:18 -04:00

253 lines
10 KiB
Python

# directly running the DOE because existing surrogates can be explored with another workflow
from os import path
from os import sys
import importlib.util
import multiprocessing
import time
import re
from numpy import random as r
from numpy import *
import statistics
from functools import partial
import shutil
# Surrogate modelling
import itertools
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# Test function representation
from rpy2 import robjects as robjs
from rpy2.robjects.packages import importr
from rpy2 import rinterface
# Test function characteristics
import statistics as st
from scipy import signal, misc, ndimage
def installFalcoo(mirror = 'https://utstat.toronto.edu/cran/'):
utils = importr('utils')
utils.install_packages('flacco', repos=mirror)
utils.install_packages('list', repos=mirror)
class counter:
#wraps a function, to keep a running count of how many
#times it's been called
def __init__(self, func):
self.func = func
self.count = 0
def __call__(self, *args, **kwargs):
self.count += 1
return self.func(*args, **kwargs)
def simulate(algName, algPath, funcname, funcpath, objs, args, initpoint):
# loading the heuristic object into the namespace and memory
spec = importlib.util.spec_from_file_location(algName, algPath)
heuristic = importlib.util.module_from_spec(spec)
spec.loader.exec_module(heuristic)
# loading the test function object into the namespace and memory
testspec = importlib.util.spec_from_file_location(funcname, funcpath)
func = importlib.util.module_from_spec(testspec)
testspec.loader.exec_module(func)
# defining a countable test function
@counter
def testfunc(args):
return func.main(args)
# using a try statement to handle potential exceptions raised by child processes like the algorithm or test functions or the pooling algorithm
try:
#This timer calculates directly the CPU time of the process (Nanoseconds)
tic = time.process_time_ns()
# running the test by calling the heuritic script with the test function as argument
quality = heuristic.main(testfunc, objs, initpoint, args)
toc = time.process_time_ns()
# ^^ The timer ends right above this; the CPU time is then calculated below by simple difference ^^
# CPU time in seconds
cpuTime = (toc - tic)*(10**-9)
numCalls = testfunc.count
converged = 1
except:
quality = NaN
cpuTime = NaN
numCalls = testfunc.count
converged = 0
return cpuTime, quality, numCalls, converged
def measure(heuristicpath, heuristic_name, funcpath, funcname, objs, args, scale, connection):
'''
This function runs each optimization process of the heuristic with one test function
'''
# Seeding the random module for generating the initial point of the optimizer: Utilising random starting point for experimental validity
r.seed(int(time.time()))
# Defining random initial points to start testing the algorithms
initpoints = [[r.random() * scale, r.random() * scale] for run in range(30)] #update the inner as [r.random() * scale for i in range(testfuncDimmensions)]
# building the iterable arguments
partfunc = partial(simulate, heuristic_name, heuristicpath, funcname, funcpath, objs, args)
with multiprocessing.Pool(processes = 3) as pool:
# running the simulations
newRun = pool.map(partfunc,initpoints)
cpuTime = [resl[0] for resl in newRun]
quality = [resl[1] for resl in newRun]
numCalls = [resl[2] for resl in newRun]
converged = [resl[3] for resl in newRun]
results = dict()
results['cpuTime'] = array([statistics.mean(cpuTime), statistics.stdev(cpuTime)])
results['quality'] = array([statistics.mean(quality), statistics.stdev(quality)])
results['numCalls'] = array([statistics.mean(numCalls), statistics.stdev(numCalls)])
results['convRate'] = array([statistics.mean(converged), statistics.stdev(converged)])
connection.send((results,newRun))
def writerepresentation(funcpath, charas):
# Save a backup copy of the function file
shutil.copyfile(funcpath, funcpath + '.old')
# create a string format of the representation variables
representation = ''
for line in list(charas):
representation += '\n\t#_# ' + line + ': ' + repr(charas[line]).replace('\n','')
representation+='\n\n\t#_# Represented: 1\n\n'
# Creating the new docstring to be inserted into the file
with open(funcpath, "r") as file:
content = file.read()
docstrs = re.findall("def main\(.*?\):.*?'''(.*?)'''.*?return\s+.*?", content, re.DOTALL)[0]
docstrs += representation
repl = "\\1"+docstrs+"\t\\2"
# Create the new content of the file to replace the old. Replacing the whole thing
pattrn = re.compile("(def main\(.*?\):.*?''').*?('''.*?return\s+.*?\n|$)", flags=re.DOTALL)
newContent = pattrn.sub(repl, content, count=1)
# Overwrite the test function file
with open(funcpath,"w") as file:
file.write(newContent)
def representfunc(funcpath):
#defining the function name
funcname = path.splitext(path.basename(funcpath))[0]
# loading the function to be represented
spec = importlib.util.spec_from_file_location(funcname, funcpath)
funcmodule = importlib.util.module_from_spec(spec)
spec.loader.exec_module(funcmodule)
# Finding the function characteristics inside the docstring
if funcmodule.main.__doc__:
regex = re.compile("#_#\s?(\w+):(.+)?\n") # this regular expression matches the characteristics already specified in the docstring section of the function -- old exp: "#_#\s?(\w+):\s?([-+]?(\d+(\.\d*)?|\.\d+)([eE][-+]?\d+)?)"
characs = re.findall(regex, funcmodule.main.__doc__)
results = {}
for charac in characs:
results[charac[0]] = eval(charac[1])
# Automatically generate the representation if the docstrings did not return anything
if not ('Represented' in results):
print("Warning, the Representation of the Test Function has not been specified\n===\n******Calculating the Characteristics******")
n = int(results['dimmensions'])
# Importing FLACCO using rpy2
flacco = importr('flacco')
# creating the r functions
rlist = robjs.r['list']
rapply = robjs.r['apply']
rvector = robjs.r['c']
r_unlist = robjs.r['unlist']
rtestfunc = rinterface.rternalize(funcmodule.main)
###
lower = r_unlist(rvector(results['lower']))
upper = r_unlist(rvector(results['upper']))
X = flacco.createInitialSample(n_obs = 500, dim = n, control = rlist(**{'init_sample.type' : 'lhs', 'init_sample.lower' : lower, 'init_sample.upper' : upper}))
y = rapply(X, 1, rtestfunc)
testfuncobj = flacco.createFeatureObject(X = X, y = y, fun = rtestfunc, lower = lower, upper = upper, blocks = 10)
# these are the retained features. Note that some features are being excluded for being problematic and to avoid overcomplicating the neural network.... the feature sets are redundant and the most relevant ones have been retained
# the excluded feature sets are: 'bt', 'ela_level'
# feature sets that require special attention: 'cm_angle', 'cm_grad', 'limo', 'gcm' (large set with some nans),
featureset = ['cm_angle','cm_conv','cm_grad','ela_conv','ela_curv','ela_distr','ela_local','ela_meta','basic','disp','limo','nbc','pca','gcm','ic']
pyfeats = dict()
for feature in featureset:
rawfeats = flacco.calculateFeatureSet(testfuncobj, set=feature)
pyfeats[feature] = asarray(rawfeats)
writerepresentation(funcpath, pyfeats)
return results
def doe(heuristicpath, heuristic_name, testfunctionpaths, funcnames, objs, args, scale):
# logic variables to deal with the processes
proc = []
connections = {}
# loading the test functions into the namespace and memory
for idx, funcpath in enumerate(testfunctionpaths):
funcname = funcnames[idx]
# Creating the connection objects for communication between the heuristic and this module
connections[funcname] = multiprocessing.Pipe(duplex=False)
proc.append(multiprocessing.Process(target=measure, name=funcname, args=(heuristicpath, heuristic_name, funcpath, funcname, objs, args, scale, connections[funcname][1])))
# defining the response variables
responses = {}
failedfunctions = {}
processtiming = {}
# defining some logic variables
for idx,process in enumerate(proc):
process.start()
# Waiting for all the runs to be
# multiprocessing.connection.wait([process.sentinel for process in proc])
for process in proc: process.join()
for process in proc:
run = process.name
if process.exitcode == 0: responses[run] = connections[run][0].recv()
else:
responses[run] = "this run was not successful"
failedfunctions[run] = process.exitcode
connections[run][0].close()
connections[run][1].close()
# display output
print("\n\n||||| Responses: [mean,stdDev] |||||")
for process in proc: print(process.name + "____\n" + str(responses[process.name][0]) + "\n_________________")
#return output
return responses
if __name__ == '__main__':
heuristicpath = "SampleAlgorithms/SimmulatedAnnealing.py"
heuristic_name = "SimmulatedAnnealing"
testfunctionpaths = ["TestFunctions/Bukin2.py", "TestFunctions/Bukin4.py", "TestFunctions/Brown.py"]
funcnames = ["Bukin2", "Bukin4", "Brown"]
# testfunctionpaths = ["/home/remi/Documents/MDAF-GitLAB/SourceCode/TestFunctions/Bukin4.py"]
# funcnames = ["Bukin4"]
objs = 0
args = {"high": 200, "low": -200, "t": 1000, "p": 0.95}
scale = 1
# data = doe (heuristicpath, heuristic_name, testfunctionpaths, funcnames, objs, args, scale)
# print([point[2] for point in data['Bukin2'][1]])
representfunc("TestFunctions/Bukin2.py")
# %%