pgq

Организация монопольных очередей сообщений.
Связанные компоненты: pgqd.
Схема размещения: pgq.

Описание

Модуль предназначен для организации очередей сообщений и состоит из непосредственно расширения pgq для postgres и демона pgqd.
При организации очередей следует избегать большого количества отдельных сообщений меньшего объема в пользу меньшего количества более объемных сообщений. Таблицы сообщений не содержат дополнительных индексов, позволяющих фильтровать условия видимости сообщений, и ориентированы, прежде всего, на скорость вставки. Компенсирующей мерой является штатное разделение таблицы очередей на набор наследуемых таблиц (INHERIT TABLE), регламентные операции по каждой из которых разделены по времени. По умолчанию, количество наследуемых таблиц равно 3, может быть скорректировано пользователем до создания очереди.
Функции:
ФункцияВходные параметрыВозвращаемое значениеОписание
create_queue
textintСоздает новую очередь с заданным именем
drop_queue
text, boolintegerУдаляет очередь и все связанные с ней таблицы
drop_queue
textintegerУдаляет очередь и все связанные с ней таблицы, при этом ни один из процессов не должен зависеть от очереди
set_queue_config
text, text, textintegerЗадает конфигурацию для конкретной очереди (возвращает 0, если событие уже в очереди, в противном случае возвращает 1)
insert_event
text, text, textbigintДобавляет событие в очередь
insert_event
text, text, text, text, text, text, textbigintДобавляет в очередь событие со всеми дополнительными полями
current_event_table
texttextВозвращает активное событие таблицы для конкретной очереди. Событие может быть добавлено в нее через функцию, например, при вызове COPY
register_consumer
text, text-Подписывает «потребителя» (consumer) на очередь, после чего он будет видеть все события в очереди
register_consumer
text, text, bigintintegerРасширяет регистрацию возможностью задать tick_id
register_consumer_at
text, text, bigintintegerИспользуется для отложенной регистрации
unregister_consumer
text, textintegerИсключает «потребителя» (consumer) из очереди
next_batch_info
text, text, int8, int8, int8, timestamptz, timestamptz, int8, int8int8Переходит к следующему элементу очереди, NULL при отсутствии
next_batch
text, textint8Возвращает идентификатор элемента очереди, NULL при отсутствии
next_batch_custom
text, text, interval, int4, interval, int8, int8, int8, timestamptz, timestamptz, int8, int8int8Возвращает идентификатор элемента очереди
get_batch_events
bigint, bigint, timestamptz, bigint, int4, text, text, text, text, text, textsetof recordВозвращает список всех элементов очереди
get_batch_cursor
bigint, text, int4, text, bigint, timestamptz, bigint, int4, text, text, text, text, text, textsetof recordВозвращает список событий
event_retry
bigint, bigint, timestamptzintegerПомещает событие в очередь для повторного вызова в дальнейшем
finish_batch
bigintintegerЗакрывает блок сообщений
get_queue_info
text, integer, integer, interval, timestamptz, boolean, boolean, integer, interval, interval, interval, float8, bigint, bigintsetof recordВыводит информацию о всех очередях
get_consumer_info
text, text, interval, bigint, bigint, bigint, bigintsetof recordВыводит информацию о «потребителях» (consumer) во всех очередях

Использование модуля

Примеры использования модуля:
  • создание очереди:
    select * from pgq.create_queue({имя очереди} text);
  • добавление в очередь сообщения:
    select * from pgq.insert_event({имя очереди} text, {тип события} text, {информация о событии} text);
  • создание «потребителя» (consumer):
    select * from pgq.register_consumer({имя очереди} text, {имя консьюмера} text);
    Потребитель будет получать события только после собственной регистрации, поэтому возможность создавать события требует первичной «регистрации» потребителя.
  • получение идентификатора (ID) блока последовательных сообщений в очереди:
    select * from pgq.next_batch({имя очереди} text, {имя консьюмера} text);
  • получение всех событий блока последовательных сообщений:
    select * from pgq.get_batch_events({id бача} bigint);
    Возвращенное значение может быть пустым.
  • повторное помещение события в очередь:
    select * from pgq.event_retry({id бача} bigint, {id события} bigint, {количество секунд до повторной попытки выполнить действие} integer);
  • приостановление очереди на прием:
    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','true');
    
  • возобновление работы очереди сообщений:
    SET ROLE tuz_pgq_admin;
    SELECT * FROM pgq.set_queue_config('<queue_name>','queue_ticker_paused','false');
    

Ссылки на документацию разработчика

Дополнительную информацию по поставляемому модулю pgq можно получить по ссылке.
Предыдущий раздел
pgq_coop
Следующий раздел
pg_repack
Была ли страница полезной?