3
\$\begingroup\$

I wrote a simple multitthreading program which does particle swarm optimization. Optimization itself and some corresponding functions were taken from https://nathanrooy.github.io/posts/2016-08-17/simple-particle-swarm-optimization-with-python/. This is my first attempt to write multithread Python code :)

Code has 4 files:

  1. const.py - just some constants
  2. main.py - entry point
  3. particle.py particle class as a thread
  4. particle_producer - class which synchronizes all particles and computes next position/velocity

Thank you for your help :)

const.py

MAX_ITERATIONS = 300
NUMBER_OF_PARTICLES = 10
NUM_DIMENSIONS = 2

main.py

import threading
import random
from consts import NUM_DIMENSIONS, NUMBER_OF_PARTICLES
from particle import Particle
from particle_producer import ParticleSwarmProducer
import numpy as np
# Function to optimize
def fnc(x):
 total=0
 for i in range(len(x)):
 total+=x[i]**2
 return total
if __name__ == '__main__':
 x_down, x_up = (-100, 100)
 # All shared datastrucures between particles and particle producer
 dict_shared_new_position = {i: list(np.random.uniform(x_down, x_up, NUM_DIMENSIONS)) for i in range(0, NUMBER_OF_PARTICLES)}
 dict_shared_best_position = dict_shared_new_position.copy()
 dict_velocity = {i: [random.uniform(-1,1)] * NUM_DIMENSIONS for i in range(0, NUMBER_OF_PARTICLES)}
 dict_shared_errors = {i: -1 for i in range(0, NUMBER_OF_PARTICLES)}
 dict_shared_is_ready = {i: False for i in range(0, NUMBER_OF_PARTICLES)}
 bounds = [(x_down, x_up) for i in range(NUMBER_OF_PARTICLES)]
 condition_wait = threading.Condition()
 producer = ParticleSwarmProducer(initial_particle_position=dict_shared_new_position,
 bounds=bounds,
 dict_shared_errors=dict_shared_errors,
 dict_shared_is_ready=dict_shared_is_ready,
 dict_shared_new_position=dict_shared_new_position,
 dict_shared_best_positions=dict_shared_best_position,
 dict_velocity=dict_velocity,
 condition_wait=condition_wait
 )
 particles = []
 for i in range(NUMBER_OF_PARTICLES):
 p = Particle(thread_id=i,
 name='Thread ' + str(i+1),
 dict_shared_errors=dict_shared_errors,
 dict_shared_is_ready=dict_shared_is_ready,
 dict_shared_new_position=dict_shared_new_position,
 dict_shared_best_positions=dict_shared_best_position,
 fnc=fnc,
 condition_wait=condition_wait)
 particles.append(p)
 producer.start()
 for p in particles:
 p.start()
 producer.join()
 for p in particles:
 p.join()

particle.py

import threading
from consts import MAX_ITERATIONS
threadLock = threading.Lock()
class Particle(threading.Thread):
 """ Represents one particle object with specific position/velocity"""
 def __init__(self,
 thread_id,
 name,
 dict_shared_errors,
 dict_shared_is_ready,
 dict_shared_new_position,
 dict_shared_best_positions,
 fnc,
 condition_wait
 ):
 threading.Thread.__init__(self)
 self.thread_id = thread_id
 self.name = name
 self.dict_shared_errors = dict_shared_errors
 self.dict_shared_is_ready = dict_shared_is_ready
 self.dict_shared_new_positions = dict_shared_new_position
 self.dict_shared_best_positions = dict_shared_best_positions
 self.fnc = fnc
 self.condition_wait = condition_wait
 def run(self) -> None:
 it = 1
 best_particle_error = 9999
 while it < MAX_ITERATIONS+1:
 # print(f'{self.name} waiting for a new job...')
 threadLock.acquire(blocking=True)
 self.dict_shared_is_ready[self.thread_id] = True
 threadLock.release()
 with self.condition_wait:
 self.condition_wait.wait()
 threadLock.acquire(blocking=True)
 position = self.dict_shared_new_positions[self.thread_id]
 error = self.fnc(position)
 if error < best_particle_error:
 self.dict_shared_best_positions[self.thread_id] = position
 best_particle_error = error
 self.dict_shared_errors[self.thread_id] = self.fnc(position)
 # set to not ready
 self.dict_shared_is_ready[self.thread_id] = False
 threadLock.release()
 # print(f'{self.name} working on task')
 it +=1
 # print(f'{self.name}, {it}')

