вторник, 10 марта 2026 г.

Загрузка изменений из двух источников в одну ДТФ

В статье о соединении диапазонных таблиц фактов (ДТФ) я рассмотрел запрос, формирующий из двух ДТФ корректный набор диапазонов с фактами из обеих таблиц.

Заметим, что

  1. составные первичные ключи в двух таблицах, включающие дату начала диапазона, могут занимать не меньше, а, возможно, и больше места, чем столбцы фактов,
  2. запрос, соединяющий миллионы (десятки, сотни миллионов) строк из двух ДТФ в любом случае будет ресурсоемким.

Если такой запрос требуется выполнять часто, то оказывается выгодно хранить факты, даже загружаемые из разных источников, не в двух, а в одной ДТФ.

Диапазоны в этой таблице будут представлять собой пересечение диапазонов фактов, получаемых из двух источников. Количество диапазонов, то есть, строк, в этой таблице будет меньше или (в худшем случае) равно количеству диапазонов в двух отдельных ДТФ, заполняемых каждая из своего источника.

Для примера рассмотрим показатели a1 и a2 некоторого объекта с кодом 12345, значения которых нерегулярно изменяются со сменой дат, причем изменения поступают к нам из разных источников. Загрузим изменения за неделю с 2025-11-15 по 2025-11-21 из источников 1 и 2 в таблицы src1 и src2, соответственно:

create table src1 (fact_date date, item_code int, a1 int)
;

create table src2 (fact_date date, item_code int, a2 char)
;

create or replace procedure init_deltas()
language sql as
$$
    truncate table src1
    ;
    insert into src1
    values
    (date '2025-11-15', 12345, 90),
    (date '2025-11-16', 12345, 80),
    (date '2025-11-19', 12345, 60)
    ;
    truncate table src2
    ;
    insert into src2
    values
    (date '2025-11-17', 12345, '+'),
    (date '2025-11-18', 12345, '-'),
    (date '2025-11-19', 12345, '+'),
    (date '2025-11-21', 12345, '-')
    ;
$$

call init_deltas();

select * from src1 order by fact_date, item_code
;

|fact_date |item_code|a1 |
|----------|---------|---|
|2025-11-15|    12345| 90|
|2025-11-16|    12345| 80|
|2025-11-19|    12345| 60|

select * from src2 order by fact_date, item_code
;

|fact_date |item_code|a2 |
|----------|---------|---|
|2025-11-17|    12345|+  |
|2025-11-18|    12345|-  |
|2025-11-19|    12345|+  |
|2025-11-21|    12345|-  |

Если загрузить эти входящие данные в две ДТФ, по одной для каждого показателя, то получим следующие диапазоны:

-- из источника 1
|first_date|last_date |item_code|a1 |
|----------|----------|---------|---|
|2025-11-15|2025-11-15|    12345| 90|
|2025-11-16|2025-11-18|    12345| 80|
|2025-11-19|2099-12-31|    12345| 60|

-- из источника 2
|first_date|last_date |item_code|a2 |
|----------|----------|---------|---|
|2025-11-17|2025-11-17|    12345|+  |
|2025-11-18|2025-11-18|    12345|-  |
|2025-11-19|2025-11-20|    12345|+  |
|2025-11-21|2099-12-31|    12345|-  |

Если соединить полученные таблицы, то получим следующие диапазоны с a1 и a2:

|first_date|last_date |item_code|a1 |a2 |
|----------|----------|---------|---|---|
|2025-11-15|2025-11-15|    12345| 90|   |
|2025-11-16|2025-11-16|    12345| 80|   |
|2025-11-17|2025-11-17|    12345| 80|+  |
|2025-11-18|2025-11-18|    12345| 80|-  |
|2025-11-19|2025-11-20|    12345| 60|+  |
|2025-11-21|2099-12-31|    12345| 60|-  |

Всего 6 строк. Что на 1 меньше суммы строк в отдельных таблицах (3 + 4 = 7) за счет того, что в обеих есть диапазон, начинающийся 2025-11-19.

Теперь, вместо того чтобы загружать изменения из источников 1 и 2 в отдельные ДТФ и затем соединять их запросом, сделаем загрузку из обоих источников в одну ДТФ, содержащую показатели a1 и a2. Для этого создадим ДТФ dual_facts с двумя показателями:

create table dual_facts (
    first_date date,
    last_date date,
    item_code int,
    a1 int,
    a2 char,
    upddate1 timestamptz,
    upddate2 timestamptz,
    primary key (item_code, first_date)
);

Здесь столбцы upddate1 и upddate2 содержат время загрузки данных в строку из источников 1 и 2, соответственно.

