Skip to content

Commit

Permalink
fix: s3 table upload location (#1376)
Browse files Browse the repository at this point in the history
* fix: s3 table upload location

* fix linter

* sanitize s3 path

* disable redirect

* fix linter

* add break change & bump version

* typing

* remove `

* breaking change
  • Loading branch information
jczhong84 authored Nov 29, 2023
1 parent e332fc5 commit 0b5699a
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 33 deletions.
10 changes: 10 additions & 0 deletions docs_website/docs/changelog/breaking_change.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,22 @@ slug: /changelog

Here are the list of breaking changes that you should be aware of when updating Querybook:

## v3.29.0

Made below changes for `S3BaseExporter` (csv table uploader feature):

- Both `s3_path` and `use_schema_location` are optional now
- If none is provided, or `use_schema_location=False`, the table will be created as managed table, whose location will be determined by the query engine.
- Previously `use_schema_location=True` will create managed table, and now it creates external table.

## v3.27.0

Updated properties of `QueryValidationResult` object. `line` and `ch` are replaced with `start_line` and `start_ch` respectively.

## v3.22.0

Updated the charset of table `data_element` to `utf8mb4`. For those mysql db's default charset is not utf8, please run below sql to update it if needed.

```sql
ALTER TABLE data_element CONVERT TO CHARACTER SET utf8mb4
```
Expand Down
9 changes: 5 additions & 4 deletions docs_website/docs/integrations/add_table_upload.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@ Included by default: No

Available options:

Either s3_path or use_schema_location must be supplied.

- s3_path (str): if supplied, will use it as the root path for upload. Must be the full s3 path like s3://bucket/key, the trailing / is optional.
- use_schema_location (boolean):
- s3_path (str, optional): if supplied, will use it as the root path for upload. Must be the full s3 path like s3://bucket/key, the trailing / is optional.
- use_schema_location (boolean, optional):
if true, the upload root path is inferred by locationUri specified by the schema/database in HMS. To use this option, the engine must be connected to a metastore that uses
HMSMetastoreLoader (or its derived class).
if false, it will be created as managed table, whose location will be determined automatically by the query engine.
- table_properties (List[str]): list of table properties passed, this must be query engine specific.
Checkout here for examples in SparkSQL: https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-hiveformat.html#examples
For Trino/Presto, it would be the WITH statement: https://trino.io/docs/current/sql/create-table.html

If neither s3_path nor use_schema_location is supplied, it will be treated same as `use_schema_location=False`, and it will be created as managed table.

### S3 Parquet exporter

This would upload a Parquet file instead of a CSV file. In addition to dependencies such as boto3, `pyarrow` must also be installed.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, Union

from clients.hms_client import HiveMetastoreClient
from const.metastore import DataColumn, DataTable
Expand Down Expand Up @@ -101,6 +101,12 @@ def get_partitions(
def get_schema_location(self, schema_name: str) -> str:
return self.hmc.get_database(schema_name).locationUri

def get_table_location(self, schema_name: str, table_name: str) -> Union[None, str]:
try:
return self.hmc.get_table(schema_name, table_name).sd.location
except NoSuchObjectException:
return None

def _get_hmc(self, metastore_dict):
return HiveMetastoreClient(
hmss_ro_addrs=metastore_dict["metastore_params"]["hms_connection"]
Expand Down
59 changes: 32 additions & 27 deletions querybook/server/lib/table_upload/exporter/s3_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
if true, the upload root path is inferred by locationUri specified in hms
to use this option, the engine must be connected to a metastore that uses
HMSMetastoreLoader (or its derived class)
if false, it will be created as managed table, whose location will be determined automatically by the query engine.
- table_properties (List[str]): list of table properties passed, this must be query engine specific.
Checkout here for examples in SparkSQL: https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-create-table-hiveformat.html#examples
For Trino/Presto, it would be the WITH statement: https://trino.io/docs/current/sql/create-table.html
If neither s3_path nor use_schema_location is provided, it will be treated same as `use_schema_location=False``,
and it will be created as managed table.
"""


Expand All @@ -61,40 +65,39 @@ def destination_s3_path(self, session=None) -> str:
return self.destination_s3_folder(session=session) + "/" + "0000"

@with_session
def destination_s3_root(self, session=None) -> str:
"""Generate the bucket name + prefix before
the table specific folder
def destination_s3_folder(self, session=None) -> str:
"""Generate the s3 folder path for the table
Returns:
str: s3 path consisting bucket + prefix + schema name
str: s3 path consisting bucket + prefix + schema name + table name
"""

schema_name, table_name = self._fq_table_name
if "s3_path" in self._exporter_config:
schema_name, _ = self._fq_table_name
s3_path: str = self._exporter_config["s3_path"]
return sanitize_s3_url(s3_path) + "/" + schema_name + "/" + table_name

return sanitize_s3_url_with_trailing_slash(s3_path) + schema_name + "/"

if self._exporter_config.get("use_schema_location", False):
query_engine = get_query_engine_by_id(self._engine_id, session=session)
metastore = get_metastore_loader(query_engine.metastore_id, session=session)
query_engine = get_query_engine_by_id(self._engine_id, session=session)
metastore = get_metastore_loader(query_engine.metastore_id, session=session)

if metastore is None:
raise Exception("Invalid metastore")
if metastore is None:
raise Exception("Invalid metastore for table upload")

schema_location_uri = metastore.get_schema_location(
self._table_config["schema_name"]
)
if self._exporter_config.get("use_schema_location", False):
schema_location_uri = metastore.get_schema_location(schema_name)
if not schema_location_uri:
raise Exception("Invalid metastore to use use_schema_location option")

return sanitize_s3_url_with_trailing_slash(schema_location_uri)
return sanitize_s3_url(schema_location_uri) + "/" + table_name

raise Exception("Must specify s3_path or set use_schema_location=True")
# Use its actual location for managed tables
table_location = metastore.get_table_location(schema_name, table_name)

@with_session
def destination_s3_folder(self, session=None) -> str:
_, table_name = self._fq_table_name
return self.destination_s3_root(session=session) + table_name
if not table_location:
raise Exception(
"Cant get the table location from metastore. Please make sure the query engine supports managed table with default location."
)
return sanitize_s3_url(table_location)

@with_session
def _handle_if_table_exists(self, session=None):
Expand All @@ -118,13 +121,15 @@ def _handle_if_table_exists(self, session=None):
def _get_table_create_query(self, session=None) -> str:
query_engine = get_query_engine_by_id(self._engine_id, session=session)
schema_name, table_name = self._fq_table_name
is_external = not self._exporter_config.get("use_schema_location", False)
is_external = "s3_path" in self._exporter_config or self._exporter_config.get(
"use_schema_location"
)
return get_create_table_statement(
language=query_engine.language,
table_name=table_name,
schema_name=schema_name,
column_name_types=self._table_config["column_name_types"],
# if use schema location, then no table location is needed for creation
# table location is only needed for external (non managed) table creation
file_location=self.destination_s3_folder() if is_external else None,
file_format=self.UPLOAD_FILE_TYPE(),
table_properties=self._exporter_config.get("table_properties", []),
Expand Down Expand Up @@ -203,13 +208,13 @@ def _upload_to_s3(self) -> None:
S3FileCopier.from_local_file(f).copy_to(self.destination_s3_path())


def sanitize_s3_url_with_trailing_slash(uri: str) -> str:
def sanitize_s3_url(uri: str) -> str:
"""
This function does two things:
1. if the uri is s3a:// or s3n://, change it to s3://
2. if there is no trailing slash, add it
2. remove the trailing slash if it has one
"""
uri = re.sub(r"^s3[a-z]:", "s3:", uri)
if not uri.endswith("/"):
uri += "/"
if uri.endswith("/"):
uri = uri[:-1]
return uri
10 changes: 9 additions & 1 deletion querybook/webapp/components/TableUploader/TableUploaderForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,15 @@ export const TableUploaderForm: React.FC<ITableUploaderFormProps> = ({
error: 'Fail to create table',
});

navigateWithinEnv(`/table/${tableId}`);
// sometimes there will be sync delay between the metastore and querybook
// skip the redirection if the table has not been synced over.
if (tableId) {
navigateWithinEnv(`/table/${tableId}`);
} else {
toast(
'Waiting for the table to be synced over from the metastore.'
);
}
onHide();
},
[onHide]
Expand Down

0 comments on commit 0b5699a

Please sign in to comment.