Drupal 8: Плагин QueueWorker — выполнение очередей по крону

Автоматическая обработка очередей во время крон операций.

21.04.2019
7 комментариев
8 мин.

QueueWorker — тип плагина, позволяющий реализовать выполнение очередей в cron операциях. Данный тип плагина предоставляется ядром, поэтому вам не потребуется никаких дополнительных модулей для его работы.

Данный тип плагина берет на себя всю работу с очередью, предоставляя вам данные на обработку. Все что вам будет нобходимо делать, это обрабатывать входящие данные.

Состав плагина

Аннотация @QueueWorker

Аннотация плагина имеет следующие значения:

  • id: Идентификатор плагина. Очень важно, чтобы название плагина было равным названию очереди. Именно так будет получаться очередь. Если ваша очередь имеет id равный my_queue, то и id плагина должен быть именно таким.
  • title: Человеко-понятное название плагина.
  • cron: (опционально) Настройки для крона.
    • time: Время в секундах, выделяемое на выполнение очереди. Пока данное время не истечет, будут производиться попытки получения элементов из очереди и отправка их на выполнение плагину. По умолчанию присваивается 15 секунд.

Пример аннотации:

/**
 * @QueueWorker(
 *   id = "my_queue_name_to_process",
 *   title = @Translation("My queue worker"),
 *   cron = {"time" = 60}
 * )
 */

Объект плагина

Плагины данного типа создаются в src/Plugin/QueueWorker.

Объект плагина должен расширять Drupal\Core\Queue\QueueWorkerBase. Вам необходимо объявить один единственный метод processItem($data), в котором и описать всю логику обработки элемента очереди.

В качестве аргумента метод принимает один единственный аргумент, который содержит элемент очереди, в том виде, как вы его добавили в данную очередь $queue->createItem().

Метод не должен ничего возвращать, вы просто выполняете необходимую логику и всё. При этом вы можете вызвать одно из следующих исключений, которые будут корректно обработаны менеджером плагина:

  • RequeueException: Прерывает обработку текущего элемента очереди, возвращая его на повторную обработку. После чего выполнение очереди данным плагином продолжится.
  • SuspendQueueException: Прерывает обработку текущего элемента очереди, возвращая его на повторную обработку. После чего выполнение плагина прекращается и вызывается следующий.
  • Любое другое исключение будет записано в лог сайта и элемент не удалится из очереди.

Пример плагина:

<?php

namespace Drupal\dummy\Plugin\QueueWorker;

use Drupal\Core\Queue\QueueWorkerBase;

/**
 * Process a queue.
 *
 * @QueueWorker(
 *   id = "my_queue_name_to_process",
 *   title = @Translation("My queue worker"),
 *   cron = {"time" = 60}
 * )
 */
class MyQueueWorker extends QueueWorkerBase {

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    // Do something with $data.
  }

}

Примеры

В качестве примеров использования мы разберемся с реализацией данного типа плагинов из ядра.

Пример №1. aggregator_feeds

Плагин aggregator_feeds отвечает за фоновое обновление фидов модуля Aggregator из ядра.

Очередь с данным id собирается в крон операции aggregator_cron(). В днном хуке производится загрузка всех фидов, которые необходимо обновить. Фиду устанавливается время текущего запроса, как время когда он добавлен в очередь, чтобы избежать повторного вызова, а в очередь передается экземпляр сущности feed (FeedInterface). Иными словами, данная очередь, хранит сущности feed, которые нобходимо обработать.

А теперь посмотрим на плагин, который обрабатывает данную очередь:

core/modules/aggregator/src/Plugin/QueueWorker/AggregatorRefresh.php
<?php

namespace Drupal\aggregator\Plugin\QueueWorker;

use Drupal\aggregator\FeedInterface;
use Drupal\Core\Queue\QueueWorkerBase;

/**
 * Updates a feed's items.
 *
 * @QueueWorker(
 *   id = "aggregator_feeds",
 *   title = @Translation("Aggregator refresh"),
 *   cron = {"time" = 60}
 * )
 */
class AggregatorRefresh extends QueueWorkerBase {

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    if ($data instanceof FeedInterface) {
      $data->refreshItems();
    }
  }

}

