Building My Own Async PHP

A lot of what I do involves data processing. I retrieve resources like database records or files, then iterate through each one to process it. The biggest problem I run into is performance due to the sheer amount of data that needs processing. Since PHP runs on a single thread, if I need to process a million records, PHP has to go through each one sequentially. However, modern computers are capable of executing multiple tasks simultaneously, so can PHP leverage that hardware to process data in parallel?

Existing Solutions

There are many libraries and frameworks that help achieve asynchronous PHP, such as Swoole, ReactPHP, and Amp, as well as extensions like PThreads and Parallel. However, my goal was to use native PHP features without additional dependencies while still being able to receive data after it has been processed. This left me with the following options:

  • pcntl_fork - Requires the Process Control extension and does not work on non-Unix systems.
  • stream_socket_server - Requires creating a server, which seems excessive.
  • shell_exec() or exec() - Cannot capture real-time output.
  • proc_open() - Does not require extensions, can capture output in real-time, and is lightweight.

I decided to go with proc_open(). In a serious application, you would likely use existing solutions, but this was an experiment to see how close I could get to async PHP using built-in features.

Building Blocks

These are the key functions that enable us to implement something resembling async PHP:

  • proc_open - Starts an external process and allows interaction via pipes (stdin, stdout, stderr).
  • stream_set_blocking - Sets a stream to blocking or non-blocking mode, allowing us to avoid execution being halted while waiting for input.
  • stream_select - Monitors multiple streams for readability, writability, or errors, making it useful for handling multiple socket connections or non-blocking I/O.
  • stream_get_contents - Reads the full content from a stream, useful for reading from pipes, sockets, or remote connections.

The Plan

The library should be able to take in tasks (callables) and process each one in a separate process so they do not block each other. If I have 1000 tasks, I should not have all 1000 running simultaneously, as too much time would be spent on context switching. Instead, the number of active tasks should be limited — ideally, to match the available CPU threads. Data from each process should be retrieved as soon as it becomes available, and when a process ends, another should take its place to ensure that the maximum number of allowed processes is always working on tasks.

Execution

I liked the API of the Spatie Async library, so I decided to mimic it. In the example below, 15 tasks are created and executed in parallel, with their results summed up:

use Enxas\Pumice;

$pumice = Pumice::create();
$total = 0;

for ($i = 0; $i < 15; $i++) {
	$pumice
		->add([new StuffCalculator, 'calculate'])
		->then(function (mixed $output) use (&$total) {
			$total += $output;
		})->catch(function (Throwable $exception) {
			echo "Error: {$exception->getMessage()}\n";
		});
}

$pumice->wait();

echo $total;

If no maximum number of concurrent processes is provided, then the available thread count is used:

private function getProcessorCores(): int
{
	return (int) match (PHP_OS_FAMILY) {
		'Windows' => shell_exec('echo %NUMBER_OF_PROCESSORS%'),
		'Linux' => shell_exec('nproc'),
		'Darwin' => shell_exec('sysctl -n hw.ncpu'), // MacOS
		default => 8,
	};
}

Since tasks are objects and PHP does not allow passing objects as command-line arguments, I serialize and base64 encode the object before passing it to a separate worker process. Once the process is created, I interact with it through stdin, stdout, and stderr pipes, setting those pipes to non-blocking mode to prevent blocking other processes.

$serializedTask = base64_encode(serialize($task));
$cmd = ["php", "-f", __DIR__ . "/worker.php", $serializedTask];

$process = proc_open($cmd, $this->descriptorSpec, $this->pipes[$index]);

if (is_resource($process)) {
	stream_set_blocking($this->pipes[$index][self::STDOUT], false);
	stream_set_blocking($this->pipes[$index][self::STDERR], false);

	$this->processes[$index] = $process;
}

The worker.php script is straightforward. It decodes and unserializes the received command-line argument (the task object), executes the callable, and then returns the serialized and encoded response back to the main process.

