суббота, 21 февраля 2026 г.

Загрузка изменений за прошлые даты в ДТФ

В посте о диапазонных таблицах фактов (ДТФ) я привел алгоритм их заполнения из ежедневных снэпшотов, содержащих данные на текущую дату. Алгоритм также годится для заполнения ДТФ в случае, когда на входе не регулярные (ежедневные) снэпшоты, а только нерегулярные изменения (дельта).

Но время от времени возникает задача добавления в непустую ДТФ новых или измененных данных за прошлые даты, и упомянутый алгоритм для этого не годится. Поэтому рассмотрим универсальный алгоритм заполнения ДТФ из источника, откуда поступают данные как на текущую дату, так и на прошлые даты — если данные за прошлую дату изменились.

Для разработки и тестирования нового алгоритма воспользуюсь примером из упомянутого поста, слегка модифицировав его. Для простоты каждый снэпшот с остатками товаров на складе на дату содержит всего две строки с товарами 12333 и 12345.

Загрузим ежедневные снэпшоты за неделю в таблицу src:

create table src (
    fact_date date,
    item_code int,
    qty int,
    primary key (fact_date, item_code)
);

create or replace procedure init_snap()
language sql as
$$
    truncate table src;
    insert into src
    values
    (date '2025-11-15', 12333, 5),
    (date '2025-11-15', 12345, 90),
    (date '2025-11-16', 12333, 5),
    (date '2025-11-16', 12345, 80),
    (date '2025-11-17', 12333, 5),
    (date '2025-11-17', 12345, 80),
    (date '2025-11-18', 12333, 5),
    (date '2025-11-18', 12345, 80),
    (date '2025-11-19', 12333, 5),
    (date '2025-11-19', 12345, 60),
    (date '2025-11-20', 12333, 0),
    (date '2025-11-20', 12345, 60),
    (date '2025-11-21', 12333, 0),
    (date '2025-11-21', 12345, 60)
    ;
$$

call init_snap();

select * from src order by fact_date, item_code;

|fact_date |item_code|qty|
|----------|---------|---|
|2025-11-15|    12333|  5|
|2025-11-15|    12345| 90|
|2025-11-16|    12333|  5|
|2025-11-16|    12345| 80|
|2025-11-17|    12333|  5|
|2025-11-17|    12345| 80|
|2025-11-18|    12333|  5|
|2025-11-18|    12345| 80|
|2025-11-19|    12333|  5|
|2025-11-19|    12345| 60|
|2025-11-20|    12333|  0|
|2025-11-20|    12345| 60|
|2025-11-21|    12333|  0|
|2025-11-21|    12345| 60|

Последний снэпшот от 2025-11-21, поэтому для целей тестирования будем считать эту дату сегодняшней.

Создадим диапазонную таблицу фактов facts:

create table facts (
    first_date date,
    last_date date,
    item_code int,
    qty int,
    primary key (item_code, first_date)
);

Вот как выглядят данные в таблице facts после загрузки всех снэпшотов в хронологическом порядке:

|first_date|last_date |item_code|qty|
|----------|----------|---------|---|
|2025-11-15|2025-11-19|    12333|  5|
|2025-11-15|2025-11-15|    12345| 90|
|2025-11-16|2025-11-18|    12345| 80|
|2025-11-19|2025-11-21|    12345| 60|
|2025-11-20|2025-11-21|    12333|  0|

Здесь дата 2099-12-31 — это произвольно выбранная дата в неблизком будущем, использующаяся в качестве последней даты текущего диапазона, то есть, диапазона, в который попадает сегодняшняя дата.

История изменения остатков товаров 12333 и 12345 в таблице facts является непрерывной с даты 2025-11-15, когда товар впервые появился на складе, по дату 2099-12-31.

Алгоритм, предложенный в первом посте о ДТФ, ежедневно загружает в ДТФ текущие данные из источника. При этом из источника могут поступать как ежедневные снэпшоты, содержащие остатки всех товаров, так и только строки с товарами, остаток которых изменился — дельта, — и результат обработки одинаково корректен. Но алгоритм, загружающий снэпшот за прошлую дату, и алгоритм, загружающий дельту за прошлую дату, будут разными. И вот почему.

Представим остаток товара 12345 из таблицы facts на рисунке, где сверху приведены даты (дни месяца для краткости), под ними остаток в штуках, а диапазоны отчеркнуты вертикальными разделительными чертами:

Серым выделена "сегодняшняя" дата. Буквами FF (Far Future) обозначена дата 2099-12-31 в далеком будущем.

Загрузка измененного снэпшота на дату между 2025-11-15 и 2025-11-21 (сегодня) приведет к одному из результатов, изображенных на рисунке:

