Thread-worker

PHP multi-thread worker message based library

View the Project on GitHub drealecs/thread-worker

drealecs/thread-worker

PHP multi-thread-worker message/event library Build Status

Introduction

Thread-worker is a library that allows execution of tasks in parallel by multiple PHP processes on the same computer or on different computers.

The library has a lot of concepts borrowed from other languages.

Concepts

Task, TaskResult and TaskException are the entities that are being serialized as "messages" and write to/read from the queue.

API

Task

A code that will be executed remotely cannot share variables or context with the calling code. When defining a function that can be executed asynchronously it must be created as a Task by extending \ThreadWorker\Task and implementing method run():

class AddTask extends ThreadWorker\Task
{
    public function run($a, $b)
    {
        returns $a + $b;
    }
}

and after that, the task can be used in this way:

$task = new AddTask(3, 5);
$result = $task();

and this will make $result equals 8.

Just like a function, a task can return a value or not.

Of course, the example above execute a task locally, synchronously. To execute it asynchronously we will need a Queue and an Executor.

Queue

To synchronize working tasks a queue concept is being used.

There is a queue, someone puts a task in the queue and there are workers that take it and run it.

Interface of a task queue:

Currently there is only one implementation of Queue: \ThreadWorker\RedisQueue and there are plans for: AMQPQueue, MySQLQueue

Executor

Executor wraps a queue and provide a simpler interface to queue task and work on tasks and get task results.

There is a QueueExecutor that has 2 methods:

Let's look at an example:

$queue = new ThreadWorker\RedisQueue('example');
$executor = new ThreadWorker\QueueExecutor($queue);

$task = new AddTask(3, 5);
$queuedTask = $executor->submit($task);
$result = $queuedTask->getResult()->getValue();

QueueExecutor is extended into RemoteExecutor that does the asynchronous running of the tasks. Worker's code would look like this:

$queue = new ThreadWorker\RedisQueue('example');
$worker = new ThreadWorker\RemoteExecutor($queue);

$worker->work();

An instance of RemoteExecutor is passed as an extra parameter to the run() method of the task and can be use to queue more tasks.