if ($argc >= 2) {
	try {
		$task = unserialize(base64_decode($argv[1]));

		if (!is_callable($task)) {
			throw new Exception("Task is not callable.");
		}

		// Execute task
		$output = $task();

		// Wrap successful output
		$response = [
			'status' => 'success',
			'data'   => $output
		];
	} catch (Throwable $e) {
		// Wrap error output
		$response = [
			'status'  => 'error',
			'message' => $e->getMessage()
		];
	}

	// Always return a structured response
	echo base64_encode(serialize($response));

	flush();
}

The Main Logic

The core logic happens in the wait() function. It starts an initial batch of tasks, ensuring that no more than the allowed number of tasks run at the same time. As each task completes, a new one starts to maintain the limit.

// Start a batch of $maxConcurrent tasks
while (count($this->processes) < $this->maxConcurrent && $this->taskIndex < count($this->tasks)) {
	$this->startNewTask();
}

We monitor the output pipes of the running processes. stream_select() blocks execution until at least one process has data available:

$readStreams = array_map(fn($pipes) => $pipes[self::STDOUT], $this->pipes);
$writeStreams = null;
$exceptStreams = null;

// Wait until one or more streams are ready to read
if (stream_select($readStreams, $writeStreams, $exceptStreams, null) > 0) {

When output is available, we retrieve, decode, and deserialize it:

foreach ($this->processes as $i => $process) {
	if (in_array($this->pipes[$i][self::STDOUT], $readStreams, true)) {
		$encodedOutput = stream_get_contents($this->pipes[$i][self::STDOUT]);

		if ($encodedOutput !== false && $encodedOutput !== '') {
			$output = unserialize(base64_decode($encodedOutput));

Finally, we call the success or error callbacks with the resulting data:

// Handle success/failure callbacks
if (isset($output['status']) && $output['status'] === 'error') {
	if ($this->callbacks[$i]['catch']) {
		call_user_func($this->callbacks[$i]['catch'], new \Exception($output['message']));
	}
} elseif ($this->callbacks[$i]['then']) {
	call_user_func($this->callbacks[$i]['then'], $output['data']);
}

The Result

Let's test the performance improvement by comparing async execution with synchronous execution. First lets create a worker class.

<?php

namespace App\Workers;

class StuffCalculator
{
	public function calculate(): float
    {
		$result = 0;

        for ($i = 0; $i < 10_000_000; $i++) {
            $result += sqrt($i) * log($i + 1) * sin($i) * cos($i) * exp($i % 10);
        }

		return $result;
    }
}

Then measure time how long it takes to complete 15 tasks between sync and async versions.

$startTime1 = microtime(true);

$pumice = Pumice::create();
$total1 = 0;

for ($i = 0; $i < 15; $i++) {
	$pumice
		->add([new StuffCalculator, 'calculate'])
		->then(function (mixed $output) use (&$total1) {
			$total1 += $output;
		})->catch(function (Throwable $exception) {
			echo "Error: {$exception->getMessage()}\n";
		});
}

$pumice->wait();

$elapsedTime1 = (microtime(true) - $startTime1) * 1000;

$startTime2 = microtime(true);

$total2 = 0;
for ($i = 0; $i < 15; $i++) {
	$total2 += (new StuffCalculator)->calculate();
}

$elapsedTime2 = (microtime(true) - $startTime2) * 1000;


var_dump([$elapsedTime1, $elapsedTime2, $total1, $total2]);

Results:

array(4) {
  [0]=> float(7125.4589557647705)
  [1]=> float(32699.30911064148)
  [2]=> float(2001088973.8368132)
  [3]=> float(2001088973.8368132)
}

The async version took around 7 seconds, while the synchronous version took 32 seconds. The computed results are identical, showing a significant performance boost.

In the process, I created a small library named Pumice. The source code is available on GitHub. This was a challenging but rewarding project, proving that PHP can achieve parallel processing with built-in features.

This article was updated on March 7, 2025