вторник, 27 октября 2015 г.

Загрузка данных в БД Oracle из csv-файла

Когда-то наиболее естественным способом загрузки структурированных данных в БД Oracle была загрузка при помощи SQL*Loader. Требовалось подготовить файл с параметрами, управляющими загрузкой, и вызвать утилиту sqlloader, передав ей управляющий файл и файл с данными.

Затем производители Oracle предоставили новый механизм для загрузки данных из внешних файлов - external tables. Нужно создать внешнюю таблицу командой DDL CREATE TABLE, указав внешний файл в качестве источника данных и описав его структуру. Средства для описания структуры очень похожи на средства описания структуры для SQL*Loader.

Если вам приходится часто загружать данные из структурированных текстовых файлов, то и SQL*Loader и внешние таблицы начинают казаться слишком громоздкими. Оба инструмента позволяют достаточно гибко описывать структуры загружаемых данных, определять проверки и преобразования данных в ходе загрузки. Это плюс. Минус в том, что для загрузки в БД данных из очередного внешнего файла нужно специально создавать для этого файла таблицу, будь то обычная таблица, в которую загрузит данные SQL*Loader, или внешняя таблица, связанная с внешним файлом при ее создании.

В этой статье я расскажу о еще одном подходе к загрузке данных из внешних csv-файлов (которым уже давно успешно пользуюсь).

Для того, чтобы данные из csv-файла стали доступны для манипуляций при помощи языка SQL, прежде всего, для запроса SELECT, нужно решить две задачи:

  1. прочитать содержимое внешнего файла,
  2. представить данные из файла в виде таблицы.

Прочитать содержимое внешнего файла можно при помощи пакета UTL_FILE. Альтернативно, можно загрузить внешний файл в CLOB при помощи пакета DBMS_LOB. Оба эти способа предполагают, что файл помещен в директорию на сервере, для которой в БД создан объект directory. Есть и третий способ - загрузить файл в БД через web-интерфейс при помощи PL/SQL Gateway, после чего содержимое файла становится доступным внутри БД как LOB.

Когда csv-файл загружен в БД и его содержимое доступно для PL/SQL, представить его данные в виде таблицы - это вопрос разбиения данных на строки, строк - на поля, и возврата строк из конвейерной (pipelined) функции.

Таким образом, нам нужна конвейерная функция, которая будет принимать в качестве параметра имя csv-файла и возвращать его содержимое в виде таблицы. Конвейерная функция возвращает таблицу, тип которой должен быть определен на уровне схемы данных, как и тип строки данной таблицы. Для универсальности, объектный тип, представляющий строку таблицы, будет содержать атрибуты типа VARCHAR2, где каждый атрибут будет представлять отдельное поле (столбец) таблицы.

Создам тип t_varchar_row для представления строки таблицы с 5-ю столбцами (при необходимости количество столбцов можно увеличить):

SQL> create type t_varchar_row as object (
  2   c1 varchar2(4000),
  3   c2 varchar2(4000),
  4   c3 varchar2(4000),
  5   c4 varchar2(4000),
  6   c5 varchar2(4000)
  7  );
  8  /
Type created

Значения типа t_varchar_row создаются с помощью конструктора:

SQL> select t_varchar_row(42,'answer','to','the','Question') from dual;

T_VARCHAR_ROW(42,'ANSWER','TO','THE','QUESTION')(C1, C2, C3, C4, C5)
--------------------------------------------------------------------------------
T_VARCHAR_ROW('42', 'answer', 'to', 'the', 'Question')

SQL> select t_varchar_row('Hello', 'world', null, null, null) from dual;

T_VARCHAR_ROW('HELLO','WORLD',NULL,NULL,NULL)(C1, C2, C3, C4, C5)
--------------------------------------------------------------------------------
T_VARCHAR_ROW('Hello', 'world', NULL, NULL, NULL)

Создам табличный тип t_varchar_row_table и проверю новые типы с помощью тестовой конвейерной функции:

SQL> create type t_varchar_row_table as table of t_varchar_row;
  2  /
Type created

SQL> create or replace function testfun
  2  return t_varchar_row_table pipelined
  3  is
  4  begin
  5   for r in (select * from all_users where rownum <= 3) loop
  6    pipe row(t_varchar_row(r.username, r.user_id, r.created, null, null));
  7   end loop;
  8  end testfun;
  9  /
Function created