particle_producer.py

 import threading
 import time
 import random
 import numpy as np
 
 from consts import NUM_DIMENSIONS, NUMBER_OF_PARTICLES, MAX_ITERATIONS
 
 threadLock = threading.Lock()
 
 
 class ParticleSwarmProducer(threading.Thread):
 """ Synchronizes all particles and computes their next position/velocity"""
 def __init__(self,
 initial_particle_position,
 bounds,
 dict_shared_errors,
 dict_shared_is_ready,
 dict_shared_new_position,
 dict_shared_best_positions,
 dict_velocity,
 condition_wait):
 
 threading.Thread.__init__(self)
 
 self.initial_particle_positions = initial_particle_position
 self.bounds = bounds
 
 self.dict_velocity = dict_velocity
 
 self.dict_best_positions = dict_shared_best_positions
 self.dict_shared_errors = dict_shared_errors
 self.dict_shared_is_ready = dict_shared_is_ready
 self.dict_shared_new_position = dict_shared_new_position
 
 self.condition_wait = condition_wait
 
 self.current_iteration = 0
 self.err_best_g = -1 # best error for group
 self.pos_best_g = [] # best position for group
 
 # Used for plotting
 self.output_pos = {i: np.empty((0, NUM_DIMENSIONS)) for i in range(NUMBER_OF_PARTICLES)}
 
 def run(self) -> None:
 
 i = 1
 while i < MAX_ITERATIONS+1:
 threadLock.acquire(blocking=True)
 ready = list(self.dict_shared_is_ready.values())
 
 # If all particles have finished their current jobs...
 if all(ready):
 print(f'Iteration {i}')
 print(f'Current positions: {self.dict_shared_new_position}')
 print(f'Current errors: {self.dict_shared_errors}')
 print(f'Current velocities: {self.dict_velocity}')
 
 self.add_pos_to_out()
 
 self.evaluate_all_particles()
 self.update_all_particles()
 
 print('All particles go!')
 
 with self.condition_wait:
 self.condition_wait.notifyAll()
 i += 1
 threadLock.release()
 # time.sleep(0.2)
 
 time.sleep(0.02)
 with self.condition_wait:
 self.condition_wait.notifyAll()
 print(f'Current positions: {self.dict_shared_new_position}')
 print(f'Error: {self.err_best_g}')
 
 
 def evaluate_all_particles(self):
 for i in range(NUMBER_OF_PARTICLES):
 if self.dict_shared_errors[i] < self.err_best_g or self.err_best_g == -1:
 self.pos_best_g = list(self.dict_shared_new_position[i])
 self.err_best_g = float(self.dict_shared_errors[i])
 
 def update_all_particles(self):
 for i in range(NUMBER_OF_PARTICLES):
 self.update_velocity(i)
 self.update_position(i)
 
 def add_pos_to_out(self):
 for i in range(NUMBER_OF_PARTICLES):
 self.output_pos[i] = np.vstack((self.output_pos[i], self.dict_shared_new_position[i]))
 
 def update_velocity(self, i):
 w = 0.5 # constant inertia weight (how much to weigh the previous velocity)
 c1 = 1 # cognative constant
 c2 = 2 # social constant
 
 for j in range(0, NUM_DIMENSIONS):
 r1 = random.random()
 r2 = random.random()
 
 vel_cognitive = c1 * r1 * (self.dict_best_positions[i][j] - self.dict_shared_new_position[i][j])
 vel_social = c2 * r2 * (self.pos_best_g[j] - self.dict_shared_new_position[i][j])
 
 self.dict_velocity[i][j] = w * self.dict_velocity[i][j] + vel_cognitive + vel_social
 
 def update_position(self, i):
 for j in range(0, NUM_DIMENSIONS):
 self.dict_shared_new_position[i][j] = self.dict_shared_new_position[i][j] + self.dict_velocity[i][j]
 
 # adjust maximum position if necessary
 if self.dict_shared_new_position[i][j] > self.bounds[i][1]:
 self.dict_shared_new_position[i][j] = self.bounds[i][1]
 
 # adjust minimum position if neseccary
 if self.dict_shared_new_position[i][j] < self.bounds[i][0]:
 self.dict_shared_new_position[i][j] = self.bounds[i][0]
asked Jun 12, 2021 at 19:43
\$\endgroup\$

1 Answer 1

2
\$\begingroup\$

Some suggestions:

  1. Run the code through black and isort --profile=black to format it more idiomatically without any manual work.
  2. Run the code through flake8 or pylint to check for other maintainability issues.
  3. Run the code through mypy, ideally with a strict configuration, and annotate accordingly. It'll help readers make sense of the arguments and return values, and makes type prefixes like dict_ redundant.
  4. Code should ideally be grouped according to what it belongs with, rather than what it is. For example, you wouldn't group all classes in one file, so why group all the constants?
  5. i = 1; while i < MAX_ITERATIONS+1: can be simplified as for iteration in range(1, MAX_ITERATIONS + 1), or more idiomatically, for index in range(MAX_ITERATIONS).
  6. range(0, limit) can be simplified to range(limit).

[cat overflow. please hold.]

answered Jun 13, 2021 at 10:28
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.