вторник, 14 августа 2018 г.

Захват и обработка изменений в БД Oracle (at_delta)

Ранее я рассказывал о том, как получить изменения данных в исходных таблицах за период с помощью операции MINUS.

Однако, есть ряд сценариев, когда необходимо обрабатывать изменения оперативно или в реальном времени. Это может быть нужно для немедленной передачи изменений удаленному клиенту, или для отправки уведомлений об изменениях заинтересованным лицам по электронной почте, или для отправки документа по назначению при изменении его статуса. В этом случае наиболее универсальным средством захвата изменений будет триггер на исходной таблице, записывающий изменения в таблицу изменений.

Почему триггер не может сделать работу непосредственно и зачем нужна (казалось бы лишняя) таблица изменений? Она нужна для того, чтобы однажды зарегистрированные изменения могли быть обработаны несколькими клиентами независимо друг от друга, для разных целей и в разное время.

Поскольку захват изменений в таблицах БД и предоставление их клиентам для обработки задача достаточно распространенная, имеет смысл решить ее в общем виде и создать механизм для быстрой реализации частных решений. Такой механизм включает:

  • таблицы изменений и триггеры, вставляющие в них строки при изменении исходных таблиц,
  • реестр клиентов, обрабатывающих захваченные изменения,
  • сервисы изменений, связанные с конкретным клиентом и использующие таблицу изменений,
  • протокол получения изменений клиентом.

Таблица изменений в обязательном порядке должна содержать столбцы первичного ключа исходной таблицы, а также столбцы oper (операция) и when (время изменения). Кроме этого, она может содержать другие столбцы, соответствующие столбцам исходной таблицы - для сохранения изменяемых данных.

В таблице изменений можно регистрировать

  1. каждое изменение строк исходной таблицы,
  2. факты изменения строк исходной таблицы.

В варианте 1 в таблицу изменений последовательно записываются все случаи изменений исходной таблицы. Если одна и та же строка изменилась несколько раз, то в таблице изменений будет по одной записи для каждого изменения. Такой режим работы имеет смысл, когда в таблице изменений сохраняются изменяемые значения интересующих нас столбцов: получаем историю всех изменений. Например, история изменений полей status и amount для строки с первичным ключом 1012:

id   oper   when                status    amount
1012 insert 2018-05-15 20:22:16      1    1001.0
1012 update 2018-05-15 20:22:30      2    1001.0
1012 update 2018-05-15 20:23:15      2    1101.0
1012 update 2018-05-15 20:23:31      3    1101.0
1012 update 2018-05-15 20:23:44      4    1101.0
1012 delete 2018-05-15 20:25:05      4    1101.0

В варианте 2 регистрируются факты изменений каждой отдельной строки, но не вся история изменений и не измененные данные. При этом в таблице изменений имеется максимум одна запись для одной строки исходной таблицы, а значения в столбцах oper и when отражают, какая команда DML была выполнена последней для данной строки и время ее выполнения:

id   oper   when
1012 delete 2018-05-15 20:22:16
2519 update 2018-05-15 20:22:44
2541 update 2018-05-15 20:24:05

Для каждой из трех строк исходной таблицы в приведенном примере могли быть выполнены одно или несколько изменений, однако таблица изменений хранит только данные последнего изменения.

Такой режим подходит для передачи изменений в исходной таблице внешнему клиенту через интерфейсное вью (представление). Строки исходной таблицы при этом соединяются со строками таблицы изменений по первичному ключу, чтобы получить только измененные строки. Строки, удаленные из исходной таблицы, представлены в таблице изменений первичным ключом, и передаются клиенту с признаком удаления. Ниже будет приведен детальный пример передачи изменений клиенту через интерфейсное вью.

В зависимости от решаемой задачи, триггер может захватывать не все, а только особые, представляющие интерес, изменения исходной таблицы. Например, только изменения поля статуса, или только операции update, после которых выполняется определенное условие с участием нескольких полей. Тогда записи в таблице изменений могут выглядеть как-то так:

id   oper   when
2517 status 2018-05-15 20:22:16
2030 status 2018-05-15 20:22:30
1045 alarm  2018-05-15 20:23:15

В столбце oper вместо вида команды DML регистрируется тип наступившего события. В зависимости от решаемой задачи, многократное наступление события для одной и той же строки исходной таблицы может регистрироваться в отдельных записях таблицы изменений (по варианту 1) или приводить к обновлению единственной записи (по варианту 2).

Для одной исходной таблицы можно создать несколько триггеров, например, триггер для захвата специальных изменений и триггер для захвата фактов изменений строк.

В другом сценарии исходная таблица может иметь дочерние таблицы, ссылающиеся на ее первичный ключ, и вместе с дочерними таблицами описывать единую сущность предметной области. Тогда, для захвата изменений сущности, нужно создать триггеры как на родительской, так и на дочерних таблицах; все эти триггеры будут записывать изменения в общую таблицу изменений.

Таблица изменений имеет особенность, которая позволяет организовать выборку из нее в порядке внесения изменений в БД. Таблица изменений создается в режиме rowdependencies, что обеспечивает сохранение номера SCN для отдельных строк таблицы в момент завершения транзакции (см. Два кейса для ora_rowscn в Oracle 11g). Псевдостолбец ora_rowscn таблицы изменений, таким образом, содержит числовое значение, которое непрерывно возрастает с каждой завершенной транзакцией.

Номер изменения в псевдостолбце ora_rowscn таблицы изменений можно с успехом использовать для организации обработки изменений несколькими клиентами. Для этого клиент должен каждый раз обрабатывать изменения с номерами, большими, чем обработанный в прошлый раз. В специальной таблице будем хранить связку (таблица изменений, клиент, последний обработанный номер ora_rowscn). Назовем такие связки сервисами, дадим каждой связке имя, так, чтобы сочетание (клиент, сервис) было уникальным, и предоставим клиентам возможность получать и изменять последний обработанный номер изменения для данного сервиса.

Полезным побочным эффектом от хранения номера последнего обработанного изменения на стороне БД-источника (а не на стороне клиента) является возможность автоматически удалять из таблицы изменений строки, уже обработанные всеми клиентами.

Протокол получения и обработки клиентом последних изменений будет выглядеть так:

  1. получить номер последнего обработанного клиентом изменения и текущий максимальный номер изменения для сервиса А,
  2. обработать строки из соответствующей таблицы изменений (или интерфейсного вью) с номерами ora_rowscn, большими, чем номер последнего обработанного изменения и меньшими или равными текущему максимальному номеру изменения из п. 1,
  3. обновить номер последнего обработанного изменения для сервиса A, присвоив ему текущий максимальный номер изменения из п. 1.

Следующие две таблицы и пакет at_delta предоставляют инфраструктуру для реализации вышеописанного механизма захвата изменений для передачи клиентам:

SQL> create table at_cdc_ (
    capture    varchar2(20),
    cdc_type   varchar2(10) not null,
    descr      varchar2(4000),
    constraint at_cdc_pk primary key (capture),
    constraint at_cdc_ck check (cdc_type in ('deltascn', 'orarowscn'))
);
comment on table at_cdc_ is 'Change data captures';
comment on column at_cdc_.capture is 'Capture name';
comment on column at_cdc_.cdc_type is 'CDC type';
comment on column at_cdc_.descr is 'Capture description';

create table at_svs_ (
    service    varchar2(30),
    client     varchar2(20),
    capture    varchar2(20),
    descr      varchar2(4000),
    last_scn   number default 0 not null,
    last_when  timestamp with time zone default systimestamp,
    constraint at_svs_pk primary key (client, service),
    constraint at_svs_cdc_fk foreign key (capture) references at_cdc_
);
comment on table at_svs_ is 'Change data capture client''s services';
comment on column at_svs_.service is 'Service name';
comment on column at_svs_.client is 'Client name';
comment on column at_svs_.capture is 'Capture name';
comment on column at_svs_.descr is 'Description';
comment on column at_svs_.last_scn is 'Max SCN already processed by client';
comment on column at_svs_.last_when is 'Last time client used the service';