SQL> select * from table(testfun);
C1             C2             C3             C4             C5
-------------- -------------- -------------- -------------- -------------- 
SYS            0              05-SEP-10
SYSTEM         5              05-SEP-10
OUTLN          9              05-SEP-10

SQL> drop function testfun;
Function dropped

Теперь, когда идея оформилась и ее реализуемость не вызывает сомнений, осталось собрать вместе код, загружающий данные из внешнего файла, и код для представления этих данных в виде таблицы t_varchar_row_table.

Сделаю это в функции csv:

create or replace function csv(
    p_filename varchar2,
    p_dir varchar2,
    p_charset varchar2 default 'CL8MSWIN1251',
    p_sep varchar2 default ';',
    p_newline varchar2 default chr(13)||chr(10)
) return t_varchar_row_table pipelined
is
    MAXCOLS constant pls_integer := 5;
    -- для загрузки файла в clob
    l_clob clob;
    l_bfile bfile;
    l_dest_offset number;
    l_src_offset number;
    l_lang_ctx number := 0; -- the default
    l_warning number;
    -- для обработки строк
    l_here pls_integer;
    l_there pls_integer;
    l_line varchar2(32767);
    -- для обработки полей
    l_here1 pls_integer;
    l_there1 pls_integer;
    l_cols dbms_sql.varchar2_table;
begin
    --
    -- загрузка файла в clob
    --
    l_bfile := bfilename(p_dir, p_filename);
    dbms_lob.fileopen(l_bfile);

    dbms_lob.createtemporary(l_clob, true);

    l_dest_offset := 1; -- с начала
    l_src_offset := 1;  -- с начала
    dbms_lob.loadclobfromfile(
        dest_lob    => l_clob,
        src_bfile   => l_bfile,
        amount      => dbms_lob.lobmaxsize,
        dest_offset => l_dest_offset,
        src_offset  => l_src_offset,
        bfile_csid  => nls_charset_id(p_charset),
        lang_context => l_lang_ctx,
        warning     => l_warning
    );
    dbms_lob.fileclose(l_bfile);

    if l_warning != 0 then
        raise_application_error(-21001, 'Error loading file to clob');
    end if;

    --
    -- разбор содержимого файла
    --
    l_here := 1;
    -- для всех строк
    loop
        l_there := instr(l_clob, p_newline, l_here);
        if l_there = 0 then
            l_there := dbms_lob.getlength(l_clob) + 1;
        end if;
        l_line := substr(l_clob, l_here, l_there - l_here);

        l_here1 := 1;
        l_cols.delete;
        -- для всех полей
        loop
            l_there1 := instr(l_line, p_sep, l_here1);
            if l_there1 = 0 then
                l_there1 := length(l_line) + 1;
            end if;
            l_cols(l_cols.count+1) := substr(l_line, l_here1, l_there1 - l_here1);
            l_here1 := l_there1 + length(p_sep);
            exit when l_here1 > length(l_line);
        end loop;
        for i in l_cols.count+1 .. MAXCOLS loop
            l_cols(i) := NULL;
        end loop;
        -- вернуть строку таблицы
        pipe row(t_varchar_row(l_cols(1), l_cols(2), l_cols(3), l_cols(4), l_cols(5)));

        l_here := l_there + length(p_newline);
        exit when l_here > dbms_lob.getlength(l_clob);
    end loop;
    dbms_lob.freetemporary(l_clob);
    
exception
when no_data_needed then
    dbms_lob.freetemporary(l_clob);
    raise;
end;
/

Помимо указания файла и директории для загрзуки, параметры функции позволяют указать

  • p_charset - кодировку csv-файла,
  • p_sep - символ-разделитель полей csv-файла,
  • p_newline - символ(ы) завершения строки csv-файла.

Для разбора содержимого файла, загруженного в CLOB переменную l_clob в функции используется SQL семантика для LOB, позволяющая работать с большим объектом (N)CLOB как со строкой VARCHAR2. (Об SQL семантике для LOB я упоминал в статье О работе с большими объектами в СУБД Oracle.)

Для тестирования создам файл Hemingway.csv со списком романов Хэмингуэя и годами публикации:

1926;Вешние воды;The Torrents of Spring
1926;И восходит солнце (Фиеста);The Sun Also Rises
1929;Прощай, оружие!;A Farewell to Arms
1937;Иметь и не иметь;To Have and Have Not
1940;По ком звонит колокол;For Whom the Bell Tolls
1950;За рекой, в тени деревьев;Across the River and Into the Trees
1970;Острова в океане;Islands in the Stream
1986;Райский сад;The Garden of Eden
1999;Проблеск истины;True at First Light

