Designing Queue Topology for High Load
Queue topology — which queues exist, what jobs go in each, how many workers consume each — is
the architectural decision that determines whether your system holds up under load or falls apart.
Poor topology means a surge of low-priority jobs starves critical ones; good topology gives
you isolation, prioritization, and independent scaling.
The core design principle: jobs that must not affect each other should not share a queue.
A video encoding job that takes 5 minutes should never share a queue with a payment confirmation
job that should complete in under a second. If they share a queue, a burst of video jobs
creates a multi-minute delay for payment confirmations.
// config/horizon.php — production topology for a media platform
'environments' => [
'production' => [
// Critical path: payments, auth, security — SLA < 2 seconds
'supervisor-critical' => [
'queue' => ['critical'],
'balance' => 'simple',
'maxProcesses' => 5,
'minProcesses' => 3, // always warm — never scaled to zero
'timeout' => 30,
'tries' => 3,
],
// User-facing real-time: notifications, webhooks — SLA < 10 seconds
'supervisor-realtime' => [
'queue' => ['webhooks', 'notifications-push', 'notifications-email'],
'balance' => 'auto',
'maxProcesses' => 20,
'minProcesses' => 2,
'timeout' => 60,
],
// Media processing: CPU-intensive, long-running — SLA: minutes
'supervisor-media' => [
'queue' => ['video-transcode', 'image-process'],
'balance' => 'simple',
'maxProcesses' => 8, // GPU/CPU limited — don't over-provision
'minProcesses' => 1,
'timeout' => 900, // 15 minutes
'memory' => 1024,
],
// Batch/analytics: low priority, runs in background — SLA: hours
'supervisor-batch' => [
'queue' => ['analytics', 'reporting', 'data-export'],
'balance' => 'auto',
'maxProcesses' => 4,
'minProcesses' => 0, // can scale to zero during peak
'timeout' => 3600,
],
// External integrations: rate-limited per service
'supervisor-integrations' => [
'queue' => ['stripe', 'salesforce', 'mailchimp'],
'balance' => 'auto',
'maxProcesses' => 6,
'minProcesses' => 1,
'timeout' => 120,
],
],
],
Video & Image Processing Pipeline
A production video processing pipeline must handle: upload completion detection, multiple
output quality variants, CDN distribution, thumbnail generation, and failure recovery —
all as a choreographed sequence of jobs rather than one monolithic job.
// Pipeline architecture: S3 event → chain of specialized jobs
// Triggered by S3 ObjectCreated event via SQS or directly from upload controller
class VideoUploadedJob implements ShouldQueue
{
public int $tries = 3;
public int $timeout = 60;
public function __construct(
private readonly int $videoId,
private readonly string $s3Key, // e.g., "uploads/raw/uuid.mp4"
) {}
public function handle(VideoRepository $videos, S3Client $s3): void
{
$video = $videos->findOrFail($this->videoId);
// Validate the upload before starting expensive processing
$metadata = $s3->getObjectMetadata($this->s3Key);
if ($metadata['size'] > 5_368_709_120) { // 5 GB limit
$video->update(['status' => 'rejected', 'rejection_reason' => 'file_too_large']);
return;
}
$video->update(['status' => 'processing', 'raw_s3_key' => $this->s3Key]);
// Fan-out to parallel quality transcodes
// Each quality level is independent — they can run concurrently
$transcodingJobs = collect(['360p', '720p', '1080p', '4k'])
->filter(fn ($quality) => $this->qualityNeeded($metadata, $quality))
->map(fn ($quality) => new TranscodeVideoJob($this->videoId, $this->s3Key, $quality));
// After ALL transcodes complete, run the chain for post-processing
Bus::batch($transcodingJobs->toArray())
->then(new GenerateThumbnailsJob($this->videoId))
->then(new InvalidateCdnCacheJob($this->videoId))
->then(new MarkVideoReadyJob($this->videoId))
->then(new NotifySubscribersJob($this->videoId))
->catch(new HandleVideoProcessingFailureJob($this->videoId))
->onQueue('video-transcode')
->dispatch();
}
private function qualityNeeded(array $metadata, string $quality): bool
{
$minimumHeights = ['360p' => 360, '720p' => 720, '1080p' => 1080, '4k' => 2160];
return ($metadata['height'] ?? 0) >= $minimumHeights[$quality];
}
}
// The actual transcode job — shells out to ffmpeg or calls a transcoding service
class TranscodeVideoJob implements ShouldQueue
{
public int $timeout = 900; // 15 minutes for long videos
public function __construct(
private readonly int $videoId,
private readonly string $sourceKey,
private readonly string $quality,
) {}
public function handle(VideoTranscoder $transcoder): void
{
$outputKey = "processed/{$this->videoId}/{$this->quality}.mp4";
$transcoder->transcode(
source: $this->sourceKey,
output: $outputKey,
quality: $this->quality,
);
// Record the output for this quality level
VideoVariant::create([
'video_id' => $this->videoId,
'quality' => $this->quality,
's3_key' => $outputKey,
'cdn_url' => "https://cdn.yourapp.com/{$outputKey}",
]);
}
}
// Image processing pipeline — simpler but same principles
class ProcessUploadedImageJob implements ShouldQueue
{
public int $timeout = 120;
public function __construct(
private readonly int $imageId,
private readonly string $s3Key,
) {}
public function handle(ImageProcessor $processor): void
{
// Generate all size variants in parallel via batching
$sizes = [
['name' => 'thumbnail', 'width' => 150, 'height' => 150, 'fit' => 'crop'],
['name' => 'preview', 'width' => 800, 'height' => 600, 'fit' => 'contain'],
['name' => 'full', 'width' => 1920, 'height' => 1080, 'fit' => 'max'],
['name' => 'webp', 'width' => null, 'height' => null, 'format' => 'webp'],
];
Bus::batch(
collect($sizes)->map(fn ($size) => new ResizeImageJob($this->imageId, $this->s3Key, $size))
)
->then(new MarkImageReadyJob($this->imageId))
->onQueue('image-process')
->dispatch();
}
}
Bulk Notification Fan-Out Architecture
Sending 1 million push notifications is not a single job — it's a fan-out tree. A single
coordinator job reads the audience, splits it into chunks, and dispatches per-chunk delivery
jobs that are further specialized by delivery channel. This architecture allows independent
failure handling per channel and per chunk.
// Fan-out pattern: coordinator → channel dispatchers → per-user senders
// Level 1: Campaign coordinator — dispatches per-channel jobs
class DispatchNotificationCampaignJob implements ShouldQueue
{
public int $timeout = 300;
public function __construct(private readonly int $campaignId) {}
public function handle(): void
{
$campaign = NotificationCampaign::findOrFail($this->campaignId);
$campaign->update(['status' => 'dispatching', 'dispatch_started_at' => now()]);
// Stream through the audience without loading all IDs into memory
// Use chunks of 1000 to balance dispatch overhead vs memory usage
$chunkIndex = 0;
$campaign->audience()
->select('user_id')
->cursor()
->chunk(1000)
->each(function ($chunk) use ($campaign, &$chunkIndex) {
$userIds = $chunk->pluck('user_id')->toArray();
// Dispatch per-channel jobs for this chunk
if ($campaign->channels->contains('push')) {
SendPushNotificationBatchJob::dispatch($campaign->id, $userIds)
->onQueue('notifications-push');
}
if ($campaign->channels->contains('email')) {
SendEmailBatchJob::dispatch($campaign->id, $userIds)
->delay(now()->addSeconds($chunkIndex * 2)) // stagger email batches
->onQueue('notifications-email');
}
if ($campaign->channels->contains('sms')) {
SendSmsBatchJob::dispatch($campaign->id, $userIds)
->delay(now()->addSeconds($chunkIndex * 5)) // SMS has tightest rate limits
->onQueue('notifications-sms');
}
$chunkIndex++;
});
$campaign->update(['status' => 'dispatched']);
}
}
// Level 2: Per-channel batch sender
class SendPushNotificationBatchJob implements ShouldQueue
{
public int $timeout = 120;
public function __construct(
private readonly int $campaignId,
private readonly array $userIds,
) {}
public function handle(PushNotificationService $push): void
{
$campaign = NotificationCampaign::find($this->campaignId);
if (! $campaign || $campaign->status === 'cancelled') {
return; // campaign was cancelled mid-dispatch
}
// Fetch device tokens for this batch
$tokens = PushToken::whereIn('user_id', $this->userIds)
->where('platform', 'fcm')
->whereNotNull('token')
->get();
// Use FCM multicast — sends to up to 500 tokens in one API call
$tokens->chunk(500)->each(function ($batch) use ($campaign, $push) {
$result = $push->sendMulticast(
tokens: $batch->pluck('token')->toArray(),
title: $campaign->title,
body: $campaign->body,
data: $campaign->metadata,
);
// Track delivery stats
NotificationDelivery::insert(
$result->map(fn ($r) => [
'campaign_id' => $this->campaignId,
'token_id' => $r->token_id,
'status' => $r->success ? 'delivered' : 'failed',
'error_code' => $r->errorCode,
'sent_at' => now(),
])->toArray()
);
});
}
}
API Rate Limiting via Queue Workers
When you integrate with third-party APIs that have rate limits (Stripe: 100 req/s, Twilio:
1 req/s per account, Mailchimp: 10 simultaneous calls), the queue becomes your rate-limiting
layer. The pattern: dedicate a queue per API, dedicate a fixed number of workers per queue
(to cap concurrency), and use Laravel's RateLimiter for within-worker throttling.
// Per-service queue architecture with worker count as concurrency control
// config/horizon.php (relevant supervisors)
'supervisor-stripe' => [
'queue' => ['stripe'],
'balance' => 'simple', // DON'T use auto-balance — fixed concurrency is intentional
'maxProcesses' => 10, // Stripe allows ~100 req/s; 10 workers * 10 jobs/s each = 100 req/s max
'minProcesses' => 10, // hold steady, don't scale down
'timeout' => 30,
],
'supervisor-twilio' => [
'queue' => ['twilio'],
'balance' => 'simple',
'maxProcesses' => 1, // Twilio: 1 concurrent request per account number
'minProcesses' => 1,
'timeout' => 15,
],
// Token bucket rate limiter for fine-grained control within workers:
class SendSmsViaStripeJob implements ShouldQueue
{
public function middleware(): array
{
return [
// Allow max 1 SMS per second using RateLimiter
new RateLimited('twilio-sms'),
];
}
}
// Register the limiter in AppServiceProvider:
RateLimiter::for('twilio-sms', function (object $job) {
return Limit::perSecond(1); // 1 SMS per second across all workers
});
// Token bucket implementation for APIs that need burst capacity
// Allows bursting up to N requests but sustains at a lower average rate
class TokenBucketRateLimiter
{
private const BUCKET_KEY = 'rate_limit:bucket:';
private const TOKENS_KEY = ':tokens';
private const LAST_FILL_KEY = ':last_fill';
public function __construct(
private readonly string $apiName,
private readonly int $capacity, // max burst
private readonly int $fillRate, // tokens added per second
) {}
public function consume(int $tokens = 1): bool
{
$key = self::BUCKET_KEY . $this->apiName;
return Cache::lock($key . ':lock', 1)->block(0.1, function () use ($key, $tokens) {
$now = microtime(true);
$current = (float) Cache::get($key . self::TOKENS_KEY, $this->capacity);
$lastFill = (float) Cache::get($key . self::LAST_FILL_KEY, $now);
// Refill tokens based on elapsed time
$elapsed = $now - $lastFill;
$refilled = min($this->capacity, $current + ($elapsed * $this->fillRate));
if ($refilled < $tokens) {
return false; // not enough tokens
}
Cache::put($key . self::TOKENS_KEY, $refilled - $tokens, 3600);
Cache::put($key . self::LAST_FILL_KEY, $now, 3600);
return true;
});
}
}
// Use in job middleware:
class ThrottleWithTokenBucket implements JobMiddleware
{
public function __construct(
private readonly TokenBucketRateLimiter $limiter,
) {}
public function handle(mixed $job, callable $next): void
{
if (! $this->limiter->consume()) {
// Release back to queue to retry after token refill period
$job->release(1); // retry in 1 second
return;
}
$next($job);
}
}
Handling 1M+ Jobs Per Day
One million jobs per day is approximately 11.6 jobs per second on average. That sounds
manageable — until you consider that real-world load is not uniform. Marketing emails
fire at 9am, batch syncs run at midnight, and user-driven events spike at lunch. Your
system must handle 5–10x the average rate during peaks.
// Capacity planning math for 1M jobs/day at 10x peak:
//
// Average rate: 1,000,000 / 86,400 = 11.6 jobs/second
// Peak rate (10x): 11.6 * 10 = 116 jobs/second
//
// Per-worker throughput (Redis, mixed jobs, p50 duration 200ms):
// 1 job / 200ms = 5 jobs/second per worker
//
// Workers needed at peak: 116 / 5 = 23.2 → 25 workers (with headroom)
//
// Redis ops per job (pop, delete, update delayed set): ~6 ops
// Redis peak load: 116 * 6 = 696 ops/second → well within single Redis capacity
//
// Database connections at peak (25 workers, each holding 1 connection): 25
// Plus web tier: say 50 connections
// Total: 75 — comfortable for MySQL max_connections=200
//
// Memory per worker (Laravel + application): ~128 MB
// Total worker memory: 25 * 128 = 3.2 GB → fits on 8GB worker server with headroom
// Architecture recommendation for 1M+/day:
// - Redis backend (mandatory)
// - 2 worker servers (8 workers each = 16 baseline) + auto-scale to 40 during peaks
// - Horizon with auto-balance between queues
// - Separate Redis instance from application cache (isolate queue I/O)
// - Failed job monitoring with alert on >0.1% failure rate
// Optimizing job dispatch for bulk operations
// Dispatching 100,000 jobs one-by-one is slow — use pipeline for Redis batch insert
class BulkDispatcher
{
public function dispatch(array $jobs): void
{
// Use Redis pipeline to batch-insert up to 1000 jobs at once
$chunks = array_chunk($jobs, 1000);
foreach ($chunks as $chunk) {
Redis::pipeline(function ($pipe) use ($chunk) {
foreach ($chunk as $job) {
$payload = $this->createPayload($job);
$pipe->rpush("queues:{$job->queue}", $payload);
}
});
}
}
private function createPayload(object $job): string
{
return json_encode([
'uuid' => (string) Str::uuid(),
'displayName' => get_class($job),
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => $job->tries ?? null,
'timeout' => $job->timeout ?? null,
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
]);
}
}
// Dispatching 100,000 jobs this way:
// Individual dispatch: ~25 seconds (100,000 Redis round-trips)
// Pipeline dispatch: ~0.8 seconds (100 pipeline batches of 1000)
// Monitoring queue health at scale with Prometheus metrics
// Exported via custom artisan command every 30 seconds
class QueueMetricsCommand extends Command
{
protected $signature = 'metrics:queue-export';
public function handle(): void
{
$queues = ['critical', 'notifications-push', 'notifications-email',
'video-transcode', 'stripe', 'analytics'];
$metrics = [];
foreach ($queues as $queue) {
$metrics[] = sprintf(
'laravel_queue_size{queue="%s"} %d %d',
$queue,
Redis::llen("queues:{$queue}") + Redis::zcard("queues:{$queue}:delayed"),
now()->getTimestampMs()
);
}
// Post to Prometheus Pushgateway or write to file for Node Exporter
file_put_contents('/var/lib/prometheus/queue_metrics.prom',
implode("\n", $metrics) . "\n"
);
}
}
Queue Topology Design – One Queue vs Many
The debate between a single default queue and many specialized queues is a real architectural
decision with concrete tradeoffs. Here is the honest analysis:
One queue: simple to operate, no routing logic, every worker can process every job. Works well up to a few thousand jobs/day with uniform job types. Fails when a burst of slow jobs blocks fast ones.
Many queues: independent priority, independent scaling, independent failure domains. Required above moderate scale or when job types have significantly different SLAs. Adds operational complexity (more supervisors to configure, more metrics to track).
// Queue routing by job type — in each job class
class SendTransactionalEmailJob implements ShouldQueue
{
public string $queue = 'notifications-email'; // declared on the class
}
// Or set at dispatch time:
ProcessVideoJob::dispatch($videoId)->onQueue('video-transcode');
// Queue routing by tenant (multi-tenant apps):
class TenantAwareJob implements ShouldQueue
{
public string $queue;
public function __construct(private readonly int $tenantId)
{
// High-value tenants get dedicated queue with more workers
$tier = Tenant::find($tenantId)?->tier ?? 'standard';
$this->queue = match($tier) {
'enterprise' => 'enterprise-processing',
'pro' => 'pro-processing',
default => 'standard-processing',
};
}
}
// When to add a new queue — the signals:
// 1. A specific job type is blocking other jobs (high-volume, slow jobs)
// 2. A specific job type needs SLA guarantees different from others
// 3. A specific job type uses a different external resource (separate rate limit pool)
// 4. A specific job type needs dedicated infrastructure (GPU server for ML jobs)
// 5. A specific tenant needs isolation (enterprise tier SLA)
Conclusion
High-load queue systems are won or lost at the architecture stage, not the code optimization
stage. The decisions that matter most:
Topology first — separate queues for jobs with different SLAs, resource profiles, and failure characteristics. This is the foundational decision everything else builds on.
Fan-out for bulk operations — never put a million units of work in one job. A coordinator dispatches chunk jobs, which dispatch per-unit jobs if needed. Each level is independently retryable.
Worker count as rate control — for rate-limited APIs, the number of dedicated workers is your primary throttle. Don't over-provision them.
Bulk dispatch via pipeline — when dispatching tens of thousands of jobs programmatically, Redis pipeline mode reduces dispatch time by 30–50x.
Metrics as first-class infrastructure — at 1M+ jobs/day, you cannot operate blind. Queue depth per queue, p95 processing time, and failed job rate must be in your monitoring dashboard.