Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network Tunneling in Batch SQL Connectors #2190

Merged
merged 6 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions go/network-tunnel/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package networkTunnel

import (
"context"
"fmt"
"net"
)

type SSHForwardingConfig struct {
SSHEndpoint string `json:"sshEndpoint" jsonschema:"title=SSH Endpoint,description=Endpoint of the remote SSH server that supports tunneling (in the form of ssh://user@hostname[:port])" jsonschema_extras:"pattern=^ssh://.+@.+$"`
PrivateKey string `json:"privateKey" jsonschema:"title=SSH Private Key,description=Private key to connect to the remote SSH server." jsonschema_extras:"secret=true,multiline=true"`
}

type TunnelConfig struct {
SSHForwarding *SSHForwardingConfig `json:"sshForwarding,omitempty" jsonschema:"title=SSH Forwarding"`
}

func (cfg *TunnelConfig) InUse() bool {
return cfg != nil && cfg.SSHForwarding != nil && cfg.SSHForwarding.SSHEndpoint != ""
}

func (cfg *TunnelConfig) Start(ctx context.Context, address string, localPort string) (*SshTunnel, error) {
host, port, err := net.SplitHostPort(address)
if err != nil {
return nil, fmt.Errorf("error splitting address %q into host and port: %w", address, err)
}

var sshConfig = &SshConfig{
SshEndpoint: cfg.SSHForwarding.SSHEndpoint,
PrivateKey: []byte(cfg.SSHForwarding.PrivateKey),
ForwardHost: host,
ForwardPort: port,
LocalPort: localPort,
}
var tunnel = sshConfig.CreateTunnel()

if err := tunnel.Start(); err != nil {
return nil, err
}
return tunnel, nil
}
2 changes: 1 addition & 1 deletion go/network-tunnel/network_tunnel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package network_tunnel
package networkTunnel

import (
"bytes"
Expand Down
1 change: 0 additions & 1 deletion source-bigquery-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type Config struct {
Dataset string `json:"dataset" jsonschema:"title=Dataset,description=BigQuery dataset to discover tables within." jsonschema_extras:"order=2"`

Advanced advancedConfig `json:"advanced,omitempty" jsonschema:"title=Advanced Options,description=Options for advanced users. You should not typically need to modify these." jsonschema_extra:"advanced=true"`
// TODO(wgd): Add network tunnel support
}

type advancedConfig struct {
Expand Down
32 changes: 32 additions & 0 deletions source-mysql-batch/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,38 @@
"type": "object",
"title": "Advanced Options",
"description": "Options for advanced users. You should not typically need to modify these."
},
"networkTunnel": {
"properties": {
"sshForwarding": {
"properties": {
"sshEndpoint": {
"type": "string",
"title": "SSH Endpoint",
"description": "Endpoint of the remote SSH server that supports tunneling (in the form of ssh://user@hostname[:port])",
"pattern": "^ssh://.+@.+$"
},
"privateKey": {
"type": "string",
"title": "SSH Private Key",
"description": "Private key to connect to the remote SSH server.",
"multiline": true,
"secret": true
}
},
"additionalProperties": false,
"type": "object",
"required": [
"sshEndpoint",
"privateKey"
],
"title": "SSH Forwarding"
}
},
"additionalProperties": false,
"type": "object",
"title": "Network Tunnel",
"description": "Connect to your system through an SSH server that acts as a bastion host for your network."
}
},
"type": "object",
Expand Down
17 changes: 14 additions & 3 deletions source-mysql-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

cerrors "github.com/estuary/connectors/go/connector-errors"
networkTunnel "github.com/estuary/connectors/go/network-tunnel"
"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/source-boilerplate"
Expand All @@ -23,7 +24,8 @@ type Config struct {
User string `json:"user" jsonschema:"default=flow_capture,description=The database user to authenticate as." jsonschema_extras:"order=1"`
Password string `json:"password" jsonschema:"description=Password for the specified database user." jsonschema_extras:"secret=true,order=2"`
Advanced advancedConfig `json:"advanced,omitempty" jsonschema:"title=Advanced Options,description=Options for advanced users. You should not typically need to modify these." jsonschema_extra:"advanced=true"`
// TODO(wgd): Add network tunnel support

NetworkTunnel *networkTunnel.TunnelConfig `json:"networkTunnel,omitempty" jsonschema:"title=Network Tunnel,description=Connect to your system through an SSH server that acts as a bastion host for your network."`
}

