Ранее я описал механизм delta для регистрации изменений в таблицах БД Oracle, позволяющий обрабатывать изменения независимо нескольким клиентам.
Работа описанного механизма опирается на таблицу изменений с rowdependencies
. Что делает его зависимым от СУБД Oracle. А в один прекрасный день возникает желание использовать хорошо зарекомендовавший себя механизм в СУБД PostgreSQL или другой СУБД с триггерами.
Можно ли использовать более универсальный подход при реализации таблицы изменений вместо таблицы с rowdependencies
?
Напомню, как работает механизм delta и для чего нужна таблица изменений c rowdependencies
.
На таблице, изменения которой мы хотим регистрировать (назовем ее базовой), создается триггер, который записывает в специальную таблицу (таблицу изменений) значения первичного ключа добавленных, обновленных и удаленных строк. Таблица изменений - это таблица Oracle с rowdependencies
, где каждая строка имеет индивидуальный system change number (SCN), возрастающий с каждой транзакцией и доступный через псевдостолбец ora_rowscn
. Для клиентов, которым необходимо обрабатывать изменения в базовой таблице, организовано хранение SCN последнего обработанного клиентом изменения. В каждом сеансе обработки изменений клиент обрабатывает изменения, где ora_rowscn between <последний обработанный SCN> + 1 and <текущий SCN>
и после этого сохраняет текущий SCN как последний обработанный.
Поскольку каждый из клиентов использует номер последнего обработанного им изменения чтобы выбрать новые, еще не обработанные, изменения, то критически важно, чтобы номера изменений в таблице изменений только возрастали - и не возникали бы изменения с номерами меньшими, чем уже обработанные. А такая ситуация возможна, если номера изменений берутся из последовательности (sequence) по мере выполнения команд DML на базовой таблице.
Пусть транзакция A зарегистрировала изменение с номером 1001 и пока не завершилась - следовательно, строка с номером 1001 в таблице изменений не видна другим сеансам. Транзакция B зарегистрировала изменение с номером 1002 и завершилась - строка с номером 1002 видна другим сеансам. В это время клиент, у которого номер последнего обработанного изменения 990, начинает работу, обрабатывает изменения с 991 по 1002 (но не видит строку с номером 1001), и запоминает 1002 как номер последнего обработанного изменения. После этого завершается транзакция A и строка с номером 1001 в таблице изменений становится доступна другим сеансам. Но наш клиент уже ушел вперед и никогда не обработает сроку с номером 1001.
А вот очередной SCN присваивается строке таблицы изменений с rowdependencies
в момент завершения транзакции (когда строка становится видна другим сеансам), следовательно, описанная проблема исключена.
Как же без SCN обеспечить возрастание номеров изменений, которые становятся видны другим сеансам по мере завершения транзакций? Для этого достаточно время от времени присваивать очередной возрастающий номер новым изменениям в таблице изменений, ставшим доступными благодаря завершению транзакций, и потребовать, чтобы клиенты использовали этот номер для выборки необработанных изменений.
Разберем пример.
Есть базовая таблица
create table items (
id number(5) primary key,
name varchar2(50) not null
);
и таблица изменений для нее (без rowdependencies
)
create table at_cdc_items (
-- service columns
seqn number not null,
fixn number,
when timestamp with time zone default current_timestamp,
oper varchar2(10),
-- base table primary key
id number(5)
);
Изменения в базовой таблице регистирируются триггером; для получения последовательно возрастающих номеров изменений используется последовательность (sequence).
create sequence at_cdc_items_seq;
create or replace trigger at_cdc_items_aiudr
after insert or update or delete on items
for each row
declare
l_oper varchar2(10);
begin
l_oper := case when deleting then 'delete' when inserting or updating then 'change' else '?' end
;
merge into at_cdc_items tgt
using (
select nvl(:new.id, :old.id) id from dual
) src
on (src.id = tgt.id and oper in ('change', 'delete'))
when matched then
update set when = current_timestamp,
oper = l_oper,
seqn = at_cdc_items_seq.nextval,
fixn = null
when not matched then
insert (oper, seqn, id)
values (l_oper, at_cdc_items_seq.nextval, src.id)
;
end;
/
Вставим строки в таблицу items
, затем удалим и изменим отдельные строки и понаблюдаем за таблицей изменений:
insert into items values (1, 'Линейка');
insert into items values (2, 'Карандаш');
insert into items values (3, 'Блокнот');
commit;
select * from at_cdc_items order by seqn;
SEQN FIXN WHEN OPER ID
---- ---- ------------------ ------ ---
1 24-JAN-23 07.34.36 change 1
2 24-JAN-23 07.34.36 change 2
3 24-JAN-23 07.34.37 change 3
delete from items where id = 1;
update items set name = 'Ручка' where id = 2;
update items set name = 'Тетрадь' where id = 3;
commit;
select * from at_cdc_items order by seqn;
SEQN FIXN WHEN OPER ID
---- ---- ------------------ ------ ---
4 24-JAN-23 07.35.30 delete 1
5 24-JAN-23 07.35.31 change 2
6 24-JAN-23 07.35.31 change 3
Как видим, столбец seqn
содержит значение, получаемое из последовательности в момент выполнения команд DML над базовой таблицей. Если клиенты будут обрабатывать изменения в диапазоне номеров seqn
, то некоторые изменения могут быть потеряны для клиентов, как это описано выше.
Чтобы этого не произошло, в таблице изменений есть столбец fixn
, значение которому присваивает клиент в начале обработки изменений. Вместе столбцы fixn
и seqn
позволяют организовть обработку изменений клиентами без потерь и в порядке выполнения этих изменений (seqn
).
Клиент, приступая к обработке изменений, делает следующее:
declare
l_curr_fixn number := at_cdc_items_seq.nextval;
begin
update at_cdc_items set fixn = l_curr_fixn
where fixn is null
;
-- Параллельное обновление fixn в другом сеансе не присвоит другое значение,
-- так как после снятия блокировки по завершении данной транзакции
-- условие fixn is null будет переоценено.
commit;
end;
И далее обрабатывает измененния в диапазоне
fixn between l_last_fixn + 1 and l_curr_fixn
в порядке
order by fixn, seqn
В пакет at_delta
из библиотеки atop-PL/SQL добавлена поддержка таблиц изменений типа seqn
, что позволяет создавать таблицы изменений без rowdependencies
, со столбцами seqn
и fixn
.
Продемонстрирую, как реализовать захват и обработку изменений в таблице items
с помощью пакетов at_delta
и at_delta2
.
-- удалить демонстрационные объекты
drop sequence at_cdc_items_seq;
drop table at_cdc_items;
drop trigger at_cdc_items_aiudr;
-- очистить таблицу items
delete from items;
-- создать таблицу изменений типа seqn и последовательность для нее
begin
at_delta.create_capture(
p_capture => 'items',
p_type => 'seqn',
p_descr => 'items changes'
);
end;
/
-- добавить к таблице изменений первичный ключ базовой таблицы
alter table at_cdc_items add (id number(5));
-- создать триггер для регистрации изменений
create or replace trigger at_cdc_items_aiudr
after insert or update or delete on items
for each row
declare
l_oper varchar2(10);
begin
l_oper :=
case
when deleting then
'd' -- delete
when inserting or updating then
'c' -- change
else
'?'
end
;
merge into at_cdc_items tgt
using (
select nvl(:old.id, :new.id) id from dual
) src
on (src.id = tgt.id and oper in ('c', 'd'))
when matched then
update set when = current_timestamp,
oper = l_oper,
seqn = at_cdc_items_seq.nextval,
fixn = null
when not matched then
insert (oper, seqn, id)
values (l_oper, at_cdc_items_seq.nextval, src.id)
;
end;
/
Пусть нам необходимо воспроизводить все изменения таблицы items
в другой "удаленной" таблице remote_items
. Обработка изменений будет состоять в применении зарегистрированных изменений к таблице remote_items
.
create table remote_items (
id number(5) primary key,
name varchar2(50) not null
);
Чтобы организовать хранение номера последнего обработанного изменения, создадим клиента test_client
и сервис items_service
:
begin
at_delta.create_client(
p_client => 'test_client'
);
at_delta.create_service(
p_client => 'test_client',
p_service => 'items_service',
p_capture => 'items',
p_descr => 'items changes'
);
end;
/
commit;
select * from at_svs_test_client;
SERVICE CAPTURE CDC_TYPE LAST_WHEN LAST_SCN CURR_SCN
------------- ------- -------- ------------------ -------- --------
ITEMS_SERVICE ITEMS seqn 24-JAN-23 12.06.46 0 1
Наконец, создадим процедуру для обработки изменений, использующую процедуры пакета at_delta2
для получения и сохранения номеров обработанных изменений:
create or replace procedure update_remote_items as
c_client constant varchar2(20) := 'test_client';
c_service constant varchar2(20) := 'items_service';
l_last_fixn number;
l_curr_fixn number;
begin
-- получить диапазон номеров изменений для обработки
at_delta2.get_range(c_client, c_service, l_last_fixn, l_curr_fixn);
--commit;
for r in (
select fixn, seqn, oper, items.id, items.name
from at_cdc_items cdc join items on items.id = cdc.id
where oper = 'c'
and fixn between l_last_fixn + 1 and l_curr_fixn
union all
select fixn, seqn, oper, id, null
from at_cdc_items cdc
where oper = 'd'
and fixn between l_last_fixn + 1 and l_curr_fixn
order by fixn, seqn
) loop
if r.oper = 'd' then
delete from remote_items where id = r.id
;
elsif r.oper = 'c' then
merge into remote_items tgt
using (select r.id id, r.name name from dual) src
on (tgt.id = src.id)
when matched then
update set name = src.name
when not matched then
insert (id, name) values (src.id, src.name)
;
end if;
end loop;
-- сохранить номер последнего обработанного изменения
at_delta2.acknowledge(c_client, c_service, l_curr_fixn);
end update_remote_items;
/
Добавим данные в таблицу items
:
insert into items values (1, 'Линейка');
insert into items values (2, 'Карандаш');
insert into items values (3, 'Блокнот');
delete from items where id = 1;
update items set name = 'Тетрадь' where id = 3;
commit;
select * from items;
ID NAME
--- --------------
2 Карандаш
3 Тетрадь
select * from remote_items;
no rows
Посмотрим зарегистрированные изменения и диапазон номеров для обработки в сервисе items_service
:
select * from at_cdc_items order by seqn;
OPER WHEN SEQN FIXN ID
---- ------------------ ---- ---- ---
m 24-JAN-23 12.11.19 3 2
d 24-JAN-23 12.11.20 5 1
m 24-JAN-23 12.11.20 6 3
select * from at_svs_test_client;
SERVICE CAPTURE CDC_TYPE LAST_WHEN LAST_SCN CURR_SCN
------------- ------- -------- ------------------ -------- --------
ITEMS_SERVICE ITEMS seqn 24-JAN-23 12.06.46 0 7
Обработаем изменения:
begin update_remote_items; end;
/
select * from at_cdc_items order by seqn;
OPER WHEN SEQN FIXN ID
---- ------------------ ---- ---- ---
m 24-JAN-23 12.11.19 3 8 2
d 24-JAN-23 12.11.20 5 8 1
m 24-JAN-23 12.11.20 6 8 3
select * from at_svs_test_client;
SERVICE CAPTURE CDC_TYPE LAST_WHEN LAST_SCN CURR_SCN
------------- ------- -------- ------------------ -------- --------
ITEMS_SERVICE ITEMS seqn 24-JAN-23 12.18.50 8 9
Проверим результат в таблице remote_items
:
select * from remote_items;
ID NAME
--- --------------
2 Карандаш
3 Тетрадь
Итак, были продемонстрированы захват и обработка изменений клиентом при помощи таблицы изменений без rowdependencies
, что делает данную технологию применимой не только в СУБД Oracle, но и в других СУБД с триггерами.
В заключение, удаляю демонстрационные объекты:
drop procedure update_remote_items;
drop table remote_items;
begin
at_delta.delete_service('test_client', 'items_service');
at_delta.delete_client('test_client');
at_delta.delete_capture('items');
end;
/
drop table items;
Комментариев нет:
Отправить комментарий