среда, 8 февраля 2023 г.

delta revisited. Работа с обычной таблицей изменений

Ранее я описал механизм delta для регистрации изменений в таблицах БД Oracle, позволяющий обрабатывать изменения независимо нескольким клиентам.

Работа описанного механизма опирается на таблицу изменений с rowdependencies. Что делает его зависимым от СУБД Oracle. А в один прекрасный день возникает желание использовать хорошо зарекомендовавший себя механизм в СУБД PostgreSQL или другой СУБД с триггерами.

Можно ли использовать более универсальный подход при реализации таблицы изменений вместо таблицы с rowdependencies?

Напомню, как работает механизм delta и для чего нужна таблица изменений c rowdependencies.

На таблице, изменения которой мы хотим регистрировать (назовем ее базовой), создается триггер, который записывает в специальную таблицу (таблицу изменений) значения первичного ключа добавленных, обновленных и удаленных строк. Таблица изменений - это таблица Oracle с rowdependencies, где каждая строка имеет индивидуальный system change number (SCN), возрастающий с каждой транзакцией и доступный через псевдостолбец ora_rowscn. Для клиентов, которым необходимо обрабатывать изменения в базовой таблице, организовано хранение SCN последнего обработанного клиентом изменения. В каждом сеансе обработки изменений клиент обрабатывает изменения, где ora_rowscn between <последний обработанный SCN> + 1 and <текущий SCN> и после этого сохраняет текущий SCN как последний обработанный.

Поскольку каждый из клиентов использует номер последнего обработанного им изменения чтобы выбрать новые, еще не обработанные, изменения, то критически важно, чтобы номера изменений в таблице изменений только возрастали - и не возникали бы изменения с номерами меньшими, чем уже обработанные. А такая ситуация возможна, если номера изменений берутся из последовательности (sequence) по мере выполнения команд DML на базовой таблице.

Пусть транзакция A зарегистрировала изменение с номером 1001 и пока не завершилась - следовательно, строка с номером 1001 в таблице изменений не видна другим сеансам. Транзакция B зарегистрировала изменение с номером 1002 и завершилась - строка с номером 1002 видна другим сеансам. В это время клиент, у которого номер последнего обработанного изменения 990, начинает работу, обрабатывает изменения с 991 по 1002 (но не видит строку с номером 1001), и запоминает 1002 как номер последнего обработанного изменения. После этого завершается транзакция A и строка с номером 1001 в таблице изменений становится доступна другим сеансам. Но наш клиент уже ушел вперед и никогда не обработает сроку с номером 1001.

А вот очередной SCN присваивается строке таблицы изменений с rowdependencies в момент завершения транзакции (когда строка становится видна другим сеансам), следовательно, описанная проблема исключена.

Как же без SCN обеспечить возрастание номеров изменений, которые становятся видны другим сеансам по мере завершения транзакций? Для этого достаточно время от времени присваивать очередной возрастающий номер новым изменениям в таблице изменений, ставшим доступными благодаря завершению транзакций, и потребовать, чтобы клиенты использовали этот номер для выборки необработанных изменений.

Разберем пример.

Есть базовая таблица

create table items (
   id        number(5) primary key,
   name      varchar2(50) not null
);

и таблица изменений для нее (без rowdependencies)

create table at_cdc_items (
    -- service columns
    seqn number not null,
    fixn number,
    when timestamp with time zone default current_timestamp,
    oper varchar2(10),
    -- base table primary key
    id number(5)
);

Изменения в базовой таблице регистирируются триггером; для получения последовательно возрастающих номеров изменений используется последовательность (sequence).

create sequence at_cdc_items_seq;

create or replace trigger at_cdc_items_aiudr
after insert or update or delete on items
for each row
declare
    l_oper varchar2(10);
begin
    l_oper := case when deleting then 'delete' when inserting or updating then 'change' else '?' end
    ;
    merge into at_cdc_items tgt
    using (
        select nvl(:new.id, :old.id) id from dual
    ) src
    on (src.id = tgt.id and oper in ('change', 'delete'))
    when matched then
        update set when = current_timestamp,
            oper = l_oper,
            seqn = at_cdc_items_seq.nextval,
            fixn = null
    when not matched then
        insert (oper, seqn, id)
        values (l_oper, at_cdc_items_seq.nextval, src.id)
    ;