Очень простой и понятный, пройдемся по нему сверху-внизу:

  • В аннотации мы видим указание необходимой очереди для обработки, а также, что времени на обработку данной очереди в кроне будет выделено 60 секунд.
  • В processItem() в качестве аргумента, ожидается что будет передан экземпляр FeedInterface, а значит - объект сущности feed. Если всё корректно, у данной сущности вызывается метод refreshItems(). В данном методе производится обновление фида, а также сброс значения времени, когда он добавлен в очередь, чтобы при следующем кроне данная сущность опять была обнаружена в aggregator_cron() и очередь была собрана заново.

Таким образом, обновление фидов производится полностью в автоматическом режиме и в фоне, при выполнении крон операций.

Пример №2. media_entity_thumbnail

Данный плагин занимается обработкой очереди media_entity_thumbnail, и отвечает за обработку превьюшек для медиа элементов.

Возьмем для примера медиа тип Remote Video, который позволяет добавлять на сайт YouTube и Vimeo видео. По умолчанию опция отложенной загрузки превьюшек отключена, но вы можете включить её в редактировании данного типа медиа сущности, установив галочку "Queue thumbnail downloads".

Установив её, после добавления новой сущности данного типа будет вызван Media::postSave() метод сущности. В данном методе производится проверка, установлена ли данная галочка, и является ли сущность новой. Если условия удовлетворены, в очередь media_entity_thumbnail будет добавлен id данной сущности на обработку в видео массива: ['id' => $translation->id()].

При следующем вызове крон операций будет вызван одноименный плагин.

core/modules/media/src/Plugin/QueueWorker/ThumbnailDownloader.php
<?php

namespace Drupal\media\Plugin\QueueWorker;

use Drupal\Core\Entity\EntityTypeManagerInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
 * Process a queue of media items to fetch their thumbnails.
 *
 * @QueueWorker(
 *   id = "media_entity_thumbnail",
 *   title = @Translation("Thumbnail downloader"),
 *   cron = {"time" = 60}
 * )
 */
class ThumbnailDownloader extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  /**
   * The entity type manager service.
   *
   * @var \Drupal\Core\Entity\EntityTypeManagerInterface
   */
  protected $entityTypeManager;

  /**
   * Constructs a new class instance.
   *
   * @param array $configuration
   *   A configuration array containing information about the plugin instance.
   * @param string $plugin_id
   *   The plugin_id for the plugin instance.
   * @param mixed $plugin_definition
   *   The plugin implementation definition.
   * @param \Drupal\Core\Entity\EntityTypeManagerInterface $entity_type_manager
   *   Entity type manager service.
   */
  public function __construct(array $configuration, $plugin_id, $plugin_definition, EntityTypeManagerInterface $entity_type_manager) {
    parent::__construct($configuration, $plugin_id, $plugin_definition);
    $this->entityTypeManager = $entity_type_manager;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $configuration,
      $plugin_id,
      $plugin_definition,
      $container->get('entity_type.manager')
    );
  }

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    /** @var \Drupal\media\Entity\Media $media */
    if ($media = $this->entityTypeManager->getStorage('media')->load($data['id'])) {
      $media->updateQueuedThumbnail();
      $media->save();
    }
  }

}

Пройдемся по нему сверху-вниз:

  • Первым делом смотрим на аннотацию, где видем, что будет обрабатываться очередь media_entity_thumbnail, а также, что времени на выполнение данной очереди будет выделяться 60 секунд.
  • В processItem() методе мы видим, что первым делом пытается загрузиться медиа сущность с id из очереди. Если она загрузилась, то вызывается её метод updateQueuedThumbnail(), который, в свою очередь, попробует загрузить файл изображения для первью, а затем осхранит сущность.

Таким образом, во время крон операций будут обновляться превьюшки для новых медиа сущностей.

Также вы можете обратить внимание на то, что данный пример более комплексный из-за Dependency Injection сервисов.

Заключение

Данный тип плагина может помочь при обработке очередей по крону, вам не придется писать свой процесс обработки, вам достаточно создать плагин и описать в нем процесс обработки элемента очереди, а всю остальную работу возьмет на себя ядро.

