Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

qxsch/WorkerPool

Repository files navigation

WorkerPool

Project Status

Latest Stable Version Total Downloads License

Parallel Processing WorkerPool for PHP

10K Downloads within 4 Months, thank you very much! We're adding features as anyone requires them.

Examples

The WorkerPool class provides a very simple interface to pass data to a worker pool and have it processed. You can at any time fetch the results from the workers. Each worker child can receive and return any value that can be serialized.

A simple example

<?php
$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
 ->create(new \QXS\WorkerPool\ClosureWorker(
 /**
 * @param mixed $input the input from the WorkerPool::run() Method
 * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
 * @param \ArrayObject $storage a persistent storage for the current child process
 */
 function($input, $semaphore, $storage) {
 echo "[".getmypid()."]"." hi $input\n";
 sleep(rand(1,3)); // this is the working load!
 return $input; // return null here, in case you do not want to pass any data to the parent 
 }
 )
);
for($i=0; $i<10; $i++) {
 $wp->run($i);
}
$wp->waitForAllWorkers(); // wait for all workers
foreach($wp as $val) {
	echo $val->dump() . "\n"; // dump the returned values
 // var_dump($val); // dump the returned values
}

A more sophisticated example

<?php
use QXS\WorkerPool\WorkerPool;
use QXS\WorkerPool\WorkerInterface;
use QXS\WorkerPool\Semaphore;
/**
 * Our Worker Class
 */
Class MyWorker implements WorkerInterface {
 protected $sem;
 /**
 * after the worker has been forked into another process
 *
 * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to run synchronized tasks
 * @throws \Exception in case of a processing Error an Exception will be thrown
 */
 public function onProcessCreate(Semaphore $semaphore) {
 // semaphore can be used in the run method to synchronize the workers
 $this->sem=$semaphore;
 // write something to the stdout
 echo "\t[".getmypid()."] has been created.\n";
 // initialize mt_rand
 list($usec, $sec) = explode('', microtime());
 mt_srand((int)( (float) $sec + ((float) $usec * 100000) ));
 }
 /**
 * before the worker process is getting destroyed
 *
 * @throws \Exception in case of a processing Error an Exception will be thrown
 */
 public function onProcessDestroy() {
 // write something to the stdout
 echo "\t[".getmypid()."] will be destroyed.\n";
 }
 /**
 * run the work
 *
 * @param Serializeable $input the data, that the worker should process
 * @return Serializeable Returns the result
 * @throws \Exception in case of a processing Error an Exception will be thrown
 */
 public function run($input) {
 $input=(string)$input;
 echo "\t[".getmypid()."] Hi $input\n";
 sleep(mt_rand(0,10)); // this is the workload!
 // and sometimes exceptions might occur
 if(mt_rand(0,10)==9) {
 throw new \RuntimeException('We have a problem for '.$input.'.');
 }
 return "Hi $input"; // return null here, in case you do not want to pass any data to the parent
 }
}
$wp=new WorkerPool();
$wp->setWorkerPoolSize(10)
 ->create(new MyWorker());
// produce some tasks
for($i=1; $i<=50; $i++) {
 $wp->run($i);
}
// some statistics
echo "Busy Workers:".$wp->getBusyWorkers()." Free Workers:".$wp->getFreeWorkers()."\n";
// wait for completion of all tasks
$wp->waitForAllWorkers();
// collect all the results
foreach($wp as $val) {
 if(isset($val['data'])) {
 echo "RESULT: ".$val['data']."\n";
 }
 elseif(isset($val['workerException'])) {
 echo "WORKER EXCEPTION: ".$val['workerException']['class'].": ".$val['workerException']['message']."\n".$val['workerException']['trace']."\n";
 }
 elseif(isset($val['poolException'])) {
 echo "POOL EXCEPTION: ".$val['poolException']['class'].": ".$val['poolException']['message']."\n".$val['poolException']['trace']."\n";
 }
}
// write something, before the parent exits
echo "ByeBye\n";

Synchronize your workers

In case you need to access shared ressources, you can synchronize your workers.