end;
/

Вставим строки в таблицу items, затем удалим и изменим отдельные строки и понаблюдаем за таблицей изменений:

insert into items values (1, 'Линейка');
insert into items values (2, 'Карандаш');
insert into items values (3, 'Блокнот');
commit;

select * from at_cdc_items order by seqn;

SEQN FIXN WHEN               OPER   ID
---- ---- ------------------ ------ ---
   1      24-JAN-23 07.34.36 change   1
   2      24-JAN-23 07.34.36 change   2
   3      24-JAN-23 07.34.37 change   3

delete from items where id = 1;
update items set name = 'Ручка' where id = 2;
update items set name = 'Тетрадь' where id = 3;
commit;

select * from at_cdc_items order by seqn;

SEQN FIXN WHEN               OPER   ID
---- ---- ------------------ ------ ---
   4      24-JAN-23 07.35.30 delete   1
   5      24-JAN-23 07.35.31 change   2
   6      24-JAN-23 07.35.31 change   3

Как видим, столбец seqn содержит значение, получаемое из последовательности в момент выполнения команд DML над базовой таблицей. Если клиенты будут обрабатывать изменения в диапазоне номеров seqn, то некоторые изменения могут быть потеряны для клиентов, как это описано выше.

Чтобы этого не произошло, в таблице изменений есть столбец fixn, значение которому присваивает клиент в начале обработки изменений. Вместе столбцы fixn и seqn позволяют организовть обработку изменений клиентами без потерь и в порядке выполнения этих изменений (seqn).

Клиент, приступая к обработке изменений, делает следующее:

declare
    l_curr_fixn number := at_cdc_items_seq.nextval;
begin
    update at_cdc_items set fixn = l_curr_fixn
    where fixn is null
    ;
    -- Параллельное обновление fixn в другом сеансе не присвоит другое значение,
    -- так как после снятия блокировки по завершении данной транзакции
    -- условие fixn is null будет переоценено.
    commit;
end;

И далее обрабатывает измененния в диапазоне

fixn between l_last_fixn + 1 and l_curr_fixn

в порядке

order by fixn, seqn

В пакет at_delta из библиотеки atop-PL/SQL добавлена поддержка таблиц изменений типа seqn, что позволяет создавать таблицы изменений без rowdependencies, со столбцами seqn и fixn.

Продемонстрирую, как реализовать захват и обработку изменений в таблице items с помощью пакетов at_delta и at_delta2.

-- удалить демонстрационные объекты
drop sequence at_cdc_items_seq;
drop table at_cdc_items;
drop trigger at_cdc_items_aiudr;

-- очистить таблицу items
delete from items;

-- создать таблицу изменений типа seqn и последовательность для нее
begin
    at_delta.create_capture(
        p_capture => 'items',
        p_type => 'seqn',
        p_descr => 'items changes'
    );
end;
/

-- добавить к таблице изменений первичный ключ базовой таблицы
alter table at_cdc_items add (id number(5));

-- создать триггер для регистрации изменений
create or replace trigger at_cdc_items_aiudr
after insert or update or delete on items
for each row
declare
    l_oper varchar2(10);
begin
    l_oper := 
        case
            when deleting then
                'd' -- delete
            when inserting or updating then
                'c' -- change
            else
                '?'
        end
    ;
    merge into at_cdc_items tgt
    using (
        select nvl(:old.id, :new.id) id from dual
    ) src
    on (src.id = tgt.id and oper in ('c', 'd'))
    when matched then
        update set when = current_timestamp,
            oper = l_oper,
            seqn = at_cdc_items_seq.nextval,
            fixn = null
    when not matched then
        insert (oper, seqn, id)
        values (l_oper, at_cdc_items_seq.nextval, src.id)
    ;
end;
/

Пусть нам необходимо воспроизводить все изменения таблицы items в другой "удаленной" таблице remote_items. Обработка изменений будет состоять в применении зарегистрированных изменений к таблице remote_items.

create table remote_items (
   id        number(5) primary key,
   name      varchar2(50) not null
);

Чтобы организовать хранение номера последнего обработанного изменения, создадим клиента test_client и сервис items_service:

