wal2json

Логическая репликация в формате json.
Для реализации возможности логической репликации в формате JSON в состав продукта дополнительно включен плагин вывода для логического декодирования wal2json.
Параметры плагина, задаваемые при подключении к слоту репликации:
  • include-xids - добавить xid к каждому набору изменений. Значение по умолчанию - false;
  • use-xid32 - использовать 32-битный xid в наборе изменений. Значение по умолчанию - false;
  • include-timestamp - добавить временную метку (timestamp) к каждому набору изменений. Значение по умолчанию - false;
  • include-schemas - добавить схему (schema) к каждому изменению. Значение по умолчанию - true;
  • include-types - добавить тип к каждому изменению. Значение по умолчанию - true;
  • include-typmod - добавить модификатор к типам, у которых он есть (например, varchar(20) вместо varchar). Значение по умолчанию - true;
  • include-type-oids - добавить oids типа. Значение по умолчанию - false;
  • include-domain-data-type - заменить доменное имя базовым типом данных. Значение по умолчанию - false;
  • include-column-positions - добавить позицию столбца (pg_attribute.attnum). Значение по умолчанию - false;
  • include-origin - добавить источник фрагмента данных. Значение по умолчанию - false;
  • include-not-null - добавить ненулевую информацию в качестве columnoptionals. Значение по умолчанию - false;
  • include-default - добавить выражение (expression) по умолчанию. Значение по умолчанию - false;
  • include-pk - добавить информацию о primary key в виде pk. Имя столбца и тип данных включены. Значение по умолчанию - false;
  • numeric-data-types-as-string - использовать строку для числовых типов данных. Спецификация JSON не распознает Infinity и NaN как допустимые числовые значения. Для чисел двойной точности могут возникнуть потенциальные проблемы совместимости. Значение по умолчанию - false;
  • pretty-print - добавить пробелы и отступы в структуры JSON. Значение по умолчанию - false;
  • write-in-chunks - запись после каждого изменения вместо каждого набора изменений. Используется только тогда, когда format-version в значении 1. Значение по умолчанию - false; include-lsn - добавить nextlsn к каждому набору изменений (changeset). Значение по умолчанию - false;
  • include-transaction - выдает записи, обозначающие начало и конец каждой транзакции. Значение по умолчанию - true;
  • filter-origins - исключить изменения из указанных источников. Значение по умолчанию пустое, что означает, что источник не будет отфильтрован. Набор значений, разделенных запятой;
  • filter-tables - исключить строки (rows) из указанных таблиц. Значение по умолчанию пустое, что означает, что ни одна таблица не будет отфильтрована. Набор значений, разделенных запятой. Таблицы должны соответствовать требованиям схемы. Имя вида *.foo означает таблицу foo во всех схемах, а имя вида bar.* означает все таблицы в схеме bar. Специальные символы (пробел, одинарная кавычка, запятая, точка, звездочка) должны быть экранированы обратной косой чертой. Схема и таблица чувствительны к регистру;
  • add-tables: включить только строки (rows) из указанных таблиц. По умолчанию используются все таблицы из всех схем. В нем действуют те же правила, что и в filter-tables;
  • filter-msg-prefixes - исключить сообщения, если префикс есть в списке. Значение по умолчанию пустое, что означает, что ни одно сообщение не будет отфильтровано. Задается в виде набора значений, разделенных запятой;
  • add-msg-prefixes - включить только сообщения, если префикс есть в списке. По умолчанию используются все префиксы. Задается в виде набора значений, разделенных запятой. Значение filter-msg-prefixes применяется перед этим параметром;
  • format-version - определяет, какой формат вывода использовать. Значение по умолчанию равно 1;
  • actions - определяет, какие операции будут отправляться. По умолчанию используются все действия (INSERT, UPDATE, DELETE и TRUNCATE). Однако, если используется format-version 1, TRUNCATE не будет включен (для поддержания обратной совместимости).
Плагин имеет доступ к кортежам, созданным с помощью INSERT и UPDATE. Для настройки доступа к UPDATE/DELETE старых версий строк (row) при создании/изменении таблицы задается свойство REPLICA IDENTITY. Изменения могут быть применены с помощью протокола потока изменений (слоты логической репликации) или с помощью специального SQL API.
Поддерживается два формата вывода:
  • объект JSON для каждой транзакции: все новые/старые кортежи доступны в объекте JSON. Есть опции для включения таких свойств, как временная метка транзакции, соответствие схеме, типы данных и идентификаторы транзакций;
  • объект JSON для каждого кортежа: для начала и окончания транзакции объект JSON необязателен; дополнительно представлен набор опций для включения отдельных свойств объектов.
Ниже приведен пример JSON-схемы, генерируемой для операции вставки:
{
   "kind": "insert",
   "schema": "public",
   "table": "test_table",
   "columnnames": ["num", "txt"],
   "columntypes": ["integer", "text"],
   "columnvalues": [1, "A first record in a table"]
}
Для использования плагина требуется уровень WAL >= logical, для этого необходимо задать значение параметра wal_level = 'logical'. После смены значения этого параметра требуется перезапустить сервер Pangolin.

