суббота, 21 февраля 2026 г.

Загрузка изменений за прошлые даты в ДТФ

В посте о диапазонных таблицах фактов (ДТФ) я привел алгоритм их заполнения из ежедневных снэпшотов, содержащих данные на текущую дату. Алгоритм также годится для заполнения ДТФ в случае, когда на входе не регулярные (ежедневные) снэпшоты, а только нерегулярные изменения (дельта).

Но время от времени возникает задача добавления в непустую ДТФ новых или измененных данных за прошлые даты, и упомянутый алгоритм для этого не годится. Поэтому рассмотрим универсальный алгоритм заполнения ДТФ из источника, откуда поступают данные как на текущую дату, так и на прошлые даты — если данные за прошлую дату изменились.

Для разработки и тестирования нового алгоритма воспользуюсь примером из упомянутого поста, слегка модифицировав его. Данные на входе представляют собой нерегулярные изменения (дельту) остатков двух товаров 12333 и 12345 на складе за неделю с 2025-11-15 по 2025-11-21.

Загрузим изменения остатков за неделю в таблицу src:

create table src (
    fact_date date,
    item_code int,
    qty int,
    primary key (fact_date, item_code)
);

create or replace procedure init_delta()
language sql as
$$
    truncate table src;
    insert into src
    values
    (date '2025-11-15', 12333, 5),
    (date '2025-11-15', 12345, 90),
    (date '2025-11-16', 12345, 80),
    (date '2025-11-19', 12345, 60),
    (date '2025-11-20', 12333, 0)
    ;
$$

call init_delta();

select * from src order by fact_date, item_code;

|fact_date |item_code|qty|
|----------|---------|---|
|2025-11-15|    12333|  5|
|2025-11-15|    12345| 90|
|2025-11-16|    12345| 80|
|2025-11-19|    12345| 60|
|2025-11-20|    12333|  0|

Для целей тестирования будем считать последний день недели, 2025-11-21, "сегодняшним".

Создадим диапазонную таблицу фактов facts:

create table facts (
    first_date date,
    last_date date,
    item_code int,
    qty int,
    primary key (item_code, first_date)
);

Вот как выглядят данные в таблице facts после загрузки всех данных из src в хронологическом порядке:

|first_date|last_date |item_code|qty|
|----------|----------|---------|---|
|2025-11-15|2025-11-19|    12333|  5|
|2025-11-15|2025-11-15|    12345| 90|
|2025-11-16|2025-11-18|    12345| 80|
|2025-11-19|2099-12-31|    12345| 60|
|2025-11-20|2099-12-31|    12333|  0|

Здесь дата 2099-12-31 — это произвольно выбранная дата в неблизком будущем, закрывающая текущий диапазон, то есть, диапазон, содержащий сегодняшнюю дату.

История изменения остатков товаров 12333 и 12345 в таблице facts является непрерывной с даты 2025-11-15, когда товары впервые появились на складе, по дату 2099-12-31.

Алгоритм, предложенный в первом посте о ДТФ, ежедневно загружает в ДТФ текущие данные из источника. При этом из источника могут поступать как ежедневные снэпшоты, содержащие остатки всех товаров, так и только строки с товарами, остаток которых изменился — дельта, — и результат обработки одинаково корректен. Но алгоритм, загружающий снэпшот за прошлую дату, и алгоритм, загружающий дельту за прошлую дату, будут разными. И вот почему.

Представим остаток товара 12345 из таблицы facts на рисунке, где сверху приведены даты (дни месяца для краткости), под ними остаток в штуках, а диапазоны отчеркнуты вертикальными разделительными чертами:

Серым выделена "сегодняшняя" дата. Буквами FF (Far Future) обозначена дата 2099-12-31 в далеком будущем.

Загрузка измененного снэпшота на дату между 2025-11-15 и 2025-11-21 приведет к одному из результатов, изображенных на рисунке:

В верхней части рисунка показано состояние таблицы facts перед тем, как в нее загружается измененный снэпшот за одну из дат. А ниже показаны измененные остатки на дату в прошлом, и справа от них — новая история остатка в таблице facts, возникшая в результате загрузки измененного остатка.

Главное правило при загрузке снэпшота за прошлую дату в ДТФ состоит в том, что снэпшот влияет на факты только на дату снэпшота. Отсюда и соответствующие разбиения диапазонов в facts, на которые приходится прошлая дата снэпшота.

И только загрузка снэпшота за сегодня меняет остаток не на одну дату, а на весь текущий диапазон. В том числе, если текущий диапазон начался сегодня:

А следующий рисунок показывает, к каким изменениям в таблице facts приводит загрузка дельты за прошлую и "сегодняшнюю" даты. Сравните его с приведенным выше рисунком На входе ежедневные снэпшоты.

Главное правило при загрузке дельты за прошлую дату в ДТФ состоит в том, что дельта влияет на диапазон дат в ДТФ, начиная с даты дельты и до конца диапазона, на который пришлась дата дельты. Отсюда и разбиения диапазонов в facts не более чем на два диапазона.

