Глава 10.3: Очереди и Jobs — фоновые задачи, Redis, обработка email, ретраи
Введение: Зачем нужны очереди?
Представьте: пользователь регистрируется на вашем сайте. Что должно произойти?
- Сохранить данные в базу
- Отправить приветственное письмо
- Создать профиль в CRM
- Отправить уведомление в Slack
- Сгенерировать PDF-сертификат
- Обновить статистику
Если делать всё это синхронно — пользователь будет ждать 5-10 секунд. А если email-сервер завис? Регистрация упадёт с ошибкой.
Решение: критичные операции выполняем сразу, остальное — отправляем в фоновую очередь.
// Плохо: пользователь ждёт
User::create($data);
Mail::send(new WelcomeEmail($user)); // 2 секунды
CRM::createProfile($user); // 3 секунды
Slack::notify($user); // 1 секунда
PDF::generate($user); // 4 секунды
// Итого: ~10 секунд ожидания
// Хорошо: мгновенный ответ
User::create($data);
dispatch(new SendWelcomeEmail($user));
dispatch(new CreateCRMProfile($user));
dispatch(new NotifySlack($user));
dispatch(new GenerateCertificate($user));
// Ответ пользователю: 200ms
// Остальное выполнится в фонеКак работают очереди
Архитектура системы очередей
┌─────────────┐
│ Laravel │
│ Application │
└──────┬──────┘
│ dispatch(new Job)
↓
┌─────────────┐
│ Queue │ ← хранилище задач (database/redis/sqs)
│ Driver │
└──────┬──────┘
│
↓
┌─────────────┐
│ Worker │ ← php artisan queue:work
│ Process │ забирает задачи и выполняет
└─────────────┘Ключевые компоненты:
- Job — класс задачи (что делать)
- Queue — хранилище задач (database, Redis, Beanstalkd, SQS)
- Worker — процесс, который выполняет задачи
- Dispatcher — отправляет задачи в очередь
Настройка очередей
1. Конфигурация драйверов
config/queue.php:
return [
'default' => env('QUEUE_CONNECTION', 'sync'),
'connections' => [
'sync' => [
'driver' => 'sync', // Выполняется сразу (для разработки)
],
'database' => [
'driver' => 'database',
'table' => 'jobs',
'queue' => 'default',
'retry_after' => 90,
],
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90,
'block_for' => null,
],
'sqs' => [
'driver' => 'sqs',
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'prefix' => env('SQS_PREFIX'),
'queue' => env('SQS_QUEUE', 'default'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
],
],
];2. Миграция для database-драйвера
php artisan queue:table
php artisan migrateСоздастся таблица jobs:
CREATE TABLE jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
queue VARCHAR(255) NOT NULL,
payload LONGTEXT NOT NULL, -- сериализованная задача
attempts TINYINT UNSIGNED,
reserved_at INT UNSIGNED, -- когда задачу взял worker
available_at INT UNSIGNED, -- когда можно выполнять
created_at INT UNSIGNED
);3. Установка Redis (рекомендуется для production)
# Ubuntu/Debian
sudo apt install redis-server
# macOS
brew install redis
# Запуск
redis-servercomposer.json:
composer require predis/predis.env:
QUEUE_CONNECTION=redis
REDIS_HOST=127.0.0.1
REDIS_PASSWORD=null
REDIS_PORT=6379Создание Job-классов
Генерация Job
php artisan make:job SendWelcomeEmailСоздаётся app/Jobs/SendWelcomeEmail.php:
namespace App\Jobs;
use App\Models\User;
use App\Mail\WelcomeEmail;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Mail;
class SendWelcomeEmail implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* @var User
*/
public $user;
/**
* Количество попыток выполнения
*/
public $tries = 3;
/**
* Таймаут выполнения (секунды)
*/
public $timeout = 120;
/**
* Create a new job instance.
*/
public function __construct(User $user)
{
$this->user = $user;
}
/**
* Execute the job.
*/
public function handle(): void
{
Mail::to($this->user->email)->send(new WelcomeEmail($this->user));
}
/**
* Что делать если задача упала
*/
public function failed(\Throwable $exception): void
{
// Логируем ошибку
\Log::error('Failed to send welcome email', [
'user_id' => $this->user->id,
'error' => $exception->getMessage(),
]);
// Уведомляем администратора
// Mail::to('admin@example.com')->send(...);
}
}Разбор трейтов
- Dispatchable — позволяет
dispatch(new Job) - InteractsWithQueue — методы для работы с очередью (
$this->release(),$this->delete()) - Queueable — настройки очереди (
onQueue(),delay()) - SerializesModels — автоматически сериализует Eloquent-модели
Отправка задач в очередь
1. Базовая отправка
use App\Jobs\SendWelcomeEmail;
// Самый простой способ
SendWelcomeEmail::dispatch($user);
// Или через глобальный хелпер
dispatch(new SendWelcomeEmail($user));2. Отложенное выполнение
// Выполнить через 10 минут
SendWelcomeEmail::dispatch($user)
->delay(now()->addMinutes(10));
// Выполнить в конкретное время
SendWelcomeEmail::dispatch($user)
->delay(now()->addHours(2));3. Выбор очереди
// Разные приоритеты
SendWelcomeEmail::dispatch($user)
->onQueue('emails');
ProcessPayment::dispatch($order)
->onQueue('payments'); // высокий приоритет
GenerateReport::dispatch()
->onQueue('reports'); // низкий приоритет4. Условная отправка
SendWelcomeEmail::dispatchIf($user->wants_emails, $user);
SendWelcomeEmail::dispatchUnless($user->unsubscribed, $user);5. Цепочки задач
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessOrder($order),
new SendInvoice($order),
new NotifyCustomer($order),
])->dispatch();
// Если одна упадёт — остальные не выполнятся6. Пакетная обработка
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
$batch = Bus::batch([
new ImportCsvRow($row1),
new ImportCsvRow($row2),
new ImportCsvRow($row3),
])->then(function (Batch $batch) {
// Все задачи выполнены
})->catch(function (Batch $batch, \Throwable $e) {
// Одна из задач упала
})->finally(function (Batch $batch) {
// Всегда выполняется
})->dispatch();
// Проверить прогресс
$batch->progress(); // 66 (процентов)Запуск Worker-процесса
Основные команды
# Запустить worker (слушает очередь default)
php artisan queue:work
# Указать конкретную очередь
php artisan queue:work --queue=emails
# Несколько очередей (по приоритету)
php artisan queue:work --queue=payments,emails,default
# Только одна задача (для тестирования)
php artisan queue:work --once
# Остановить после текущей задачи
php artisan queue:work --stop-when-empty
# С таймаутом
php artisan queue:work --timeout=60Важные опции
# Количество попыток
php artisan queue:work --tries=3
# Задержка между попытками
php artisan queue:work --backoff=3,5,10
# Задержка при пустой очереди
php artisan queue:work --sleep=3
# Перезагружать worker при изменении кода
php artisan queue:work --max-jobs=1000 --max-time=3600Graceful Shutdown
# Отправить сигнал worker'у для остановки
php artisan queue:restartWorker завершит текущую задачу и остановится.
Обработка ошибок и ретраи
1. Настройка попыток в Job
class ProcessPayment implements ShouldQueue
{
public $tries = 5; // Попыток
public $backoff = [10, 30, 60, 120]; // Задержки в секундах
public function handle()
{
// Текущая попытка
$attempt = $this->attempts();
if ($attempt > 3) {
\Log::warning("Payment processing attempt #{$attempt}");
}
// Логика
}
}2. Условное освобождение задачи
public function handle()
{
try {
$this->processPayment();
} catch (TemporaryException $e) {
// Вернуть в очередь через 30 секунд
$this->release(30);
} catch (PermanentException $e) {
// Удалить из очереди (не пытаться снова)
$this->delete();
}
}3. Обработка неудач
public function failed(\Throwable $exception)
{
// Откатить транзакцию
DB::rollBack();
// Уведомить администратора
Mail::to('admin@example.com')->send(
new JobFailedNotification($this, $exception)
);
// Сохранить в лог
\Log::error('Payment processing failed', [
'job' => get_class($this),
'order_id' => $this->order->id,
'error' => $exception->getMessage(),
'trace' => $exception->getTraceAsString(),
]);
}4. Таблица неудачных задач
php artisan queue:failed-table
php artisan migrateСоздаётся таблица failed_jobs:
CREATE TABLE failed_jobs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
uuid VARCHAR(255) UNIQUE,
connection TEXT,
queue TEXT,
payload LONGTEXT,
exception LONGTEXT,
failed_at TIMESTAMP
);Управление неудачными задачами
# Посмотреть список
php artisan queue:failed
# Повторить конкретную
php artisan queue:retry 5
# Повторить все
php artisan queue:retry all
# Удалить неудачную задачу
php artisan queue:forget 5
# Очистить все неудачные
php artisan queue:flushПрактический пример: Email-рассылка
1. Job для отправки письма
namespace App\Jobs;
use App\Models\User;
use App\Mail\NewsletterEmail;
use Illuminate\Support\Facades\Mail;
class SendNewsletterEmail implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public $tries = 3;
public $backoff = [60, 180, 600]; // 1 мин, 3 мин, 10 мин
public $timeout = 120;
public function __construct(
public User $user,
public string $subject,
public string $content
) {}
public function handle(): void
{
// Проверка перед отправкой
if (!$this->user->subscribed) {
$this->delete(); // Не пытаться снова
return;
}
Mail::to($this->user->email)->send(
new NewsletterEmail($this->subject, $this->content)
);
// Обновить статистику
$this->user->increment('newsletters_sent');
}
public function failed(\Throwable $exception): void
{
// Пометить пользователя как недоступного
$this->user->update(['email_bounced' => true]);
\Log::error('Newsletter failed', [
'user_id' => $this->user->id,
'error' => $exception->getMessage(),
]);
}
}2. Отправка рассылки
namespace App\Http\Controllers;
use App\Models\User;
use App\Jobs\SendNewsletterEmail;
use Illuminate\Support\Facades\Bus;
class NewsletterController extends Controller
{
public function send(Request $request)
{
$validated = $request->validate([
'subject' => 'required|string|max:255',
'content' => 'required|string',
]);
$users = User::where('subscribed', true)
->where('email_bounced', false)
->get();
// Создаём батч задач
$jobs = $users->map(function ($user) use ($validated) {
return new SendNewsletterEmail(
$user,
$validated['subject'],
$validated['content']
);
});
$batch = Bus::batch($jobs)
->then(function (Batch $batch) {
\Log::info("Newsletter sent to {$batch->totalJobs} users");
})
->catch(function (Batch $batch, \Throwable $e) {
\Log::error('Newsletter batch failed', [
'error' => $e->getMessage()
]);
})
->dispatch();
return response()->json([
'message' => 'Newsletter queued',
'batch_id' => $batch->id,
'total_recipients' => $users->count(),
]);
}
public function batchStatus($batchId)
{
$batch = Bus::findBatch($batchId);
return response()->json([
'progress' => $batch->progress(),
'total_jobs' => $batch->totalJobs,
'pending_jobs' => $batch->pendingJobs,
'failed_jobs' => $batch->failedJobs,
'finished' => $batch->finished(),
]);
}
}Rate Limiting (ограничение скорости)
1. Базовое ограничение
use Illuminate\Support\Facades\RateLimiter;
class SendEmailJob implements ShouldQueue
{
public function handle()
{
RateLimiter::attempt(
'send-email:' . $this->user->id,
$perMinute = 5,
function () {
Mail::to($this->user)->send(new SomeEmail());
},
$decaySeconds = 60
);
}
}2. Middleware для Job
use Illuminate\Queue\Middleware\RateLimited;
class SendEmailJob implements ShouldQueue
{
public function middleware()
{
return [
new RateLimited('emails'), // 5 задач в минуту
];
}
}Конфигурация в RouteServiceProvider:
RateLimiter::for('emails', function (object $job) {
return Limit::perMinute(100); // 100 писем в минуту
});3. Throttling по внешнему API
use Illuminate\Queue\Middleware\ThrottlesExceptions;
public function middleware()
{
return [
(new ThrottlesExceptions(10, 5)) // 10 ошибок за 5 минут
->backoff(5), // потом пауза 5 минут
];
}Мониторинг очередей
1. Laravel Horizon (для Redis)
composer require laravel/horizon
php artisan horizon:install
php artisan migrateconfig/horizon.php:
'environments' => [
'production' => [
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['default', 'emails'],
'balance' => 'auto',
'processes' => 10,
'tries' => 3,
],
],
],Запуск:
php artisan horizonDashboard: http://localhost/horizon
2. Команды для мониторинга
# Статистика задач
php artisan queue:monitor redis:default,redis:emails --max=100
# Очистить очередь
php artisan queue:clear redis
# Очистить конкретную очередь
php artisan queue:clear redis --queue=emails
# Посмотреть задачи в БД
php artisan queue:work --once --verbose3. События очередей
use Illuminate\Support\Facades\Queue;
Queue::before(function (JobProcessing $event) {
\Log::info('Job starting', ['job' => $event->job->resolveName()]);
});
Queue::after(function (JobProcessed $event) {
\Log::info('Job finished', ['job' => $event->job->resolveName()]);
});
Queue::failing(function (JobFailed $event) {
\Log::error('Job failed', [
'job' => $event->job->resolveName(),
'error' => $event->exception->getMessage(),
]);
});Production: Supervisor
Worker должен всегда работать в production. Используем Supervisor.
Установка
sudo apt install supervisorКонфигурация
/etc/supervisor/conf.d/laravel-worker.conf:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/html/artisan queue:work redis --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=www-data
numprocs=8
redirect_stderr=true
stdout_logfile=/var/www/html/storage/logs/worker.log
stopwaitsecs=3600Управление
# Перечитать конфигурацию
sudo supervisorctl reread
sudo supervisorctl update
# Запустить workers
sudo supervisorctl start laravel-worker:*
# Остановить
sudo supervisorctl stop laravel-worker:*
# Перезапустить (после деплоя)
php artisan queue:restart
sudo supervisorctl restart laravel-worker:*Оптимизация производительности
1. Используйте Redis
Database-драйвер медленный из-за блокировок таблицы.
QUEUE_CONNECTION=redis2. Несколько workers
# 4 процесса
php artisan queue:work --queue=default --sleep=3 &
php artisan queue:work --queue=default --sleep=3 &
php artisan queue:work --queue=default --sleep=3 &
php artisan queue:work --queue=default --sleep=3 &Или через Supervisor (numprocs=8).
3. Приоритизация очередей
# Сначала payments, потом emails, потом default
php artisan queue:work --queue=payments,emails,default4. Chunk большие задачи
// Плохо: одна задача на 10 000 пользователей
dispatch(new SendNewsletterToAll());
// Хорошо: 100 задач по 100 пользователей
User::chunk(100, function ($users) {
dispatch(new SendNewsletterChunk($users));
});5. Lazy Collections
use App\Models\User;
User::cursor()->each(function ($user) {
dispatch(new ProcessUser($user));
});Безопасность
1. Никогда не передавайте пароли в Job
// ПЛОХО
class SendEmail implements ShouldQueue
{
public function __construct(
public string $password // Сериализуется в БД!
) {}
}
// ХОРОШО
class SendEmail implements ShouldQueue
{
public function __construct(
public int $userId
) {}
public function handle()
{
$user = User::find($this->userId);
// Работаем с $user
}
}2. Шифрование Job (если передаёте чувствительные данные)
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
class ProcessPayment implements ShouldQueue, ShouldBeEncrypted
{
public function __construct(
public string $creditCardNumber
) {}
}3. Аутентификация перед выполнением
public function handle()
{
if (!$this->user->exists) {
$this->delete();
return;
}
// Продолжаем
}Практические задачи
Задача 1: Импорт CSV
Создайте систему импорта CSV-файла с пользователями:
- Job
ImportCsvчитает файл и создаёт задачиImportUserRowдля каждой строки ImportUserRowсоздаёт пользователя (или обновляет, если email существует)- После завершения батча отправьте email администратору со статистикой
Подсказки:
- Используйте
Bus::batch() - Обрабатывайте ошибки (невалидные строки)
- Логируйте прогресс
Задача 2: Напоминания о корзине
Создайте Job, который отправляет email пользователям, у которых в корзине есть товары более 24 часов:
- Найдите все активные корзины старше 24 часов
- Для каждой отправьте
SendAbandonedCartEmail - После 3 дней удалите корзину через
DeleteAbandonedCart
Подсказки:
- Используйте
delay()для отложенных задач - Проверяйте, не купил ли пользователь товар уже
Задача 3: Обработка изображений
Загрузка фото профиля должна:
- Сохранить оригинал
- Job
ProcessAvatar: создать 3 размера (thumbnail, medium, large) - Оптимизировать с помощью
spatie/image-optimizer - Обновить модель пользователя
Подсказки:
- Используйте цепочки задач
- Если оптимизация упала — оставьте неоптимизированные версии
Проверь себя
В чём разница между
dispatch()иdispatchSync()?Что произойдёт, если Job упадёт 3 раза?
Зачем нужен
SerializesModelsтрейт?Как отправить задачу в конкретную очередь?
Как ограничить количество писем до 100 в минуту?
Что такое
queue:restartи когда его использовать?Как обработать ситуацию, когда внешний API временно недоступен?
В чём разница между
triesиbackoff?
Частые ошибки
❌ Worker не запущен
dispatch(new SendEmail($user));
// Ничего не происходитРешение: проверьте php artisan queue:work запущен.
❌ Serialization Error
class SendEmail implements ShouldQueue
{
public function __construct(
public $pdfResource // Невозможно сериализовать
) {}
}Решение: передавайте только ID, пути к файлам, скаляры.
❌ Memory Leak
// Worker работает бесконечно и съедает всю память
php artisan queue:workРешение:
php artisan queue:work --max-jobs=1000 --max-time=3600❌ Забыли implements ShouldQueue
class SendEmail // Нет ShouldQueue
{
use Dispatchable, Queueable;
}
dispatch(new SendEmail($user)); // Выполнится синхронно!Чеклист перед production
- [ ] Установлен Redis
- [ ] Настроен Supervisor для workers
- [ ] Включен
php artisan queue:restartв деплой-скрипт - [ ] Настроен мониторинг (Horizon / CloudWatch)
- [ ] Добавлены логи для
failed() - [ ] Настроены уведомления о неудачных задачах
- [ ] Проверено, что чувствительные данные не попадают в очередь
- [ ] Добавлены тесты для критичных Jobs
Резюме
| Что | Как |
|---|---|
| Создать Job | php artisan make:job SendEmail |
| Отправить в очередь | dispatch(new SendEmail($user)) |
| Отложить | ->delay(now()->addMinutes(10)) |
| Выбрать очередь | ->onQueue('emails') |
| Запустить worker | php artisan queue:work |
| Повторить неудачную | php artisan queue:retry all |
| Мониторинг | Laravel Horizon |
Золотое правило: всё, что может подождать — должно быть в очереди.
Следующая глава: Events и Listeners — событийная архитектура, полное разделение логики.