Помещу файл в директорию файловой системы, доступную через объект directory FILES_DIR:

[oracle@tsuki files]$ pwd
/home/oracle/files
[oracle@tsuki files]$ ls -l Hemingway.csv
-rw-r--r-- 1 oracle oinstall 422 Oct 18 10:47 Hemingway.csv

Если у вас нет подходящего объекта directory, создать его можно командой:

SQL> create directory files_dir as '/home/oracle/files';
Directory created

Получу данные из файла Hemingway.csv с помощью SELECT и функции csv:

SQL> select * from table(csv('Hemingway.csv', 'FILES_DIR'));

C1         C2                            C3                            C4         C5
---------- ----------------------------- ----------------------------- ---------- ----------
1926       Вешние воды                   The Torrents of Spring
1926       И восходит солнце (Фиеста)    The Sun Also Rises
1929       Прощай, оружие!               A Farewell to Arms
1937       Иметь и не иметь              To Have and Have Not
1940       По ком звонит колокол         For Whom the Bell Tolls
1950       За рекой, в тени деревьев     Across the River and Into the Trees
1970       Острова в океане              Islands in the Stream
1986       Райский сад                   The Garden of Eden
1999       Проблеск истины               True at First Light

9 rows selected

SQL> select * from table(csv('Hemingway.csv', 'FILES_DIR')) where rownum <= 3;

C1         C2                            C3                            C4         C5
---------- ----------------------------- ----------------------------- ---------- ----------
1926       Вешние воды                   The Torrents of Spring
1926       И восходит солнце (Фиеста)    The Sun Also Rises
1929       Прощай, оружие!               A Farewell to Arms

Обратите внимание, что в последнем случае, когда SELECT выбирает не все строки, освобождение временного CLOB происходит в обработчике исключения NO_DATA_NEEDED функции csv.

Ещё один пример. Скопирую в каталог /home/oracle/files файл со словами английского языка, имеющийся во многих ОС семейстав Unix. Каждая строка файла содержит 1 слово, что позволяет рассматривать этот файл как как csv-файл с одним полем в строке:

[oracle@tsuki files]$ cp /usr/share/dict/words words
[oracle@tsuki files]$ ls -l words
-rw-r--r-- 1 oracle oinstall 4950996 Oct 18 13:09 words

[oracle@tsuki files]$ wc -w words
479623 words

[oracle@tsuki files]$ head words
1080
10-point
10th
11-point
12-point
16-point
18-point
1st
2
20-point

Как видим, в файле 479623 слова. Получу его содержимое с помощью функции csv:

SQL> select * from table(csv('words', 'FILES_DIR', p_newline => chr(10))) where rownum <=10;

C1                            C2         C3         C4         C5
----------------------------- ---------- ---------- ---------- ----------
1080
10-point
10th
11-point
12-point
16-point
18-point
1st
2
20-point

10 rows selected

SQL> select count(*) from table(csv('words', 'FILES_DIR', p_newline => chr(10)));

  COUNT(*)
----------
    479623

На последний запрос понадобилось больше 80 секунд!

Очевидно, что SQL семантика для LOB - не самый эффективный способ работы с содержанием больших объектов. Ведь при выполнении функций, перегруженных для типа (N)CLOB, неявно создаются временные (N)CLOB'ы.

Создам оптимизированную по быстродействию функцию csv2, переписав фрагмент, ответственный за выделение строк из CLOB'а:

create or replace function csv2(
    p_filename varchar2,
    p_dir varchar2,
    p_charset varchar2 default 'CL8MSWIN1251',
    p_sep varchar2 default ';',
    p_newline varchar2 default chr(13)||chr(10)
) return t_varchar_row_table pipelined
is
    MAXCOLS constant pls_integer := 5;
    -- для загрузки файла в clob
    l_clob clob;
    l_bfile bfile;
    l_dest_offset number;
    l_src_offset number;
    l_lang_ctx number := 0; -- the default
    l_warning number;
    -- для обработки строк
    l_here pls_integer;
    l_there pls_integer;
    l_line varchar2(32767);
    -- для обработки полей
    l_here1 pls_integer;
    l_there1 pls_integer;
    l_cols dbms_sql.varchar2_table;
    -- для чтения фрагментов clob
    l_buffer varchar2(32767);
    l_amount pls_integer;
    l_pointer pls_integer;
    l_clob_length pls_integer;
