Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update postgres integration #799

Merged
merged 7 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ test("error result", async () => {
status: "error",
metadata: {
error: "Run Error",
message: "Error Message",
},
connection: {
id: "1",
Expand All @@ -59,8 +60,62 @@ test("error result", async () => {
)

await waitFor(() => {
expect(screen.getByText("Run Error")).toBeInTheDocument()
expect(
screen.getByText("Validation Failed - Run Error"),
).toBeInTheDocument()
})

expect(screen.getByText("Error Message")).toBeInTheDocument()
})

test("error result unknown", async () => {
const mocks = [
{
request: {
query: GET_RUN,
variables: {
workspaceId: "1",
runId: "1",
},
},
result: {
data: {
workspace: {
id: "1",
run: {
id: "1",
status: "error",
metadata: {
error: "Unknown",
message: "Error Message",
},
connection: {
id: "1",
validated: false,
},
},
},
},
},
},
]

render(
<ValidateConnection
workspaceId="1"
run={{
id: "1",
}}
onValidate={onValidate}
/>,
{ mocks },
)

await waitFor(() => {
expect(screen.getByText("Validation Failed")).toBeInTheDocument()
})

expect(screen.getByText("Error Message")).toBeInTheDocument()
})

test("error", async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,13 @@ const ValidateConnection: React.FC<ValidateConnectionProps> = ({
if (runError)
return (
<Alert severity="error">
<AlertTitle>Validation Failed</AlertTitle>
{data.workspace.run.metadata.error}
<AlertTitle>
Validation Failed
{data.workspace.run.metadata.error !== "Unknown"
? ` - ${data.workspace.run.metadata.error}`
: ""}
</AlertTitle>
{data.workspace.run.metadata.message}
</Alert>
)

Expand Down
239 changes: 119 additions & 120 deletions grai-integrations/source-postgres/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions grai-integrations/source-postgres/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "grai_source_postgres"
version = "0.2.2"
version = "0.2.4"
description = ""
authors = ["Ian Eaves <ian@grai.io>"]
license = "Elastic-2.0"
Expand All @@ -15,7 +15,7 @@ documentation = "https://docs.grai.io/"
[tool.poetry.dependencies]
python = "^3.8"
pydantic = "^1.9.1"
grai-schemas = "^0.2.6"
grai-schemas = "^0.2.10"
PyYAML = "^6.0"
multimethod = "^1.8"
psycopg2 = "^2.9.5"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from grai_source_postgres import adapters, base, loader, models, package_definitions
from grai_source_postgres.package_definitions import config

__version__ = "0.2.2-alpha1"
__version__ = "0.2.4"
40 changes: 35 additions & 5 deletions grai-integrations/source-postgres/src/grai_source_postgres/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import re
from functools import cache
from typing import List, Optional, Tuple, Union

from grai_schemas.base import SourcedEdge, SourcedNode
from grai_schemas.integrations.base import GraiIntegrationImplementation
from grai_schemas.integrations.errors import (
IncorrectPasswordError,
MissingPermissionError,
NoConnectionError,
)
from grai_schemas.v1.source import SourceV1
from psycopg2.errors import OperationalError

from grai_source_postgres.adapters import adapt_to_client
from grai_source_postgres.loader import PostgresConnector
Expand Down Expand Up @@ -51,21 +58,44 @@ def __init__(
namespace=namespace,
)

def handle_error(self, err: OperationalError):
string = str(err)

if string.startswith("could not translate host name"):
raise NoConnectionError(string)

if string.startswith("connection to server at"):
if re.search(r"FATAL: password authentication failed for user", string):
raise IncorrectPasswordError(string)

if re.search(r"FATAL: database \".*\" does not exist", string):
raise MissingPermissionError(string)

raise err

raise err

@cache
def get_nodes_and_edges(self) -> Tuple[List[SourcedNode], List[SourcedEdge]]:
"""Returns a tuple of lists of SourcedNode and SourcedEdge objects"""
with self.connector.connect() as conn:
nodes, edges = conn.get_nodes_and_edges()
try:
with self.connector.connect() as conn:
nodes, edges = conn.get_nodes_and_edges()
except OperationalError as e:
self.handle_error(e)

nodes = adapt_to_client(nodes, self.source, self.version)
edges = adapt_to_client(edges, self.source, self.version)
return nodes, edges

def ready(self) -> bool:
"""Returns True if the integration is ready to run"""
with self.connector.connect() as _:
pass
return True
try:
with self.connector.connect() as _:
pass
return True
except OperationalError as e:
self.handle_error(e)

def nodes(self) -> List[SourcedNode]:
"""Returns a list of SourcedNode objects"""
Expand Down
20 changes: 12 additions & 8 deletions grai-server/app/connections/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
from typing import Type

from django.utils import timezone
from grai_schemas.integrations.errors import (
IncorrectPasswordError,
MissingPermissionError,
NoConnectionError,
)

from celery import shared_task
from grai_schemas.integrations.errors import NoConnectionError, IncorrectPasswordError, MissingPermissionError
from connections.adapters.base import BaseAdapter
from connections.adapters.bigquery import BigqueryAdapter
from connections.adapters.dbt import DbtAdapter
Expand Down Expand Up @@ -149,17 +153,17 @@
if run.commit.pull_request:
github.post_comment(run.commit.pull_request.reference, message)

except NoConnectionError:
error_run(run, {"error": "No connection"})
except NoConnectionError as e:
error_run(run, {"error": "No connection", "message": str(e)})

except IncorrectPasswordError:
error_run(run, {"error": "Incorrect password"})
except IncorrectPasswordError as e:
error_run(run, {"error": "Incorrect password", "message": str(e)})

Check warning on line 160 in grai-server/app/connections/tasks.py

View check run for this annotation

Codecov / codecov/patch

grai-server/app/connections/tasks.py#L160

Added line #L160 was not covered by tests

except MissingPermissionError:
error_run(run, {"error": "Missing permission"})
except MissingPermissionError as e:
error_run(run, {"error": "Missing permission", "message": str(e)})

Check warning on line 163 in grai-server/app/connections/tasks.py

View check run for this annotation

Codecov / codecov/patch

grai-server/app/connections/tasks.py#L163

Added line #L163 was not covered by tests

except Exception as e:
error_run(run, {"error": str(e), "traceback": traceback.format_exc()})
error_run(run, {"error": "Unknown", "message": str(e), "traceback": traceback.format_exc()})

raise e

Expand Down
86 changes: 77 additions & 9 deletions grai-server/app/connections/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import uuid
from datetime import date
from unittest import mock
from grai_schemas.integrations.errors import NoConnectionError

import pytest
from decouple import config
Expand Down Expand Up @@ -206,6 +206,10 @@ def test_run_update_server_postgres(self, test_workspace, test_postgres_connecto

process_run(str(run.id))

run.refresh_from_db()

assert run.status == "success"

def test_run_update_server_postgres_no_host(self, test_workspace, test_postgres_connector, test_source):
connection = Connection.objects.create(
name=str(uuid.uuid4()),
Expand All @@ -217,16 +221,77 @@ def test_run_update_server_postgres_no_host(self, test_workspace, test_postgres_
)
run = Run.objects.create(connection=connection, workspace=test_workspace, source=test_source)

with pytest.raises(Exception) as e_info:
process_run(str(run.id))
process_run(str(run.id))

run.refresh_from_db()

assert run.status == "error"
assert run.metadata["error"] == "No connection"
assert (
str(e_info.value)
run.metadata["message"]
== 'could not translate host name "a" to address: nodename nor servname provided, or not known\n'
or str(e_info.value)
or run.metadata["message"]
== 'could not translate host name "a" to address: Temporary failure in name resolution\n'
)

def test_run_update_server_postgres_wrong_password(self, test_workspace, test_postgres_connector, test_source):
connection = Connection.objects.create(
name=str(uuid.uuid4()),
connector=test_postgres_connector,
workspace=test_workspace,
source=test_source,
metadata={
"host": config("DB_HOST", "localhost"),
"port": 5432,
"dbname": "wrong",
"user": "grai",
},
secrets={"password": "wrong"},
)
run = Run.objects.create(connection=connection, workspace=test_workspace, source=test_source)

process_run(str(run.id))

run.refresh_from_db()

assert run.status == "error"
assert run.metadata["error"] == "Incorrect password"
assert (
run.metadata["message"]
== 'connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL: password authentication failed for user "grai"\n'
or run.metadata["message"]
== 'connection to server at "127.0.0.1", port 5432 failed: FATAL: password authentication failed for user "grai"\n'
)

def test_run_update_server_postgres_no_database(self, test_workspace, test_postgres_connector, test_source):
connection = Connection.objects.create(
name=str(uuid.uuid4()),
connector=test_postgres_connector,
workspace=test_workspace,
source=test_source,
metadata={
"host": config("DB_HOST", "localhost"),
"port": 5432,
"dbname": "wrong",
"user": "grai",
},
secrets={"password": "grai"},
)
run = Run.objects.create(connection=connection, workspace=test_workspace, source=test_source)

process_run(str(run.id))

run.refresh_from_db()

assert run.status == "error"
assert run.metadata["error"] == "Missing permission"
assert (
run.metadata["message"]
== 'connection to server at "localhost" (127.0.0.1), port 5432 failed: FATAL: database "wrong" does not exist\n'
or run.metadata["message"]
== 'connection to server at "127.0.0.1", port 5432 failed: FATAL: database "wrong" does not exist\n'
)

def test_run_update_server_no_connector(self, test_workspace, test_connector, test_source):
connection = Connection.objects.create(
name=str(uuid.uuid4()),
Expand Down Expand Up @@ -784,13 +849,16 @@ def test_run_connection_schedule_postgres(self, test_workspace, test_postgres_co
source=test_source,
)

with pytest.raises(Exception) as e_info:
run_connection_schedule(str(connection.id))
run_connection_schedule(str(connection.id))

run = connection.runs.last()

assert run.status == "error"
assert run.metadata["error"] == "No connection"
assert (
str(e_info.value)
run.metadata["message"]
== 'could not translate host name "a" to address: nodename nor servname provided, or not known\n'
or str(e_info.value)
or run.metadata["message"]
== 'could not translate host name "a" to address: Temporary failure in name resolution\n'
)

Expand Down
Loading
Loading