Skip to content

Commit

Permalink
Use protocol level prepare instead of PREPARE sql commands
Browse files Browse the repository at this point in the history
Introduce a new pg_conn_t::prepare() function which is now used in
several places instead of pg_conn_t::exec() with a SQL PREPARE command.

This does not yet replace all places where PREPARE is used, the rest
will come in a later commit.

This is to make osm2pgsql work with some connection poolers that have
problems with prepared statements. For some background see
#2118 and
https://www.crunchydata.com/blog/prepared-statements-in-transaction-mode-for-pgbouncer
  • Loading branch information
joto committed Dec 12, 2024
1 parent cc0ea60 commit f4ace45
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 27 deletions.
20 changes: 11 additions & 9 deletions src/expire-output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,19 @@ std::size_t expire_output_t::output_tiles_to_table(

if (result.num_fields() == 3) {
// old format with fields: zoom, x, y
db_connection.exec("PREPARE insert_tiles(int4, int4, int4) AS"
" INSERT INTO {} (zoom, x, y) VALUES ($1, $2, $3)"
" ON CONFLICT DO NOTHING",
qn);
db_connection.prepare("insert_tiles",
"INSERT INTO {} (zoom, x, y)"
" VALUES ($1::int4, $2::int4, $3::int4)"
" ON CONFLICT DO NOTHING",
qn);
} else {
// new format with fields: zoom, x, y, first, last
db_connection.exec("PREPARE insert_tiles(int4, int4, int4) AS"
" INSERT INTO {} (zoom, x, y) VALUES ($1, $2, $3)"
" ON CONFLICT (zoom, x, y)"
" DO UPDATE SET last = CURRENT_TIMESTAMP(0)",
qn);
db_connection.prepare("insert_tiles",
"INSERT INTO {} (zoom, x, y)"
" VALUES ($1::int4, $2::int4, $3::int4)"
" ON CONFLICT (zoom, x, y)"
" DO UPDATE SET last = CURRENT_TIMESTAMP(0)",
qn);
}

auto const count = for_each_tile(
Expand Down
14 changes: 6 additions & 8 deletions src/flex-table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,12 @@ std::string flex_table_t::build_sql_prepare_get_wkb() const

if (has_multicolumn_id_index()) {
return fmt::format(
R"(PREPARE get_wkb_{}(char(1), bigint) AS)"
R"( SELECT {} FROM {} WHERE "{}" = $1 AND "{}" = $2)",
m_table_num, columns, full_name(), m_columns[0].name(),
m_columns[1].name());
R"(SELECT {} FROM {} WHERE "{}" = $1::char(1) AND "{}" = $2::bigint)",
columns, full_name(), m_columns[0].name(), m_columns[1].name());
}

return fmt::format(R"(PREPARE get_wkb_{}(bigint) AS)"
R"( SELECT {} FROM {} WHERE "{}" = $1)",
m_table_num, columns, full_name(), id_column_names());
return fmt::format(R"(SELECT {} FROM {} WHERE "{}" = $1::bigint)", columns,
full_name(), id_column_names());
}

std::string
Expand Down Expand Up @@ -246,7 +243,8 @@ bool flex_table_t::has_columns_with_expire() const noexcept
void flex_table_t::prepare(pg_conn_t const &db_connection) const
{
if (has_id_column() && has_columns_with_expire()) {
db_connection.exec(build_sql_prepare_get_wkb());
auto const stmt = fmt::format("get_wkb_{}", m_table_num);
db_connection.prepare(stmt, fmt::runtime(build_sql_prepare_get_wkb()));
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/middle-pgsql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1105,10 +1105,11 @@ void middle_pgsql_t::update_users_table()
log_info("Writing {} entries to table '{}'...", m_users.size(),
m_users_table.name());

m_db_connection.exec("PREPARE insert_user(int8, text) AS"
" INSERT INTO {}.\"{}\" (id, name) VALUES ($1, $2)"
" ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id",
m_users_table.schema(), m_users_table.name());
m_db_connection.prepare(
"insert_user",
"INSERT INTO {}.\"{}\" (id, name) VALUES ($1::int8, $2::text)"
" ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id",
m_users_table.schema(), m_users_table.name());

for (auto const &[id, name] : m_users) {
m_db_connection.exec_prepared("insert_user", id, name);
Expand Down
14 changes: 14 additions & 0 deletions src/pgsql.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,20 @@ void pg_conn_t::copy_end(std::string_view context) const
}
}

void pg_conn_t::prepare_internal(std::string_view stmt,
std::string_view sql) const
{
if (get_logger().log_sql()) {
log_sql("(C{}) PREPARE {} AS {}", m_connection_id, stmt, sql);
}

pg_result_t const res{
PQprepare(m_conn.get(), stmt.data(), sql.data(), 0, nullptr)};
if (res.status() != PGRES_COMMAND_OK) {
throw fmt_error("Prepare failed for '{}': {}.", sql, error_msg());
}
}

pg_result_t pg_conn_t::exec_prepared_internal(char const *stmt, int num_params,
char const *const *param_values,
int *param_lengths,
Expand Down
20 changes: 20 additions & 0 deletions src/pgsql.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,24 @@ class pg_conn_t
return exec(fmt::format(sql, std::forward<TArgs>(params)...));
}

/**
* Prepare SQL query.
*
* \param stmt Name of the prepared query.
* \param sql SQL query.
* \param params Any number of arguments for the fmt lib.
* \throws std::runtime_exception If the command failed (didn't return
* status code PGRES_COMMAND_OK).
*/
template <typename... TArgs>
void prepare(std::string_view stmt, fmt::format_string<TArgs...> sql,
TArgs... params) const
{
std::string const query =
fmt::format(sql, std::forward<TArgs>(params)...);
prepare_internal(stmt, query);
}

/**
* Run the named prepared SQL statement and return the results in text
* format.
Expand Down Expand Up @@ -228,6 +246,8 @@ class pg_conn_t
void close();

private:
void prepare_internal(std::string_view stmt, std::string_view sql) const;

pg_result_t exec_prepared_internal(char const *stmt, int num_params,
char const *const *param_values,
int *param_lengths, int *param_formats,
Expand Down
6 changes: 3 additions & 3 deletions src/properties.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ void properties_t::store()

pg_conn_t const db_connection{m_connection_params, "prop.store"};

db_connection.exec(
"PREPARE set_property(text, text) AS"
" INSERT INTO {} (property, value) VALUES ($1, $2)"
db_connection.prepare(
"set_property",
"INSERT INTO {} (property, value) VALUES ($1::text, $2::text)"
" ON CONFLICT (property) DO UPDATE SET value = EXCLUDED.value",
table);

Expand Down
5 changes: 2 additions & 3 deletions src/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ void table_t::prepare()
{
//let postgres cache this query as it will presumably happen a lot
auto const qual_name = qualified_name(m_target->schema(), m_target->name());
m_db_connection->exec("PREPARE get_wkb(int8) AS"
" SELECT way FROM {} WHERE osm_id = $1",
qual_name);
m_db_connection->prepare(
"get_wkb", "SELECT way FROM {} WHERE osm_id = $1::int8", qual_name);
}

void table_t::generate_copy_column_list()
Expand Down

0 comments on commit f4ace45

Please sign in to comment.