[ English | English (United Kingdom) | русский | Deutsch ]

Конфигурация обмена сообщениями

В этом разделе представлен обзор концепций развертывания гибридных сообщений и описаны необходимые шаги для рабочего развертывания OpenStack-Ansible (OSA), в котором RPC-коммуникации и сообщения Notify разделены и интегрированы с различными серверными частями обмена сообщениями.

Библиотека oslo.messaging

Библиотека oslo.messaging является частью проекта OpenStack Oslo, который предоставляет возможности обмена сообщениями между сервисами. Библиотека поддерживает два паттерна взаимодействия (RPC и уведомления) и предоставляет абстракцию, которая прячет детали процесса обмена сообщениями от сервисов OpenStack.

Уведомления

Уведомления это асинхронный обмен сообщениями от уведомителя к получателю. Передаваемые сообщения обычно содержат информацию об обновлениях или возникновении событий, которые публикуются сервисом OpenStack. Получатель не должен присутствовать, когда сообщение отправляется так как сообщения уведомлений временно отделены. Такое отделение между уведомителем и получателем требует от сервиса обмена хранения сообщений, такого как очередь брокера или хранилище логов. Стоит упомянуть, что передача сообщения от уведомителя получателю однонаправленная и направление сообщений обратно к уведомителю отсутствует.

RPC

RPC предназначался для синхронного обмена между клиентом и сервером, который временно ограничен. Передаваемая информация обычно отвечает паттерну запроса-ответа для вызова команд сервиса. Если сервер отсутствует в момент выполнения команды, вызов должен завершиться ошибкой. Временная связь требует, что бы сервер обмена сообщениями имел поддержку двунаправленной передачи запроса от вызывающего к серверу и связного ответа от сервера обратно к вызывающему. Это требование может быть удовлетворено очередью брокера или прямым обменом сообщениями сервера.

Транспорт для обмена сообщениями

Библиотека oslo.messaging поддерживает возможность transport plugin , так что RPC и уведомления могут быть разъединены и для них могут быть развернуты различные серверы для обмена сообщениями.

Драйверы oslo.messaging предоставляют интеграцию транспорта для выбранного протокола и сервера. Следующая таблица резюмирует поддерживаемые драйверы oslo.messaging и сервисы взаимодействия, которые они поддерживают.

+----------------+-----------+-----------+-----+--------+-----------+
| Oslo.Messaging | Transport |  Backend  | RPC | Notify | Messaging |
|     Driver     | Protocol  |  Server   |     |        |   Type    |
+================+===========+===========+=====+========+===========+
|     rabbit     | AMQP V0.9 | rabbitmq  | yes |   yes  |   queue   |
+----------------+-----------+-----------+-----+--------+-----------+
|     kafka      |  kafka    |  kafka    |     |   yes  |   queue   |
| (experimental) |  binary   |           |     |        |  (stream) |
+----------------+-----------+-----------+-----+--------+-----------+

Стандартная установка rabbitmq сервера

Единый бекэнд rabbitmq сервера (сервер или кластер серверов) это стандартная конфигурация развертывания OSA. Данный брокер обмена сообщениями предоставляет сервисы очередей как для RPC, так и для обмена уведомлениями при помощи интеграции с rabbit драйвером oslo.messaging. Файл oslo-messaging.yml предоставляет стандартную настройку для связи oslo.messaging сервисов RPC и уведомлений с rabbitmq сервером.

# Quorum Queues
oslomsg_rabbit_quorum_queues: "{{ rabbitmq_queue_replication }}"

# NOTE(noonedeadpunk): Disabled due to missing oslo.concurrency lock_path defenition
#                      for services
oslomsg_rabbit_queue_manager: False

# RPC
oslomsg_rpc_transport: 'rabbit'
oslomsg_rpc_port: "{{ rabbitmq_port }}"
oslomsg_rpc_servers: "{{ rabbitmq_servers }}"
oslomsg_rpc_use_ssl: "{{ rabbitmq_use_ssl }}"
oslomsg_rpc_host_group: "{{ rabbitmq_host_group }}"
oslomsg_rpc_policies: "{{ rabbitmq_policies }}"

# Notify
oslomsg_notify_transport: "{{ (groups[rabbitmq_host_group] | length > 0) | ternary('rabbit', 'none') }}"
oslomsg_notify_port: "{{ rabbitmq_port }}"
oslomsg_notify_servers: "{{ rabbitmq_servers }}"
oslomsg_notify_use_ssl: "{{ rabbitmq_use_ssl }}"
oslomsg_notify_host_group: "{{ rabbitmq_host_group }}"
oslomsg_notify_policies: "{{ rabbitmq_policies }}"

Управление политикой потока RabbitMQ

При развертывании RabbitMQ с поддержкой очередей кворума и потоков, поведение хранения сообщений меняется. Потоковые очереди поддерживают журнал только для добавления на диск всех полученных сообщений до тех пор, пока политика хранения не укажет, что их следует удалить. По умолчанию эта политика установлена ​​с x-max-age для каждого потока в 1800 секунд. Однако, как отмечено в RabbitMQ docs, это вступает в силу только тогда, когда поток накопил достаточно сообщений для заполнения сегмента, размер которого по умолчанию составляет 500 МБ.

Если вы хотите сократить использование диска, можно применить дополнительную политику через OpenStack Ansible, как показано ниже:

rabbitmq_policies:
  - name: CQv2
    pattern: '.*'
    priority: 0
    tags:
      queue-version: 2
    state: >-
      {{
        ((oslomsg_rabbit_quorum_queues | default(True) or not rabbitmq_queue_replication) and rabbitmq_install_method | default('') != 'distro'
          ) | ternary('present', 'absent')
      }}
# The following is an example of an additional policy which applies to fanout/stream queues only
# By default, each stream uses RabbitMQ's 500MB segment size, and no messages will be discarded
# until that size is reached. This may result in undesirable disk usage.
# If using this policy, it must be applied BEFORE any stream queues are created.
# See also https://bugs.launchpad.net/oslo.messaging/+bug/2089845 and https://www.rabbitmq.com/docs/streams#retention
#  - name: CQv2F
#    pattern: '^.*_fanout'
#    priority: 1
#    tags:
#      queue-version: 2
#      stream-max-segment-size-bytes: 1000000
#    state: >-
#      {{
#        ((oslomsg_rabbit_quorum_queues | default(True) or not rabbitmq_queue_replication) and rabbitmq_install_method | default('') != 'distro'
#          ) | ternary('present', 'absent')
#      }}

Однако следует отметить, что эта политика будет применяться только в том случае, если она установлена ​​до создания очередей потоков. Если они уже существуют, их необходимо вручную удалить и заново создать с помощью соответствующей службы OpenStack.

Эта проблема отслеживается в oslo.messaging bug.