diff --git a/go/network-tunnel/interface.go b/go/network-tunnel/interface.go new file mode 100644 index 0000000000..17cbfe6b6d --- /dev/null +++ b/go/network-tunnel/interface.go @@ -0,0 +1,41 @@ +package networkTunnel + +import ( + "context" + "fmt" + "net" +) + +type SSHForwardingConfig struct { + SSHEndpoint string `json:"sshEndpoint" jsonschema:"title=SSH Endpoint,description=Endpoint of the remote SSH server that supports tunneling (in the form of ssh://user@hostname[:port])" jsonschema_extras:"pattern=^ssh://.+@.+$"` + PrivateKey string `json:"privateKey" jsonschema:"title=SSH Private Key,description=Private key to connect to the remote SSH server." jsonschema_extras:"secret=true,multiline=true"` +} + +type TunnelConfig struct { + SSHForwarding *SSHForwardingConfig `json:"sshForwarding,omitempty" jsonschema:"title=SSH Forwarding"` +} + +func (cfg *TunnelConfig) InUse() bool { + return cfg != nil && cfg.SSHForwarding != nil && cfg.SSHForwarding.SSHEndpoint != "" +} + +func (cfg *TunnelConfig) Start(ctx context.Context, address string, localPort string) (*SshTunnel, error) { + host, port, err := net.SplitHostPort(address) + if err != nil { + return nil, fmt.Errorf("error splitting address %q into host and port: %w", address, err) + } + + var sshConfig = &SshConfig{ + SshEndpoint: cfg.SSHForwarding.SSHEndpoint, + PrivateKey: []byte(cfg.SSHForwarding.PrivateKey), + ForwardHost: host, + ForwardPort: port, + LocalPort: localPort, + } + var tunnel = sshConfig.CreateTunnel() + + if err := tunnel.Start(); err != nil { + return nil, err + } + return tunnel, nil +} diff --git a/go/network-tunnel/network_tunnel.go b/go/network-tunnel/network_tunnel.go index 488aa08fd6..af9a1e38d3 100644 --- a/go/network-tunnel/network_tunnel.go +++ b/go/network-tunnel/network_tunnel.go @@ -1,4 +1,4 @@ -package network_tunnel +package networkTunnel import ( "bytes" diff --git a/source-bigquery-batch/main.go b/source-bigquery-batch/main.go index 7c79b757bf..333b1fcbaf 100644 --- a/source-bigquery-batch/main.go +++ b/source-bigquery-batch/main.go @@ -24,7 +24,6 @@ type Config struct { Dataset string `json:"dataset" jsonschema:"title=Dataset,description=BigQuery dataset to discover tables within." jsonschema_extras:"order=2"` Advanced advancedConfig `json:"advanced,omitempty" jsonschema:"title=Advanced Options,description=Options for advanced users. You should not typically need to modify these." jsonschema_extra:"advanced=true"` - // TODO(wgd): Add network tunnel support } type advancedConfig struct { diff --git a/source-mysql-batch/.snapshots/TestSpec b/source-mysql-batch/.snapshots/TestSpec index 529d7aef4a..bb9d3c26d1 100644 --- a/source-mysql-batch/.snapshots/TestSpec +++ b/source-mysql-batch/.snapshots/TestSpec @@ -47,6 +47,38 @@ "type": "object", "title": "Advanced Options", "description": "Options for advanced users. You should not typically need to modify these." + }, + "networkTunnel": { + "properties": { + "sshForwarding": { + "properties": { + "sshEndpoint": { + "type": "string", + "title": "SSH Endpoint", + "description": "Endpoint of the remote SSH server that supports tunneling (in the form of ssh://user@hostname[:port])", + "pattern": "^ssh://.+@.+$" + }, + "privateKey": { + "type": "string", + "title": "SSH Private Key", + "description": "Private key to connect to the remote SSH server.", + "multiline": true, + "secret": true + } + }, + "additionalProperties": false, + "type": "object", + "required": [ + "sshEndpoint", + "privateKey" + ], + "title": "SSH Forwarding" + } + }, + "additionalProperties": false, + "type": "object", + "title": "Network Tunnel", + "description": "Connect to your system through an SSH server that acts as a bastion host for your network." } }, "type": "object", diff --git a/source-mysql-batch/main.go b/source-mysql-batch/main.go index 366aaf8e3c..2b1f14f748 100644 --- a/source-mysql-batch/main.go +++ b/source-mysql-batch/main.go @@ -9,6 +9,7 @@ import ( "strings" cerrors "github.com/estuary/connectors/go/connector-errors" + networkTunnel "github.com/estuary/connectors/go/network-tunnel" "github.com/estuary/connectors/go/schedule" schemagen "github.com/estuary/connectors/go/schema-gen" boilerplate "github.com/estuary/connectors/source-boilerplate" @@ -23,7 +24,8 @@ type Config struct { User string `json:"user" jsonschema:"default=flow_capture,description=The database user to authenticate as." jsonschema_extras:"order=1"` Password string `json:"password" jsonschema:"description=Password for the specified database user." jsonschema_extras:"secret=true,order=2"` Advanced advancedConfig `json:"advanced,omitempty" jsonschema:"title=Advanced Options,description=Options for advanced users. You should not typically need to modify these." jsonschema_extra:"advanced=true"` - // TODO(wgd): Add network tunnel support + + NetworkTunnel *networkTunnel.TunnelConfig `json:"networkTunnel,omitempty" jsonschema:"title=Network Tunnel,description=Connect to your system through an SSH server that acts as a bastion host for your network."` } type advancedConfig struct { @@ -79,6 +81,15 @@ func connectMySQL(ctx context.Context, cfg *Config) (*client.Conn, error) { "user": cfg.User, }).Info("connecting to database") + // If a network tunnel is configured, then try to start it before establishing connections. + var address = cfg.Address + if cfg.NetworkTunnel.InUse() { + if _, err := cfg.NetworkTunnel.Start(ctx, cfg.Address, "3306"); err != nil { + return nil, err + } + address = "localhost:3306" + } + var conn *client.Conn const mysqlErrorCodeSecureTransportRequired = 3159 // From https://dev.mysql.com/doc/mysql-errors/8.4/en/server-error-reference.html @@ -94,12 +105,12 @@ func connectMySQL(ctx context.Context, cfg *Config) (*client.Conn, error) { // * Otherwise we report both errors because it's better to be clear what failed and how. // * Except if the non-TLS connection specifically failed because TLS is required then // we don't need to mention that and just return the with-TLS error. - if connWithTLS, errWithTLS := client.Connect(cfg.Address, cfg.User, cfg.Password, cfg.Advanced.DBName, withTLS); errWithTLS == nil { + if connWithTLS, errWithTLS := client.Connect(address, cfg.User, cfg.Password, cfg.Advanced.DBName, withTLS); errWithTLS == nil { log.WithField("addr", cfg.Address).Info("connected with TLS") conn = connWithTLS } else if errors.As(errWithTLS, &mysqlErr) && mysqlErr.Code == mysql.ER_ACCESS_DENIED_ERROR { return nil, cerrors.NewUserError(mysqlErr, "incorrect username or password") - } else if connWithoutTLS, errWithoutTLS := client.Connect(cfg.Address, cfg.User, cfg.Password, cfg.Advanced.DBName); errWithoutTLS == nil { + } else if connWithoutTLS, errWithoutTLS := client.Connect(address, cfg.User, cfg.Password, cfg.Advanced.DBName); errWithoutTLS == nil { log.WithField("addr", cfg.Address).Info("connected without TLS") conn = connWithoutTLS } else if errors.As(errWithoutTLS, &mysqlErr) && mysqlErr.Code == mysql.ER_ACCESS_DENIED_ERROR { diff --git a/source-postgres-batch/.snapshots/TestSpec b/source-postgres-batch/.snapshots/TestSpec index f538baec2d..a2473bae58 100644 --- a/source-postgres-batch/.snapshots/TestSpec +++ b/source-postgres-batch/.snapshots/TestSpec @@ -61,6 +61,38 @@ "type": "object", "title": "Advanced Options", "description": "Options for advanced users. You should not typically need to modify these." + }, + "networkTunnel": { + "properties": { + "sshForwarding": { + "properties": { + "sshEndpoint": { + "type": "string", + "title": "SSH Endpoint", + "description": "Endpoint of the remote SSH server that supports tunneling (in the form of ssh://user@hostname[:port])", + "pattern": "^ssh://.+@.+$" + }, + "privateKey": { + "type": "string", + "title": "SSH Private Key", + "description": "Private key to connect to the remote SSH server.", + "multiline": true, + "secret": true + } + }, + "additionalProperties": false, + "type": "object", + "required": [ + "sshEndpoint", + "privateKey" + ], + "title": "SSH Forwarding" + } + }, + "additionalProperties": false, + "type": "object", + "title": "Network Tunnel", + "description": "Connect to your system through an SSH server that acts as a bastion host for your network." } }, "type": "object", diff --git a/source-postgres-batch/main.go b/source-postgres-batch/main.go index dd5b74fdef..6a0020622a 100644 --- a/source-postgres-batch/main.go +++ b/source-postgres-batch/main.go @@ -10,6 +10,7 @@ import ( "strings" "text/template" + networkTunnel "github.com/estuary/connectors/go/network-tunnel" "github.com/estuary/connectors/go/schedule" schemagen "github.com/estuary/connectors/go/schema-gen" boilerplate "github.com/estuary/connectors/source-boilerplate" @@ -25,7 +26,8 @@ type Config struct { Password string `json:"password" jsonschema:"description=Password for the specified database user." jsonschema_extras:"secret=true,order=2"` Database string `json:"database" jsonschema:"default=postgres,description=Logical database name to capture from." jsonschema_extras:"order=3"` Advanced advancedConfig `json:"advanced,omitempty" jsonschema:"title=Advanced Options,description=Options for advanced users. You should not typically need to modify these." jsonschema_extra:"advanced=true"` - // TODO(wgd): Add network tunnel support + + NetworkTunnel *networkTunnel.TunnelConfig `json:"networkTunnel,omitempty" jsonschema:"title=Network Tunnel,description=Connect to your system through an SSH server that acts as a bastion host for your network."` } type advancedConfig struct { @@ -71,6 +73,9 @@ func (c *Config) SetDefaults() { // ToURI converts the Config to a DSN string. func (c *Config) ToURI() string { var address = c.Address + if c.NetworkTunnel.InUse() { + address = "localhost:5432" + } var uri = url.URL{ Scheme: "postgres", Host: address, @@ -96,6 +101,13 @@ func connectPostgres(ctx context.Context, cfg *Config) (*sql.DB, error) { "database": cfg.Database, }).Info("connecting to database") + // If a network tunnel is configured, then try to start it before establishing connections. + if cfg.NetworkTunnel.InUse() { + if _, err := cfg.NetworkTunnel.Start(ctx, cfg.Address, "5432"); err != nil { + return nil, err + } + } + var db, err = sql.Open("pgx", cfg.ToURI()) if err != nil { return nil, fmt.Errorf("error opening database connection: %w", err) diff --git a/source-redshift-batch/.snapshots/TestSpec b/source-redshift-batch/.snapshots/TestSpec index e6d4343aed..786e0c9db4 100644 --- a/source-redshift-batch/.snapshots/TestSpec +++ b/source-redshift-batch/.snapshots/TestSpec @@ -61,6 +61,38 @@ "type": "object", "title": "Advanced Options", "description": "Options for advanced users. You should not typically need to modify these." + }, + "networkTunnel": { + "properties": { + "sshForwarding": { + "properties": { + "sshEndpoint": { + "type": "string", + "title": "SSH Endpoint", + "description": "Endpoint of the remote SSH server that supports tunneling (in the form of ssh://user@hostname[:port])", + "pattern": "^ssh://.+@.+$" + }, + "privateKey": { + "type": "string", + "title": "SSH Private Key", + "description": "Private key to connect to the remote SSH server.", + "multiline": true, + "secret": true + } + }, + "additionalProperties": false, + "type": "object", + "required": [ + "sshEndpoint", + "privateKey" + ], + "title": "SSH Forwarding" + } + }, + "additionalProperties": false, + "type": "object", + "title": "Network Tunnel", + "description": "Connect to your system through an SSH server that acts as a bastion host for your network." } }, "type": "object", diff --git a/source-redshift-batch/main.go b/source-redshift-batch/main.go index 2e3d0c2bad..860aca2959 100644 --- a/source-redshift-batch/main.go +++ b/source-redshift-batch/main.go @@ -9,6 +9,7 @@ import ( "net/url" "strings" + networkTunnel "github.com/estuary/connectors/go/network-tunnel" "github.com/estuary/connectors/go/schedule" schemagen "github.com/estuary/connectors/go/schema-gen" boilerplate "github.com/estuary/connectors/source-boilerplate" @@ -24,7 +25,8 @@ type Config struct { Password string `json:"password" jsonschema:"description=Password for the specified database user." jsonschema_extras:"secret=true,order=2"` Database string `json:"database" jsonschema:"default=dev,description=Logical database name to capture from." jsonschema_extras:"order=3"` Advanced advancedConfig `json:"advanced,omitempty" jsonschema:"title=Advanced Options,description=Options for advanced users. You should not typically need to modify these." jsonschema_extra:"advanced=true"` - // TODO(wgd): Add network tunnel support + + NetworkTunnel *networkTunnel.TunnelConfig `json:"networkTunnel,omitempty" jsonschema:"title=Network Tunnel,description=Connect to your system through an SSH server that acts as a bastion host for your network."` } type advancedConfig struct { @@ -70,6 +72,9 @@ func (c *Config) SetDefaults() { // ToURI converts the Config to a DSN string. func (c *Config) ToURI() string { var address = c.Address + if c.NetworkTunnel.InUse() { + address = "localhost:5432" + } var uri = url.URL{ Scheme: "postgres", Host: address, @@ -95,6 +100,13 @@ func connectRedshift(ctx context.Context, cfg *Config) (*sql.DB, error) { "database": cfg.Database, }).Info("connecting to database") + // If a network tunnel is configured, then try to start it before establishing connections. + if cfg.NetworkTunnel.InUse() { + if _, err := cfg.NetworkTunnel.Start(ctx, cfg.Address, "5432"); err != nil { + return nil, err + } + } + var db, err = sql.Open("pgx", cfg.ToURI()) if err != nil { return nil, fmt.Errorf("error opening database connection: %w", err)