Руководство по Drupal 8 Queue API и Cron Queueing

Перевод
Автор оригинала Daniel Sipos
Оригинал Drupal 8 Queue API – Powerful Manual and Cron Queueing
Руководство по Drupal 8 Queue API и Cron Queueing

Queue API в Drupal позволяет нам обрабатывать несколько задач в порядке очереди. Это означает, что мы можем поместить элементы в очередь, и каждый отдельный элемент в очереди будет обрабатываться отдельно, через какой-то промежуток времени. Как правило такие очереди запускаются с помощью CRON, и Drupal 8 позволяет нам быстро устанавливать очереди на основе cronjob. Хотя, очереди не обязательно запускать через CRON.

В этой статье мы рассмотрим использование Queue API в Drupal 8, на двух простых примерах. В первом примере мы посмотрим, как очередь можно вызвать через CRON, а во втором вызовем очередь вручную. Но в обоих примерах, фактически обработчик будет один. Модуль, который мы напишем в этой статье, находится в этом Git-репозитории.

Модуль, с которым мы будем работать называется Node Publisher Queue. Он автоматически добавляет в очередь на публикацию новые материалы, у которых при сохранении статус публикации стоял "Не опубликовано", и позже их публикует. Для начала нам нужно разобраться в некоторых основных понятиях, связанных с очередями в Drupal 8.

Теория

Есть несколько основных компонентов из которых состоит Queue API в Drupal 8.

Наиболее важную роль в этом API играет реализация QueueInterface, которая собственно и представляет из себя очередь. DatabaseQueue - это тип очереди, который присутствует в Drupal 8 в настоящее время по умолчанию. Он является типом надежной очереди, которая гарантирует, что все ее элементы будут обработаны по порядку, по крайней мере один раз (FIFO).

Типичная роль объекта очереди, это создавать элементы, затем запрашивать их из очереди и удалять их после того как они будут обработаны. Кроме того, он может освобождать элементы, если обработка не закончена, или другой worker'а должен обработать их снова перед удалением.

Реализация QueueInterface создается с помощью более общего QueueFactory. В случае с DatabaseQueue нужно использовать DatabaseQueueFactory. Перед тем как использовать очереди, их нужно создать. В нашем случае DatabaseQueue был создан при установке Drupal, и не требует никаких дополнительных настроек.

Queue Worker'ы отвечают за обработку элементов очереди. В Drupal 8, это QueueWorker плагины, которые реализуют QueueWorkerInterface. Используя QueueWorkerManager, мы создаем объекты таких плагинов и обрабатываем элементы, при каждом запуске очереди.

Node Publish Queue модуль

Сейчас, когда мы рассмотрели базовые концепции Queue API в Drupal 8, давайте все-таки запачкаем наши с вами руки и реализуем все то, что написано в теории. Наш npq.info.yml файл будет очень простым:

name: Node Publish Queue
description: Demo module illustrating the Queue API in Drupal 8
core: 8.x
type: module

Создание элементов очереди

В файле npq.module мы позаботимся о логике создания элементов очереди всякий раз, когда материал сохранен, но у него стоит галочка "Опубликовано".

use Drupal\Core\Entity\EntityInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;

/**
 * Implements hook_entity_insert().
 */
function npq_entity_insert(EntityInterface $entity) {
  if ($entity->getEntityTypeId() !== 'node') {
    return;
  }

  if ($entity->isPublished()) {
    return;
  }

  /** @var QueueFactory $queue_factory */
  $queue_factory = \Drupal::service('queue');
  /** @var QueueInterface $queue */
  $queue = $queue_factory->get('cron_node_publisher');
  $item = new \stdClass();
  $item->nid = $entity->id();
  $queue->createItem($item);
}

Внутри hook'a hook_entity_insert(), мы выполняем одну очень простую задачу. Для начала мы получаем объект QueueInterface из сервис-контейнера и используем его для получения очереди под названием cron_node_publisher. Если углубиться, то можно увидеть, что метод get(), просто создает новый объект DatabaseQueue  с именем, которое мы ему передали.

И наконец, мы создаем небольшой PHP-объект, содержащий в себе идентификатор материла и создаем элемент очереди с этими данными. Все просто.

CRON queue worker

Далее, давайте создадим QueueWorker плагин, который будет обрабатывать элементы очереди, при запуске Cron. Так как мы знаем, что в дальнейшем возможно будем запускать эту самую очередь вручную, мы можем вынести большую часть логики в базовый, абстрактный класс. Внутри папки src/Plugin/QueueWorker мы создадим класс NodePublishBase, со следующим содержанием:

