Библиотека реализует комплексное решение для взаимодействия эрланг проекта с базой PostgreSQL на базе асинхронной версии драйвера epgsql и пула процессов pooler.
Она позволяет создать несколько пулов процессов, каждый со своими настройками соединения с базой. Так что разные пулы могут работать с разными базами.
Внутри пула есть несколько процессов, и каждый из них устанавливает свое соединение с базой и посылает запросы к ней, независимо от остальных процессов.
Чтобы выполнить запрос к базе, берется свободный процесс из пула, который сериализует запрос, посылает его базе, получает и десериализует ответ, и возвращается в пул.
Для запуска пула нужно вызывать функцию epgsql_pool:start/4 с аргументами:
- имя пула atom() | string() | binary()
- число соединений integer()
- максимальное число соединений integer()
- настройки соединения map()
Params = #{host => "localhost",
port => 5432,
username => "someuser",
password => "pass",
database => "main_db"},
{ok, _} = epgsql_pool:start(main_pool, 10, 20, Params),
Params2 = #{host => "localhost",
port => 5432,
username => "someuser",
password => "pass",
database => "other_db"},
{ok, _} = epgsql_pool:start(other_pool, 10, 20, Params2),
Настройки соединения должны быть map() или #epgsql_connection_params{} (определена в include/epgsql_pool.hrl).
Для остановки пула нужно вызывать epgsql_pool:stop(PoolName).
Каждый процесс в пуле пытается установить соединение с базой. Если по каким-то причинам это не удается, то процесс логирует ошибку, и через короткий промежуток времени снова пытается соединиться. Если настройки соединения указаны неправильно, то такие попытки повторяются бесконечно. И если процессов в пуле много, то генерируется много сообщений об ошибках, которые создают большую нагрузку на IO и записывают много логов.
Поэтому, перед запуском пула рекомендуется проверить правильность настроек. Это можно сделать вызовом epgsql_pool:validate_connection_params/1
1> Params = #{host => "localhost",
1> port => 5432,
1> username => "test",
1> password => "test",
1> database => "testdb"}.
2> epgsql_pool:validate_connection_params(Params).
ok
3> epgsql_pool:validate_connection_params(Params#{password := "123"}).
{error,invalid_password}
4> epgsql_pool:validate_connection_params(Params#{database := "some"}).
{error,{error,fatal,<<"3D000">>,
<<"database \"some\" does not exist">>,[]}}
Здесь также настройки должны быть map() или #epgsql_connection_params{}.
Если настройки оказались неправильными, то, вероятно, вы захотите сообщить об ошибке и остановить ноду.
Для отправки запроса нужно вызывать одну из функций epgsql_pool:query/2, /3, /4 с аргументами:
- имя пула atom() | string() | binary()
- SQL-запрос io_list()
- опционально, параметры запроса [term()]
- опционально, дополнительные настройки [proplists:option()]
Формат SQL-запроса, параметров к нему, возможные форматы ответа такие же, как требует драйвер epgsql. Подробности смотрите в документации драйвера.
Напрямую работать с пулом соединений не нужно, об этом заботится библиотека.
5> epgsql_pool:query(my_pool, "INSERT INTO category (id, title) VALUES (1, 'My Category'), (2, 'Other Category')").
{ok,2}
6> epgsql_pool:query(my_pool, "INSERT INTO category (id, title) VALUES (3, 'Next Category') RETURNING id").
{ok,1,[{column,<<"id">>,int8,8,-1,1}],[{3}]}
7> epgsql_pool:query(my_pool, "SELECT * FROM category").
{ok,[{column,<<"id">>,int8,8,-1,1},
{column,<<"title">>,text,-1,-1,1}],
[{1,<<"My Category">>},
{2,<<"Other Category">>},
{3,<<"Next Category">>}]}
Есть ограничение на время выполнения запроса, по умолчанию оно 10 секунд. Если за это время библиотека не получает ответ от базы, то запрос отменяется, и возвращается {error, timeout}.
8> epgsql_pool:query(my_pool, "select pg_sleep(100)").
{error,timeout}
Процесс из пула блокируется, пока не получит ответ от базы. Если запрос выполняется долго, то процесс долго не возвращается в пул. Если послать много таких долгих запросов, то можно исчерпать весь пул. Для защиты от такой ситуации введен timeout.
Вы можете изменить timeout для конкретного запроса.
9> epgsql_pool:query(my_pool, "select pg_sleep(10)", [], [{timeout, 15000}]).
{ok,[{column,<<"b">>,void,4,-1,0}],[{<<>>}]}
Или даже указать {timeout, infinity}, если вообще не хотите ограничивать время запроса.
10> epgsql_pool:query(my_pool, "select pg_sleep(10)", [], [{timeout, infinity}]).
{ok,[{column,<<"b">>,void,4,-1,0}],[{<<>>}]}
timeout задается в миллисекундах. И это пока единственная настройка запроса, которая поддерживается библиотекой. Возможно, в следующих версиях появятся и другие настройки. (Поэтому 4-й аргумент -- proplist, а не атомарное значение).
Можно изменить и timeout по умолчанию, что повлияет на все запросы. Смотрите ниже раздел Настройки.
Чтобы выполнить несколько запросов в транзакции, нужно вызывать функцию epgsql_pool:transaction/2 с аргументами:
- имя пула atom() | string() | binary()
- ваша функция fun()
В функцию передается процесс из пула, и запросы к базе данных нужно выполнять, используя этот процесс вместо имени пула.
epgsql_pool:transaction(my_pool,
fun(Worker) ->
Res1 = epgsql_pool:query(Worker, Query1),
...
ResN = epgsql_pool:query(Worker, QueryN)
end).
Любое исключение, возникающее внутри функции, отменяет транзакцию.
epgsql_pool:transaction(my_pool,
fun(Worker) ->
Res1 = epgsql_pool:query(Worker, Query1),
...
case SomeData of
GoodResult -> do_something;
BadResult -> throw(cancel_transaction)
end,
ResN = epgsql_pool:query(Worker, QueryN)
end).
Иногда соединение с базой данных может оборваться, причем клиентская сторона (epgsql драйвер), может об этом и не знать. В этом случае запросы, проходящие через это соединение, будут завершаться с ошибкой {error, timeout}. Если не восстановить соединение, то в пуле окажется процесс (или несколько процессов), неспособный выполнять запросы.
Поэтому каждый процесс в пуле мониторит состояние своего соединения, периодически отправляя keep-alive запросы (на самом деле это запросы "SELECT 1"). Если в течение определенного времени на такой запрос не приходит ответ от базы данных, то соединение считается потерянным, и процесс устанавливает его заново.
Все это происходит внутри библиотеки, незаметно для пользователя. Но может потребоваться настройка интервала, через которые посылаются запросы (по умолчанию 60 секунд).
В текущей версии библиотеки не предусмотрена возможность отключить keep-alive. Но можно установить очень долгий интервал. Планируется добавить такую настройку.
В некоторых случаях epgsql_pool:query может вернуть {error, reconnecting}. Это значит, процесс потерял соединение с базой и пытается его восстановить.
Попытки установить соединение реализованы с exponential backoff, с увеличением интервала между попытками от 100 миллисекунд до 5 секунд. Число попыток не ограничено. Эти интервалы настраиваются.
Можно попытаться повторить вызов epgsql_pool:query. Скорее всего из пула будет получен другой процесс, и он сможет выполнить запрос.
Библиотека позволяет конфигурировать ряд параметров, касающихся работы пула, и различных интервалов.
Все доступные параметры можно получить вызовом epgsql_pool:get_settings/0:
3> epgsql_pool:get_settings().
#{connection_timeout => 10000,
keep_alive_timeout => 60000,
max_reconnect_timeout => 5000,
min_reconnect_timeout => 100,
pooler_get_worker_timeout => 10000,
pooler_max_queue => 100,
query_timeout => 10000}
И можно установить новые значения вызовом epgsql_pool:set_settings/1:
3> epgsql_pool:set_settings(
#{keep_alive_timeout => 24 * 3600 * 1000,
query_timeout => 60000}).
При этом в map можно указать только те параметры, которые вы ходите поменять.
Доступны следующие настройки:
connection_timeout -- timeout на установку соединения для драйвера epgsql. По умолчанию 10000 миллисекунд.
keep_alive_timeout -- с таким интервалом посылаются запросы keep alive. По умолчанию 60000 миллисекунд
max_reconnect_timeout и min_reconnect_timeout -- exponential backoff параметры для восстановления соединения. Интервал между попытками восстановления начинается с min_reconnect_timeout и экспоненциально растет после каждой попытки до max_reconnect_timeout. По умолчанию 5000 и 100 миллисекунд.
pooler_get_worker_timeout -- timeout на получение процесса из пула. Если в пуле нет свободных процессов, и они не появились за это время, то epgsql_pool:query вернет {error, pool_overload} По умолчанию 10000 миллисекунд.
pooler_max_queue -- размер очереди запросов на получение процесса из пула. Если запросов будет больше, то epgsql_pool:query вернет {error, pool_overload}. По умолчанию 100 запросов.
query_timeout -- timeout на время выполнения запроса к базе. Действует для всех запросов, кроме тех, где timeout явно переопределен. Смотрите раздел Запрос к базе данных выше. По умолчанию 10000 миллисекунд.
Значения по умолчанию могут измениться в новых версиях библиотеки.