Итак, загрузка снэпшота за прошлую дату и загрузка дельты за прошлую дату приводят к разным изменениям в ДТФ. Это объясняется тем, что, в случае снэпшотов на входе, для каждой даты уже завершенного диапазона имелся снэпшот, подтверждавший значение остатка на дату. А в случае дельт на входе, значение остатка было получено только для первой даты диапазона и в последующие дни данные по остатку не поступали — ведь приходят только изменения.

Следовательно, для загрузки изменений на прошлые даты в ДТФ нужны два разных алгоритма: для случая дельт и для случая ежедневных снэпшотов на входе.

Начну со случая дельт, и реализую алгоритм для загрузки дельт за прошлые и сегодняшнюю даты в процедуре process_delta с единственным параметром p_date — датой, за которую загружаются данные из таблицы src в таблицу facts.

Код процедуры я приведу позднее, а пока посмотрим на результаты ее работы.

Для быстрой проверки корректности наполнения таблицы facts из источника src создам следующее вью:

create or replace view src_vs_facts as
with src_ as (
    select generate_series(
            first_date::timestamp,
            least(last_date, '2025-11-21')::timestamp,
            interval '1' day
        )::date fact_date, item_code, qty
    from (
        select fact_date first_date,
            coalesce(lead(fact_date) over (partition by item_code order by fact_date) - 1, date '2099-12-31') last_date,
            item_code,
            qty
        from src
        ) t
), facts_ as (
    select generate_series(
            first_date::timestamp,
            least(last_date, '2025-11-21')::timestamp,
            interval '1' day
        )::date fact_date, item_code, qty
    from facts
)
(select * from src_ except select * from facts_)
union all
(select * from facts_ except select * from src_)
;

Для начала очищу и наполню таблицу facts, загрузив в нее данные из src в хронологическом порядке:

do language plpgsql $$
declare
    r record;
begin
    call init_delta();
    truncate table facts;
    for r in 
        select distinct fact_date from src order by 1
    loop
        call process_delta(r.fact_date);
    end loop;
end;
$$

Вот результат загрузки:

select * from facts order by first_date, item_code
;

|first_date|last_date |item_code|qty|
|----------|----------|---------|---|
|2025-11-15|2025-11-19|    12333|  5|
|2025-11-15|2025-11-15|    12345| 90|
|2025-11-16|2025-11-18|    12345| 80|
|2025-11-19|2099-12-31|    12345| 60|
|2025-11-20|2099-12-31|    12333|  0|

Проверка с помощью вью src_vs_facts показывает, что остатки на даты, полученные из таблицы src, и остатки на даты в facts совпадают:

select * from src_vs_facts;
-- no rows

Теперь загружу все дельты повторно в произвольном порядке и проверю результат на корректность:

do language plpgsql $$
declare
    r record;
begin
    --truncate table facts;
    for r in 
        select *
        from (select distinct fact_date from src) t
        order by random()
    loop
        call process_delta(r.fact_date);
    end loop;
end;
$$

select * from src_vs_facts;
-- no rows

Да! Повторная загрузка дельт в произвольном порядке ничего не сломала.

Теперь воспользуюсь данными с рисунка На входе только изменения чтобы протестировать загрузку изменившихся дельт за прошлую и "сегодняшнюю" дату, и проверю корректность загруженных данных после каждого изменения. Перед применением каждого изменения я буду восстанавливать исходное состояние таблиц src и facts. Итак:

do language plpgsql $$
declare
    r record;
    r2 record;
begin
    for r in
        with test(fact_date, item_code, new_qty) as (
            values
            (date '2025-11-15', 12345, 85),
            (date '2025-11-16', 12345, 85),
            (date '2025-11-17', 12345, 85),
            (date '2025-11-18', 12345, 85),
            (date '2025-11-19', 12345, 85),
            (date '2025-11-20', 12345, 85),
            (date '2025-11-21', 12345, 85)
        )
        select * from test order by random()
    loop
        -- создать исходное состояние источника и таблицы фактов
        call init_delta();
        truncate table facts;
        for r2 in 
            select distinct fact_date from src order by 1
        loop
            call process_delta(r2.fact_date);
        end loop;

        -- изменить данные в источнике и загрузить изменение
        merge into src tgt
        using (select r.fact_date, r.item_code, r.new_qty) as src
        on (src.fact_date = tgt.fact_date and src.item_code = tgt.item_code)
        when matched then
            update set qty = src.new_qty
        when not matched then
            insert (fact_date, item_code, qty)
            values (src.fact_date, src.item_code, src.new_qty)
        ;
        call process_delta(r.fact_date);
        -- найти расхождения между источником и таблицей фактов
        for r2 in select * from src_vs_facts loop
            raise notice '%', r2;
        end loop;
    end loop;
end;
$$

Никаких сообщений на выходе. Следовательно, имеем основания предположить, что реализация алгоритма работает корректно.