/**
 * @file
 * Contains Drupal\npq\Plugin\QueueWorker\NodePublishBase.php
 */

namespace Drupal\npq\Plugin\QueueWorker;

use Drupal\Core\Entity\EntityStorageInterface;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\node\NodeInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;


/**
 * Provides base functionality for the NodePublish Queue Workers.
 */
abstract class NodePublishBase extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  /**
   * The node storage.
   *
   * @var \Drupal\Core\Entity\EntityStorageInterface
   */
  protected $nodeStorage;

  /**
   * Creates a new NodePublishBase object.
   *
   * @param \Drupal\Core\Entity\EntityStorageInterface $node_storage
   *   The node storage.
   */
  public function __construct(EntityStorageInterface $node_storage) {
    $this->nodeStorage = $node_storage;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $container->get('entity.manager')->getStorage('node')
    );
  }

  /**
   * Publishes a node.
   *
   * @param NodeInterface $node
   * @return int
   */
  protected function publishNode($node) {
    $node->setPublished(TRUE);
    return $node->save();
  }

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    /** @var NodeInterface $node */
    $node = $this->nodeStorage->load($data->nid);
    if (!$node->isPublished() && $node instanceof NodeInterface) {
      return $this->publishNode($node);
    }
  }
}

Здесь видно, что мы используем Dependency Injection для внедрения NodeStorage в наш класс. За большей информацией о Dependency Injection и Service Container вы можете обратиться к этой статье.

В этом базовом классе у нас есть два метода: publishNode() и обязательный processItem(). Первый публикует и сохраняет материалы, которые ему передаются. Второй загружает материалы используя идентификатор, который содержится в объекте $data и публикует их, если они не опубликованы.

Теперь давайте создадим CronNodePublisher плагин, который будет реализовывать логику обработки очереди при запуске CRON:

namespace Drupal\npq\Plugin\QueueWorker;

/**
 * A Node Publisher that publishes nodes on CRON run.
 *
 * @QueueWorker(
 *   id = "cron_node_publisher",
 *   title = @Translation("Cron Node Publisher"),
 *   cron = {"time" = 10}
 * )
 */
class CronNodePublisher extends NodePublishBase {}

Это весь класс. Все что нам нужно уже есть в базовом классе. Обратите внимание на то, что в аннотации, мы говорим Drupal'у, что этот worker должен использовать Cron для обработки стольких элементов, сколько он может обработать в течении 10 секунд. Что это значит?

При каждом запуске Cron'a, он использует QueueManager для загрузки всех объявленных плагинов. Если любой из загруженных плагинов в аннотации содержит ключ Cron, очереди с таким же именем как, и идентификатор worker'a загружаются для обработки. И наконец, каждый элемент в очереди обрабатывается, пока не закончится выделенное (в аннотации) время.

Если теперь сохранить, неопубликованный материал, то он автоматически опубликуется при первом запуске CRON.

Ручной worker

Теперь давайте сделаем возможным ручной запуск обработки очереди. Во-первых нам нужно изменить реализацию hook_entity_insert() и изменить следующую строку:

$queue = $queue_factory->get('cron_node_publisher');

на эту:

$queue = $queue_factory->get('manual_node_publisher');

Конечно, при желании можно сделать страницу настроек, где администратор сам сможет выбирать какой тип обработки должно использовать приложение.

Во-вторых, создадим ManualNodePublisher плагин:

namespace Drupal\npq\Plugin\QueueWorker;

/**
 * A Node Publisher that publishes nodes via a manual action triggered by an admin.
 *
 * @QueueWorker(
 *   id = "manual_node_publisher",
 *   title = @Translation("Manual Node Publisher"),
 * )
 */
class ManualNodePublisher extends NodePublishBase {}

Почти один в один, как и плагин для CRON, только без ключа Cron в аннотации.

В-третьих, создадим форму, на которой мы сможем видеть сколько элементов сейчас находится в очереди, а при нажатии на кнопку будем их обрабатывать. В корневой папке модуля создаем файл npq.routing.yml и пишем следующее:

demo.form:
  path: '/npq'
  defaults:
    _form: '\Drupal\npq\Form\NodePublisherQueueForm'
    _title: 'Node Publisher'
  requirements:
    _permission: 'administer site configuration'

Здесь мы определили путь /npq, который привязан к форме с определенным пространством имен. Теперь нужно создать саму форму. Для этого в папке src/Form создаем файл NodePublisherQueueForm с таким содержанием:

/**
 * @file
 * Contains \Drupal\npq\Form\NodePublisherQueueForm.
 */

