Автоматическая обработка очередей во время крон операций.
QueueWorker — тип плагина, позволяющий реализовать выполнение очередей в cron операциях. Данный тип плагина предоставляется ядром, поэтому вам не потребуется никаких дополнительных модулей для его работы.
Данный тип плагина берет на себя всю работу с очередью, предоставляя вам данные на обработку. Все что вам будет нобходимо делать, это обрабатывать входящие данные.
Состав плагина ¶
Аннотация @QueueWorker ¶
Аннотация плагина имеет следующие значения:
-
id
: Идентификатор плагина. Очень важно, чтобы название плагина было равным названию очереди. Именно так будет получаться очередь. Если ваша очередь имеет id равныйmy_queue
, то и id плагина должен быть именно таким. -
title
: Человеко-понятное название плагина. -
cron
: (опционально) Настройки для крона.-
time
: Время в секундах, выделяемое на выполнение очереди. Пока данное время не истечет, будут производиться попытки получения элементов из очереди и отправка их на выполнение плагину. По умолчанию присваивается 15 секунд.
-
Пример аннотации:
/**
* @QueueWorker(
* id = "my_queue_name_to_process",
* title = @Translation("My queue worker"),
* cron = {"time" = 60}
* )
*/
Объект плагина ¶
Плагины данного типа создаются в src/Plugin/QueueWorker
.
Объект плагина должен расширять Drupal\Core\Queue\QueueWorkerBase
. Вам необходимо объявить один единственный метод processItem($data)
, в котором и описать всю логику обработки элемента очереди.
В качестве аргумента метод принимает один единственный аргумент, который содержит элемент очереди, в том виде, как вы его добавили в данную очередь $queue->createItem()
.
Метод не должен ничего возвращать, вы просто выполняете необходимую логику и всё. При этом вы можете вызвать одно из следующих исключений, которые будут корректно обработаны менеджером плагина:
-
RequeueException
: Прерывает обработку текущего элемента очереди, возвращая его на повторную обработку. После чего выполнение очереди данным плагином продолжится. -
SuspendQueueException
: Прерывает обработку текущего элемента очереди, возвращая его на повторную обработку. После чего выполнение плагина прекращается и вызывается следующий. - Любое другое исключение будет записано в лог сайта и элемент не удалится из очереди.
Пример плагина:
<?php
namespace Drupal\dummy\Plugin\QueueWorker;
use Drupal\Core\Queue\QueueWorkerBase;
/**
* Process a queue.
*
* @QueueWorker(
* id = "my_queue_name_to_process",
* title = @Translation("My queue worker"),
* cron = {"time" = 60}
* )
*/
class MyQueueWorker extends QueueWorkerBase {
/**
* {@inheritdoc}
*/
public function processItem($data) {
// Do something with $data.
}
}
Примеры ¶
В качестве примеров использования мы разберемся с реализацией данного типа плагинов из ядра.
Пример №1. aggregator_feeds ¶
Плагин aggregator_feeds
отвечает за фоновое обновление фидов модуля Aggregator из ядра.
Очередь с данным id собирается в крон операции aggregator_cron()
. В днном хуке производится загрузка всех фидов, которые необходимо обновить. Фиду устанавливается время текущего запроса, как время когда он добавлен в очередь, чтобы избежать повторного вызова, а в очередь передается экземпляр сущности feed (FeedInterface
). Иными словами, данная очередь, хранит сущности feed
, которые нобходимо обработать.
А теперь посмотрим на плагин, который обрабатывает данную очередь:
<?php
namespace Drupal\aggregator\Plugin\QueueWorker;
use Drupal\aggregator\FeedInterface;
use Drupal\Core\Queue\QueueWorkerBase;
/**
* Updates a feed's items.
*
* @QueueWorker(
* id = "aggregator_feeds",
* title = @Translation("Aggregator refresh"),
* cron = {"time" = 60}
* )
*/
class AggregatorRefresh extends QueueWorkerBase {
/**
* {@inheritdoc}
*/
public function processItem($data) {
if ($data instanceof FeedInterface) {
$data->refreshItems();
}
}
}
Очень простой и понятный, пройдемся по нему сверху-внизу:
- В аннотации мы видим указание необходимой очереди для обработки, а также, что времени на обработку данной очереди в кроне будет выделено 60 секунд.
- В
processItem()
в качестве аргумента, ожидается что будет передан экземплярFeedInterface
, а значит - объект сущности feed. Если всё корректно, у данной сущности вызывается методrefreshItems()
. В данном методе производится обновление фида, а также сброс значения времени, когда он добавлен в очередь, чтобы при следующем кроне данная сущность опять была обнаружена вaggregator_cron()
и очередь была собрана заново.
Таким образом, обновление фидов производится полностью в автоматическом режиме и в фоне, при выполнении крон операций.
Пример №2. media_entity_thumbnail ¶
Данный плагин занимается обработкой очереди media_entity_thumbnail
, и отвечает за обработку превьюшек для медиа элементов.
Возьмем для примера медиа тип Remote Video, который позволяет добавлять на сайт YouTube и Vimeo видео. По умолчанию опция отложенной загрузки превьюшек отключена, но вы можете включить её в редактировании данного типа медиа сущности, установив галочку "Queue thumbnail downloads".
Установив её, после добавления новой сущности данного типа будет вызван Media::postSave()
метод сущности. В данном методе производится проверка, установлена ли данная галочка, и является ли сущность новой. Если условия удовлетворены, в очередь media_entity_thumbnail
будет добавлен id данной сущности на обработку в видео массива: ['id' => $translation->id()]
.
При следующем вызове крон операций будет вызван одноименный плагин.
<?php
namespace Drupal\media\Plugin\QueueWorker;
use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Symfony\Component\DependencyInjection\ContainerInterface;
/**
* Process a queue of media items to fetch their thumbnails.
*
* @QueueWorker(
* id = "media_entity_thumbnail",
* title = @Translation("Thumbnail downloader"),
* cron = {"time" = 60}
* )
*/
class ThumbnailDownloader extends QueueWorkerBase implements ContainerFactoryPluginInterface {
/**
* The entity type manager service.
*
* @var \Drupal\Core\Entity\EntityTypeManagerInterface
*/
protected $entityTypeManager;
/**
* Constructs a new class instance.
*
* @param array $configuration
* A configuration array containing information about the plugin instance.
* @param string $plugin_id
* The plugin_id for the plugin instance.
* @param mixed $plugin_definition
* The plugin implementation definition.
* @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
* Entity type manager service.
*/
public function __construct(array $configuration, $plugin_id, $plugin_definition, EntityTypeManagerInterface $entity_type_manager) {
parent::__construct($configuration, $plugin_id, $plugin_definition);
$this->entityTypeManager = $entity_type_manager;
}
/**
* {@inheritdoc}
*/
public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
return new static(
$configuration,
$plugin_id,
$plugin_definition,
$container->get('entity_type.manager')
);
}
/**
* {@inheritdoc}
*/
public function processItem($data) {
/** @var \Drupal\media\Entity\Media $media */
if ($media = $this->entityTypeManager->getStorage('media')->load($data['id'])) {
$media->updateQueuedThumbnail();
$media->save();
}
}
}
Пройдемся по нему сверху-вниз:
- Первым делом смотрим на аннотацию, где видем, что будет обрабатываться очередь
media_entity_thumbnail
, а также, что времени на выполнение данной очереди будет выделяться 60 секунд. - В
processItem()
методе мы видим, что первым делом пытается загрузиться медиа сущность с id из очереди. Если она загрузилась, то вызывается её методupdateQueuedThumbnail()
, который, в свою очередь, попробует загрузить файл изображения для первью, а затем осхранит сущность.
Таким образом, во время крон операций будут обновляться превьюшки для новых медиа сущностей.
Также вы можете обратить внимание на то, что данный пример более комплексный из-за Dependency Injection сервисов.
Заключение ¶
Данный тип плагина может помочь при обработке очередей по крону, вам не придется писать свой процесс обработки, вам достаточно создать плагин и описать в нем процесс обработки элемента очереди, а всю остальную работу возьмет на себя ядро.
Это очень полезнный плагин для обработки неопределенного кол-ва данных, в неопределенное время. На нем можно спокойно реализовать массовые рассылки, обновления содержимого (различные импорты) и т.д. Круг применения у него огромный и по большей части зависит от того, подходят вам очереди или нет. И если очереди вам подходят для решения задачи, и вы не хотите использовать Batch API, который требует участия юзера, то данный плагин поможет вам поставить всё на автоматическую обработку при помощи стандартных инструментов.
Комментарии
Там зависит от сервера.
Если на сервере лимит на выполнение (max_execution_time
) больше чем лимит у операции плагина то: Если началась выполняться операции на 59 секунде, а лимит стоит 60 (у плагина), то задача выполнится, а следующая отложится уже на будущий крон.
Если на сервер лимит на выполнение равен лимиту у плагина или меньше, это приведет к "жесткому" завершению операции на уровне процесса. В таком случае, задача, которая в данный момент выполнялась, выполнится заново при следующем кроне.
А если не использовать данный QueueWorker
, то каким образом обычно реализуется обработка очередей? Своя функция в hook_cron()
, которая ручками берёт из очереди элемент, лочит, обрабатывает, и удаляет при успехе? Или есть ещё какие-то способы?
Просто мне необходимо реализовать параллельную обработку очереди, нашёл модуль для этого: https://www.drupal.org/project/parallel_queue но с ним QueueWorker
почему-то валится с вот такой ошибкой: https://www.drupal.org/project/parallel_queue/issues/3184570
При помощи сервиса queue
, вот тут об этом есть: https://niklan.net/blog/79
Если коротко:
$queue = $queue = \Drupal::queue('QUEUE_NAME');
while ($item = $queue->claimItem()) {
// Do something with $item.
$queue->deleteItem($item);
}
Привет. Подскажи одну вещь. Вокер (элементы очереди) выполняется последовательно, или параллельно по умолчанию? Просто нам нужно строить сложный отчет. Он для каждого элемента очереди загружает отчет из БД, производит вычисления и сохраняет все обратно. И у нас ошибки лезут, особенно когда cron очередь обрабатывает
Они выполняются так, как диктует очередь, из которой они получаются обработчиком. То есть, это не решает обработчик, это решает очередь. Обработчик делает запрос элемента из очереди — что дали, с тем и работает.
По умолчанию очередь на ::claimItem()
получает ближайший элемент очереди и блокирует его, затем кто-то что-то с ним делает, и либо удалит из очереди, в случае успешной обработки, либо отправляет на повторную обработку, либо происходит ошибка и этот элемент останется заблокированным на какой-то период, после чего разблокируется и снова станет доступен. Так вот очередь отдаёт ближайший не заблокированный элемент.
Иными словами, дефолтная очередь не гарантирует однопоточность и последовательность обработки данных. Выходит, что если организовать запуск обработки очереди одновременно в разных процессах — то очередь будет обрабатываться многопоточно, но обрабатывать они будут разные элементы.
Я предложу следующие решения (ведь они будут индивидуальны):
- Отключить для обработчика работу в кроне, запускать исключительно в предсказуемом сценарии, например системным кроном при помощи
drush queue:run NAME
. Отключается просто — из аннотации обработчика нужно убрать параметрcron
, тогда такой обработчик и не будет запускаться на регулярные операции запускаемыеdrush cron
,/cron/{key}
и вызовом из админки, но сработает на прямой вызовdrush queue:run NAME
. - Задекорировать нужную очередь и написать свою логику, как получаются элементы. Вот там как раз можно сделать чтобы они выполнялись строго последовательно, потому что процесс добавления в очередь и получение из очереди будет написан вами.
Благодарю за развернутый ответ!
Спасибо за материал Есть один вопросик, если в очереди что-то есть, или например не успело все обработаться за 60 секунд указанных, то следующее выполнение начнется в момент запуска крона?