Для удобства демонстрации реализую загрузки из каждого источника в виде отдельных процедур с единственным параметром p_date - датой загружаемых из источника изменений. Код процедур process_delta1 и process_delta2 приведу позже, а сейчас посмотрим, как они работают.

Для быстрой проверки корректности наполнения таблицы dual_facts из источников src1 и src2 создам следующие вью:

create or replace view src1_vs_dual_facts as
with src_ as (
    select generate_series(
            first_date::timestamp,
            least(last_date, '2025-11-21')::timestamp,
            interval '1' day
        )::date fact_date, item_code, a1
    from (
        select fact_date first_date,
            coalesce(lead(fact_date) over (partition by item_code order by fact_date) - 1, date '2099-12-31') last_date,
            item_code,
            a1
        from src1
        ) t
), facts_ as (
    select generate_series(
            first_date::timestamp,
            least(last_date, '2025-11-21')::timestamp,
            interval '1' day
        )::date fact_date, item_code, a1
    from dual_facts
    where a1 is not null
)
(select * from src_ except select * from facts_)
union all
(select * from facts_ except select * from src_)
;

create or replace view src2_vs_dual_facts as
with src_ as (
    select generate_series(
            first_date::timestamp,
            least(last_date, '2025-11-21')::timestamp,
            interval '1' day
        )::date fact_date, item_code, a2
    from (
        select fact_date first_date,
            coalesce(lead(fact_date) over (partition by item_code order by fact_date) - 1, date '2099-12-31') last_date,
            item_code,
            a2
        from src2
        ) t
), facts_ as (
    select generate_series(
            first_date::timestamp,
            least(last_date, '2025-11-21')::timestamp,
            interval '1' day
        )::date fact_date, item_code, a2
    from dual_facts
    where a2 is not null
)
(select * from src_ except select * from facts_)
union all
(select * from facts_ except select * from src_)
;

Для начала, загрузим данные из двух источников дата за датой, имитируя работу с ежедневными дельтами день за днем:

do language plpgsql $$
declare
    r record;
begin
    truncate table dual_facts;
    for r in
        select fact_date from src1
        union
        select fact_date from src2
        order by 1
    loop
        call process_delta1(r.fact_date);
        call process_delta2(r.fact_date);
    end loop;
end;
$$

И проверим результат:

select * from dual_facts order by first_date, item_code
;

|first_date|last_date |item_code|a1 |a2 |upddate1               |upddate2               |
|----------|----------|---------|---|---|-----------------------|-----------------------|
|2025-11-15|2025-11-15|    12345| 90|   |2026-03-10 18:46:46.752|                       |
|2025-11-16|2025-11-16|    12345| 80|   |2026-03-10 18:46:46.752|                       |
|2025-11-17|2025-11-17|    12345| 80|+  |                       |2026-03-10 18:46:46.752|
|2025-11-18|2025-11-18|    12345| 80|-  |                       |2026-03-10 18:46:46.752|
|2025-11-19|2025-11-20|    12345| 60|+  |2026-03-10 18:46:46.752|2026-03-10 18:46:46.752|
|2025-11-21|2099-12-31|    12345| 60|-  |                       |2026-03-10 18:46:46.752|

Результат корректен! Что подтверждается также запросами к проверочным вью:

select * from src1_vs_dual_facts;
-- no rows OK
select * from src2_vs_dual_facts;
-- no rows OK

Загрузка данных из двух источников день за днем, как мы только что сделали, это основной вариант использования процедур. Но существует также задача загрузки исторических данных из одного источника в таблицу, где уже есть исторические данные из другого источника. Продемонстрирую, что процедуры справляются и с этой задачей.

Вначале загрузим данные из источника 1 в хронологическом порядке, затем — из источника 2:

do language plpgsql $$do language plpgsql $$
declare
    r record;
begin
    truncate table dual_facts;
    for r in
        select distinct fact_date from src1 order by 1
    loop
        call process_delta1(r.fact_date);
    end loop;
    for r in
        select distinct fact_date from src2 order by 1
    loop
        call process_delta2(r.fact_date);
    end loop;
end;
$$

select * from src1_vs_dual_facts;
-- no rows OK
select * from src2_vs_dual_facts;
-- no rows OK

И наоборот:

do language plpgsql $$
declare
    r record;
begin
    truncate table dual_facts;
    for r in
        select distinct fact_date from src2 order by 1
    loop
        call process_delta2(r.fact_date);
    end loop;
    for r in
        select distinct fact_date from src1 order by 1
    loop
        call process_delta1(r.fact_date);
    end loop;
end;
$$

select * from src1_vs_dual_facts;
-- no rows OK
select * from src2_vs_dual_facts;
-- no rows OK