Использование модуля

Для создания слота логической репликации с использованием wal2json необходимо вызвать функцию pg_create_logical_replication_slot:
postgres$ psql -p 6544 \
-c "select pg_create_logical_replication_slot('test_slot1', 'wal2json')"
Для подключения к созданному слоту в отдельном терминале вызывается pg_recvlogical:
pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
В соответствующем терминале будет отображаться мониторинг содержимого слота в формате JSON.
Ниже приведен пример создания тестовых таблиц и их заполнения данными:
CREATE table test01 (num INTEGER PRIMARY KEY, txt TEXT);
CREATE table test02 (num INTEGER, txt TEXT);
INSERT INTO test01 VALUES(1, 'A first record in a table');
INSERT INTO test01 VALUES(2, 'A second record in a table'),(3, 'Third row');
INSERT INTO test02 SELECT * FROM test01;
Пример вывода результата в терминале мониторинга:
Раскрыть type=sql
**CREATE TABLE
{
   "change": [
   ]
}
**CREATE TABLE
{
   "change": [
   ]
}

**INSERT 0 1
{
   "change": [
      {
            "kind": "insert",
            "schema": "public",
            "table": "test01",
            "columnnames": ["num", "txt"],
            "columntypes": ["integer", "text"],
            "columnvalues": [1, "A first record in a table"]
      }
    ]
}

**INSERT 0 2
{
   "change": [
      {
            "kind": "insert",
            "schema": "public",
            "table": "test01",
            "columnnames": ["num", "txt"],
            "columntypes": ["integer", "text"],
            "columnvalues": [2, "A second record in a table"]
      }
      ,{
            "kind": "insert",
            "schema": "public",
            "table": "test01",
            "columnnames": ["num", "txt"],
            "columntypes": ["integer", "text"],
            "columnvalues": [3, "Third row"]
      }
   ]
}

**INSERT 0 3
{
   "change": [
      {
            "kind": "insert",
            "schema": "public",
            "table": "test02",
            "columnnames": ["num", "txt"],
            "columntypes": ["integer", "text"],
            "columnvalues": [1, "A first record in a table"]
      }
      ,{
            "kind": "insert",
            "schema": "public",
            "table": "test02",
            "columnnames": ["num", "txt"],
            "columntypes": ["integer", "text"],
            "columnvalues": [2, "A second record in a table"]
      }
      ,{
            "kind": "insert",
            "schema": "public",
            "table": "test02",
            "columnnames": ["num", "txt"],
            "columntypes": ["integer", "text"],
            "columnvalues": [3, "Third row"]
      }
   ]
}
Содержимое тестовой таблицы test02:
Раскрыть type=sql
num |            txt
----+----------------------------
  1 | A first record in a table
  2 | A second record in a table
  3 | Third row
(3 rows)
В примере далее произведено удаление данных из двух таблиц:
DELETE FROM test01 WHERE num=3;
DELETE FROM test02 WHERE num=3;
Удаление строк таблицы test01 выполнено. Для таблицы test02 было получено предупреждение (WARNING), так как логическая репликация операции невозможна из-за отсутствия ключа.
Пример вывода wal2json:
Раскрыть type=sql
**DELETE 1
{
   "change": [
      {
            "kind": "delete",
            "schema": "public",
            "table": "test01",
            "oldkeys": {
               "keynames": ["num"],
               "keytypes": ["integer"],
               "keyvalues": [3]
            }
      }
   ]
}

**DELETE 1
WARNING:  table "test02" without primary key or replica identity is nothing
{
   "change": [
   ]
}
Аналогичным образом далее рассмотрен сценарий модификации строк в двух таблицах:
UPDATE test01 SET txt='modified' WHERE num=2;
UPDATE test02 SET txt='modified' WHERE num=2;
Обновление строк таблицы test01 выполнено. Для таблицы test02 указан WARNING, так как логическая репликация операции невозможна из-за отсутствия ключа.
Получен следующий вывод:
Раскрыть type=sql
**UPDATE 1
{
   "change": [
      {
            "kind": "update",
            "schema": "public",
            "table": "test01",
            "columnnames": ["num", "txt"],
            "columntypes": ["integer", "text"],
            "columnvalues": [2, "modified"],
            "oldkeys": {
               "keynames": ["num"],
               "keytypes": ["integer"],
               "keyvalues": [2]
            }
      }
   ]
}

**UPDATE 1
WARNING:  table "test02" without primary key or replica identity is nothing
{
   "change": [
   ]
}

Ссылки на документацию разработчика

Дополнительную информацию по поставляемому модулю wal2json можно получить по ссылке.
Предыдущий раздел
vacuumlo
Следующий раздел
xml2
Была ли страница полезной?