pgq
Организация монопольных очередей сообщений.
Связанные компоненты:
pgqd
.Схема размещения:
pgq
.Модуль предназначен для организации очередей сообщений и состоит из непосредственно расширения
pgq
для postgres и демона pgqd
.При организации очередей следует избегать большого количества отдельных сообщений меньшего объема в пользу меньшего количества более объемных сообщений. Таблицы сообщений не содержат дополнительных индексов, позволяющих фильтровать условия видимости сообщений, и ориентированы, прежде всего, на скорость вставки. Компенсирующей мерой является штатное разделение таблицы очередей на набор наследуемых таблиц (
INHERIT TABLE
), регламентные операции по каждой из которых разделены по времени. По умолчанию, количество наследуемых таблиц равно 3, может быть скорректировано пользователем до создания очереди.Функции:
Функция | Входные параметры | Возвращаемое значение | Описание |
---|---|---|---|
create_queue | text | int | Создает новую очередь с заданным именем |
drop_queue | text, bool | integer | Удаляет очередь и все связанные с ней таблицы |
drop_queue | text | integer | Удаляет очередь и все связанные с ней таблицы, при этом ни один из процессов не должен зависеть от очереди |
set_queue_config | text, text, text | integer | Задает конфигурацию для конкретной очереди (возвращает 0, если событие уже в очереди, в противном случае возвращает 1) |
insert_event | text, text, text | bigint | Добавляет событие в очередь |
insert_event | text, text, text, text, text, text, text | bigint | Добавляет в очередь событие со всеми дополнительными полями |
current_event_table | text | text | Возвращает активное событие таблицы для конкретной очереди. Событие может быть добавлено в нее через функцию, например, при вызове COPY |
register_consumer | text, text | - | Подписывает «потребителя» (consumer) на очередь, после чего он будет видеть все события в очереди |
register_consumer | text, text, bigint | integer | Расширяет регистрацию возможностью задать tick_id |
register_consumer_at | text, text, bigint | integer | Используется для отложенной регистрации |
unregister_consumer | text, text | integer | Исключает «потребителя» (consumer) из очереди |
next_batch_info | text, text, int8, int8, int8, timestamptz, timestamptz, int8, int8 | int8 | Переходит к следующему элементу очереди, NULL при отсутствии |
next_batch | text, text | int8 | Возвращает идентификатор элемента очереди, NULL при отсутствии |
next_batch_custom | text, text, interval, int4, interval, int8, int8, int8, timestamptz, timestamptz, int8, int8 | int8 | Возвращает идентификатор элемента очереди |
get_batch_events | bigint, bigint, timestamptz, bigint, int4, text, text, text, text, text, text | setof record | Возвращает список всех элементов очереди |
get_batch_cursor | bigint, text, int4, text, bigint, timestamptz, bigint, int4, text, text, text, text, text, text | setof record | Возвращает список событий |
event_retry | bigint, bigint, timestamptz | integer | Помещает событие в очередь для повторного вызова в дальнейшем |
finish_batch | bigint | integer | Закрывает блок сообщений |
get_queue_info | text, integer, integer, interval, timestamptz, boolean, boolean, integer, interval, interval, interval, float8, bigint, bigint | setof record | Выводит информацию о всех очередях |
get_consumer_info | text, text, interval, bigint, bigint, bigint, bigint | setof record | Выводит информацию о «потребителях» (consumer) во всех очередях |
Примеры использования модуля:
-
создание очереди:
select * from pgq.create_queue({имя очереди} text);
-
добавление в очередь сообщения:
select * from pgq.insert_event({имя очереди} text, {тип события} text, {информация о событии} text);
-
создание «потребителя» (consumer):
select * from pgq.register_consumer({имя очереди} text, {имя консьюмера} text);
Потребитель будет получать события только после собственной регистрации, поэтому возможность создавать события требует первичной «регистрации» потребителя. -
получение идентификатора (
ID
) блока последовательных сообщений в очереди:select * from pgq.next_batch({имя очереди} text, {имя консьюмера} text);
-
получение всех событий блока последовательных сообщений:
select * from pgq.get_batch_events({id бача} bigint);
Возвращенное значение может быть пустым. -
повторное помещение события в очередь:
select * from pgq.event_retry({id бача} bigint, {id события} bigint, {количество секунд до повторной попытки выполнить действие} integer);
-
приостановление очереди на прием:
SET ROLE tuz_pgq_admin; SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','true');
-
возобновление работы очереди сообщений:
SET ROLE tuz_pgq_admin; SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','false');
Дополнительную информацию по поставляемому модулю pgq можно получить по ссылке.