Итак, процедуры можно использовать как для ежедневного обновления таблицы фактов из двух источников, так и для загрузки в таблицу фактов исторических данных из источника, данные которого не были ранее загружены в dual_facts.

Проверим, что повторная загрузка всех изменений из двух источников в произвольном порядке (!) оставляет данные в таблице фактов корректными:

do language plpgsql $$
declare
    r record;
begin
    --truncate table dual_facts;
    for r in
        select *
        from (select distinct fact_date from src1)
        order by random()
    loop
        call process_delta1(r.fact_date);
    end loop;
    for r in
        select *
        from (select distinct fact_date from src2)
        order by random()
    loop
        call process_delta2(r.fact_date);
    end loop;
end;
$$

select * from src1_vs_dual_facts;
-- no rows OK
select * from src2_vs_dual_facts;
-- no rows OK

Таким образом, процедуры успешно загружают изменения из двух источников не только в хронологическом порядке. Но что если из источника будут поступать измененные данные за прошлые даты?

На следующем рисунке из статьи Загрузка изменений за прошлые даты в ДТФ показано, как сказываются изменения дельты за прошлые даты на таблице фактов:

Воспроизведу эти изменения по одному в src1 и загружу в таблицу dual_facts, проверяя результат с помощью вью src1_vs_dual_facts и src2_vs_dual_facts:

do language plpgsql $$
declare
    TODAY date := date '2025-11-21';
    r record;
    r2 record;
begin
    for r in
        with test(fact_date, item_code, a1) as (
            values
            (date '2025-11-15', 12345, 85),
            (date '2025-11-16', 12345, 85),
            (date '2025-11-17', 12345, 85),
            (date '2025-11-18', 12345, 85),
            (date '2025-11-19', 12345, 85),
            (date '2025-11-20', 12345, 85),
            (date '2025-11-21', 12345, 85)
        )
        select * from test order by random()
    loop
        -- создать исходное состояние источников и таблицы фактов
        call init_deltas();
        truncate table dual_facts;
        for r2 in
            select fact_date from src1
            union
            select fact_date from src2
            order by 1
        loop
            call process_delta1(r2.fact_date);
            call process_delta2(r2.fact_date);
        end loop;

        -- изменить данные в источнике и загрузить изменение
        merge into src1 tgt
        using (select r.fact_date, r.item_code, r.a1) as src
        on (src.fact_date = tgt.fact_date and src.item_code = tgt.item_code)
        when matched then
            update set a1 = src.a1
        when not matched then
            insert (fact_date, item_code, a1)
            values (src.fact_date, src.item_code, src.a1)
        ;
        call process_delta1(r.fact_date);
        -- найти расхождения между источниками и таблицей фактов
        for r2 in select * from src1_vs_dual_facts loop
            raise notice '1: %', r2;
        end loop;
        for r2 in select * from src2_vs_dual_facts loop
            raise notice '2: %', r2;
        end loop;
    end loop;
end;
$$

Никаких сообщений на выходе. Следовательно, процедура process_delta1 успешно справляется с загрузкой в таблицу фактов изменений задним числом.

Процедуры process_delta1 и process_delta2 созданы исходя из соображений, приведенных в статье Загрузка изменений за прошлые даты в ДТФ. Приведу их еще раз:

  1. Поскольку реальная дельта (изменения за день) содержит много строк, то выгодно обрабатывать их массово предложениями SQL, а не построчно в цикле, имея в виду потенциальный выигрыш в производительности.
  2. В общем случае изменение данных в прошлом влечет пересчет данных в таблице фактов, начиная с даты изменения до настоящего времени. Сделав такой пересчет процедурным образом, в цикле, а не массово предложениями SQL, можно снять все ограничения, связанные с агрегирующими и аналитическими (не)возможностями SQL. Кроме гибкости, другим преимуществом пересчета в цикле может оказаться простота, или даже очевидность, реализации, в отличие от сложных аналитических запросов.

Вот код процедур process_delta1 и process_delta2, который несложно адаптировать при необходимости:

create or replace procedure process_delta1(
    p_date date default current_date
)
language plpgsql as
$$
declare
    FF date := date '2099-12-31';