begin
    --
    -- загрузка файла в clob
    --
    l_bfile := bfilename(p_dir, p_filename);
    dbms_lob.fileopen(l_bfile);

    dbms_lob.createtemporary(l_clob, true);

    l_dest_offset := 1; -- с начала
    l_src_offset := 1;  -- с начала
    dbms_lob.loadclobfromfile(
        dest_lob    => l_clob,
        src_bfile   => l_bfile,
        amount      => dbms_lob.lobmaxsize,
        dest_offset => l_dest_offset,
        src_offset  => l_src_offset,
        bfile_csid  => nls_charset_id(p_charset),
        lang_context => l_lang_ctx,
        warning     => l_warning
    );
    dbms_lob.fileclose(l_bfile);

    if l_warning != 0 then
        raise_application_error(-21001, 'Error loading file to clob');
    end if;

    --
    -- разбор содержимого clob
    --
    l_pointer := 1;
    l_clob_length := dbms_lob.getlength(l_clob);
    l_amount := least(dbms_lob.getchunksize(l_clob), floor(32767 / 4) /*utf-8*/);
    -- пока не прочитан весь clob
    while l_pointer < l_clob_length loop
        dbms_lob.read(l_clob, l_amount, l_pointer, l_buffer);
        if l_line is not null then
            l_buffer := l_line || l_buffer;
        end if;

        l_here := 1;
        -- для всех строк
        loop
            l_there := instr(l_buffer, p_newline, l_here);
            if l_there = 0 then
                l_there := length(l_buffer) + 1;
                l_line := substr(l_buffer, l_here, l_there - l_here);
                exit when l_pointer < l_clob_length;
            else
                l_line := substr(l_buffer, l_here, l_there - l_here);
            end if;

            l_here1 := 1;
            l_cols.delete;
            -- для всех полей
            loop
                l_there1 := instr(l_line, p_sep, l_here1);
                if l_there1 = 0 then
                    l_there1 := length(l_line) + 1;
                end if;
                l_cols(l_cols.count+1) := substr(l_line, l_here1, l_there1 - l_here1);
                l_here1 := l_there1 + length(p_sep);
                exit when l_here1 > length(l_line);
            end loop;
            for i in l_cols.count+1 .. MAXCOLS loop
                l_cols(i) := NULL;
            end loop;
            -- вернуть строку таблицы
            pipe row(t_varchar_row(l_cols(1), l_cols(2), l_cols(3), l_cols(4), l_cols(5)));

            l_line := null;
            l_here := l_there + length(p_newline);
            exit when l_here > length(l_buffer);
        end loop;
        
        l_pointer := l_pointer + l_amount;
    end loop;

    
    dbms_lob.freetemporary(l_clob);
    
exception
when no_data_needed then
    dbms_lob.freetemporary(l_clob);
    raise;
end;
/

Проверим, насколько новая версия эффективней прежней:

SQL> set timing on
SQL> select count(*) from table(csv2('words', 'FILES_DIR', p_newline => chr(10)));

  COUNT(*)
----------
    479623

Elapsed: 00:00:01.32

SQL> select count(*) from table(csv('words', 'FILES_DIR', p_newline => chr(10)));

  COUNT(*)
----------
    479623

Elapsed: 00:01:22.54

Что ж, выбор из двух версий очевиден.

Отмечу ограничение функции сsv2: строки в csv-файле не могут быть слишком длинными (максимальная длина строки зависит от кодировки, но не длиннее 32767). Это ограничение можно снять, отказавшись от предварительного выделения строк из содержимого СLOB. Кроме того, функция не допускает символов завершения строки и символов-разделителей в полях файла csv, так как не предусматривает их экранирования с помощью \ (как принято в Unix) или заключения значений полей в кавычки (как принято у Microsoft). При желании, можно реализовать тот и другой вариант, переработав фрагмент функции csv2, отвечающий за разбор содержания CLOB.

В заключение, удалю демонстрационные функции и типы:

SQL> drop function csv;
Function dropped
SQL> drop function csv2;
Function dropped
SQL> drop type t_varchar_row_table;
Type dropped
SQL> drop type t_varchar_row;
Type dropped

Продемонстрированный подход к загрузке данных из csv-файлов имеет преимущество перед SQL*Loader и external tables, не требуя создания таблиц специально для загрузки данных.

1 комментарий:

  1. Сайт хороший , материал безупречный . С Уважением к Вам, ГиперИнфо.

    ОтветитьУдалить