Таблица at_cdc_ (от английского change data capture) хранит имена и типы созданных таблиц изменений. В частности, доступность списка всех таблиц изменений позволяет организовать их автоматическую очистку. (Столбец cdс_type определяет, каким образом захватываются изменения, - с помощью триггера и таблицы изменений или с помощью псевдостолбца ora_rowscn самой исходной таблицы. В последнем случае не нужны триггер и таблица изменений, но невозможно регистрировать удаление строк из исходной таблицы; можно получить только новые и измененные строки при условии, что исходная таблица была создана в режиме rowdependencies.)

Таблица at_svs_ хранит параметры сервисов, созданных для клиентов, и позволяет реализовать протокол получения изменений клиентом (получения дельты клиентом).

Приведу спецификацию пакета at_delta:

SQL> create or replace package at_delta is

    c_type_deltascn constant varchar2(10) := 'deltascn';
    c_type_orarowscn constant varchar2(10) := 'orarowscn';

    -- Create CDC table and register it.
    procedure create_capture(
        p_capture at_cdc_.capture%type,
        p_type    at_cdc_.cdc_type%type,
        p_descr   at_cdc_.descr%type
    );

    -- Unregister and drop CDC table.
    procedure delete_capture(
        p_capture at_cdc_.capture%type
    );

    -- Create CDC client's view and trigger.
    procedure create_client(
        p_client at_svs_.client%type
    );

    -- Drop CDC client's view and trigger.
    procedure delete_client(
        p_client at_svs_.client%type
    );

    -- Register client's service p_service based on capture.
    procedure create_service(
        p_client at_svs_.client%type,
        p_service at_svs_.service%type,
        p_capture at_svs_.capture%type,
        p_descr  at_svs_.descr%type
    );

    -- Unregister client's service p_service.
    procedure delete_service(
        p_client at_svs_.client%type,
        p_service at_svs_.service%type
    );

    -- Current change number for the capture.
    function current_scn(
        p_capture at_cdc_.capture%type,
        p_cdc_type at_cdc_.cdc_type%type
    ) return number;

    -- Delete utilized rows from CDC tables.
    -- (Create job to run at_delta.purge_cdc daily.)
    procedure purge_cdc;

end at_delta;
/

Пакет at_delta содержит процедуры для

  • создания таблицы изменений,
  • удаления таблицы изменений,
  • создания клиента,
  • удаления клиента,
  • создания сервиса,
  • удаления сервиса,
  • очистки таблиц изменений.

Описанные таблицы и пакет являются частью библиотеки atop-plsql, доступной на GitHub по адресу https://github.com/andorei/atop-plsql.

Для иллюстрации того, как работает предложенное решение, создам простую сущность - таблицу itemz, изменения в которой будут регистрироваться (по варианту 2) и предоставляться двум клиентам:

SQL> create table itemz (
  2      id number primary key,
  3      name varchar2(50) not null
  4  );
Table created

Захват изменений и их предоставление клиенту организуется за три шага:

  1. создание таблицы изменений, триггера и интерфейсного вью,
  2. создание клиента (если ранее не создан),
  3. создание сервиса изменений.

Создам таблицу изменений с помощью пакета at_delta:

SQL> begin
  2      at_delta.create_capture(
  3          p_capture => 'ITEMZ',
  4          p_type => at_delta.c_type_deltascn,
  5          p_descr => 'Журнал изменений таблицы ITEMZ'
  6      );
  7  end;
  8  /
PL/SQL procedure successfully completed

В результате выполнения процедуры at_delta.create_capture создана таблица изменений at_cdc_itemz в режиме rowdependencies и зарегистрирована в таблице at_cdc_:

SQL> desc at_cdc_itemz

Name Type                        Nullable Default      Comments 
---- --------------------------- -------- ------------ -------- 
OPER CHAR(1)                                                    
WHEN TIMESTAMP(6) WITH TIME ZONE          systimestamp          

SQL> select table_name, dependencies
  2  from user_tables
  3  where table_name = 'AT_CDC_ITEMZ'
  4  ;