В верхней части рисунка показано состояние таблицы facts перед тем, как в нее загружается измененный снэпшот за одну из дат. А ниже показаны измененные остатки на дату в прошлом, и справа от них — новая история остатка в таблице facts, возникшая в результате загрузки измененного остатка.

Главное правило при загрузке снэпшота за прошлую дату в ДТФ состоит в том, что каждый снэпшот за прошлую дату влияет на факты только на дату снэпшота. Отсюда и соответствующие разбиения диапазонов в facts, на которые приходится прошлая дата снэпшота.

И только загрузка снэпшота за сегодня меняет остаток не на одну дату, а на весь текущий диапазон. В том числе, если текущий диапазон начался сегодня:

А следующий рисунок показывает, к каким изменениям в таблице facts приводит загрузка дельты за прошлую и "сегодняшнюю" даты. Сравните его с приведенным выше рисунком На входе ежедневные снэпшоты.

Главное правило при загрузке дельты за прошлую дату в ДТФ состоит в том, что дельта за прошлую дату влияет на диапазон дат в ДТФ, начиная с даты изменения и до конца диапазона, на который пришлась дата изменения. Отсюда и разбиения диапазонов в facts не более чем на два диапазона.

Итак, загрузка снэпшота за прошлую дату и загрузка дельты за прошлую дату приводят к разным изменениям в ДТФ. Это объясняется тем, что диапазоны в ДТФ формируются по-разному при загрузке ежедневных снэпшотов и дельт. В случае снэпшотов, для каждой даты диапазона из прошлого имелся снэпшот на дату, подтверждавший значение остатка на дату. А в случае дельт, значение остатка было получено только для первой даты диапазона и в последующие дни данные по остатку не поступали — ведь приходят только изменения.

Следовательно, для загрузки изменений на прошлые даты в ДТФ нужны два разных алгоритма: для случая ежедневных снэпшотов и для случая дельт на входе.

Начну со случая снэпшотов, и реализую алгоритм для загрузки снэпшотов за прошлые и сегодняшнюю даты в процедуре process_snap с двумя параметрами:

p_date date default current_date
p_today date default current_date

Параметр p_date задает дату загружаемого снэпшота (из таблицы srс), а параметр p_today задает "сегодняшнюю" дату — дату, когда выполняется загрузка, — которая необходима для корректной работы алгоритма.

Код процедуры я приведу позднее, а пока посмотрим на результаты ее работы.

Для быстрой проверки корректности наполнения таблицы facts из источника src создам следующее вью:

create or replace view src_vs_facts as
with src_ as (
    select generate_series(
            first_date::timestamp,
            least(last_date, '2025-11-21')::timestamp,
            interval '1' day
        )::date fact_date, item_code, qty
    from (
        select fact_date first_date,
            coalesce(lead(fact_date) over (partition by item_code order by fact_date) - 1, date '2099-12-31') last_date,
            item_code,
            qty
        from src
        ) t
), facts_ as (
    select generate_series(
            first_date::timestamp,
            least(last_date, '2025-11-21')::timestamp,
            interval '1' day
        )::date fact_date, item_code, qty
    from facts
)
(select * from src_ except select * from facts_)
union all
(select * from facts_ except select * from src_)
;

Для начала очищу и наполню таблицу facts, загрузив в нее все снэпшоты из src в хронологическом порядке:

do language plpgsql $$
declare
    r record;
begin
    call init_snap();
    truncate table facts;
    for r in 
        select distinct fact_date from src order by 1
    loop
        call process_snap(r.fact_date, r.fact_date);
    end loop;
end;
$$

Вот результат загрузки:

select * from facts order by first_date, item_code
;

|first_date|last_date |item_code|qty|
|----------|----------|---------|---|
|2025-11-15|2025-11-19|    12333|  5|
|2025-11-15|2025-11-15|    12345| 90|
|2025-11-16|2025-11-18|    12345| 80|
|2025-11-19|2025-11-21|    12345| 60|
|2025-11-20|2025-11-21|    12333|  0|

Проверка с помощью вью src_vs_facts показывает, что остатки во снэпшотах на каждую дату совпадают с остатками в facts:

select * from src_vs_facts;
-- no rows

Теперь загружу все снэпшоты повторно в произвольном порядке и проверю результат на корректность:

do language plpgsql $$
declare
    TODAY date := date '2025-11-21';
    r record;
begin
    for r in 
        select *
        from (select distinct fact_date from src) t
        order by random()
    loop
        call process_snap(r.fact_date, TODAY);
    end loop;
end;
$$

select * from src_vs_facts;
-- no rows

Да! Повторная загрузка снэпшотов в произвольном порядке ничего не сломала.

Теперь воспользуюсь данными с рисунка На входе ежедневные снэпшоты чтобы протестировать загрузку измененных снэпшотов за прошлую и "сегодняшнюю" дату, и проверю корректность загруженных данных после каждого изменения. Более того, после применения каждого измененного спэпшота я буду отменять это изменение, возвращая прежнее значение остатка на дату, и вновь обрабатывая снэпшот. Итак:

