Queues

Berlioz Framework integrates with the Queue Manager component via the berlioz/queue-manager-package. This package provides:

  • Configuration-based queue setup (Database, AWS SQS, Memory)
  • Automatic service container registration (QueueManager, JobHandlerManager, Worker)
  • Built-in job handlers for running Berlioz commands and system commands asynchronously
  • CLI commands for running workers, purging queues, and monitoring queue sizes

Installation

composer require berlioz/queue-manager-package

The package is auto-discovered by the framework. No additional registration is needed.

For more details on package installation, refer to the Packages page.

Configuration

Queue configuration goes under the berlioz.queues key. You can define it in your main berlioz.json or in a dedicated queues.json file in your configuration directory.

Default configuration provided by the package:

{
    "berlioz": {
        "queues": {
            "queues": [],
            "handlers": {
                "berlioz:command": "Berlioz\\Package\\QueueManager\\Handler\\BerliozCommandJobHandler",
                "berlioz:system": "Berlioz\\Package\\QueueManager\\Handler\\BerliozSystemJobHandler"
            },
            "factories": [
                "Berlioz\\Package\\QueueManager\\Factory\\MemoryQueueFactory",
                "Berlioz\\Package\\QueueManager\\Factory\\AwsSqsQueueFactory",
                "Berlioz\\Package\\QueueManager\\Factory\\DbQueueFactory"
            ]
        }
    }
}
  • queues: Array of queue definitions, each with a type key matching a queue class
  • handlers: Map of job name to handler class
  • factories: Array of factory classes that know how to create queues from configuration

Database queue

{
    "berlioz": {
        "queues": {
            "queues": [
                {
                    "type": "Berlioz\\QueueManager\\Queue\\DbQueue",
                    "db": {
                        "dsn": "mysql:dbname=myapp;host=127.0.0.1;port=3306;charset=UTF8",
                        "username": "myuser",
                        "password": "mypassword",
                        "table_name": "queue_jobs"
                    },
                    "retry_time": 30,
                    "max_attempts": 5,
                    "name": ["emails", "notifications"]
                }
            ]
        }
    }
}

The name key accepts a list of queue names. All queues in the same definition share the same database connection and settings. See the Queue Manager component documentation for the required MySQL schema.

Warning: This configuration file contains database credentials. Add it to your .gitignore if stored in a separate file.

AWS SQS queue

{
    "berlioz": {
        "queues": {
            "queues": [
                {
                    "type": "Berlioz\\QueueManager\\Queue\\AwsSqsQueue",
                    "client": {
                        "region": "eu-west-1",
                        "version": "latest"
                    },
                    "retry_time": 30,
                    "name": {
                        "emails": "https://sqs.eu-west-1.amazonaws.com/123456789/emails",
                        "notifications": "https://sqs.eu-west-1.amazonaws.com/123456789/notifications"
                    }
                }
            ]
        }
    }
}

The client key is passed directly to the SqsClient constructor. The name key is a map of queue name to SQS URL.

Memory queue

Useful for testing or short-lived tasks:

{
    "berlioz": {
        "queues": {
            "queues": [
                {
                    "type": "Berlioz\\QueueManager\\Queue\\MemoryQueue",
                    "retry_time": 30,
                    "max_attempts": 5,
                    "name": ["default"]
                }
            ]
        }
    }
}

Per-queue overrides

Each queue name can be an object with its own retry_time, max_attempts, and rate_limit:

{
    "name": [
        {
            "name": "emails",
            "retry_time": 60,
            "max_attempts": 10,
            "rate_limit": "100/1m"
        },
        {
            "name": "notifications",
            "rate_limit": ["50/1m", "1000/1d"]
        }
    ]
}

Rate limits accept a string ("100/1m") or an array of strings for multiple constraints. Supported units: s (seconds), m (minutes), h (hours), d (days). You can use a multiplier: "10/2hours" means 10 per 2 hours.

Dispatching jobs

The QueueManager service is automatically registered in the service container. You can inject it in your controllers or services:

use Berlioz\Http\Core\Controller\AbstractController;
use Berlioz\QueueManager\Job\JobDescriptor;
use Berlioz\QueueManager\QueueManager;
use Psr\Http\Message\ResponseInterface;

class OrderController extends AbstractController
{
    public function __construct(
        private QueueManager $queueManager,
    ) {
    }

    public function confirm(int $orderId): ResponseInterface
    {
        // Create and push a job
        $job = new JobDescriptor(
            name: 'order.confirmation',
            payload: ['order_id' => $orderId, 'email' => 'customer@example.com'],
        );
        $this->queueManager->push($job);

        // Push with a delay (process in 5 minutes)
        $this->queueManager->push($job, delay: 300);

        // Push to a specific queue by name
        $this->queueManager->push($job, queue: 'emails');

        return $this->response('Order confirmed');
    }
}