namespace Drupal\npq\Form;

use Drupal\Core\Form\FormBase;
use Drupal\Core\Form\FormStateInterface;
use Drupal\Core\Queue\QueueFactory;
use Drupal\Core\Queue\QueueInterface;
use Drupal\Core\Queue\QueueWorkerInterface;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\Core\Queue\SuspendQueueException;
use Symfony\Component\DependencyInjection\ContainerInterface;

class NodePublisherQueueForm extends FormBase {

  /**
   * @var QueueFactory
   */
  protected $queueFactory;

  /**
   * @var QueueWorkerManagerInterface
   */
  protected $queueManager;


  /**
   * {@inheritdoc}
   */
  public function __construct(QueueFactory $queue, QueueWorkerManagerInterface $queue_manager) {
    $this->queueFactory = $queue;
    $this->queueManager = $queue_manager;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container) {
    return new static(
      $container->get('queue'),
      $container->get('plugin.manager.queue_worker')
    );
  }
  
  /**
   * {@inheritdoc}.
   */
  public function getFormId() {
    return 'demo_form';
  }
  
  /**
   * {@inheritdoc}.
   */
  public function buildForm(array $form, FormStateInterface $form_state) {
    /** @var QueueInterface $queue */
    $queue = $this->queueFactory->get('node_publisher');

    $form['help'] = array(
      '#type' => 'markup',
      '#markup' => $this->t('Submitting this form will process the Manual Queue which contains @number items.', array('@number' => $queue->numberOfItems())),
    );
    $form['actions']['#type'] = 'actions';
    $form['actions']['submit'] = array(
      '#type' => 'submit',
      '#value' => $this->t('Process queue'),
      '#button_type' => 'primary',
    );
    
    return $form;
  }
  
  /**
   * {@inheritdoc}
   */
  public function submitForm(array &$form, FormStateInterface $form_state) {
    /** @var QueueInterface $queue */
    $queue = $this->queueFactory->get('manual_node_publisher');
    /** @var QueueWorkerInterface $queue_worker */
    $queue_worker = $this->queueManager->createInstance('manual_node_publisher');

    while($item = $queue->claimItem()) {
      try {
        $queue_worker->processItem($item->data);
        $queue->deleteItem($item);
      }
      catch (SuspendQueueException $e) {
        $queue->releaseItem($item);
        break;
      }
      catch (\Exception $e) {
        watchdog_exception('npq', $e);
      }
    }
  }
}

Мы снова используем Dependency Injection для введения QueueFactory и менеджера в QueueWorker плагины. Внутри метода buildForm() мы создаем структуру основной формы и используем метод очереди nubmerOfItems() чтобы сказать пользователю сколько элементов приготовлено для обработки. Но как мы это делаем?

Во-первых, мы загружаем очередь и созданный объект worker'а (в обоих случаях мы используем идентификатор manual_node_publisher). После этого мы запускаем цикл while пока все элементы не будут обработаны. Метод claimItem() отвечает за блокировку элемента очереди, и возвращает его для обработки. После того как полученный элемент обработан worker'ом, он удаляется из очереди. В следующей итерации берется следующий элемент из очереди, и над ним выполняются все те же действия. И так пока все элементы не закончатся.

Теперь, если мы создадим несколько материалов и не станем их при создании публиковать, то на странице /npq мы сможем увидеть количество таких материалов. После отправки формы, все материалы опубликуются.

Важно учитывать, что это был всего лишь пример для того чтобы показать, как работает Queue API. Всегда нужно учитывать потенциальную нагрузку при обработке большого количества элементов, делать ограничение, тайм-аут или использовать Batch API, чтобы разделить их на несколько итераций.

Заключение

В этой статье мы рассмотрели Queue API в Drupal 8. Мы изучили основные понятия, связанные с тем как устроена очередь и как она работает, а также рассмотрели некоторые примеры. А если быть точнее, то мы поигрались с двумя вариантами использования, с помощью которых мы можем публиковать материалы с помощью Cron или с помощью ручного вызова.

Комментарии

# 30 апреля 2016 00:48 kalabro
Почему источник не указан? http://www.sitepoint.com/drupal-8-queue-api-powerful-manual-and-cron-queueing/
# 3 май 2016 21:11 Антон Запевалов
Проглядел. Добавил)
# 13 сентября 2016 19:34 Punk_UnDeaD
if (!$node->isPublished() && $node instanceof NodeInterface) только наоборот условия сначала проверка на нодность, потом на опубликованность
Для того чтобы оставлять комментарии, вам необходимо авторизоваться. Вход