begin
    at_delta.create_client(
        p_client => 'test_client'
    );
    at_delta.create_service(
        p_client => 'test_client',
        p_service => 'items_service',
        p_capture => 'items',
        p_descr => 'items changes'
    );
end;
/
commit;

select * from at_svs_test_client;

SERVICE       CAPTURE CDC_TYPE LAST_WHEN          LAST_SCN CURR_SCN
------------- ------- -------- ------------------ -------- --------
ITEMS_SERVICE ITEMS   seqn     24-JAN-23 12.06.46        0        1

Наконец, создадим процедуру для обработки изменений, использующую процедуры пакета at_delta2 для получения и сохранения номеров обработанных изменений:

create or replace procedure update_remote_items as
    c_client constant varchar2(20) := 'test_client';
    c_service constant varchar2(20) := 'items_service';
    
    l_last_fixn number;
    l_curr_fixn number;
begin
    -- получить диапазон номеров изменений для обработки
    at_delta2.get_range(c_client, c_service, l_last_fixn, l_curr_fixn);
    --commit;

    for r in (
        select fixn, seqn, oper, items.id, items.name
        from at_cdc_items cdc join items on items.id = cdc.id
        where oper = 'c'
            and fixn between l_last_fixn + 1 and l_curr_fixn
        union all
        select fixn, seqn, oper, id, null
        from at_cdc_items cdc
        where oper = 'd'
            and fixn between l_last_fixn + 1 and l_curr_fixn
        order by fixn, seqn
    ) loop
        if r.oper = 'd' then
            delete from remote_items where id = r.id
            ;
        elsif r.oper = 'c' then
            merge into remote_items tgt
            using (select r.id id, r.name name from dual) src
            on (tgt.id = src.id)
            when matched then
                update set name = src.name
            when not matched then
                insert (id, name) values (src.id, src.name)
            ;
        end if;
    end loop;

    -- сохранить номер последнего обработанного изменения
    at_delta2.acknowledge(c_client, c_service, l_curr_fixn);
end update_remote_items;
/

Добавим данные в таблицу items:

insert into items values (1, 'Линейка');
insert into items values (2, 'Карандаш');
insert into items values (3, 'Блокнот');
delete from items where id = 1;
update items set name = 'Тетрадь' where id = 3;
commit;

select * from items;

ID  NAME
--- --------------
  2 Карандаш
  3 Тетрадь

select * from remote_items;

no rows

Посмотрим зарегистрированные изменения и диапазон номеров для обработки в сервисе items_service:

select * from at_cdc_items order by seqn;

OPER WHEN               SEQN FIXN ID
---- ------------------ ---- ---- ---
m    24-JAN-23 12.11.19    3        2
d    24-JAN-23 12.11.20    5        1
m    24-JAN-23 12.11.20    6        3

select * from at_svs_test_client;

SERVICE       CAPTURE CDC_TYPE LAST_WHEN          LAST_SCN CURR_SCN
------------- ------- -------- ------------------ -------- --------
ITEMS_SERVICE ITEMS   seqn     24-JAN-23 12.06.46         0       7

Обработаем изменения:

begin update_remote_items; end;
/

select * from at_cdc_items order by seqn;

OPER WHEN               SEQN FIXN ID
---- ------------------ ---- ---- ---
m    24-JAN-23 12.11.19    3    8   2
d    24-JAN-23 12.11.20    5    8   1
m    24-JAN-23 12.11.20    6    8   3

select * from at_svs_test_client;

SERVICE       CAPTURE CDC_TYPE LAST_WHEN          LAST_SCN CURR_SCN
------------- ------- -------- ------------------ -------- --------
ITEMS_SERVICE ITEMS   seqn     24-JAN-23 12.18.50        8        9

Проверим результат в таблице remote_items:

select * from remote_items;

ID  NAME
--- --------------
  2 Карандаш
  3 Тетрадь

Итак, были продемонстрированы захват и обработка изменений клиентом при помощи таблицы изменений без rowdependencies, что делает данную технологию применимой не только в СУБД Oracle, но и в других СУБД с триггерами.

В заключение, удаляю демонстрационные объекты:

drop procedure update_remote_items;
drop table remote_items;

begin
    at_delta.delete_service('test_client', 'items_service');
    at_delta.delete_client('test_client');
    at_delta.delete_capture('items');
end;
/

drop table items;

Комментариев нет:

Отправить комментарий