TABLE_NAME                     DEPENDENCIES
------------------------------ ------------
AT_CDC_ITEMZ                   ENABLED

SQL> select * from at_cdc_;

CAPTURE              CDC_TYPE   DESCR
-------------------- ---------- -------------------------------
ITEMZ                deltascn   Журнал изменений таблицы ITEMZ

Созданная таблица изменений содержит только два обязательных столбца, oper и when. Другие столбцы, специфичные в каждом конкретном случае, необходимо добавить. Добавлю столбец id для значений первичного ключа исходной таблицы; одновременно, этот столбец будет первичным ключом таблицы изменений:

SQL> alter table at_cdc_itemz add (
  2      id number primary key
  3  );
Table altered

SQL> comment on table at_cdc_itemz is 'Журнал изменений таблицы ITEMZ';
Comment added
SQL> comment on column at_cdc_itemz.oper is '[m]erge, [d]elete';
Comment added
SQL> comment on column at_cdc_itemz.when is 'Время изменения';
Comment added
SQL> comment on column at_cdc_itemz.id is 'ПК';
Comment added

В столбце oper будет храниться буква m для операций insert и update, и буква d для операции delete на исходной таблице.

Создам триггер, регистрирующий изменения в исходной таблице по варианту 2:

SQL> create or replace trigger at_cdc_itemz_aiudr
  2  after insert or update or delete on itemz
  3  for each row
  4  declare
  5      l_oper char;
  6  begin
  7      l_oper :=
  8          case
  9              when inserting or updating then
 10                  'm'
 11              when deleting then
 12                  'd'
 13          end;
 14      merge into at_cdc_itemz tgt
 15      using (select nvl(:old.id, :new.id) id from dual) src
 16      on (tgt.id = src.id)
 17      when matched then
 18          update set oper = l_oper, when = systimestamp
 19      when not matched then
 20          insert (oper, id)
 21          values (l_oper, src.id)
 22      ;
 23  end;
 24  /
Trigger created

И, наконец, создам интерфейсное вью, из которого клиент будет забирать изменения исходной таблицы:

SQL> create view int_itemz as
  2  select d.id,
  3      i.name,
  4      d.ora_rowscn deltascn
  5  from itemz i, at_cdc_itemz d
  6  where i.id = d.id
  7     and oper = 'm'
  8  union all
  9  select id,
 10      null,
 11      ora_rowscn
 12  from at_cdc_itemz
 13  where oper = 'd'
 14  ;
View created

Интерфейсное вью не является обязательным для предоставления изменений клиентам - ведь клиент может самостоятельно выполнять соединение таблицы изменений с исходной таблицей и делать необходимую обработку. Однако с интерфейсным вью работать удобнее и клиентам и БД-источнику, где оно позволяет настроить гибкий доступ к изменениям для внешних клиентов. (Кроме того, оказалось удобным давать сервисам такие же имена, как интерфейсным вью, см. ниже.)

Теперь, когда созданы таблица изменений и интерфейсное вью, мы готовы определить клиентов и сервисы, с помощью которых клиенты будут получать изменения. Продемонстрирую, как это делается, не углубляясь в детали реализации пакета at_delta.

Пусть клиенты ABC и XYZ заинтересованы в получении изменений сущности itemz. Зарегистрирую клиентов и сервисы для них:

SQL> begin
  2      at_delta.create_client('ABC');
  3      at_delta.create_client('XYZ');
  4  end;
  5  /
PL/SQL procedure successfully completed

SQL> begin
  2      at_delta.create_service('ABC', 'INT_ITEMZ', 'ITEMZ', 'Изменения ITEMZ');
  3      at_delta.create_service('XYZ', 'INT_ITEMZ', 'ITEMZ', 'Изменения ITEMZ');
  4  end;
  5  /
PL/SQL procedure successfully completed

SQL> commit;
Commit complete

