Ранее я рассказывал о том, как получить изменения данных в исходных таблицах за период с помощью операции MINUS.
Однако, есть ряд сценариев, когда необходимо обрабатывать изменения оперативно или в реальном времени. Это может быть нужно для немедленной передачи изменений удаленному клиенту, или для отправки уведомлений об изменениях заинтересованным лицам по электронной почте, или для отправки документа по назначению при изменении его статуса. В этом случае наиболее универсальным средством захвата изменений будет триггер на исходной таблице, записывающий изменения в таблицу изменений.
Почему триггер не может сделать работу непосредственно и зачем нужна (казалось бы лишняя) таблица изменений? Она нужна для того, чтобы однажды зарегистрированные изменения могли быть обработаны несколькими клиентами независимо друг от друга, для разных целей и в разное время.
Поскольку захват изменений в таблицах БД и предоставление их клиентам для обработки задача достаточно распространенная, имеет смысл решить ее в общем виде и создать механизм для быстрой реализации частных решений. Такой механизм включает:
- таблицы изменений и триггеры, вставляющие в них строки при изменении исходных таблиц,
- реестр клиентов, обрабатывающих захваченные изменения,
- сервисы изменений, связанные с конкретным клиентом и использующие таблицу изменений,
- протокол получения изменений клиентом.
Таблица изменений в обязательном порядке должна содержать столбцы первичного ключа исходной таблицы, а также столбцы oper
(операция) и when
(время изменения). Кроме этого, она может содержать другие столбцы, соответствующие столбцам исходной таблицы - для сохранения изменяемых данных.
В таблице изменений можно регистрировать
- каждое изменение строк исходной таблицы,
- факты изменения строк исходной таблицы.
В варианте 1 в таблицу изменений последовательно записываются все случаи изменений исходной таблицы. Если одна и та же строка изменилась несколько раз, то в таблице изменений будет по одной записи для каждого изменения. Такой режим работы имеет смысл, когда в таблице изменений сохраняются изменяемые значения интересующих нас столбцов: получаем историю всех изменений. Например, история изменений полей status
и amount
для строки с первичным ключом 1012
:
id oper when status amount
1012 insert 2018-05-15 20:22:16 1 1001.0
1012 update 2018-05-15 20:22:30 2 1001.0
1012 update 2018-05-15 20:23:15 2 1101.0
1012 update 2018-05-15 20:23:31 3 1101.0
1012 update 2018-05-15 20:23:44 4 1101.0
1012 delete 2018-05-15 20:25:05 4 1101.0
В варианте 2 регистрируются факты изменений каждой отдельной строки, но не вся история изменений и не измененные данные. При этом в таблице изменений имеется максимум одна запись для одной строки исходной таблицы, а значения в столбцах oper
и when
отражают, какая команда DML была выполнена последней для данной строки и время ее выполнения:
id oper when
1012 delete 2018-05-15 20:22:16
2519 update 2018-05-15 20:22:44
2541 update 2018-05-15 20:24:05
Для каждой из трех строк исходной таблицы в приведенном примере могли быть выполнены одно или несколько изменений, однако таблица изменений хранит только данные последнего изменения.
Такой режим подходит для передачи изменений в исходной таблице внешнему клиенту через интерфейсное вью (представление). Строки исходной таблицы при этом соединяются со строками таблицы изменений по первичному ключу, чтобы получить только измененные строки. Строки, удаленные из исходной таблицы, представлены в таблице изменений первичным ключом, и передаются клиенту с признаком удаления. Ниже будет приведен детальный пример передачи изменений клиенту через интерфейсное вью.
В зависимости от решаемой задачи, триггер может захватывать не все, а только особые, представляющие интерес, изменения исходной таблицы. Например, только изменения поля статуса, или только операции update, после которых выполняется определенное условие с участием нескольких полей. Тогда записи в таблице изменений могут выглядеть как-то так:
id oper when
2517 status 2018-05-15 20:22:16
2030 status 2018-05-15 20:22:30
1045 alarm 2018-05-15 20:23:15
В столбце oper
вместо вида команды DML регистрируется тип наступившего события. В зависимости от решаемой задачи, многократное наступление события для одной и той же строки исходной таблицы может регистрироваться в отдельных записях таблицы изменений (по варианту 1) или приводить к обновлению единственной записи (по варианту 2).
Для одной исходной таблицы можно создать несколько триггеров, например, триггер для захвата специальных изменений и триггер для захвата фактов изменений строк.
В другом сценарии исходная таблица может иметь дочерние таблицы, ссылающиеся на ее первичный ключ, и вместе с дочерними таблицами описывать единую сущность предметной области. Тогда, для захвата изменений сущности, нужно создать триггеры как на родительской, так и на дочерних таблицах; все эти триггеры будут записывать изменения в общую таблицу изменений.
Таблица изменений имеет особенность, которая позволяет организовать выборку из нее в порядке внесения изменений в БД. Таблица изменений создается в режиме rowdependencies
, что обеспечивает сохранение номера SCN для отдельных строк таблицы в момент завершения транзакции (см. Два кейса для ora_rowscn в Oracle 11g). Псевдостолбец ora_rowscn
таблицы изменений, таким образом, содержит числовое значение, которое непрерывно возрастает с каждой завершенной транзакцией.
Номер изменения в псевдостолбце ora_rowscn
таблицы изменений можно с успехом использовать для организации обработки изменений несколькими клиентами. Для этого клиент должен каждый раз обрабатывать изменения с номерами, большими, чем обработанный в прошлый раз. В специальной таблице будем хранить связку (таблица изменений, клиент, последний обработанный номер ora_rowscn
). Назовем такие связки сервисами, дадим каждой связке имя, так, чтобы сочетание (клиент, сервис) было уникальным, и предоставим клиентам возможность получать и изменять последний обработанный номер изменения для данного сервиса.
Полезным побочным эффектом от хранения номера последнего обработанного изменения на стороне БД-источника (а не на стороне клиента) является возможность автоматически удалять из таблицы изменений строки, уже обработанные всеми клиентами.
Протокол получения и обработки клиентом последних изменений будет выглядеть так:
- получить номер последнего обработанного клиентом изменения и текущий максимальный номер изменения для сервиса
А
, - обработать строки из соответствующей таблицы изменений (или интерфейсного вью) с номерами
ora_rowscn
, большими, чем номер последнего обработанного изменения и меньшими или равными текущему максимальному номеру изменения из п. 1, - обновить номер последнего обработанного изменения для сервиса
A
, присвоив ему текущий максимальный номер изменения из п. 1.
Следующие две таблицы и пакет at_delta
предоставляют инфраструктуру для реализации вышеописанного механизма захвата изменений для передачи клиентам:
SQL> create table at_cdc_ (
capture varchar2(20),
cdc_type varchar2(10) not null,
descr varchar2(4000),
constraint at_cdc_pk primary key (capture),
constraint at_cdc_ck check (cdc_type in ('deltascn', 'orarowscn'))
);
comment on table at_cdc_ is 'Change data captures';
comment on column at_cdc_.capture is 'Capture name';
comment on column at_cdc_.cdc_type is 'CDC type';
comment on column at_cdc_.descr is 'Capture description';
create table at_svs_ (
service varchar2(30),
client varchar2(20),
capture varchar2(20),
descr varchar2(4000),
last_scn number default 0 not null,
last_when timestamp with time zone default systimestamp,
constraint at_svs_pk primary key (client, service),
constraint at_svs_cdc_fk foreign key (capture) references at_cdc_
);
comment on table at_svs_ is 'Change data capture client''s services';
comment on column at_svs_.service is 'Service name';
comment on column at_svs_.client is 'Client name';
comment on column at_svs_.capture is 'Capture name';
comment on column at_svs_.descr is 'Description';
comment on column at_svs_.last_scn is 'Max SCN already processed by client';
comment on column at_svs_.last_when is 'Last time client used the service';
Таблица at_cdc_
(от английского change data capture) хранит имена и типы созданных таблиц изменений. В частности, доступность списка всех таблиц изменений позволяет организовать их автоматическую очистку. (Столбец cdс_type
определяет, каким образом захватываются изменения, - с помощью триггера и таблицы изменений или с помощью псевдостолбца ora_rowscn
самой исходной таблицы. В последнем случае не нужны триггер и таблица изменений, но невозможно регистрировать удаление строк из исходной таблицы; можно получить только новые и измененные строки при условии, что исходная таблица была создана в режиме rowdependencies
.)
Таблица at_svs_
хранит параметры сервисов, созданных для клиентов, и позволяет реализовать протокол получения изменений клиентом (получения дельты клиентом).
Приведу спецификацию пакета at_delta
:
SQL> create or replace package at_delta is
c_type_deltascn constant varchar2(10) := 'deltascn';
c_type_orarowscn constant varchar2(10) := 'orarowscn';
-- Create CDC table and register it.
procedure create_capture(
p_capture at_cdc_.capture%type,
p_type at_cdc_.cdc_type%type,
p_descr at_cdc_.descr%type
);
-- Unregister and drop CDC table.
procedure delete_capture(
p_capture at_cdc_.capture%type
);
-- Create CDC client's view and trigger.
procedure create_client(
p_client at_svs_.client%type
);
-- Drop CDC client's view and trigger.
procedure delete_client(
p_client at_svs_.client%type
);
-- Register client's service p_service based on capture.
procedure create_service(
p_client at_svs_.client%type,
p_service at_svs_.service%type,
p_capture at_svs_.capture%type,
p_descr at_svs_.descr%type
);
-- Unregister client's service p_service.
procedure delete_service(
p_client at_svs_.client%type,
p_service at_svs_.service%type
);
-- Current change number for the capture.
function current_scn(
p_capture at_cdc_.capture%type,
p_cdc_type at_cdc_.cdc_type%type
) return number;
-- Delete utilized rows from CDC tables.
-- (Create job to run at_delta.purge_cdc daily.)
procedure purge_cdc;
end at_delta;
/
Пакет at_delta
содержит процедуры для
- создания таблицы изменений,
- удаления таблицы изменений,
- создания клиента,
- удаления клиента,
- создания сервиса,
- удаления сервиса,
- очистки таблиц изменений.
Описанные таблицы и пакет являются частью библиотеки atop-plsql, доступной на GitHub по адресу https://github.com/andorei/atop-plsql.
Для иллюстрации того, как работает предложенное решение, создам простую сущность - таблицу itemz
, изменения в которой будут регистрироваться (по варианту 2) и предоставляться двум клиентам:
SQL> create table itemz (
2 id number primary key,
3 name varchar2(50) not null
4 );
Table created
Захват изменений и их предоставление клиенту организуется за три шага:
- создание таблицы изменений, триггера и интерфейсного вью,
- создание клиента (если ранее не создан),
- создание сервиса изменений.
Создам таблицу изменений с помощью пакета at_delta
:
SQL> begin
2 at_delta.create_capture(
3 p_capture => 'ITEMZ',
4 p_type => at_delta.c_type_deltascn,
5 p_descr => 'Журнал изменений таблицы ITEMZ'
6 );
7 end;
8 /
PL/SQL procedure successfully completed
В результате выполнения процедуры at_delta.create_capture
создана таблица изменений at_cdc_itemz
в режиме rowdependencies
и зарегистрирована в таблице at_cdc_
:
SQL> desc at_cdc_itemz
Name Type Nullable Default Comments
---- --------------------------- -------- ------------ --------
OPER CHAR(1)
WHEN TIMESTAMP(6) WITH TIME ZONE systimestamp
SQL> select table_name, dependencies
2 from user_tables
3 where table_name = 'AT_CDC_ITEMZ'
4 ;
TABLE_NAME DEPENDENCIES
------------------------------ ------------
AT_CDC_ITEMZ ENABLED
SQL> select * from at_cdc_;
CAPTURE CDC_TYPE DESCR
-------------------- ---------- -------------------------------
ITEMZ deltascn Журнал изменений таблицы ITEMZ
Созданная таблица изменений содержит только два обязательных столбца, oper
и when
. Другие столбцы, специфичные в каждом конкретном случае, необходимо добавить. Добавлю столбец id
для значений первичного ключа исходной таблицы; одновременно, этот столбец будет первичным ключом таблицы изменений:
SQL> alter table at_cdc_itemz add (
2 id number primary key
3 );
Table altered
SQL> comment on table at_cdc_itemz is 'Журнал изменений таблицы ITEMZ';
Comment added
SQL> comment on column at_cdc_itemz.oper is '[m]erge, [d]elete';
Comment added
SQL> comment on column at_cdc_itemz.when is 'Время изменения';
Comment added
SQL> comment on column at_cdc_itemz.id is 'ПК';
Comment added
В столбце oper
будет храниться буква m
для операций insert
и update
, и буква d
для операции delete
на исходной таблице.
Создам триггер, регистрирующий изменения в исходной таблице по варианту 2:
SQL> create or replace trigger at_cdc_itemz_aiudr
2 after insert or update or delete on itemz
3 for each row
4 declare
5 l_oper char;
6 begin
7 l_oper :=
8 case
9 when inserting or updating then
10 'm'
11 when deleting then
12 'd'
13 end;
14 merge into at_cdc_itemz tgt
15 using (select nvl(:old.id, :new.id) id from dual) src
16 on (tgt.id = src.id)
17 when matched then
18 update set oper = l_oper, when = systimestamp
19 when not matched then
20 insert (oper, id)
21 values (l_oper, src.id)
22 ;
23 end;
24 /
Trigger created
И, наконец, создам интерфейсное вью, из которого клиент будет забирать изменения исходной таблицы:
SQL> create view int_itemz as
2 select d.id,
3 i.name,
4 d.ora_rowscn deltascn
5 from itemz i, at_cdc_itemz d
6 where i.id = d.id
7 and oper = 'm'
8 union all
9 select id,
10 null,
11 ora_rowscn
12 from at_cdc_itemz
13 where oper = 'd'
14 ;
View created
Интерфейсное вью не является обязательным для предоставления изменений клиентам - ведь клиент может самостоятельно выполнять соединение таблицы изменений с исходной таблицей и делать необходимую обработку. Однако с интерфейсным вью работать удобнее и клиентам и БД-источнику, где оно позволяет настроить гибкий доступ к изменениям для внешних клиентов. (Кроме того, оказалось удобным давать сервисам такие же имена, как интерфейсным вью, см. ниже.)
Теперь, когда созданы таблица изменений и интерфейсное вью, мы готовы определить клиентов и сервисы, с помощью которых клиенты будут получать изменения. Продемонстрирую, как это делается, не углубляясь в детали реализации пакета at_delta
.
Пусть клиенты ABC
и XYZ
заинтересованы в получении изменений сущности itemz
. Зарегистрирую клиентов и сервисы для них:
SQL> begin
2 at_delta.create_client('ABC');
3 at_delta.create_client('XYZ');
4 end;
5 /
PL/SQL procedure successfully completed
SQL> begin
2 at_delta.create_service('ABC', 'INT_ITEMZ', 'ITEMZ', 'Изменения ITEMZ');
3 at_delta.create_service('XYZ', 'INT_ITEMZ', 'ITEMZ', 'Изменения ITEMZ');
4 end;
5 /
PL/SQL procedure successfully completed
SQL> commit;
Commit complete
При создании клиента для него создается сервисное вью at_svs_<client>
(на базе таблицы at_svs_
) и триггер instead of update
на этом вью. С помощью сервисного вью клиент видит все сервисы, с которыми работает, и может получить и обновить номер последнего обработанного изменения в столбце last_scn
. (Само изменение столбца таблицы at_svs_
выполняется процедурой пакета at_delta2
, вызываемой триггером.)
SQL> -- сервисное вью для клиента ABC
SQL> select * from at_svs_abc;
SERVICE CDC_TYPE LAST_WHEN LAST_SCN CURR_SCN
---------- ---------- ------------------------------- ---------- ----------
INT_ITEMZ deltascn 13.08.18 13:36:02,624000 +10:00 0 0
SQL> -- сервисное вью для клиента XYZ
SQL> select * from at_svs_xyz;
SERVICE CDC_TYPE LAST_WHEN LAST_SCN CURR_SCN
---------- ---------- ------------------------------- ---------- ----------
INT_ITEMZ deltascn 13.08.18 13:36:02,625000 +10:00 0 0
Теперь выполню от имени клиента ABC
сеанс получения последних изменений сущности itemz
:
SQL> select last_scn, curr_scn
2 from at_svs_abc
3 where service = 'INT_ITEMZ';
LAST_SCN CURR_SCN
---------- ----------
0 0
SQL> select *
2 from int_itemz
3 where deltascn between 0/*last_scn*/+1 and 0/*curr_scn*/
4 ;
ID NAME DELTASCN
---------- -------------------- ----------
SQL> update at_svs_abc
2 set last_scn = 0/*curr_scn*/
3 where service = 'INT_ITEMZ'
4 ;
1 row updated.
SQL> commit;
Commit complete
Пока неинтересно, так как никаких изменений в itemz
не было; таблица itemz
вообще пуста. Добавлю в нее строки и снова получу изменения от имени клиента ABC
:
SQL> insert into itemz values (1, 'Линейка');
1 row inserted
SQL> insert into itemz values (2, 'Простой карандаш');
1 row inserted
SQL> insert into itemz values (3, 'Точилка');
1 row inserted
SQL> commit;
Commit complete
SQL> select last_scn, curr_scn
2 from at_svs_abc
3 where service = 'INT_ITEMZ';
LAST_SCN CURR_SCN
---------- ----------
0 394219
SQL> select *
2 from int_itemz
3 where deltascn between 0/*last_scn*/+1 and 394219/*curr_scn*/
4 ;
ID NAME DELTASCN
---------- -------------------- ----------
1 Линейка 394219
2 Простой карандаш 394219
3 Точилка 394219
SQL> update at_svs_abc
2 set last_scn = 394219/*curr_scn*/
3 where service = 'INT_ITEMZ'
4 ;
1 row updated.
SQL> commit;
Commit complete.
Итак, клиент ABC
получил изменившиеся строки сущности itemz
и отразил это в сервисном вью:
SQL> select last_scn, curr_scn
2 from at_svs_abc
3 where service = 'INT_ITEMZ';
LAST_SCN CURR_SCN
---------- ----------
394219 394219
Клиент XYZ
, работающий по своему расписанию, пока не получил изменений:
SQL> select last_scn, curr_scn
2 from at_svs_xyz
3 where service = 'INT_ITEMZ';
LAST_SCN CURR_SCN
---------- ----------
0 394219
Внесу еще пару изменений в itemz
. Затем получу изменения от имени каждого из двух клиентов:
SQL> update itemz
2 set name = 'Циркуль'
3 where id = 1;
1 row updated
SQL> update itemz
2 set name = 'Карандаш'
3 where id = 2;
1 row updated
SQL> commit;
Commit complete
SQL> -- от имени клиента ABC
SQL> select last_scn, curr_scn
2 from at_svs_abc
3 where service = 'INT_ITEMZ';
LAST_SCN CURR_SCN
---------- ----------
394219 394495
SQL> select *
2 from int_itemz
3 where deltascn between 394219+1 and 394495;
ID NAME DELTASCN
---------- -------------------- ----------
1 Циркуль 394495
2 Карандаш 394495
SQL> update at_svs_abc
2 set last_scn = 394495
3 where service = 'INT_ITEMZ';
1 row updated.
SQL> commit;
Commit complete.
SQL> -- от имени клиента XYZ
SQL> select last_scn, curr_scn
2 from at_svs_xyz
3 where service = 'INT_ITEMZ';
LAST_SCN CURR_SCN
---------- ----------
0 394495
SQL> select *
2 from int_itemz
3 where deltascn between 0+1 and 394495;
ID NAME DELTASCN
---------- -------------------- ----------
1 Циркуль 394495
2 Карандаш 394495
3 Точилка 394219
SQL> update at_svs_xyz
2 set last_scn = 394495
3 where service = 'INT_ITEMZ';
1 row updated.
SQL> commit;
Commit complete.
Вот так это работает.
Нужно ли напоминать, что это только базовая демонстрация? При реальной работе внешних клиентов с БД, каждый клиент будет подключаться к отдельной схеме, в которой ему будут доступны только его сервисное вью at_svs_<client>
, пакет at_delta2
(обновляет для клиента номер последнего обработанного изменения в таблице at_svs_
) и интерфейсное вью. Внутренние клиенты, например, выполняемые по расписанию задачи в БД, могут пользоваться сервисом обработки изменений наравне с внешними клиентами.
Если описанное решение показалось вам неоправданно сложным, то примите во внимание эффект масштаба: когда у вас в БД множество сущностей, изменения в которых нужно отслеживать и так или иначе обрабатывать, то стандартное решение, охватывающее все случаи, оказывается очень выгодным. Попутно поддерживается актуальный список всех клиентов, сервисов и таблиц изменений.
Процедура at_delta.purge_cdc
очищает таблицы изменений уже обработанных клиентами строк:
SQL> select count(*) from at_cdc_itemz; language-SQL> select count(*) from at_cdc_itemz;"> COUNT(*)
----------
3
SQL> begin
2 at_delta.purge_cdc;
3 end;
4 /
PL/SQL procedure successfully completed.
SQL> select count(*) from at_cdc_itemz;
COUNT(*)
----------
0
Удалю объекты, с которыми экспериментировал:
SQL> begin
2 at_delta.delete_service('ABC', 'INT_ITEMZ');
3 at_delta.delete_service('XYZ', 'INT_ITEMZ');
4 at_delta.delete_client('ABC');
5 at_delta.delete_client('XYZ');
6 at_delta.delete_capture('ITEMZ');
7 end;
8 /
PL/SQL procedure successfully completed.
SQL> drop view int_itemz;
View dropped.
SQL> drop table itemz;
Table dropped.
В заключение объясню, почему таблица изменений использует псевдостолбец ora_rowscn
, а не обычный числовой столбец, заполняемый из некоторой непрерывно возрастающей последовательности (sequence). Потому что изменения в таблицы БД вносятся конкурентно в рамках разных транзакций. Числовой столбец, заполняющийся из последовательности, получит значение в момент срабатывания триггера, регистрирующего изменения в исходной таблице - то есть, до завершения транзакции. Возможно, задолго до завершения транзакции. Тем временем другие пользователи внесут другие изменения, которые будут помечены большими номерами изменений, и завершат свои транзакции, после чего клиент может обработать эти изменения и обновить максимальный номер обработанного изменения. А поскольку этот номер будет больше номера изменения из еще не завершенной транзакции, то клиент не увидит этого изменения даже после завершения транзакции! Ведь он всегда забирает изменения с номерами изменения большими, чем уже полученные ранее. Благодаря тому, что значение псевдостолбца ora_rowscn
изменяется при завершении транзакции, а не во время выполнения команды DML, описанной проблемы не возникает.
Напомню, что пакеты at_delta
, at_delta2
и сопутствующие им таблицы являются частью библиотеки atop-plsql, доступной на GitHub по адресу https://github.com/andorei/atop-plsql.
Комментариев нет:
Отправить комментарий