<?php
$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
 ->create(new \QXS\WorkerPool\ClosureWorker(
 /**
 * @param mixed $input the input from the WorkerPool::run() Method
 * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
 * @param \ArrayObject $storage a persistent storge for the current child process
 */
 function($input, $semaphore, $storage) {
				$semaphore->synchronizedBegin();
				try {
 // this code is being synchronized accross all workers
					// so here we have just one worker at a time
 echo "[A][".getmypid()."]"." hi $input\n";
				}
				finally {
 	$semaphore->synchronizedEnd();
				}
				
 // alternative example
 $semaphore->synchronize(function() use ($input, $storage) {
 // this code is being synchronized accross all workers
					// so here we have just one worker at a time
 echo "[B][".getmypid()."]"." hi $input\n";
 });
 sleep(rand(1,3)); // this is the working load!
 return $input;
 }
 )
);
for($i=0; $i<10; $i++) {
 $wp->run($i);
}
$wp->waitForAllWorkers(); // wait for all workers
foreach($wp as $val) {
 var_dump($val); // dump the returned values
}

Disable semaphore (ability to synchronize workers)

You can disable the semaphore. Some people complained about opening semaphores, that they do not need at all.

<?php
$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
 ->disableSemaphore() // <--- this disables the semaphore support (you can still use it in the worker, but it will have no effect)
 ->create(new \QXS\WorkerPool\ClosureWorker(
 /**
 * @param mixed $input the input from the WorkerPool::run() Method
 * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
 * @param \ArrayObject $storage a persistent storage for the current child process
 */
 function($input, $semaphore, $storage) {
 echo "[".getmypid()."]"." hi $input\n";
 sleep(rand(1,3)); // this is the working load!
 return $input; // return null here, in case you do not want to pass any data to the parent 
 }
 )
);
for($i=0; $i<10; $i++) {
 $wp->run($i);
}
$wp->waitForAllWorkers(); // wait for all workers
foreach($wp as $val) {
 var_dump($val); // dump the returned values
}

Automatic respawn

You can choose to automatically respawn dead workers.

<?php
$wp=new \QXS\WorkerPool\WorkerPool();
$wp->setWorkerPoolSize(4)
 ->respawnAutomatically()
 ->create(new \QXS\WorkerPool\ClosureWorker(
 /**
 * @param mixed $input the input from the WorkerPool::run() Method
 * @param \QXS\WorkerPool\Semaphore $semaphore the semaphore to synchronize calls accross all workers
 * @param \ArrayObject $storage a persistent storage for the current child process
 */
 function($input, $semaphore, $storage) {
 echo "[".getmypid()."]"." hi $input\n";
 sleep(rand(1,3)); // this is the working load!
 // Simulate unexpected worker death
 if (rand(1, 10) > 5) exit;
 return $input; // return null here, in case you do not want to pass any data to the parent 
 }
 )
);
for($i=0; $i<10; $i++) {
 $wp->run($i);
}
$wp->waitForAllWorkers(); // wait for all workers
foreach($wp as $val) {
 var_dump($val); // dump the returned values
}

Each time a worker dies, a new one will be created with an incremented index.

You should avoid the situation where a worker dies but the respawn capability can be a useful workaround until you fix the situation.

Transparent output to ps

See what's happening when running a PS:

root 2378 \_ simpleExample.php: Parent
root 2379 \_ simpleExample.php: Worker 1 of QXS\WorkerPool\ClosureWorker [busy]
root 2380 \_ simpleExample.php: Worker 2 of QXS\WorkerPool\ClosureWorker [busy]
root 2381 \_ simpleExample.php: Worker 3 of QXS\WorkerPool\ClosureWorker [free]
root 2382 \_ simpleExample.php: Worker 4 of QXS\WorkerPool\ClosureWorker [free]

Documentation

The documentation can be found here http://qxsch.github.io/WorkerPool/doc/

About

Parallel Processing WorkerPool for PHP

Topics

Resources

License

Stars

Watchers

Forks

Sponsor this project

Packages

Contributors

Languages

AltStyle によって変換されたページ (->オリジナル) /