В статье о соединении диапазонных таблиц фактов (ДТФ) я рассмотрел запрос, формирующий из двух ДТФ корректный набор диапазонов с фактами из обеих таблиц.
Заметим, что
- составные первичные ключи в двух таблицах, включающие дату начала диапазона, могут занимать не меньше, а, возможно, и больше места, чем столбцы фактов,
- запрос, соединяющий миллионы (десятки, сотни миллионов) строк из двух ДТФ в любом случае будет ресурсоемким.
Если такой запрос требуется выполнять часто, то оказывается выгодно хранить факты, даже загружаемые из разных источников, не в двух, а в одной ДТФ.
Диапазоны в этой таблице будут представлять собой пересечение диапазонов фактов, получаемых из двух источников. Количество диапазонов, то есть, строк, в этой таблице будет меньше или (в худшем случае) равно количеству диапазонов в двух отдельных ДТФ, заполняемых каждая из своего источника.
Для примера рассмотрим показатели a1 и a2 некоторого объекта с кодом 12345, значения которых нерегулярно изменяются со сменой дат, причем изменения поступают к нам из разных источников. Загрузим изменения за неделю с 2025-11-15 по 2025-11-21 из источников 1 и 2 в таблицы src1 и src2, соответственно:
create table src1 (fact_date date, item_code int, a1 int)
;
create table src2 (fact_date date, item_code int, a2 char)
;
create or replace procedure init_deltas()
language sql as
$$
truncate table src1
;
insert into src1
values
(date '2025-11-15', 12345, 90),
(date '2025-11-16', 12345, 80),
(date '2025-11-19', 12345, 60)
;
truncate table src2
;
insert into src2
values
(date '2025-11-17', 12345, '+'),
(date '2025-11-18', 12345, '-'),
(date '2025-11-19', 12345, '+'),
(date '2025-11-21', 12345, '-')
;
$$
call init_deltas();
select * from src1 order by fact_date, item_code
;
|fact_date |item_code|a1 |
|----------|---------|---|
|2025-11-15| 12345| 90|
|2025-11-16| 12345| 80|
|2025-11-19| 12345| 60|
select * from src2 order by fact_date, item_code
;
|fact_date |item_code|a2 |
|----------|---------|---|
|2025-11-17| 12345|+ |
|2025-11-18| 12345|- |
|2025-11-19| 12345|+ |
|2025-11-21| 12345|- |
Если загрузить эти входящие данные в две ДТФ, по одной для каждого показателя, то получим следующие диапазоны:
-- из источника 1
|first_date|last_date |item_code|a1 |
|----------|----------|---------|---|
|2025-11-15|2025-11-15| 12345| 90|
|2025-11-16|2025-11-18| 12345| 80|
|2025-11-19|2099-12-31| 12345| 60|
-- из источника 2
|first_date|last_date |item_code|a2 |
|----------|----------|---------|---|
|2025-11-17|2025-11-17| 12345|+ |
|2025-11-18|2025-11-18| 12345|- |
|2025-11-19|2025-11-20| 12345|+ |
|2025-11-21|2099-12-31| 12345|- |
Если соединить полученные таблицы, то получим следующие диапазоны с a1 и a2:
|first_date|last_date |item_code|a1 |a2 |
|----------|----------|---------|---|---|
|2025-11-15|2025-11-15| 12345| 90| |
|2025-11-16|2025-11-16| 12345| 80| |
|2025-11-17|2025-11-17| 12345| 80|+ |
|2025-11-18|2025-11-18| 12345| 80|- |
|2025-11-19|2025-11-20| 12345| 60|+ |
|2025-11-21|2099-12-31| 12345| 60|- |
Всего 6 строк. Что на 1 меньше суммы строк в отдельных таблицах (3 + 4 = 7) за счет того, что в обеих есть диапазон, начинающийся 2025-11-19.
Теперь, вместо того чтобы загружать изменения из источников 1 и 2 в отдельные ДТФ и затем соединять их запросом, сделаем загрузку из обоих источников в одну ДТФ, содержащую показатели a1 и a2. Для этого создадим ДТФ dual_facts с двумя показателями:
create table dual_facts (
first_date date,
last_date date,
item_code int,
a1 int,
a2 char,
upddate1 timestamptz,
upddate2 timestamptz,
primary key (item_code, first_date)
);
Здесь столбцы upddate1 и upddate2 содержат время загрузки данных в строку из источников 1 и 2, соответственно.
Для удобства демонстрации реализую загрузки из каждого источника в виде отдельных процедур с единственным параметром p_date - датой загружаемых из источника изменений. Код процедур process_delta1 и process_delta2 приведу позже, а сейчас посмотрим, как они работают.
Для быстрой проверки корректности наполнения таблицы dual_facts из источников src1 и src2 создам следующие вью:
create or replace view src1_vs_dual_facts as
with src_ as (
select generate_series(
first_date::timestamp,
least(last_date, '2025-11-21')::timestamp,
interval '1' day
)::date fact_date, item_code, a1
from (
select fact_date first_date,
coalesce(lead(fact_date) over (partition by item_code order by fact_date) - 1, date '2099-12-31') last_date,
item_code,
a1
from src1
) t
), facts_ as (
select generate_series(
first_date::timestamp,
least(last_date, '2025-11-21')::timestamp,
interval '1' day
)::date fact_date, item_code, a1
from dual_facts
where a1 is not null
)
(select * from src_ except select * from facts_)
union all
(select * from facts_ except select * from src_)
;
create or replace view src2_vs_dual_facts as
with src_ as (
select generate_series(
first_date::timestamp,
least(last_date, '2025-11-21')::timestamp,
interval '1' day
)::date fact_date, item_code, a2
from (
select fact_date first_date,
coalesce(lead(fact_date) over (partition by item_code order by fact_date) - 1, date '2099-12-31') last_date,
item_code,
a2
from src2
) t
), facts_ as (
select generate_series(
first_date::timestamp,
least(last_date, '2025-11-21')::timestamp,
interval '1' day
)::date fact_date, item_code, a2
from dual_facts
where a2 is not null
)
(select * from src_ except select * from facts_)
union all
(select * from facts_ except select * from src_)
;
Для начала, загрузим данные из двух источников дата за датой, имитируя работу с ежедневными дельтами день за днем:
do language plpgsql $$
declare
r record;
begin
truncate table dual_facts;
for r in
select fact_date from src1
union
select fact_date from src2
order by 1
loop
call process_delta1(r.fact_date);
call process_delta2(r.fact_date);
end loop;
end;
$$
И проверим результат:
select * from dual_facts order by first_date, item_code
;
|first_date|last_date |item_code|a1 |a2 |upddate1 |upddate2 |
|----------|----------|---------|---|---|-----------------------|-----------------------|
|2025-11-15|2025-11-15| 12345| 90| |2026-03-10 18:46:46.752| |
|2025-11-16|2025-11-16| 12345| 80| |2026-03-10 18:46:46.752| |
|2025-11-17|2025-11-17| 12345| 80|+ | |2026-03-10 18:46:46.752|
|2025-11-18|2025-11-18| 12345| 80|- | |2026-03-10 18:46:46.752|
|2025-11-19|2025-11-20| 12345| 60|+ |2026-03-10 18:46:46.752|2026-03-10 18:46:46.752|
|2025-11-21|2099-12-31| 12345| 60|- | |2026-03-10 18:46:46.752|
Результат корректен! Что подтверждается также запросами к проверочным вью:
select * from src1_vs_dual_facts;
-- no rows OK
select * from src2_vs_dual_facts;
-- no rows OK
Загрузка данных из двух источников день за днем, как мы только что сделали, это основной вариант использования процедур. Но существует также задача загрузки исторических данных из одного источника в таблицу, где уже есть исторические данные из другого источника. Продемонстрирую, что процедуры справляются и с этой задачей.
Вначале загрузим данные из источника 1 в хронологическом порядке, затем — из источника 2:
do language plpgsql $$do language plpgsql $$
declare
r record;
begin
truncate table dual_facts;
for r in
select distinct fact_date from src1 order by 1
loop
call process_delta1(r.fact_date);
end loop;
for r in
select distinct fact_date from src2 order by 1
loop
call process_delta2(r.fact_date);
end loop;
end;
$$
select * from src1_vs_dual_facts;
-- no rows OK
select * from src2_vs_dual_facts;
-- no rows OK
И наоборот:
do language plpgsql $$
declare
r record;
begin
truncate table dual_facts;
for r in
select distinct fact_date from src2 order by 1
loop
call process_delta2(r.fact_date);
end loop;
for r in
select distinct fact_date from src1 order by 1
loop
call process_delta1(r.fact_date);
end loop;
end;
$$
select * from src1_vs_dual_facts;
-- no rows OK
select * from src2_vs_dual_facts;
-- no rows OK
Итак, процедуры можно использовать как для ежедневного обновления таблицы фактов из двух источников, так и для загрузки в таблицу фактов исторических данных из источника, данные которого не были ранее загружены в dual_facts.
Проверим, что повторная загрузка всех изменений из двух источников в произвольном порядке (!) оставляет данные в таблице фактов корректными:
do language plpgsql $$
declare
r record;
begin
--truncate table dual_facts;
for r in
select *
from (select distinct fact_date from src1)
order by random()
loop
call process_delta1(r.fact_date);
end loop;
for r in
select *
from (select distinct fact_date from src2)
order by random()
loop
call process_delta2(r.fact_date);
end loop;
end;
$$
select * from src1_vs_dual_facts;
-- no rows OK
select * from src2_vs_dual_facts;
-- no rows OK
Таким образом, процедуры успешно загружают изменения из двух источников не только в хронологическом порядке. Но что если из источника будут поступать измененные данные за прошлые даты?
На следующем рисунке из статьи Загрузка изменений за прошлые даты в ДТФ показано, как сказываются изменения дельты за прошлые даты на таблице фактов:
Воспроизведу эти изменения по одному в src1 и загружу в таблицу dual_facts, проверяя результат с помощью вью src1_vs_dual_facts и src2_vs_dual_facts:
do language plpgsql $$
declare
TODAY date := date '2025-11-21';
r record;
r2 record;
begin
for r in
with test(fact_date, item_code, a1) as (
values
(date '2025-11-15', 12345, 85),
(date '2025-11-16', 12345, 85),
(date '2025-11-17', 12345, 85),
(date '2025-11-18', 12345, 85),
(date '2025-11-19', 12345, 85),
(date '2025-11-20', 12345, 85),
(date '2025-11-21', 12345, 85)
)
select * from test order by random()
loop
-- создать исходное состояние источников и таблицы фактов
call init_deltas();
truncate table dual_facts;
for r2 in
select fact_date from src1
union
select fact_date from src2
order by 1
loop
call process_delta1(r2.fact_date);
call process_delta2(r2.fact_date);
end loop;
-- изменить данные в источнике и загрузить изменение
merge into src1 tgt
using (select r.fact_date, r.item_code, r.a1) as src
on (src.fact_date = tgt.fact_date and src.item_code = tgt.item_code)
when matched then
update set a1 = src.a1
when not matched then
insert (fact_date, item_code, a1)
values (src.fact_date, src.item_code, src.a1)
;
call process_delta1(r.fact_date);
-- найти расхождения между источниками и таблицей фактов
for r2 in select * from src1_vs_dual_facts loop
raise notice '1: %', r2;
end loop;
for r2 in select * from src2_vs_dual_facts loop
raise notice '2: %', r2;
end loop;
end loop;
end;
$$
Никаких сообщений на выходе. Следовательно, процедура process_delta1 успешно справляется с загрузкой в таблицу фактов изменений задним числом.
Процедуры process_delta1 и process_delta2 созданы исходя из соображений, приведенных в статье Загрузка изменений за прошлые даты в ДТФ. Приведу их еще раз:
- Поскольку реальная дельта (изменения за день) содержит много строк, то выгодно обрабатывать их массово предложениями SQL, а не построчно в цикле, имея в виду потенциальный выигрыш в производительности.
- В общем случае изменение данных в прошлом влечет пересчет данных в таблице фактов, начиная с даты изменения до настоящего времени. Сделав такой пересчет процедурным образом, в цикле, а не массово предложениями SQL, можно снять все ограничения, связанные с агрегирующими и аналитическими (не)возможностями SQL. Кроме гибкости, другим преимуществом пересчета в цикле может оказаться простота, или даже очевидность, реализации, в отличие от сложных аналитических запросов.
Вот код процедур process_delta1 и process_delta2, который несложно адаптировать при необходимости:
create or replace procedure process_delta1(
p_date date default current_date
)
language plpgsql as
$$
declare
FF date := date '2099-12-31';
begin
-- обновить диапазона [p_date, last_date]
update dual_facts tgt set
a1 = src.a1,
upddate1 = now()
from src1 src
where src.fact_date = p_date
and tgt.item_code = src.item_code
and tgt.first_date = src.fact_date
and coalesce(tgt.a1::text, ' ') != src.a1::text
;
-- изменить диапазон [first_date, last_date] на [first_date, p_date-1]
update dual_facts tgt set
last_date = src.fact_date - 1
from src1 src
where src.fact_date = p_date
and tgt.item_code = src.item_code
and src.fact_date between tgt.first_date + 1 and tgt.last_date
and coalesce(tgt.a1::text, ' ') != src.a1::text
;
-- создать диапазон [p_date, FF] с новым значением
insert into dual_facts (
first_date,
last_date,
item_code,
a1,
upddate1
)
select
src.fact_date,
FF,
src.item_code,
src.a1,
now()
from src1 src
where src.fact_date = p_date
and not exists (
select 1
from dual_facts tgt
where tgt.item_code = src.item_code
and src.fact_date between tgt.first_date and tgt.last_date
)
;
-- обновить измененные и позднейшие строки с учетом данных предыдущих строк
call process_dual_facts(p_date);
end;
$$
create or replace procedure process_delta2(
p_date date default current_date
)
language plpgsql as
$$
declare
FF date := date '2099-12-31';
begin
-- обновить диапазона [p_date, last_date]
update dual_facts tgt set
a2 = src.a2,
upddate2 = now()
from src2 src
where src.fact_date = p_date
and tgt.item_code = src.item_code
and tgt.first_date = src.fact_date
and coalesce(tgt.a2::text, ' ') != src.a2::text
;
-- изменить диапазон [first_date, last_date] на [first_date, p_date-1]
update dual_facts tgt set
last_date = src.fact_date - 1
from src2 src
where src.fact_date = p_date
and tgt.item_code = src.item_code
and src.fact_date between tgt.first_date + 1 and tgt.last_date
and coalesce(tgt.a2::text, ' ') != src.a2::text
;
-- создать диапазон [p_date, FF] с новым значением
insert into dual_facts (
first_date,
last_date,
item_code,
a2,
upddate2
)
select
src.fact_date,
FF,
src.item_code,
src.a2,
now()
from src2 src
where src.fact_date = p_date
and not exists (
select 1
from dual_facts tgt
where tgt.item_code = src.item_code
and src.fact_date between tgt.first_date and tgt.last_date
)
;
-- обновить измененные и позднейшие строки с учетом данных предыдущих строк
call process_dual_facts(p_date);
end;
$$
create or replace procedure process_dual_facts(p_date date default current_date)
language plpgsql as
$$
declare
r record;
c record;
p record;
begin
p := null;
for r in
select *
from dual_facts
where last_date >= p_date - 1
order by item_code, first_date
loop
c := r;
if not p is null then
if c.item_code = p.item_code then
if c.upddate1 is not null and c.upddate2 is null then
c.a2 = p.a2;
elsif c.upddate1 is null and c.upddate2 is not null then
c.a1 = p.a1;
end if;
-- если свойства пред-щей и текущей строк равны
if coalesce(p.a1::text, ' ') = coalesce(c.a1::text, ' ')
and coalesce(p.a2::text, ' ') = coalesce(c.a2::text, ' ')
then
-- то слить диапазоны предыдущей и текущей строк
update dual_facts set
last_date = c.last_date,
upddate1 = greatest(upddate1, c.upddate1),
upddate2 = greatest(upddate2, c.upddate2)
where
item_code = p.item_code
and first_date = p.first_date
;
delete from dual_facts
where
item_code = c.item_code
and first_date = c.first_date
;
p.last_date := c.last_date;
continue;
else
-- иначе обновить текущую и предыдущую строки
if c != r then
-- обновить изменившиеся свойства текущей строки
update dual_facts set
a1 = c.a1,
a2 = c.a2
where
item_code = c.item_code
and first_date = c.first_date
;
end if;
if p.last_date != c.first_date - 1 then
-- обновить дату закрытия предыдущей строки
update dual_facts set
last_date = c.first_date - 1
where
item_code = c.item_code
and first_date = p.first_date
;
end if;
end if;
p := c;
else
p := r;
end if;
else
p := c;
end if;
end loop;
end;
$$
Общая часть для процедур process_delta1 и process_delta2, где в цикле обновляются строки таблицы фактов, зависящие от предыдущих, вынесена в процедуру process_dual_facts.
Салют.
Комментариев нет:
Отправить комментарий