При создании клиента для него создается сервисное вью at_svs_<client> (на базе таблицы at_svs_) и триггер instead of update на этом вью. С помощью сервисного вью клиент видит все сервисы, с которыми работает, и может получить и обновить номер последнего обработанного изменения в столбце last_scn. (Само изменение столбца таблицы at_svs_ выполняется процедурой пакета at_delta2, вызываемой триггером.)

SQL> -- сервисное вью для клиента ABC
SQL> select * from at_svs_abc;

SERVICE    CDC_TYPE   LAST_WHEN                       LAST_SCN   CURR_SCN
---------- ---------- ------------------------------- ---------- ----------
INT_ITEMZ  deltascn   13.08.18 13:36:02,624000 +10:00          0          0


SQL> -- сервисное вью для клиента XYZ
SQL> select * from at_svs_xyz;

SERVICE    CDC_TYPE   LAST_WHEN                       LAST_SCN   CURR_SCN
---------- ---------- ------------------------------- ---------- ----------
INT_ITEMZ  deltascn   13.08.18 13:36:02,625000 +10:00          0          0

Теперь выполню от имени клиента ABC сеанс получения последних изменений сущности itemz:

SQL> select last_scn, curr_scn
  2  from at_svs_abc
  3  where service = 'INT_ITEMZ';

  LAST_SCN   CURR_SCN
---------- ----------
         0          0

SQL> select *
  2  from int_itemz
  3  where deltascn between 0/*last_scn*/+1 and 0/*curr_scn*/
  4  ;
        ID NAME                 DELTASCN
---------- -------------------- ----------

SQL> update at_svs_abc
  2  set last_scn = 0/*curr_scn*/
  3  where service = 'INT_ITEMZ'
  4  ;

1 row updated.

SQL> commit;
Commit complete

Пока неинтересно, так как никаких изменений в itemz не было; таблица itemz вообще пуста. Добавлю в нее строки и снова получу изменения от имени клиента ABC:

SQL> insert into itemz values (1, 'Линейка');
1 row inserted
SQL> insert into itemz values (2, 'Простой карандаш');
1 row inserted
SQL> insert into itemz values (3, 'Точилка');
1 row inserted

SQL> commit;
Commit complete

SQL> select last_scn, curr_scn
  2  from at_svs_abc
  3  where service = 'INT_ITEMZ';

  LAST_SCN   CURR_SCN
---------- ----------
         0     394219

SQL> select *
  2  from int_itemz
  3  where deltascn between 0/*last_scn*/+1 and 394219/*curr_scn*/
  4  ;
        ID NAME                 DELTASCN
---------- -------------------- ----------
         1 Линейка                  394219
         2 Простой карандаш         394219
         3 Точилка                  394219

SQL> update at_svs_abc
  2  set last_scn = 394219/*curr_scn*/
  3  where service = 'INT_ITEMZ'
  4  ;

1 row updated.

SQL> commit;
Commit complete.

Итак, клиент ABC получил изменившиеся строки сущности itemz и отразил это в сервисном вью:

SQL> select last_scn, curr_scn
  2  from at_svs_abc
  3  where service = 'INT_ITEMZ';

  LAST_SCN   CURR_SCN
---------- ----------
    394219     394219

Клиент XYZ, работающий по своему расписанию, пока не получил изменений:

SQL> select last_scn, curr_scn
  2  from at_svs_xyz
  3  where service = 'INT_ITEMZ';

  LAST_SCN   CURR_SCN
---------- ----------
         0     394219

Внесу еще пару изменений в itemz. Затем получу изменения от имени каждого из двух клиентов:

SQL> update itemz
  2  set name = 'Циркуль'
  3  where id = 1;
1 row updated

SQL> update itemz
  2  set name = 'Карандаш'
  3  where id = 2;
1 row updated

SQL> commit;
Commit complete

SQL> -- от имени клиента ABC

SQL> select last_scn, curr_scn
  2  from at_svs_abc
  3  where service = 'INT_ITEMZ';

  LAST_SCN   CURR_SCN
---------- ----------
    394219     394495

SQL> select *
  2  from int_itemz
  3  where deltascn between 394219+1 and 394495;

        ID NAME                 DELTASCN
---------- -------------------- ----------
         1 Циркуль                  394495
         2 Карандаш                 394495