do language plpgsql $$
declare
    TODAY date := date '2025-11-21';
    r record;
    r2 record;
begin
    for r in
        with test(fact_date, item_code, old_qty, new_qty) as (
            values
            (date '2025-11-15', 12345, 90, 85),
            (date '2025-11-16', 12345, 80, 85),
            (date '2025-11-17', 12345, 80, 85),
            (date '2025-11-18', 12345, 80, 85),
            (date '2025-11-19', 12345, 60, 85),
            (date '2025-11-20', 12345, 60, 85),
            (date '2025-11-21', 12345, 60, 85)
        )
        select * from test order by random()
    loop
        -- изменить снэпшот на дату и загрузить
        update src s set qty = r.new_qty
        where s.fact_date = r.fact_date and s.item_code = r.item_code;
        call process_snap(r.fact_date, TODAY);
        for r2 in select * from src_vs_facts loop
            raise notice 'new_qty: %', r2;
        end loop;
        -- восстановить снэпшот и загрузить
        update src s set qty = r.old_qty
        where s.fact_date = r.fact_date and s.item_code = r.item_code;
        call process_snap(r.fact_date, TODAY);
        for r2 in select * from src_vs_facts loop
            raise notice 'old_qty: %', r2;
        end loop;
    end loop;
end;
$$

Никаких сообщений на выходе. Следовательно, имеем основания предположить, что реализация алгоритма работает корректно.

При создании процедуры я исходил из двух соображений:

  1. Поскольку реальный снэпшот содержит много строк, то выгодно обрабатывать их массово предложениями SQL, а не построчно в цикле, имея в виду потенциальный выигрыш в производительности.
  2. В общем случае изменение данных в прошлом влечет пересчет данных в таблице фактов, начиная с даты изменения до настоящего времени. Сделав такой пересчет процедурным образом, в цикле, а не массово предложениями SQL, можно снять все ограничения, связанные с агрегирующими и аналитическими (не)возможностями SQL. Кроме гибкости, другим преимуществом пересчета в цикле может оказаться простота, или даже очевидность, реализации, в отличие от сложных аналитических запросов.

Вот код процедуры process_snap, который несложно адаптировать для любой диапазонной таблицы фактов:

create or replace procedure process_snap(
    p_date date default current_date,
    p_today date default current_date
)
language plpgsql as
$$
declare
    FF date := date '2099-12-31';
begin
    -- обновить диапазон [p_date, p_date] или [p_date, FF]
    update facts tgt set
        qty = src.qty
    from src
    where src.fact_date = p_date
        and tgt.item_code = src.item_code
        and tgt.first_date = src.fact_date
        and (
            tgt.last_date = tgt.first_date
            or
            (tgt.last_date = FF and p_date = p_today)
            )
        and tgt.qty != src.qty
    ;
    -- создать диапазон [p_date+1, last_date] со старым значением
    insert into facts (
        first_date,
        last_date,
        item_code,
        qty
    )
    select
        src.fact_date + 1,
        tgt.last_date,
        tgt.item_code,
        tgt.qty
    from src
        join facts tgt on
            tgt.item_code = src.item_code
            and src.fact_date between tgt.first_date + 1 and tgt.last_date - 1
            and tgt.qty != src.qty
    where src.fact_date = p_date
    ;
    -- изменить диапазон [p_date, last_date] на [p_date+1, last_date]
    update facts tgt set
        first_date = src.fact_date + 1
    from src
    where src.fact_date = p_date
        and tgt.item_code = src.item_code
        and tgt.first_date = src.fact_date and tgt.first_date < tgt.last_date
        and tgt.qty != src.qty
    ;
    -- изменить диапазон [first_date, last_date] на [first_date, p_date-1]
    update facts tgt set
        last_date = src.fact_date - 1
    from src
    where src.fact_date = p_date
        and tgt.item_code = src.item_code
        and src.fact_date between tgt.first_date + 1 and tgt.last_date 
        and tgt.qty != src.qty
    ;
    -- создать диапазон [p_date, FF] с новым значением
    insert into facts (
        first_date,
        last_date,
        item_code,
        qty
    )
    select
        src.fact_date,
        FF,
        src.item_code,
        src.qty
    from src
    where src.fact_date = p_date
        and not exists (
            select 1
            from facts tgt
            where tgt.item_code = src.item_code
                and src.fact_date between tgt.first_date and tgt.last_date
        )
    ;
    -- обновить диапазоны с учетом данных предыдущих диапазонов
    declare
        r record;
        c record;
        p record;
    begin
        p := null;
        for r in 
            select * 
            from facts 
            where last_date >= p_date - 1
            order by item_code, first_date
        loop
            c := r;
            if not p is null then
                if c.item_code = p.item_code then
                    -- если свойства пред-щей и текущей строк равны
                    if p.qty = c.qty then
                        -- то слить диапазоны предыдущей и текущей строк
                        update facts set
                            last_date = c.last_date
                        where item_code = p.item_code
                            and first_date = p.first_date
                        ;
                        delete from facts
                        where item_code = c.item_code
                            and first_date = c.first_date
                        ;
                        p.last_date := c.last_date;
                        continue;
                    elsif p.last_date != c.first_date - 1 then
                        -- обновить дату закрытия предыдущей строки
                        update facts set
                            last_date = c.first_date - 1
                        where item_code = p.item_code
                            and first_date = p.first_date
                        ;
                    end if; 
                    p := c;
                else
                    p := r;
                end if;
            else
                p := c;
            end if;
        end loop;
    end;
