Files
MDAF/SourceCode/AlgorithmAnalyser.py
2021-05-14 23:49:40 -04:00

253 lines
9.3 KiB
Python

# directly running the DOE because existing surrogates can be explored with another workflow
from os import path
import importlib.util
import multiprocessing
import time
import re
from numpy import random as r
from numpy import *
import shutil
# Surrogate modelling
import itertools
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# Test function characteristics
import statistics as st
from scipy import signal, misc, ndimage
def measure(heuristicpath, heuristic_name, funcpath, funcname, objs, args, scale, connection):
'''
This function runs each optimization process of the heuristic with one test function
'''
# Seeding the random module for generating the initial point of the optimizer: Utilising random starting point for experimental validity
r.seed(int(time.time()))
# loading the heuristic object into the namespace and memory
spec = importlib.util.spec_from_file_location(heuristic_name, heuristicpath)
heuristic = importlib.util.module_from_spec(spec)
spec.loader.exec_module(heuristic)
testspec = importlib.util.spec_from_file_location(funcname, funcpath)
func = importlib.util.module_from_spec(testspec)
testspec.loader.exec_module(func)
# Defining a random initial point to start testing the algorithms
initpoint = [r.random() * scale, r.random() * scale]
#This timer calculates directly the CPU time of the process (Nanoseconds)
tic = time.process_time_ns()
# running the test by calling the heuritic script with the test function as argument
best = heuristic.main(func, objs, initpoint, args)
toc = time.process_time_ns()
# ^^ The timer ends right above this; the CPU time is then calculated below by simple difference ^^
# Building the response
response = "The optimum point obtained is: " + str(best) + "\nThe CPU time of the process was: " + str((toc - tic)*(10**-9)) + " Seconds"
connection.send(response)
def writerepresentation(funcpath, charas):
# Save a backup copy of the function file
shutil.copyfile(funcpath, funcpath + '.old')
# create a string format of the representation variables
representation = ''
for line in list(charas):
representation += '\n\t#_# ' + line + ': ' + str(charas[line])
representation+='\n'
# Creating the new docstring to be inserted into the file
with open(funcpath, "r") as file:
content = file.read()
docstrs = re.findall("def main\(.*?\):.*?'''(.*?)'''.*?return\s+.*?", content, re.DOTALL)[0]
docstrs += representation
repl = "\\1"+docstrs+"\t\\2"
# Create the new content of the file to replace the old. Overwriting the whole thing
pattrn = re.compile("(def main\(.*?\):.*?''').*?('''.*?return\s+.*?\n|$)", flags=re.DOTALL)
newContent = pattrn.sub(repl, content, count=1)
# Overwrite the test function file
with open(funcpath,"w") as file:
file.write(newContent)
def representfunc(funcpath):
#defining the function name
funcname = path.splitext(path.basename(funcpath))[0]
# loading the function to be represented
spec = importlib.util.spec_from_file_location(funcname, funcpath)
funcmodule = importlib.util.module_from_spec(spec)
spec.loader.exec_module(funcmodule)
# Finding the function characteristics inside the docstring
if funcmodule.main.__doc__:
regex = re.compile("#_#\s?(\w+):\s?([-+]?(\d+(\.\d*)?|\.\d+)([eE][-+]?\d+)?)")
characs = re.findall(regex, funcmodule.main.__doc__)
results = {}
for charac in characs:
results[charac[0]] = float(charac[1])
# Automatically generate the representation if the docstrings did not return anything
if not ('Represented' in results):
print("Warning, the Representation of the Test Function has not specified\n===\n******Calculating the Characteristics******")
n = int(results['dimmensions'])
# pickle these steps
coords = arange(-10,10,0.5)
samplemx = array([*itertools.product(coords, repeat=n)])
funcmap = array([* map(funcmodule.main, samplemx)])
# Arrays for plotting the test function
X = array([tp[0] for tp in samplemx])
Y = array([tp[1] for tp in samplemx])
Z = array(funcmap)
# reshaping the array into a 3D topology
topology = reshape(Z,(coords.size,coords.size))
ck = topology
# Plotting the test function
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
ax.plot_trisurf(X, Y, Z)
# plt.show()
# Number of Modes filter the data for local optima: look for circle like shapes, or squares or rectangles of very low derivative (tip of modes)
# Valleys and Bassins
# Alternative filter used for calculating derivatives
#derfilt = array([1.0, -2, 1.0], dtype=float32)
#alpha = signal.sepfir2d(ck, derfilt, [1]) + signal.sepfir2d(ck, [1], derfilt)
# Currently used filter for Valley detection
hor = array([[0,1,1],[-1,0,1], [-1,-1,0]])
vert = array([[-1,-1,0], [-1,0,1], [0,1,1]])
for i in range(1): betaH = signal.convolve(ck,hor,mode='valid')
for i in range(1): betaV = signal.convolve(ck,vert, mode='valid')
beta = sqrt(betaH ** 2 + betaV ** 2)
#beta = beta[5:-5][5:-5]
norm = linalg.norm(beta)
beta/= norm # normalized matrix
# custom filter for detection should light up the locaton of pattern
kernel = array([[1,1,1], [1,100,1], [1,1,1]])
beta = beta < average(beta)
beta = beta * 1
for i in range(100):
beta = ndimage.convolve(beta,kernel)
beta = beta >= 101
beta = beta * 1
if any(beta): results['Valleys'] = True
# Separability: calculate the derivatives in one dimension and see if independant from other dimension
# Dimensionality: number of objectives, inputs: call function once and see what it gives | for number of inputs call until it works; try catch
# Pareto fronts:
# Noisyness: use the previously generated DOE and calculate a noisyness factor; average of derivative
# Displaying the plots for development purposes
#img1 = plt.figure()
#ax2 = img1.add_subplot(111)
#ax2.imshow(alpha)
img2 = plt.figure()
ax3 = img2.add_subplot(111)
ax3.imshow(beta)
plt.show()
print("should be plotted")
# Writing the calculated representation into the test function file
# results['Represented'] = True
writerepresentation(funcpath, results)
return results
def doe(heuristicpath, heuristic_name, testfunctionpaths, funcnames, objs, args, scale):
# logic variables to deal with the processes
proc = []
connections = {}
# loading the test functions into the namespace and memory
for idx, funcpath in enumerate(testfunctionpaths):
funcname = funcnames[idx]
# Creating the connection objects for communication between the heuristic and this module
connections[funcname] = multiprocessing.Pipe(duplex=False)
proc.append(multiprocessing.Process(target=measure, name=funcname, args=(heuristicpath, heuristic_name, funcpath, funcname, objs, args, scale, connections[funcname][1])))
# defining the response variables
responses = {}
failedfunctions = {}
processtiming = {}
# defining some logic variables
for idx,process in enumerate(proc):
process.start()
# Waiting for all the runs to be
# multiprocessing.connection.wait([process.sentinel for process in proc])
for process in proc: process.join()
for process in proc:
run = process.name
if process.exitcode == 0: responses[run] = connections[run][0].recv()
else:
responses[run] = "this run was not successful"
failedfunctions[run] = process.exitcode
connections[run][0].close()
connections[run][1].close()
# display output
print("\n\n||||| Responses |||||")
for process in proc: print(process.name + "____\n" + str(responses[process.name]) + "\n_________________")
if __name__ == '__main__':
heuristicpath = "SampleAlgorithms/SimmulatedAnnealing.py"
heuristic_name = "SimmulatedAnnealing"
testfunctionpaths = ["TestFunctions/Bukin2.py", "TestFunctions/Bukin4.py", "TestFunctions/Brown.py"]
funcnames = ["Bukin2", "Bukin4", "Brown"]
# testfunctionpaths = ["/home/remi/Documents/MDAF-GitLAB/SourceCode/TestFunctions/Bukin4.py"]
# funcnames = ["Bukin4"]
objs = 0
args = {"high": 200, "low": -200, "t": 1000, "p": 0.95}
scale = 1
#doe (heuristicpath, heuristic_name, testfunctionpaths, funcnames, objs, args, scale)
representfunc("TestFunctions/Bukin6.py")
# %%