SQL> update at_svs_abc
  2  set last_scn = 394495
  3  where service = 'INT_ITEMZ';
1 row updated.

SQL> commit;
Commit complete.

SQL> -- от имени клиента XYZ

SQL> select last_scn, curr_scn
  2  from at_svs_xyz
  3  where service = 'INT_ITEMZ';

  LAST_SCN   CURR_SCN
---------- ----------
         0     394495

SQL> select *
  2  from int_itemz
  3  where deltascn between 0+1 and 394495;

        ID NAME                 DELTASCN
---------- -------------------- ----------
         1 Циркуль                  394495
         2 Карандаш                 394495
         3 Точилка                  394219

SQL> update at_svs_xyz
  2  set last_scn = 394495
  3  where service = 'INT_ITEMZ';
1 row updated.

SQL> commit;
Commit complete.

Вот так это работает.

Нужно ли напоминать, что это только базовая демонстрация? При реальной работе внешних клиентов с БД, каждый клиент будет подключаться к отдельной схеме, в которой ему будут доступны только его сервисное вью at_svs_<client>, пакет at_delta2 (обновляет для клиента номер последнего обработанного изменения в таблице at_svs_) и интерфейсное вью. Внутренние клиенты, например, выполняемые по расписанию задачи в БД, могут пользоваться сервисом обработки изменений наравне с внешними клиентами.

Если описанное решение показалось вам неоправданно сложным, то примите во внимание эффект масштаба: когда у вас в БД множество сущностей, изменения в которых нужно отслеживать и так или иначе обрабатывать, то стандартное решение, охватывающее все случаи, оказывается очень выгодным. Попутно поддерживается актуальный список всех клиентов, сервисов и таблиц изменений.

Процедура at_delta.purge_cdc очищает таблицы изменений уже обработанных клиентами строк:

SQL> select count(*) from at_cdc_itemz; language-SQL> select count(*) from at_cdc_itemz;">  COUNT(*)
----------
         3

SQL> begin
  2      at_delta.purge_cdc;
  3  end;
  4  /

PL/SQL procedure successfully completed.

SQL> select count(*) from at_cdc_itemz;

  COUNT(*)
----------
         0

Удалю объекты, с которыми экспериментировал:

SQL> begin
  2      at_delta.delete_service('ABC', 'INT_ITEMZ');
  3      at_delta.delete_service('XYZ', 'INT_ITEMZ');
  4      at_delta.delete_client('ABC');
  5      at_delta.delete_client('XYZ');
  6      at_delta.delete_capture('ITEMZ');
  7  end;
  8  /
PL/SQL procedure successfully completed.

SQL> drop view int_itemz;
View dropped.

SQL> drop table itemz;
Table dropped.

В заключение объясню, почему таблица изменений использует псевдостолбец ora_rowscn, а не обычный числовой столбец, заполняемый из некоторой непрерывно возрастающей последовательности (sequence). Потому что изменения в таблицы БД вносятся конкурентно в рамках разных транзакций. Числовой столбец, заполняющийся из последовательности, получит значение в момент срабатывания триггера, регистрирующего изменения в исходной таблице - то есть, до завершения транзакции. Возможно, задолго до завершения транзакции. Тем временем другие пользователи внесут другие изменения, которые будут помечены большими номерами изменений, и завершат свои транзакции, после чего клиент может обработать эти изменения и обновить максимальный номер обработанного изменения. А поскольку этот номер будет больше номера изменения из еще не завершенной транзакции, то клиент не увидит этого изменения даже после завершения транзакции! Ведь он всегда забирает изменения с номерами изменения большими, чем уже полученные ранее. Благодаря тому, что значение псевдостолбца ora_rowscn изменяется при завершении транзакции, а не во время выполнения команды DML, описанной проблемы не возникает.

Напомню, что пакеты at_delta, at_delta2 и сопутствующие им таблицы являются частью библиотеки atop-plsql, доступной на GitHub по адресу https://github.com/andorei/atop-plsql.

Комментариев нет:

Отправить комментарий