end;
$$

Что касается ежедневных снэпшотов на входе, то на этом все. Переходим к дельте.

Загружу в таблицу src только изменения остатков (дельты):

create or replace procedure init_delta()
language sql as
$$
    truncate table src;
    insert into src
    values
    (date '2025-11-15', 12333, 5),
    (date '2025-11-15', 12345, 90),
    (date '2025-11-16', 12345, 80),
    (date '2025-11-19', 12345, 60),
    (date '2025-11-20', 12333, 0)
    ;
$$

call init_delta();

select * from src order by fact_date, item_code;

|fact_date |item_code|qty|
|----------|---------|---|
|2025-11-15|    12333|  5|
|2025-11-15|    12345| 90|
|2025-11-16|    12345| 80|
|2025-11-19|    12345| 60|
|2025-11-20|    12333|  0|

Ниже код процедуры process_delta, загружающей дельту на дату p_date в таблицу facts. Обратите внимание, что вторая часть процедуры, где в цикле обновляются строки, зависящие от предыдущих, такая же, как в process_snap:

create or replace procedure process_delta(in p_date date default current_date)
language plpgsql
as $procedure$
declare
    FF date := date '2099-12-31';
begin
    -- обновить диапазона [p_date, last_date]
    update facts tgt set
        qty = src.qty
    from src
    where src.fact_date = p_date
        and tgt.item_code = src.item_code
        and tgt.first_date = src.fact_date
        and tgt.qty != src.qty
    ;
    -- изменить диапазон [first_date, last_date] на [first_date, p_date-1]
    update facts tgt set
        last_date = src.fact_date - 1
    from src
    where src.fact_date = p_date
        and tgt.item_code = src.item_code
        and src.fact_date between tgt.first_date + 1 and tgt.last_date 
        and tgt.qty != src.qty
    ;
    -- создать диапазон [p_date, FF] с новым значением
    insert into facts (
        first_date,
        last_date,
        item_code,
        qty
    )
    select
        src.fact_date,
        FF,
        src.item_code,
        src.qty
    from src
    where src.fact_date = p_date
        and not exists (
            select 1
            from facts tgt
            where tgt.item_code = src.item_code
                and src.fact_date between tgt.first_date and tgt.last_date
        )
    ;
    -- обновить диапазоны с учетом данных предыдущих диапазонов
    declare
        r record;
        c record;
        p record;
    begin
        p := null;
        for r in 
            select * 
            from facts 
            where last_date >= p_date - 1
            order by item_code, first_date
        loop
            c := r;
            if not p is null then
                if c.item_code = p.item_code then
                    -- если свойства пред-щей и текущей строк равны
                    if p.qty = c.qty then
                        -- то слить диапазоны предыдущей и текущей строк
                        update facts set
                            last_date = c.last_date
                        where item_code = p.item_code
                            and first_date = p.first_date
                        ;
                        delete from facts
                        where item_code = c.item_code
                            and first_date = c.first_date
                        ;
                        p.last_date := c.last_date;
                        continue;
                    elsif p.last_date != c.first_date - 1 then
                        -- обновить дату закрытия предыдущей строки
                        update facts set
                            last_date = c.first_date - 1
                        where item_code = p.item_code
                            and first_date = p.first_date
                        ;
                    end if; 
                    p := c;
                else
                    p := r;
                end if;
            else
                p := c;
            end if;
        end loop;
    end;
end;
$procedure$
;

Я протестировал процедуру process_delta аналогично тому, как выше тестировал process_snap, сделав поправки на природу дельты. Желающие могут сделать это самостоятельно, проверяя корректность загрузки тем же вью src_vs_facts.

В заключение замечу, что если речь идет не о применении точечных изменений за прошлые даты к уже заполненной таблице фактов, а о загрузке всех исторических данных, то более легковесную процедуру process_delta можно использовать для загрузки как снэпшотов, так и дельт из источника.

Комментариев нет:

Отправить комментарий