Routing jobs to a specific queue

If a job should always go to a particular queue, implement the JobForQueue interface:

use Berlioz\QueueManager\Job\JobDescriptor;
use Berlioz\QueueManager\Job\JobForQueue;
use Berlioz\QueueManager\Job\PayloadInterface;

class EmailJobDescriptor extends JobDescriptor implements JobForQueue
{
    public function __construct(PayloadInterface|array $payload)
    {
        parent::__construct('send_email', $payload);
    }

    public function forQueue(): string
    {
        return 'emails';
    }
}

When pushed through the QueueManager, this job will automatically be routed to the emails queue regardless of the default.

Creating job handlers

A job handler implements JobHandlerInterface and processes a specific type of job:

use Berlioz\QueueManager\Handler\JobHandlerInterface;
use Berlioz\QueueManager\Job\JobInterface;

class OrderConfirmationHandler implements JobHandlerInterface
{
    public function __construct(
        private MailerService $mailer,
        private OrderRepository $orders,
    ) {
    }

    public function handle(JobInterface $job): void
    {
        $payload = $job->getPayload();
        $order = $this->orders->find($payload->getOrFail('order_id'));
        $email = $payload->getOrFail('email');

        $this->mailer->sendOrderConfirmation($order, $email);
    }
}

Register the handler in your configuration:

{
    "berlioz": {
        "queues": {
            "handlers": {
                "order.confirmation": "App\\Handler\\OrderConfirmationHandler"
            }
        }
    }
}

Handler classes are resolved from the service container, so dependency injection works automatically.

The JobHandlerManager supports wildcard matching: a handler registered as order.* will match job names like order.confirmation, order.shipped, etc.

Built-in job handlers

The package provides two job handlers out of the box.

berlioz:command

Executes a Berlioz CLI command asynchronously. The payload must contain a command key with an array of arguments (without the berlioz prefix):

use Berlioz\QueueManager\Job\JobDescriptor;

// Run "vendor/bin/berlioz berlioz:cache-clear --all" asynchronously
$job = new JobDescriptor('berlioz:command', [
    'command' => ['berlioz:cache-clear', '--all'],
]);
$queueManager->push($job);

berlioz:system

Executes a system command via passthru(). The payload command key is an array of arguments joined with spaces:

use Berlioz\QueueManager\Job\JobDescriptor;

// Run a system command asynchronously
$job = new JobDescriptor('berlioz:system', [
    'command' => ['php', '/path/to/script.php', '--option=value'],
]);
$queueManager->push($job);

Warning: The berlioz:system handler executes arbitrary system commands. Ensure job payloads are never built from untrusted user input.

CLI commands

The package registers three CLI commands for managing queues. See the dedicated Queue CLI commands page for full usage details and production deployment tips.

  • queue:worker — Start a worker to process queue jobs
  • queue:purge — Purge all jobs from queues
  • queue:size — Display queue sizes

Custom queue factories

The package includes factories for Database, AWS SQS, and Memory queues. To use other backends (Redis, AMQP, or your own), you can write a custom factory.

A factory must implement the QueueFactory interface:

use Berlioz\Package\QueueManager\Factory\QueueFactory;
use Berlioz\Package\QueueManager\Factory\QueueFactoryTrait;
use Berlioz\QueueManager\Queue\RedisQueue;
use Generator;
use Redis;

class RedisQueueFactory implements QueueFactory
{
    use QueueFactoryTrait;

    public static function getQueueClass(): string
    {
        return RedisQueue::class;
    }

    public static function createFromConfig(array $config): Generator
    {
        $redis = new Redis();
        $redis->connect(
            $config['redis']['host'] ?? '127.0.0.1',
            (int)($config['redis']['port'] ?? 6379),
        );

        foreach ((array)($config['name'] ?? []) as $queue) {
            !is_array($queue) && $queue = ['name' => (string)$queue];

            yield new RedisQueue(
                redis: $redis,
                name: $queue['name'] ?? 'default',
                limiter: self::getRateLimiterFromConfig($queue['rate_limit'] ?? null),
            );
        }
    }
}

Register it in your configuration:

{
    "berlioz": {
        "queues": {
            "factories": [
                "App\\Queue\\RedisQueueFactory"
            ],
            "queues": [
                {
                    "type": "Berlioz\\QueueManager\\Queue\\RedisQueue",
                    "redis": {
                        "host": "127.0.0.1",
                        "port": 6379
                    },
                    "name": ["emails", "notifications"]
                }
            ]
        }
    }
}

The QueueFactoryTrait provides a getRateLimiterFromConfig() helper that parses rate limit strings from configuration into RateLimiterInterface instances.

Note: Custom factories are merged with the default ones. You don’t need to re-declare the built-in factories.

Further reading

Last updated: Wed, 18 Feb 2026 11:34