I have implemented two pieces of code as part of an Evolutionary Algorithm study: Particle Swarm Optimisation (PSO) and Genetic Algorithm (GA). Both are designed to find an optimal solution to a problem, in this case finding the global minimum of a mathematical function.
PSO performs very efficiently and quickly requiring very few iterations. GA, however, uses a lot of memory and I cannot see why.
The memory used depends on the input dimensions and population size, but it has been well over 8GB in some cases without reaching the max number of iterations (terminated myself).
I have tried using the garbage collector, gc
, without success. Can anyone see why this may be? Also feel free to comment on the coding style.
To run the code, you will need use cec2005real (pip install cec2005real
). The idea is a list of numbers is passed into a function from cec2005real, and a single value>0 called the fitness is returned (0 is the optimal solution).
import numpy as np
from cec2005real.cec2005 import Function
np.random.seed(12)
class GeneticAlgorithm():
def __init__(self, pop_size, mutation_rate, dimen):
self.pop_size = pop_size
self.mutation_rate = mutation_rate
self.dimen = dimen
def _initialize(self):
self.population = np.empty(shape=(self.pop_size,self.dimen))
for i in range(self.pop_size):
individual = np.random.uniform(-100.0,100.0,self.dimen)
self.population[i] = individual
def _calculate_fitness(self):
pop_fitness = np.empty(self.pop_size)
for i,individual in enumerate(self.population):
fbench = Function(1, self.dimen)
fitness_func = fbench.get_eval_function()
fitness = fitness_func(individual)
pop_fitness[i]=fitness
return pop_fitness
def _tournamentSelection(self, pop_fitness):
N = len(pop_fitness)
best_fitness = 0.0
idx = 0
for i_ in range(3):
j = np.random.randint(0,N)
fit = pop_fitness[j]
if fit > best_fitness:
best_fitness = fit
idx = j
return idx
def _mutate(self, individual):
for j in range(len(individual)):
if np.random.random() < self.mutation_rate:
individual[j] = np.random.uniform(-100.0,100.0)
return individual
def _twoPointCrossover(self, parent1, parent2):
N = len(parent1)
cx1 = np.random.randint(0,N)
cx2 = np.random.randint(0,N)
if cx1 == cx2:
if cx1 == 0:
cx2 += 1
else:
cx1 -= 1
if cx2 < cx1:
cx1,cx2 = cx2,cx1
child1 = np.concatenate((parent1[:cx1], parent2[cx1:cx2], parent1[cx2:]))
child2 = np.concatenate((parent2[:cx1], parent1[cx1:cx2], parent2[cx2:]))
return child1, child2
def main(self, iterations):
self._initialize()
for epoch in range(iterations):
pop_fitness = self._calculate_fitness()
fittest_individual = self.population[np.argmin(pop_fitness)]
min_fitness = min(pop_fitness)
pop_fitness = 1/np.array(pop_fitness)
mfit = 1/min_fitness
probabilities = [f / sum(pop_fitness) for f in pop_fitness]
# Determine the next generation
new_pop = np.empty(shape=(self.pop_size,self.dimen))
for i in np.arange(0, self.pop_size, 2):
idx1 = self._tournamentSelection(pop_fitness)
idx2 = self._tournamentSelection(pop_fitness)
# Perform crossover to produce children
child1, child2 = self._twoPointCrossover(self.population[idx1],
self.population[idx2])
# Save mutated children for next generation
new_pop[i] = self._mutate(child1)
new_pop[i+1] = self._mutate(child2)
self.population = new_pop
if epoch%1000==0:
print ("[Epoch %d, Fitness: %f]" % (epoch+1, min_fitness))
if __name__ == "__main__":
ga = GeneticAlgorithm(10, 0.01, 10)
ga.main(100000)
2 Answers 2
All of these work only if all of your objects are numpy.array
s, so just make sure they are (or uncomment the casts in each method, but that makes it slightly slower).
In the _tournamentSelection
method you can use np.random.choice
with the optional second argument to get three random elements (with replacement as in your code, but you can also set replace=False
if you want to) from the array:
def _tournamentSelection(self, pop_fitness):
# pop_fitness = np.array(pop_fitness)
return np.random.choice(pop_fitness, 3).max()
Your _mutate
method can also be vectorized using numpy.where
:
def _mutate(self, individual):
# individual = np.array(individual)
return np.where(np.random.random(len(individual)) < self.mutation_rate,
np.random.uniform(-100., 100, len(individual)),
individual)
Note that this trades using more memory (because of keeping three arrays the size of your population in memory) for more speed (iterating at C speed, instead of at Python interpreter speed).
As you rightfully observed your memory problem was because you kept on initializing a new function. You can go one step further and use the numpy.vectorize
wrapper, to make the code very short and readable:
def _calculate_fitness(self):
# self.population = np.array(self.population)
fbench = Function(1, self.dimen)
fitness_func = np.vectorize(fbench.get_eval_function())
return fitness_func(self.population)
Note that this is not faster than writing the for
loop, but easier to read :)
You can go even further if self.dimen
never changes and define it only once, in your constructor:
def __init__(self):
self.pop_size = pop_size
self.mutation_rate = mutation_rate
self.dimen = dimen
self._fbench = Function(1, dimen)
self._calculate_fitness = np.vectorize(self.fbench.get_eval_function())
In your _initialize
method you can directly make the population the right shape:
def _initialize(self):
self.population = np.random.uniform(-100., 100., (self.pop_size, self.dimen))
Finding the least fit individual also becomes then min_fitness = pop_fitness.min()
instead of min_fitness = min(pop_fitness)
.
-
\$\begingroup\$ Thank you. I'll get implementing these. Useful information for the future. \$\endgroup\$McGuile– McGuile2018年11月12日 14:48:23 +00:00Commented Nov 12, 2018 at 14:48
The heavy memory usage was because of the calculate_fitness()
function.
I declared a new fitness function from CEC2005 every time I wanted to calculate the fitness, rather than pass in the already declared function.
There are no memory issues anymore. Apologies to any that may have tried to figure it out.
Explore related questions
See similar questions with these tags.