При создании процедуры я исходил из двух соображений:

  1. Поскольку реальная дельта (изменения на дату) содержит много строк содержит много строк, то выгодно обрабатывать их массово предложениями SQL, а не построчно в цикле, имея в виду потенциальный выигрыш в производительности.
  2. В общем случае изменение данных в прошлом влечет пересчет данных в таблице фактов, начиная с даты изменения до настоящего времени. Сделав такой пересчет процедурным образом, в цикле, а не массово предложениями SQL, можно снять все ограничения, связанные с агрегирующими и аналитическими (не)возможностями SQL. Кроме гибкости, другим преимуществом пересчета в цикле может оказаться простота, или даже очевидность, реализации, в отличие от сложных аналитических запросов.

Вот код процедуры process_delta, который несложно адаптировать для любой диапазонной таблицы фактов:

create or replace procedure process_delta(in p_date date default current_date)
language plpgsql
as $procedure$
declare
    FF date := date '2099-12-31';
begin
    merge into facts tgt
    using (
        select *
        from src
        where fact_date = p_date
        ) src
    on (tgt.first_date, tgt.item_code) = (src.fact_date, src.item_code)
    when matched and tgt.qty != src.qty then
        update set
            qty = src.qty
    when not matched then
        insert (
            first_date,
            last_date,
            item_code,
            qty
        ) values (
            src.fact_date,
            FF,
            src.item_code,
            src.qty
        )
    ;

    -- обновить строки с учетом предыдущих строк
    declare
        r record;
        c record;
        p record;
    begin
        p := null;
        for r in 
            select * 
            from facts 
            where last_date >= p_date - 1
            order by item_code, first_date
        loop
            c := r;
            if not p is null then
                if c.item_code = p.item_code then
                    -- если свойства пред-щей и текущей строк равны
                    if p.qty = c.qty then
                        -- то слить диапазоны предыдущей и текущей строк
                        update facts set
                            last_date = c.last_date
                        where item_code = p.item_code
                            and first_date = p.first_date
                        ;
                        delete from facts
                        where item_code = c.item_code
                            and first_date = c.first_date
                        ;
                        p.last_date := c.last_date;
                        continue;
                    elsif p.last_date != c.first_date - 1 then
                        -- обновить дату закрытия предыдущей строки
                        update facts set
                            last_date = c.first_date - 1
                        where item_code = p.item_code
                            and first_date = p.first_date
                        ;
                    end if; 
                    p := c;
                else
                    p := r;
                end if;
            else
                p := c;
            end if;
        end loop;
    end;
end;
$procedure$
;

В заключение, прокомментирую вторую часть процедуры, где в цикле обновляются строки таблицы фактов, зависящие от предыдущих с тем же самым item_code. Эта часть кода в процедуре process_delta отвечает за слияние смежных диапазонов с одинаковым остатком в один и за обновление даты завершения диапазона last_date. В общем случае, однако, здесь можно делать и другие вещи. Например, расчитывать и обновлять значения столбцов, зависимых от других столбцов той же строки.

Следующий фрагмент обобщенно представляет эту часть процедуры. Здесь <fact_table> это таблица фактов, <entity_id> — ключ экземпляра сущности, факты о которой представлены в таблице, <column_a> — столбец, чье значение зависит от других столбцов той же строки, <column_x> — столбец, чье значение зависит от значений столбцов предыдущей строки с тем же <entity_id>.

-- обновить строки с учетом текущей и предыдущей строк
declare
    r record;
    c record; -- current row
    p record; -- previous row
begin
    p := null;
    for r in 
        select *
        from <fact_table>
        where ...
        order by <entity_id>, first_date
    loop
        c := r;
        if not p is null then

            if c.<entity_id> = p.<entity_id> then
                -- обновить столбцы в строке p, зависимые от строки c
                /*
                update <fact_table> set
                    last_date = c.first_date - 1
                where <entity_code> = p.<entity_code>
                    and first_date = p.first_date
                    and last_date != c.first_date - 1
                ;*/
                -- обновить функционально зависимые столбцы в строке p, если есть
                /*
                update <fact_table> set
                    <column_a> = <func1(p)>
                where <entity_code> = p.<entity_code>
                    and first_date = p.first_date
                    and <column_a> != <func1(p)>
                ;*/
                -- обновить столбцы в строке c, зависимые от строки p
                /*
                update <fact_table> set
                    <column_x> = <func2(p)>
                where <entity_code> = c.<entity_code>
                    and first_date = c.first_date
                    and <column_x> != <func2(p)>
                ;*/

                p := c;
            else
                -- обновить функционально зависимые столбцы в строке p, если есть,
                -- так как это может быть единственная строка с данным <entity_id>
                /*
                update <fact_table> set
                    <column_a> = <func1(p)>
                where <entity_code> = p.<entity_code>
                    and first_date = p.first_date
                    and <column_a> != <func1(p)>
                ;*/

                p := r;
            end if;

        else
            p := c;
        end if;
    end loop;
    -- обновить функционально зависимые столбцы в строке p, если есть,
    -- так как это может быть единственная строка с данным <entity_id>
    /*
    update <fact_table> set
        <column_a> = <func1(p)>
    where <entity_code> = p.<entity_code>
        and first_date = p.first_date
        and <column_a> != <func1(p)>
    ;*/

На этом все! Салют.

Комментариев нет:

Отправить комментарий