В посте о диапазонных таблицах фактов (ДТФ) я привел алгоритм их заполнения из ежедневных снэпшотов, содержащих данные на текущую дату. Алгоритм также годится для заполнения ДТФ в случае, когда на входе не регулярные (ежедневные) снэпшоты, а только нерегулярные изменения (дельта).
Но время от времени возникает задача добавления в непустую ДТФ новых или измененных данных за прошлые даты, и упомянутый алгоритм для этого не годится. Поэтому рассмотрим универсальный алгоритм заполнения ДТФ из источника, откуда поступают данные как на текущую дату, так и на прошлые даты — если данные за прошлую дату изменились.
Для разработки и тестирования нового алгоритма воспользуюсь примером из упомянутого поста, слегка модифицировав его. Данные на входе представляют собой нерегулярные изменения (дельту) остатков двух товаров 12333 и 12345 на складе за неделю с 2025-11-15 по 2025-11-21.
Загрузим изменения остатков за неделю в таблицу src:
create table src (
fact_date date,
item_code int,
qty int,
primary key (fact_date, item_code)
);
create or replace procedure init_delta()
language sql as
$$
truncate table src;
insert into src
values
(date '2025-11-15', 12333, 5),
(date '2025-11-15', 12345, 90),
(date '2025-11-16', 12345, 80),
(date '2025-11-19', 12345, 60),
(date '2025-11-20', 12333, 0)
;
$$
call init_delta();
select * from src order by fact_date, item_code;
|fact_date |item_code|qty|
|----------|---------|---|
|2025-11-15| 12333| 5|
|2025-11-15| 12345| 90|
|2025-11-16| 12345| 80|
|2025-11-19| 12345| 60|
|2025-11-20| 12333| 0|
Для целей тестирования будем считать последний день недели, 2025-11-21, "сегодняшним".
Создадим диапазонную таблицу фактов facts:
create table facts (
first_date date,
last_date date,
item_code int,
qty int,
primary key (item_code, first_date)
);
Вот как выглядят данные в таблице facts после загрузки всех данных из src в хронологическом порядке:
|first_date|last_date |item_code|qty|
|----------|----------|---------|---|
|2025-11-15|2025-11-19| 12333| 5|
|2025-11-15|2025-11-15| 12345| 90|
|2025-11-16|2025-11-18| 12345| 80|
|2025-11-19|2099-12-31| 12345| 60|
|2025-11-20|2099-12-31| 12333| 0|
Здесь дата 2099-12-31 — это произвольно выбранная дата в неблизком будущем, закрывающая текущий диапазон, то есть, диапазон, содержащий сегодняшнюю дату.
История изменения остатков товаров 12333 и 12345 в таблице facts является непрерывной с даты 2025-11-15, когда товары впервые появились на складе, по дату 2099-12-31.
Алгоритм, предложенный в первом посте о ДТФ, ежедневно загружает в ДТФ текущие данные из источника. При этом из источника могут поступать как ежедневные снэпшоты, содержащие остатки всех товаров, так и только строки с товарами, остаток которых изменился — дельта, — и результат обработки одинаково корректен. Но алгоритм, загружающий снэпшот за прошлую дату, и алгоритм, загружающий дельту за прошлую дату, будут разными. И вот почему.
Представим остаток товара 12345 из таблицы facts на рисунке, где сверху приведены даты (дни месяца для краткости), под ними остаток в штуках, а диапазоны отчеркнуты вертикальными разделительными чертами:
Серым выделена "сегодняшняя" дата. Буквами FF (Far Future) обозначена дата 2099-12-31 в далеком будущем.
Загрузка измененного снэпшота на дату между 2025-11-15 и 2025-11-21 приведет к одному из результатов, изображенных на рисунке:
В верхней части рисунка показано состояние таблицы facts перед тем, как в нее загружается измененный снэпшот за одну из дат. А ниже показаны измененные остатки на дату в прошлом, и справа от них — новая история остатка в таблице facts, возникшая в результате загрузки измененного остатка.
Главное правило при загрузке снэпшота за прошлую дату в ДТФ состоит в том, что снэпшот влияет на факты только на дату снэпшота. Отсюда и соответствующие разбиения диапазонов в facts, на которые приходится прошлая дата снэпшота.
И только загрузка снэпшота за сегодня меняет остаток не на одну дату, а на весь текущий диапазон. В том числе, если текущий диапазон начался сегодня:
А следующий рисунок показывает, к каким изменениям в таблице facts приводит загрузка дельты за прошлую и "сегодняшнюю" даты. Сравните его с приведенным выше рисунком На входе ежедневные снэпшоты.
Главное правило при загрузке дельты за прошлую дату в ДТФ состоит в том, что дельта влияет на диапазон дат в ДТФ, начиная с даты дельты и до конца диапазона, на который пришлась дата дельты. Отсюда и разбиения диапазонов в facts не более чем на два диапазона.
Итак, загрузка снэпшота за прошлую дату и загрузка дельты за прошлую дату приводят к разным изменениям в ДТФ. Это объясняется тем, что, в случае снэпшотов на входе, для каждой даты уже завершенного диапазона имелся снэпшот, подтверждавший значение остатка на дату. А в случае дельт на входе, значение остатка было получено только для первой даты диапазона и в последующие дни данные по остатку не поступали — ведь приходят только изменения.
Следовательно, для загрузки изменений на прошлые даты в ДТФ нужны два разных алгоритма: для случая дельт и для случая ежедневных снэпшотов на входе.
Начну со случая дельт, и реализую алгоритм для загрузки дельт за прошлые и сегодняшнюю даты в процедуре process_delta с единственным параметром p_date — датой, за которую загружаются данные из таблицы src в таблицу facts.
Код процедуры я приведу позднее, а пока посмотрим на результаты ее работы.
Для быстрой проверки корректности наполнения таблицы facts из источника src создам следующее вью:
create or replace view src_vs_facts as
with src_ as (
select generate_series(
first_date::timestamp,
least(last_date, '2025-11-21')::timestamp,
interval '1' day
)::date fact_date, item_code, qty
from (
select fact_date first_date,
coalesce(lead(fact_date) over (partition by item_code order by fact_date) - 1, date '2099-12-31') last_date,
item_code,
qty
from src
) t
), facts_ as (
select generate_series(
first_date::timestamp,
least(last_date, '2025-11-21')::timestamp,
interval '1' day
)::date fact_date, item_code, qty
from facts
)
(select * from src_ except select * from facts_)
union all
(select * from facts_ except select * from src_)
;
Для начала очищу и наполню таблицу facts, загрузив в нее данные из src в хронологическом порядке:
do language plpgsql $$
declare
r record;
begin
call init_delta();
truncate table facts;
for r in
select distinct fact_date from src order by 1
loop
call process_delta(r.fact_date);
end loop;
end;
$$
Вот результат загрузки:
select * from facts order by first_date, item_code
;
|first_date|last_date |item_code|qty|
|----------|----------|---------|---|
|2025-11-15|2025-11-19| 12333| 5|
|2025-11-15|2025-11-15| 12345| 90|
|2025-11-16|2025-11-18| 12345| 80|
|2025-11-19|2099-12-31| 12345| 60|
|2025-11-20|2099-12-31| 12333| 0|
Проверка с помощью вью src_vs_facts показывает, что остатки на даты, полученные из таблицы src, и остатки на даты в facts совпадают:
select * from src_vs_facts;
-- no rows
Теперь загружу все дельты повторно в произвольном порядке и проверю результат на корректность:
do language plpgsql $$
declare
r record;
begin
--truncate table facts;
for r in
select *
from (select distinct fact_date from src) t
order by random()
loop
call process_delta(r.fact_date);
end loop;
end;
$$
select * from src_vs_facts;
-- no rows
Да! Повторная загрузка дельт в произвольном порядке ничего не сломала.
Теперь воспользуюсь данными с рисунка На входе только изменения чтобы протестировать загрузку изменившихся дельт за прошлую и "сегодняшнюю" дату, и проверю корректность загруженных данных после каждого изменения. Перед применением каждого изменения я буду восстанавливать исходное состояние таблиц src и facts. Итак:
do language plpgsql $$
declare
r record;
r2 record;
begin
for r in
with test(fact_date, item_code, new_qty) as (
values
(date '2025-11-15', 12345, 85),
(date '2025-11-16', 12345, 85),
(date '2025-11-17', 12345, 85),
(date '2025-11-18', 12345, 85),
(date '2025-11-19', 12345, 85),
(date '2025-11-20', 12345, 85),
(date '2025-11-21', 12345, 85)
)
select * from test order by random()
loop
-- создать исходное состояние источника и таблицы фактов
call init_delta();
truncate table facts;
for r2 in
select distinct fact_date from src order by 1
loop
call process_delta(r2.fact_date);
end loop;
-- изменить данные в источнике и загрузить изменение
merge into src tgt
using (select r.fact_date, r.item_code, r.new_qty) as src
on (src.fact_date = tgt.fact_date and src.item_code = tgt.item_code)
when matched then
update set qty = src.new_qty
when not matched then
insert (fact_date, item_code, qty)
values (src.fact_date, src.item_code, src.new_qty)
;
call process_delta(r.fact_date);
-- найти расхождения между источником и таблицей фактов
for r2 in select * from src_vs_facts loop
raise notice '%', r2;
end loop;
end loop;
end;
$$
Никаких сообщений на выходе. Следовательно, имеем основания предположить, что реализация алгоритма работает корректно.
При создании процедуры я исходил из двух соображений:
- Поскольку реальная дельта (изменения на дату) содержит много строк содержит много строк, то выгодно обрабатывать их массово предложениями SQL, а не построчно в цикле, имея в виду потенциальный выигрыш в производительности.
- В общем случае изменение данных в прошлом влечет пересчет данных в таблице фактов, начиная с даты изменения до настоящего времени. Сделав такой пересчет процедурным образом, в цикле, а не массово предложениями SQL, можно снять все ограничения, связанные с агрегирующими и аналитическими (не)возможностями SQL. Кроме гибкости, другим преимуществом пересчета в цикле может оказаться простота, или даже очевидность, реализации, в отличие от сложных аналитических запросов.
Вот код процедуры process_delta, который несложно адаптировать для любой диапазонной таблицы фактов:
create or replace procedure process_delta(in p_date date default current_date)
language plpgsql
as $procedure$
declare
FF date := date '2099-12-31';
begin
merge into facts tgt
using (
select *
from src
where fact_date = p_date
) src
on (tgt.first_date, tgt.item_code) = (src.fact_date, src.item_code)
when matched and tgt.qty != src.qty then
update set
qty = src.qty
when not matched then
insert (
first_date,
last_date,
item_code,
qty
) values (
src.fact_date,
FF,
src.item_code,
src.qty
)
;
-- обновить строки с учетом предыдущих строк
declare
r record;
c record;
p record;
begin
p := null;
for r in
select *
from facts
where last_date >= p_date - 1
order by item_code, first_date
loop
c := r;
if not p is null then
if c.item_code = p.item_code then
-- если свойства пред-щей и текущей строк равны
if p.qty = c.qty then
-- то слить диапазоны предыдущей и текущей строк
update facts set
last_date = c.last_date
where item_code = p.item_code
and first_date = p.first_date
;
delete from facts
where item_code = c.item_code
and first_date = c.first_date
;
p.last_date := c.last_date;
continue;
elsif p.last_date != c.first_date - 1 then
-- обновить дату закрытия предыдущей строки
update facts set
last_date = c.first_date - 1
where item_code = p.item_code
and first_date = p.first_date
;
end if;
p := c;
else
p := r;
end if;
else
p := c;
end if;
end loop;
end;
end;
$procedure$
;
В заключение, прокомментирую вторую часть процедуры, где в цикле обновляются строки таблицы фактов, зависящие от предыдущих с тем же самым item_code. Эта часть кода в процедуре process_delta отвечает за слияние смежных диапазонов с одинаковым остатком в один и за обновление даты завершения диапазона last_date. В общем случае, однако, здесь можно делать и другие вещи. Например, расчитывать и обновлять значения столбцов, зависимых от других столбцов той же строки.
Следующий фрагмент обобщенно представляет эту часть процедуры. Здесь <fact_table> это таблица фактов, <entity_id> — ключ экземпляра сущности, факты о которой представлены в таблице, <column_a> — столбец, чье значение зависит от других столбцов той же строки, <column_x> — столбец, чье значение зависит от значений столбцов предыдущей строки с тем же <entity_id>.
-- обновить строки с учетом текущей и предыдущей строк
declare
r record;
c record; -- current row
p record; -- previous row
begin
p := null;
for r in
select *
from <fact_table>
where ...
order by <entity_id>, first_date
loop
c := r;
if not p is null then
if c.<entity_id> = p.<entity_id> then
-- обновить столбцы в строке p, зависимые от строки c
/*
update <fact_table> set
last_date = c.first_date - 1
where <entity_code> = p.<entity_code>
and first_date = p.first_date
and last_date != c.first_date - 1
;*/
-- обновить функционально зависимые столбцы в строке p, если есть
/*
update <fact_table> set
<column_a> = <func1(p)>
where <entity_code> = p.<entity_code>
and first_date = p.first_date
and <column_a> != <func1(p)>
;*/
-- обновить столбцы в строке c, зависимые от строки p
/*
update <fact_table> set
<column_x> = <func2(p)>
where <entity_code> = c.<entity_code>
and first_date = c.first_date
and <column_x> != <func2(p)>
;*/
p := c;
else
-- обновить функционально зависимые столбцы в строке p, если есть,
-- так как это может быть единственная строка с данным <entity_id>
/*
update <fact_table> set
<column_a> = <func1(p)>
where <entity_code> = p.<entity_code>
and first_date = p.first_date
and <column_a> != <func1(p)>
;*/
p := r;
end if;
else
p := c;
end if;
end loop;
-- обновить функционально зависимые столбцы в строке p, если есть,
-- так как это может быть единственная строка с данным <entity_id>
/*
update <fact_table> set
<column_a> = <func1(p)>
where <entity_code> = p.<entity_code>
and first_date = p.first_date
and <column_a> != <func1(p)>
;*/
На этом все! Салют.
Комментариев нет:
Отправить комментарий