I wrote a simple multitthreading program which does particle swarm optimization. Optimization itself and some corresponding functions were taken from https://nathanrooy.github.io/posts/2016-08-17/simple-particle-swarm-optimization-with-python/. This is my first attempt to write multithread Python code :)
Code has 4 files:
- const.py - just some constants
- main.py - entry point
- particle.py particle class as a thread
- particle_producer - class which synchronizes all particles and computes next position/velocity
Thank you for your help :)
const.py
MAX_ITERATIONS = 300
NUMBER_OF_PARTICLES = 10
NUM_DIMENSIONS = 2
main.py
import threading
import random
from consts import NUM_DIMENSIONS, NUMBER_OF_PARTICLES
from particle import Particle
from particle_producer import ParticleSwarmProducer
import numpy as np
# Function to optimize
def fnc(x):
total=0
for i in range(len(x)):
total+=x[i]**2
return total
if __name__ == '__main__':
x_down, x_up = (-100, 100)
# All shared datastrucures between particles and particle producer
dict_shared_new_position = {i: list(np.random.uniform(x_down, x_up, NUM_DIMENSIONS)) for i in range(0, NUMBER_OF_PARTICLES)}
dict_shared_best_position = dict_shared_new_position.copy()
dict_velocity = {i: [random.uniform(-1,1)] * NUM_DIMENSIONS for i in range(0, NUMBER_OF_PARTICLES)}
dict_shared_errors = {i: -1 for i in range(0, NUMBER_OF_PARTICLES)}
dict_shared_is_ready = {i: False for i in range(0, NUMBER_OF_PARTICLES)}
bounds = [(x_down, x_up) for i in range(NUMBER_OF_PARTICLES)]
condition_wait = threading.Condition()
producer = ParticleSwarmProducer(initial_particle_position=dict_shared_new_position,
bounds=bounds,
dict_shared_errors=dict_shared_errors,
dict_shared_is_ready=dict_shared_is_ready,
dict_shared_new_position=dict_shared_new_position,
dict_shared_best_positions=dict_shared_best_position,
dict_velocity=dict_velocity,
condition_wait=condition_wait
)
particles = []
for i in range(NUMBER_OF_PARTICLES):
p = Particle(thread_id=i,
name='Thread ' + str(i+1),
dict_shared_errors=dict_shared_errors,
dict_shared_is_ready=dict_shared_is_ready,
dict_shared_new_position=dict_shared_new_position,
dict_shared_best_positions=dict_shared_best_position,
fnc=fnc,
condition_wait=condition_wait)
particles.append(p)
producer.start()
for p in particles:
p.start()
producer.join()
for p in particles:
p.join()
particle.py
import threading
from consts import MAX_ITERATIONS
threadLock = threading.Lock()
class Particle(threading.Thread):
""" Represents one particle object with specific position/velocity"""
def __init__(self,
thread_id,
name,
dict_shared_errors,
dict_shared_is_ready,
dict_shared_new_position,
dict_shared_best_positions,
fnc,
condition_wait
):
threading.Thread.__init__(self)
self.thread_id = thread_id
self.name = name
self.dict_shared_errors = dict_shared_errors
self.dict_shared_is_ready = dict_shared_is_ready
self.dict_shared_new_positions = dict_shared_new_position
self.dict_shared_best_positions = dict_shared_best_positions
self.fnc = fnc
self.condition_wait = condition_wait
def run(self) -> None:
it = 1
best_particle_error = 9999
while it < MAX_ITERATIONS+1:
# print(f'{self.name} waiting for a new job...')
threadLock.acquire(blocking=True)
self.dict_shared_is_ready[self.thread_id] = True
threadLock.release()
with self.condition_wait:
self.condition_wait.wait()
threadLock.acquire(blocking=True)
position = self.dict_shared_new_positions[self.thread_id]
error = self.fnc(position)
if error < best_particle_error:
self.dict_shared_best_positions[self.thread_id] = position
best_particle_error = error
self.dict_shared_errors[self.thread_id] = self.fnc(position)
# set to not ready
self.dict_shared_is_ready[self.thread_id] = False
threadLock.release()
# print(f'{self.name} working on task')
it +=1
# print(f'{self.name}, {it}')
particle_producer.py
import threading
import time
import random
import numpy as np
from consts import NUM_DIMENSIONS, NUMBER_OF_PARTICLES, MAX_ITERATIONS
threadLock = threading.Lock()
class ParticleSwarmProducer(threading.Thread):
""" Synchronizes all particles and computes their next position/velocity"""
def __init__(self,
initial_particle_position,
bounds,
dict_shared_errors,
dict_shared_is_ready,
dict_shared_new_position,
dict_shared_best_positions,
dict_velocity,
condition_wait):
threading.Thread.__init__(self)
self.initial_particle_positions = initial_particle_position
self.bounds = bounds
self.dict_velocity = dict_velocity
self.dict_best_positions = dict_shared_best_positions
self.dict_shared_errors = dict_shared_errors
self.dict_shared_is_ready = dict_shared_is_ready
self.dict_shared_new_position = dict_shared_new_position
self.condition_wait = condition_wait
self.current_iteration = 0
self.err_best_g = -1 # best error for group
self.pos_best_g = [] # best position for group
# Used for plotting
self.output_pos = {i: np.empty((0, NUM_DIMENSIONS)) for i in range(NUMBER_OF_PARTICLES)}
def run(self) -> None:
i = 1
while i < MAX_ITERATIONS+1:
threadLock.acquire(blocking=True)
ready = list(self.dict_shared_is_ready.values())
# If all particles have finished their current jobs...
if all(ready):
print(f'Iteration {i}')
print(f'Current positions: {self.dict_shared_new_position}')
print(f'Current errors: {self.dict_shared_errors}')
print(f'Current velocities: {self.dict_velocity}')
self.add_pos_to_out()
self.evaluate_all_particles()
self.update_all_particles()
print('All particles go!')
with self.condition_wait:
self.condition_wait.notifyAll()
i += 1
threadLock.release()
# time.sleep(0.2)
time.sleep(0.02)
with self.condition_wait:
self.condition_wait.notifyAll()
print(f'Current positions: {self.dict_shared_new_position}')
print(f'Error: {self.err_best_g}')
def evaluate_all_particles(self):
for i in range(NUMBER_OF_PARTICLES):
if self.dict_shared_errors[i] < self.err_best_g or self.err_best_g == -1:
self.pos_best_g = list(self.dict_shared_new_position[i])
self.err_best_g = float(self.dict_shared_errors[i])
def update_all_particles(self):
for i in range(NUMBER_OF_PARTICLES):
self.update_velocity(i)
self.update_position(i)
def add_pos_to_out(self):
for i in range(NUMBER_OF_PARTICLES):
self.output_pos[i] = np.vstack((self.output_pos[i], self.dict_shared_new_position[i]))
def update_velocity(self, i):
w = 0.5 # constant inertia weight (how much to weigh the previous velocity)
c1 = 1 # cognative constant
c2 = 2 # social constant
for j in range(0, NUM_DIMENSIONS):
r1 = random.random()
r2 = random.random()
vel_cognitive = c1 * r1 * (self.dict_best_positions[i][j] - self.dict_shared_new_position[i][j])
vel_social = c2 * r2 * (self.pos_best_g[j] - self.dict_shared_new_position[i][j])
self.dict_velocity[i][j] = w * self.dict_velocity[i][j] + vel_cognitive + vel_social
def update_position(self, i):
for j in range(0, NUM_DIMENSIONS):
self.dict_shared_new_position[i][j] = self.dict_shared_new_position[i][j] + self.dict_velocity[i][j]
# adjust maximum position if necessary
if self.dict_shared_new_position[i][j] > self.bounds[i][1]:
self.dict_shared_new_position[i][j] = self.bounds[i][1]
# adjust minimum position if neseccary
if self.dict_shared_new_position[i][j] < self.bounds[i][0]:
self.dict_shared_new_position[i][j] = self.bounds[i][0]
1 Answer 1
Some suggestions:
- Run the code through
black
andisort --profile=black
to format it more idiomatically without any manual work. - Run the code through
flake8
orpylint
to check for other maintainability issues. - Run the code through
mypy
, ideally with a strict configuration, and annotate accordingly. It'll help readers make sense of the arguments and return values, and makes type prefixes likedict_
redundant. - Code should ideally be grouped according to what it belongs with, rather than what it is. For example, you wouldn't group all classes in one file, so why group all the constants?
i = 1; while i < MAX_ITERATIONS+1:
can be simplified asfor iteration in range(1, MAX_ITERATIONS + 1)
, or more idiomatically,for index in range(MAX_ITERATIONS)
.range(0, limit)
can be simplified torange(limit)
.
[cat overflow. please hold.]