type advancedConfig struct {
Expand Down Expand Up @@ -79,6 +81,15 @@ func connectMySQL(ctx context.Context, cfg *Config) (*client.Conn, error) {
"user": cfg.User,
}).Info("connecting to database")

// If a network tunnel is configured, then try to start it before establishing connections.
var address = cfg.Address
if cfg.NetworkTunnel.InUse() {
if _, err := cfg.NetworkTunnel.Start(ctx, cfg.Address, "3306"); err != nil {
return nil, err
}
address = "localhost:3306"
}

var conn *client.Conn

const mysqlErrorCodeSecureTransportRequired = 3159 // From https://dev.mysql.com/doc/mysql-errors/8.4/en/server-error-reference.html
Expand All @@ -94,12 +105,12 @@ func connectMySQL(ctx context.Context, cfg *Config) (*client.Conn, error) {
// * Otherwise we report both errors because it's better to be clear what failed and how.
// * Except if the non-TLS connection specifically failed because TLS is required then
// we don't need to mention that and just return the with-TLS error.
if connWithTLS, errWithTLS := client.Connect(cfg.Address, cfg.User, cfg.Password, cfg.Advanced.DBName, withTLS); errWithTLS == nil {
if connWithTLS, errWithTLS := client.Connect(address, cfg.User, cfg.Password, cfg.Advanced.DBName, withTLS); errWithTLS == nil {
log.WithField("addr", cfg.Address).Info("connected with TLS")
conn = connWithTLS
} else if errors.As(errWithTLS, &mysqlErr) && mysqlErr.Code == mysql.ER_ACCESS_DENIED_ERROR {
return nil, cerrors.NewUserError(mysqlErr, "incorrect username or password")
} else if connWithoutTLS, errWithoutTLS := client.Connect(cfg.Address, cfg.User, cfg.Password, cfg.Advanced.DBName); errWithoutTLS == nil {
} else if connWithoutTLS, errWithoutTLS := client.Connect(address, cfg.User, cfg.Password, cfg.Advanced.DBName); errWithoutTLS == nil {
log.WithField("addr", cfg.Address).Info("connected without TLS")
conn = connWithoutTLS
} else if errors.As(errWithoutTLS, &mysqlErr) && mysqlErr.Code == mysql.ER_ACCESS_DENIED_ERROR {
Expand Down
32 changes: 32 additions & 0 deletions source-postgres-batch/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,38 @@
"type": "object",
"title": "Advanced Options",
"description": "Options for advanced users. You should not typically need to modify these."
},
"networkTunnel": {
"properties": {
"sshForwarding": {
"properties": {
"sshEndpoint": {
"type": "string",
"title": "SSH Endpoint",
"description": "Endpoint of the remote SSH server that supports tunneling (in the form of ssh://user@hostname[:port])",
"pattern": "^ssh://.+@.+$"
},
"privateKey": {
"type": "string",
"title": "SSH Private Key",
"description": "Private key to connect to the remote SSH server.",
"multiline": true,
"secret": true
}
},
"additionalProperties": false,
"type": "object",
"required": [
"sshEndpoint",
"privateKey"
],
"title": "SSH Forwarding"
}
},
"additionalProperties": false,
"type": "object",
"title": "Network Tunnel",
"description": "Connect to your system through an SSH server that acts as a bastion host for your network."
}
},
"type": "object",
Expand Down
14 changes: 13 additions & 1 deletion source-postgres-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"text/template"

networkTunnel "github.com/estuary/connectors/go/network-tunnel"
"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/source-boilerplate"
Expand All @@ -25,7 +26,8 @@ type Config struct {
Password string `json:"password" jsonschema:"description=Password for the specified database user." jsonschema_extras:"secret=true,order=2"`
Database string `json:"database" jsonschema:"default=postgres,description=Logical database name to capture from." jsonschema_extras:"order=3"`
Advanced advancedConfig `json:"advanced,omitempty" jsonschema:"title=Advanced Options,description=Options for advanced users. You should not typically need to modify these." jsonschema_extra:"advanced=true"`
// TODO(wgd): Add network tunnel support

NetworkTunnel *networkTunnel.TunnelConfig `json:"networkTunnel,omitempty" jsonschema:"title=Network Tunnel,description=Connect to your system through an SSH server that acts as a bastion host for your network."`
}

type advancedConfig struct {
Expand Down Expand Up @@ -71,6 +73,9 @@ func (c *Config) SetDefaults() {
// ToURI converts the Config to a DSN string.
func (c *Config) ToURI() string {
var address = c.Address
if c.NetworkTunnel.InUse() {
address = "localhost:5432"
}
var uri = url.URL{
Scheme: "postgres",
Host: address,
Expand All @@ -96,6 +101,13 @@ func connectPostgres(ctx context.Context, cfg *Config) (*sql.DB, error) {
"database": cfg.Database,
}).Info("connecting to database")

// If a network tunnel is configured, then try to start it before establishing connections.
if cfg.NetworkTunnel.InUse() {
if _, err := cfg.NetworkTunnel.Start(ctx, cfg.Address, "5432"); err != nil {
return nil, err
}
}

var db, err = sql.Open("pgx", cfg.ToURI())
if err != nil {
return nil, fmt.Errorf("error opening database connection: %w", err)
Expand Down
32 changes: 32 additions & 0 deletions source-redshift-batch/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,38 @@
"type": "object",
"title": "Advanced Options",
"description": "Options for advanced users. You should not typically need to modify these."
},
"networkTunnel": {
"properties": {
"sshForwarding": {
"properties": {
"sshEndpoint": {
"type": "string",
"title": "SSH Endpoint",
"description": "Endpoint of the remote SSH server that supports tunneling (in the form of ssh://user@hostname[:port])",
"pattern": "^ssh://.+@.+$"
},
"privateKey": {
"type": "string",
"title": "SSH Private Key",
"description": "Private key to connect to the remote SSH server.",
"multiline": true,
"secret": true
}
},
"additionalProperties": false,
"type": "object",
"required": [
"sshEndpoint",
"privateKey"
],
"title": "SSH Forwarding"
}
},
"additionalProperties": false,
"type": "object",
"title": "Network Tunnel",
"description": "Connect to your system through an SSH server that acts as a bastion host for your network."
}
},
"type": "object",
Expand Down
14 changes: 13 additions & 1 deletion source-redshift-batch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strings"

networkTunnel "github.com/estuary/connectors/go/network-tunnel"
"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
boilerplate "github.com/estuary/connectors/source-boilerplate"
Expand All @@ -24,7 +25,8 @@ type Config struct {
Password string `json:"password" jsonschema:"description=Password for the specified database user." jsonschema_extras:"secret=true,order=2"`
Database string `json:"database" jsonschema:"default=dev,description=Logical database name to capture from." jsonschema_extras:"order=3"`
Advanced advancedConfig `json:"advanced,omitempty" jsonschema:"title=Advanced Options,description=Options for advanced users. You should not typically need to modify these." jsonschema_extra:"advanced=true"`
// TODO(wgd): Add network tunnel support

NetworkTunnel *networkTunnel.TunnelConfig `json:"networkTunnel,omitempty" jsonschema:"title=Network Tunnel,description=Connect to your system through an SSH server that acts as a bastion host for your network."`
}

type advancedConfig struct {
Expand Down Expand Up @@ -70,6 +72,9 @@ func (c *Config) SetDefaults() {
// ToURI converts the Config to a DSN string.
func (c *Config) ToURI() string {
var address = c.Address
if c.NetworkTunnel.InUse() {
address = "localhost:5432"
}
var uri = url.URL{
Scheme: "postgres",
Host: address,
Expand All @@ -95,6 +100,13 @@ func connectRedshift(ctx context.Context, cfg *Config) (*sql.DB, error) {
"database": cfg.Database,
}).Info("connecting to database")

// If a network tunnel is configured, then try to start it before establishing connections.
if cfg.NetworkTunnel.InUse() {
if _, err := cfg.NetworkTunnel.Start(ctx, cfg.Address, "5432"); err != nil {
return nil, err
}
}

var db, err = sql.Open("pgx", cfg.ToURI())
if err != nil {
return nil, fmt.Errorf("error opening database connection: %w", err)
Expand Down
Loading