Это очень полезнный плагин для обработки неопределенного кол-ва данных, в неопределенное время. На нем можно спокойно реализовать массовые рассылки, обновления содержимого (различные импорты) и т.д. Круг применения у него огромный и по большей части зависит от того, подходят вам очереди или нет. И если очереди вам подходят для решения задачи, и вы не хотите использовать Batch API, который требует участия юзера, то данный плагин поможет вам поставить всё на автоматическую обработку при помощи стандартных инструментов.

Drupal
Drupal 8
Plugin API
Queue API
Batch API

Комментарии

Evgeny   ср, 30/10/2019 - 15:34

Спасибо за материал Есть один вопросик, если в очереди что-то есть, или например не успело все обработаться за 60 секунд указанных, то следующее выполнение начнется в момент запуска крона?

Niklan   пт, 01/11/2019 - 09:30

Там зависит от сервера.

Если на сервере лимит на выполнение (max_execution_time) больше чем лимит у операции плагина то: Если началась выполняться операции на 59 секунде, а лимит стоит 60 (у плагина), то задача выполнится, а следующая отложится уже на будущий крон.

Если на сервер лимит на выполнение равен лимиту у плагина или меньше, это приведет к "жесткому" завершению операции на уровне процесса. В таком случае, задача, которая в данный момент выполнялась, выполнится заново при следующем кроне.

Murz   ср, 25/11/2020 - 09:15

А если не использовать данный QueueWorker, то каким образом обычно реализуется обработка очередей? Своя функция в hook_cron(), которая ручками берёт из очереди элемент, лочит, обрабатывает, и удаляет при успехе? Или есть ещё какие-то способы?

Просто мне необходимо реализовать параллельную обработку очереди, нашёл модуль для этого: https://www.drupal.org/project/parallel_queue но с ним QueueWorker почему-то валится с вот такой ошибкой: https://www.drupal.org/project/parallel_queue/issues/3184570

Niklan   ср, 25/11/2020 - 14:19

При помощи сервиса queue, вот тут об этом есть: https://niklan.net/blog/79

Если коротко:

$queue = $queue = \Drupal::queue('QUEUE_NAME');
while ($item = $queue->claimItem()) {
  // Do something with $item.
  $queue->deleteItem($item);
}
Антон Банников   чт, 30/09/2021 - 14:21

Привет. Подскажи одну вещь. Вокер (элементы очереди) выполняется последовательно, или параллельно по умолчанию? Просто нам нужно строить сложный отчет. Он для каждого элемента очереди загружает отчет из БД, производит вычисления и сохраняет все обратно. И у нас ошибки лезут, особенно когда cron очередь обрабатывает

Niklan   чт, 30/09/2021 - 15:17

Они выполняются так, как диктует очередь, из которой они получаются обработчиком. То есть, это не решает обработчик, это решает очередь. Обработчик делает запрос элемента из очереди — что дали, с тем и работает.

По умолчанию очередь на ::claimItem() получает ближайший элемент очереди и блокирует его, затем кто-то что-то с ним делает, и либо удалит из очереди, в случае успешной обработки, либо отправляет на повторную обработку, либо происходит ошибка и этот элемент останется заблокированным на какой-то период, после чего разблокируется и снова станет доступен. Так вот очередь отдаёт ближайший не заблокированный элемент.

Иными словами, дефолтная очередь не гарантирует однопоточность и последовательность обработки данных. Выходит, что если организовать запуск обработки очереди одновременно в разных процессах — то очередь будет обрабатываться многопоточно, но обрабатывать они будут разные элементы.

Я предложу следующие решения (ведь они будут индивидуальны):

  • Отключить для обработчика работу в кроне, запускать исключительно в предсказуемом сценарии, например системным кроном при помощи drush queue:run NAME. Отключается просто — из аннотации обработчика нужно убрать параметр cron, тогда такой обработчик и не будет запускаться на регулярные операции запускаемые drush cron, /cron/{key} и вызовом из админки, но сработает на прямой вызов drush queue:run NAME.
  • Задекорировать нужную очередь и написать свою логику, как получаются элементы. Вот там как раз можно сделать чтобы они выполнялись строго последовательно, потому что процесс добавления в очередь и получение из очереди будет написан вами.