среда, 13 декабря 2023 г.

Имитация ассоциативных массивов в PL/pgSQL

В языке PL/pgSQL нет ассоциативных массивов, которые позволяют получить значение по ключу. И это досадно. Но ладно.

Первый способ имитировать ассоциативный массив

Создайте два массива, упорядоченных по одному и тому же критерию. Первый из них должен содержать ключи, второй - значения. После этого для получения значения по ключу нужно

  1. найти индекс элемента первого массива, содержащего значение ключа,
  2. по найденому индексу получить значение из второго массива.

Пример:

do language plpgsql $$
declare
    keys char[] := array['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z'];
    values_ char[] := array['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'];
    l_key char := 'w';
    l_idx int;
begin
    l_idx := array_position(keys, l_key);
    raise notice 'values_[%] = %', l_key, values_[l_idx];
end;
$$;

Результат:

values_[w] = W

А что с производительностью? Возьмем массивы по 17576 элементов, полученные как декартово произведение трех латинских алфавитов:

do language plpgsql $$
declare
    keys varchar[] := 
        array(
            with abc as (
                select chr(i) c from generate_series(ascii('a'), ascii('z')) as s(i)
            )
            select t1.c||t2.c||t3.c from abc t1, abc t2, abc t3 order by 1
        );
    values_ varchar[] :=
        array(
            with abc as (
                select chr(i) c from generate_series(ascii('A'), ascii('Z')) as s(i)
            )
            select t1.c||t2.c||t3.c from abc t1, abc t2, abc t3 order by 1
        );
    l_key varchar;
    l_idx int;

    t timestamptz;
    ivl interval := interval '0' sec;
    times int := 10000;
begin
    for i in 1..times loop
        -- get random key
        l_key := keys[floor(random() * array_length(keys, 1) + 1)::int];
        -- set stopwatch
        t := clock_timestamp();
        -- get index for the key
        l_idx := array_position(keys, l_key);
        -- accumulate stopwatch results
        ivl := ivl + (clock_timestamp() - t);
        -- test we found the right value
        if values_[l_idx] != upper(l_key) then
            raise exception 'Ooops: % != %', upper(l_key), values[i_idx];
        end if;
    end loop;
    -- show the average
    raise notice 'avg time spent=%', ivl / times;
end;
$$;

На одну операцию получения значения по ключу требуется около 300 микросекунд:

avg time spent=00:00:00.000299
avg time spent=00:00:00.000311
avg time spent=00:00:00.000304

Слабым местом здесь является линейный поиск с помощью array_position – тогда как наш массив ключей упорядочен и двоичный поиск даст выигрыш в производительности. (Не ожидал, что доведется самому реализовывать двоичный поиск с достаточными к тому основаниями.)

create or replace function at_saposition(arr anyarray, elm anyelement) returns int
language plpgsql as $$
declare
    idx int;
    left_ int := 1;
    right_ int := array_length(arr, 1);
begin
    while left_ <= right_ loop
        idx := div((left_ + right_), 2);
        if arr[idx] = elm then
            return idx;
        elsif arr[idx] < elm then
            left_ := idx + 1;
        else
            right_ := idx - 1;
        end if;
    end loop;
    return 0;
end;
$$;

Замена вызова array_position на вызов at_saposition дает выигрыш в быстродействии в 30 раз:

avg time spent=00:00:00.000011
avg time spent=00:00:00.000012
avg time spent=00:00:00.000011

Само собой, массив ключей и массив значений могут содержать элементы составных типов, например, record.

Второй способ имитировать ассоциативный массив

Можно обойтись и одним массивом записей, упорядоченным по значению ключа. Для этого придется написать функцию двоичного поиска для конкретного типа записи, которые содержит массив.

Пример с массивом записей и составным ключом:

create type stock as (
    store_code int,
    goods_code varchar(10),
    qty int,
    amt numeric(15,5)
);

create or replace function stock_sapos(arr stock[], store_code int, goods_code varchar) returns int
language plpgsql as $$
declare
    idx int;
    left_ int := 1;
    right_ int := array_length(arr, 1);
begin
    while left_ <= right_ loop
        idx := div((left_ + right_), 2);
        if (arr[idx].store_code, arr[idx].goods_code) = (store_code, goods_code) then
            return idx;
        elsif (arr[idx].store_code, arr[idx].goods_code) < (store_code, goods_code) then
            left_ := idx + 1;
        else
            right_ := idx - 1;
        end if;
    end loop;
    return 0;
end;
$$;

select
    stock_sapos(array[(111, 'QW103456', 3, 21.00)]::stock[], 111, 'AS321098') zero,
    stock_sapos(array[(111, 'AS321098', 5, 50.50), (111, 'QW103456', 3, 21.00)]::stock[], 111, 'AS321098') one