begin
    -- обновить диапазона [p_date, last_date]
    update dual_facts tgt set
        a1 = src.a1,
        upddate1 = now()
    from src1 src
    where src.fact_date = p_date
        and tgt.item_code = src.item_code
        and tgt.first_date = src.fact_date
        and coalesce(tgt.a1::text, ' ') != src.a1::text
    ;
    -- изменить диапазон [first_date, last_date] на [first_date, p_date-1]
    update dual_facts tgt set
        last_date = src.fact_date - 1
    from src1 src
    where src.fact_date = p_date
        and tgt.item_code = src.item_code
        and src.fact_date between tgt.first_date + 1 and tgt.last_date 
        and coalesce(tgt.a1::text, ' ') != src.a1::text
    ;
    -- создать диапазон [p_date, FF] с новым значением
    insert into dual_facts (
        first_date,
        last_date,
        item_code,
        a1,
        upddate1
    )
    select
        src.fact_date,
        FF,
        src.item_code,
        src.a1,
        now()
    from src1 src
    where src.fact_date = p_date
        and not exists (
            select 1
            from dual_facts tgt
            where tgt.item_code = src.item_code
                and src.fact_date between tgt.first_date and tgt.last_date
        )
    ;
    -- обновить измененные и позднейшие строки с учетом данных предыдущих строк 
    call process_dual_facts(p_date);
end;
$$

create or replace procedure process_delta2(
    p_date date default current_date
)
language plpgsql as
$$
declare
    FF date := date '2099-12-31';
begin
    -- обновить диапазона [p_date, last_date]
    update dual_facts tgt set
        a2 = src.a2,
        upddate2 = now()
    from src2 src
    where src.fact_date = p_date
        and tgt.item_code = src.item_code
        and tgt.first_date = src.fact_date
        and coalesce(tgt.a2::text, ' ') != src.a2::text
    ;
    -- изменить диапазон [first_date, last_date] на [first_date, p_date-1]
    update dual_facts tgt set
        last_date = src.fact_date - 1
    from src2 src
    where src.fact_date = p_date
        and tgt.item_code = src.item_code
        and src.fact_date between tgt.first_date + 1 and tgt.last_date 
        and coalesce(tgt.a2::text, ' ') != src.a2::text
    ;
    -- создать диапазон [p_date, FF] с новым значением
    insert into dual_facts (
        first_date,
        last_date,
        item_code,
        a2,
        upddate2
    )
    select
        src.fact_date,
        FF,
        src.item_code,
        src.a2,
        now()
    from src2 src
    where src.fact_date = p_date
        and not exists (
            select 1
            from dual_facts tgt
            where tgt.item_code = src.item_code
                and src.fact_date between tgt.first_date and tgt.last_date
        )
    ;
    -- обновить измененные и позднейшие строки с учетом данных предыдущих строк 
    call process_dual_facts(p_date);
end;
$$

create or replace procedure process_dual_facts(p_date date default current_date)
language plpgsql as
$$
declare
    r record;
    c record;
    p record;
begin
    p := null;
    for r in 
        select * 
        from dual_facts 
        where last_date >= p_date - 1
        order by item_code, first_date
    loop
        c := r;
        if not p is null then
            if c.item_code = p.item_code then
                if c.upddate1 is not null and c.upddate2 is null then
                    c.a2 = p.a2;
                elsif c.upddate1 is null and c.upddate2 is not null then
                    c.a1 = p.a1;
                end if;
                -- если свойства пред-щей и текущей строк равны
                if coalesce(p.a1::text, ' ') = coalesce(c.a1::text, ' ')
                    and coalesce(p.a2::text, ' ') = coalesce(c.a2::text, ' ')
                then
                    -- то слить диапазоны предыдущей и текущей строк
                    update dual_facts set
                        last_date = c.last_date,
                        upddate1 = greatest(upddate1, c.upddate1),
                        upddate2 = greatest(upddate2, c.upddate2)
                    where
                        item_code = p.item_code
                        and first_date = p.first_date
                    ;
                    delete from dual_facts
                    where
                        item_code = c.item_code
                        and first_date = c.first_date
                    ;
                    p.last_date := c.last_date;
                    continue;
                else
                    -- иначе обновить текущую и предыдущую строки
                    if c != r then
                        -- обновить изменившиеся свойства текущей строки 
                        update dual_facts set
                            a1 = c.a1,
                            a2 = c.a2
                        where
                            item_code = c.item_code
                            and first_date = c.first_date
                        ;
                    end if; 
                    if p.last_date != c.first_date - 1 then
                        -- обновить дату закрытия предыдущей строки
                        update dual_facts set
                            last_date = c.first_date - 1
                        where
                            item_code = c.item_code
                            and first_date = p.first_date
                        ;
                    end if;
                end if; 
                p := c;
            else
                p := r;
            end if;
        else
            p := c;
        end if;
    end loop;
end;
$$

Общая часть для процедур process_delta1 и process_delta2, где в цикле обновляются строки таблицы фактов, зависящие от предыдущих, вынесена в процедуру process_dual_facts.

Салют.

Комментариев нет:

Отправить комментарий