Руководство по Drupal 8 Queue API и Cron Queueing

Queue API в Drupal позволяет нам обрабатывать несколько задач в порядке очереди. Это означает, что мы можем поместить элементы в очередь, и каждый отдельный элемент в очереди будет обрабатываться отдельно, через какой-то промежуток времени. Как правило такие очереди запускаются с помощью CRON, и Drupal 8 позволяет нам быстро устанавливать очереди на основе cronjob. Хотя, очереди не обязательно запускать через CRON.

В этой статье мы рассмотрим использование Queue API в Drupal 8, на двух простых примерах. В первом примере мы посмотрим, как очередь можно вызвать через CRON, а во втором вызовем очередь вручную. Но в обоих примерах, фактически обработчик будет один. Модуль, который мы напишем в этой статье, находится в этом Git-репозитории.

Модуль, с которым мы будем работать называется Node Publisher Queue. Он автоматически добавляет в очередь на публикацию новые материалы, у которых при сохранении статус публикации стоял «Не опубликовано», и позже их публикует. Для начала нам нужно разобраться в некоторых основных понятиях, связанных с очередями в Drupal 8.

Теория

Есть несколько основных компонентов из которых состоит Queue API в Drupal 8.

Наиболее важную роль в этом API играет реализация QueueInterface, которая собственно и представляет из себя очередь. DatabaseQueue — это тип очереди, который присутствует в Drupal 8 в настоящее время по умолчанию. Он является типом надежной очереди, которая гарантирует, что все ее элементы будут обработаны по порядку, по крайней мере один раз (FIFO).

Типичная роль объекта очереди, это создавать элементы, затем запрашивать их из очереди и удалять их после того как они будут обработаны. Кроме того, он может освобождать элементы, если обработка не закончена, или другой worker’а должен обработать их снова перед удалением.

Реализация QueueInterface создается с помощью более общего QueueFactory. В случае с DatabaseQueue нужно использовать DatabaseQueueFactory. Перед тем как использовать очереди, их нужно создать. В нашем случае DatabaseQueue был создан при установке Drupal, и не требует никаких дополнительных настроек.

Queue Worker’ы отвечают за обработку элементов очереди. В Drupal 8, это QueueWorker плагины, которые реализуют QueueWorkerInterface. Используя QueueWorkerManager, мы создаем объекты таких плагинов и обрабатываем элементы, при каждом запуске очереди.

Node Publish Queue модуль

Сейчас, когда мы рассмотрели базовые концепции Queue API в Drupal 8, давайте все-таки запачкаем наши с вами руки и реализуем все то, что написано в теории. Наш npq.info.yml файл будет очень простым:

Создание элементов очереди

В файле npq.module мы позаботимся о логике создания элементов очереди всякий раз, когда материал сохранен, но у него стоит галочка «Опубликовано».

Внутри hook’a hook_entity_insert(), мы выполняем одну очень простую задачу. Для начала мы получаем объект QueueInterface из сервис-контейнера и используем его для получения очереди под названием cron_node_publisher. Если углубиться, то можно увидеть, что метод get(), просто создает новый объект DatabaseQueue  с именем, которое мы ему передали.

И наконец, мы создаем небольшой PHP-объект, содержащий в себе идентификатор материла и создаем элемент очереди с этими данными. Все просто.

CRON queue worker

Далее, давайте создадим QueueWorker плагин, который будет обрабатывать элементы очереди, при запуске Cron. Так как мы знаем, что в дальнейшем возможно будем запускать эту самую очередь вручную, мы можем вынести большую часть логики в базовый, абстрактный класс. Внутри папки src/Plugin/QueueWorker мы создадим класс NodePublishBase, со следующим содержанием:

Здесь видно, что мы используем Dependency Injection для внедрения NodeStorage в наш класс. За большей информацией о Dependency Injection и Service Container вы можете обратиться к этой статье.

В этом базовом классе у нас есть два метода: publishNode() и обязательный processItem(). Первый публикует и сохраняет материалы, которые ему передаются. Второй загружает материалы используя идентификатор, который содержится в объекте $data и публикует их, если они не опубликованы.

Теперь давайте создадим CronNodePublisher плагин, который будет реализовывать логику обработки очереди при запуске CRON:

Это весь класс. Все что нам нужно уже есть в базовом классе. Обратите внимание на то, что в аннотации, мы говорим Drupal’у, что этот worker должен использовать Cron для обработки стольких элементов, сколько он может обработать в течении 10 секунд. Что это значит?

При каждом запуске Cron’a, он использует QueueManager для загрузки всех объявленных плагинов. Если любой из загруженных плагинов в аннотации содержит ключ Cron, очереди с таким же именем как, и идентификатор worker’a загружаются для обработки. И наконец, каждый элемент в очереди обрабатывается, пока не закончится выделенное (в аннотации) время.

Если теперь сохранить, неопубликованный материал, то он автоматически опубликуется при первом запуске CRON.

Ручной worker

Теперь давайте сделаем возможным ручной запуск обработки очереди. Во-первых нам нужно изменить реализацию hook_entity_insert() и изменить следующую строку:

на эту:

Конечно, при желании можно сделать страницу настроек, где администратор сам сможет выбирать какой тип обработки должно использовать приложение.

Во-вторых, создадим ManualNodePublisher плагин:

Почти один в один, как и плагин для CRON, только без ключа Cron в аннотации.

В-третьих, создадим форму, на которой мы сможем видеть сколько элементов сейчас находится в очереди, а при нажатии на кнопку будем их обрабатывать. В корневой папке модуля создаем файл npq.routing.yml и пишем следующее:

Здесь мы определили путь /npq, который привязан к форме с определенным пространством имен. Теперь нужно создать саму форму. Для этого в папке src/Form создаем файл NodePublisherQueueForm с таким содержанием:

Мы снова используем Dependency Injection для введения QueueFactory и менеджера в QueueWorker плагины. Внутри метода buildForm() мы создаем структуру основной формы и используем метод очереди nubmerOfItems() чтобы сказать пользователю сколько элементов приготовлено для обработки. Но как мы это делаем?

Во-первых, мы загружаем очередь и созданный объект worker’а (в обоих случаях мы используем идентификатор manual_node_publisher). После этого мы запускаем цикл while пока все элементы не будут обработаны. Метод claimItem() отвечает за блокировку элемента очереди, и возвращает его для обработки. После того как полученный элемент обработан worker’ом, он удаляется из очереди. В следующей итерации берется следующий элемент из очереди, и над ним выполняются все те же действия. И так пока все элементы не закончатся.

Теперь, если мы создадим несколько материалов и не станем их при создании публиковать, то на странице /npq мы сможем увидеть количество таких материалов. После отправки формы, все материалы опубликуются.

Важно учитывать, что это был всего лишь пример для того чтобы показать, как работает Queue API. Всегда нужно учитывать потенциальную нагрузку при обработке большого количества элементов, делать ограничение, тайм-аут или использовать Batch API, чтобы разделить их на несколько итераций.

Заключение

В этой статье мы рассмотрели Queue API в Drupal 8. Мы изучили основные понятия, связанные с тем как устроена очередь и как она работает, а также рассмотрели некоторые примеры. А если быть точнее, то мы поигрались с двумя вариантами использования, с помощью которых мы можем публиковать материалы с помощью Cron или с помощью ручного вызова.

Комментарии

Добавить комментарий