;

zero one
---- ----
   0    1

Можно, конечно, пренебречь двоичным поиском в пользу линейного – если массив небольшой, в этом случае и упорядочивать его не нужно. Но функцию поиска для конкретного типа записи все равно придется писать. А можно задействовать SQL для поиска в массиве – но в этом случае не сводим ли мы на нет преимущества PL/pgSQL?

Убедимся, кто быстрее – запрос SQL или функция двоичного поиска в упорядоченном массиве:

do language plpgsql $$
declare
    values_ stock[] := 
        array(
            with stores as (
                select i code from generate_series(101, 200) as s(i)
            ), goods as (
                select 'ZX'||i code from generate_series(5000001, 5000100) as s(i)
            )
            select (stores.code, goods.code, floor(random() * 100 + 1)::int, floor(random() * 1000 + 1)::numeric)::stock
            from stores, goods 
            order by stores.code, goods.code
        );
    l_store int;
    l_goods varchar(10);
    l_idx int;

    t timestamptz;
    ivl interval := interval '0' sec;
    times int := 10000;
begin
    for i in 1..times loop
        -- get random key
        l_store := 100 + floor(random() * 100 + 1)::int;
        l_goods := 'ZX'||(5000000 + floor(random() * 100 + 1)::int);
        -- set stopwatch
        t := clock_timestamp();
        -- get index for the key
        select idx
        into l_idx
        from (
                select row_number() over () idx, store_code, goods_code
                from unnest(values_) t
            ) u
        where u.store_code = l_store
            and u.goods_code = l_goods
        ;
        --l_idx := stock_sapos(values_, l_store, l_goods);
        -- accumulate stopwatch results
        ivl := ivl + (clock_timestamp() - t);
        -- test we found the right value
        if (values_[l_idx].store_code, values_[l_idx].goods_code) != (l_store, l_goods) then
            raise exception 'Ooops: % != %', (l_store, l_goods), i_idx;
        end if;
    end loop;
    -- show the average
    raise notice 'avg time spent=%', ivl / times;
end;
$$;

Поиск в масиве из 10000 элементов с помощью SQL занимает около 2 миллисекунд:

avg time spent=00:00:00.002095
avg time spent=00:00:00.002115
avg time spent=00:00:00.002117

Вызов функции stock_sapos вместо select приводит к росту производительности в 200 раз:

avg time spent=00:00:00.000012
avg time spent=00:00:00.00001
avg time spent=00:00:00.000015

Само собой, вместо индекса функция могла бы возвращать непосредственно найденный элемент массива.

Третий способ имитировать ассоциативный массив

Пусть у нас есть массив записей, доступ к которым по ключу необходимо получать в каждой итерации цикла, а цикл организован на базе курсора, строки которого содержат значение ключа. Это достаточно типичная ситуация. В таком случае в курсоре можно получить и значение индекса для элемента массива с данным ключом, – и массив не обязан быть упорядоченным по ключу.

Пример:

do language plpgsql $$
declare
    values_ stock[] := 
        array(
            with stores as (
                select i code from generate_series(101, 200) as s(i)
            ), goods as (
                select 'ZX'||i code from generate_series(5000001, 5000100) as s(i)
            )
            select (stores.code, goods.code, floor(random() * 100 + 1)::int, floor(random() * 1000 + 1)::numeric)::stock
            from stores, goods 
            order by random()
        );
    r record;
    l_count int := 0;
begin
    for r in
        with asst as (
            -- matrix stores x goods
            select 100 + floor(random() * 100 + 1)::int store_code,
                'ZX'||(5000000 + floor(random() * 100 + 1)::int) goods_code
            from generate_series(1, 1000)
            order by 1, 2
        )
        select a.*, idx
        from asst a
            join (
                select row_number() over () idx, store_code, goods_code
                from unnest(values_) t
            ) u
            on a.store_code = u.store_code and a.goods_code = u.goods_code
    loop
        -- test we found the right value
        if (values_[r.idx].store_code, values_[r.idx].goods_code) != (r.store_code, r.goods_code) then
            raise exception 'Ooops: %: % != %',
                r.idx,
                (values_[r.idx].store_code, values_[r.idx].goods_code),
                (r.store_code, r.goods_code)
            ;
        end if;
        l_count := l_count + 1;
    end loop;
    raise notice 'Traversed % rows', l_count;
end;
$$;

Заключение

По запросу "plpgsql associative array" вы найдете много советов использовать типы json(b) или hstore для имитации ассоциативных массивов. Стрельба из пушки по воробьям?

Удаляю следы экспериментирования:

drop function stock_sapos(stock[], int, varchar);
drop type stock;

Комментариев нет:

Отправить комментарий