Skip to content

Commit

Permalink
Secure clickhouse container
Browse files Browse the repository at this point in the history
* Adds TLS support for native clickhouse connections.
* Adds col Names for signal table.
  • Loading branch information
KevinJoiner committed May 16, 2024
1 parent 1ad6096 commit 0566b44
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 72 deletions.
2 changes: 1 addition & 1 deletion cmd/clickhouse-container/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func run(ctx context.Context) error {
defer chcontainer.Terminate(ctx)

if *migrate {
db, err := clickhouseinfra.GetClickhouseAsDB(ctx, chcontainer.ClickHouseContainer)
db, err := chcontainer.GetClickhouseAsDB(ctx)
if err != nil {
return fmt.Errorf("failed to get clickhouse db: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.0
require (
github.com/ClickHouse/clickhouse-go/v2 v2.23.1
github.com/docker/go-connections v0.5.0
github.com/mdelapenya/tlscert v0.1.0
github.com/pressly/goose/v3 v3.20.0
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.30.0
Expand Down Expand Up @@ -41,6 +42,7 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/madflojo/testcerts v1.1.1 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mfridman/interpolate v0.0.2 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,14 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/madflojo/testcerts v1.1.1 h1:YsSHWV79nMNZK0mJtwXjKoYHjJEbLPFefR8TxmmWupY=
github.com/madflojo/testcerts v1.1.1/go.mod h1:MW8sh39gLnkKh4K0Nc55AyHEDl9l/FBLDUsQhpmkuo0=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mdelapenya/tlscert v0.1.0 h1:YTpF579PYUX475eOL+6zyEO3ngLTOUWck78NBuJVXaM=
github.com/mdelapenya/tlscert v0.1.0/go.mod h1:wrbyM/DwbFCeCeqdPX/8c6hNOqQgbf0rUDErE1uD+64=
github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY=
github.com/mfridman/interpolate v0.0.2/go.mod h1:p+7uk6oE07mpE/Ik1b8EckO0O4ZXiGAfshKBWLUM9Xg=
github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
Expand Down
192 changes: 123 additions & 69 deletions pkg/clickhouseinfra/clickhouseinfra.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,46 @@
package clickhouseinfra

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"database/sql"
"fmt"
"os"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/docker/go-connections/nat"
"github.com/mdelapenya/tlscert"
"github.com/testcontainers/testcontainers-go"
chmodule "github.com/testcontainers/testcontainers-go/modules/clickhouse"
"github.com/testcontainers/testcontainers-go/wait"
)

const (
defaultUser = "default"
defaultDB = "dimo"
defaultUser = "default"
defaultDB = "dimo"
SecureNativePort = nat.Port("9440/tcp")
)

// ColInfo is a struct that holds the column meta information.
type ColInfo struct {
Name string
Type string
Comment string
}
var secureConfigXML = []byte(`
<clickhouse>
<tcp_port_secure>9440</tcp_port_secure>
<openSSL>
<server>
<certificateFile>/etc/clickhouse-server/certs/client.crt</certificateFile>
<privateKeyFile>/etc/clickhouse-server/certs/client.key</privateKeyFile>
<verificationMode>relaxed</verificationMode>
<caConfig>/etc/clickhouse-server/certs/ca.crt</caConfig>
</server>
</openSSL>
</clickhouse>
`)

// Container is a struct that holds the clickhouse and zookeeper containers.
type Container struct {
*chmodule.ClickHouseContainer
ZooKeeperContainer testcontainers.Container
RootCAs *x509.CertPool
}

// CreateClickHouseContainer function starts and testcontainer for clickhouse.
Expand All @@ -39,41 +50,52 @@ func CreateClickHouseContainer(ctx context.Context, userName, password string) (
if userName == "" {
userName = defaultUser
}
zkcontainer, zkPort, err := StartZooKeeperContainer(ctx)
if err != nil {
return nil, fmt.Errorf("failed to start zookeeper container: %w", err)
}
ipaddr, err := zkcontainer.ContainerIP(ctx)
caCert, clientCerts, err := createCert()
if err != nil {
_ = zkcontainer.Terminate(ctx)
return nil, fmt.Errorf("failed to get zookeeper container IP: %w", err)
return nil, fmt.Errorf("failed to create certs: %w", err)
}
clickHouseContainer, err := chmodule.RunContainer(ctx,
testcontainers.WithImage("clickhouse/clickhouse-server:23.3.8.21-alpine"),
chmodule.WithDatabase(defaultDB),
chmodule.WithUsername(userName),
chmodule.WithPassword(password),
chmodule.WithZookeeper(ipaddr, zkPort),
WithCerts(caCert, clientCerts),
)
if err != nil {
_ = zkcontainer.Terminate(ctx)
return nil, fmt.Errorf("failed to start clickhouse container: %w", err)
}
return &Container{clickHouseContainer, zkcontainer}, nil
rootCAs, _ := x509.SystemCertPool()
if rootCAs == nil {
rootCAs = x509.NewCertPool()
}

// add our cert to the system pool
rootCAs.AppendCertsFromPEM(caCert.Bytes)
rootCAs.AppendCertsFromPEM(clientCerts.Bytes)
rootCAs.AppendCertsFromPEM(clientCerts.KeyBytes)
rootCAs.AppendCertsFromPEM(caCert.Cert.AuthorityKeyId)
return &Container{
ClickHouseContainer: clickHouseContainer,
RootCAs: rootCAs,
}, nil
}

// GetClickHouseAsConn function returns a clickhouse.Conn connection which uses native ClickHouse protocol.
func GetClickHouseAsConn(container *chmodule.ClickHouseContainer) (clickhouse.Conn, error) {
host, err := container.ConnectionHost(context.TODO())
func (c *Container) GetClickHouseAsConn() (clickhouse.Conn, error) {
host, err := c.ConnectionHost(context.TODO())
if err != nil {
return nil, fmt.Errorf("failed to get clickhouse host: %w", err)
}
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{host},
Protocol: clickhouse.Native,
Addr: []string{host},
Auth: clickhouse.Auth{
Username: container.User,
Password: container.Password,
Database: container.DbName,
Username: c.User,
Password: c.Password,
Database: c.DbName,
},
TLS: &tls.Config{
RootCAs: c.RootCAs,
},
})
if err != nil {
Expand All @@ -83,17 +105,21 @@ func GetClickHouseAsConn(container *chmodule.ClickHouseContainer) (clickhouse.Co
}

// GetClickhouseAsDB function returns a sql.DB connection which allows interfaceing with the stdlib database/sql package.
func GetClickhouseAsDB(ctx context.Context, container *chmodule.ClickHouseContainer) (*sql.DB, error) {
host, err := container.ConnectionHost(ctx)
func (c *Container) GetClickhouseAsDB(ctx context.Context) (*sql.DB, error) {
host, err := c.ConnectionHost(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get clickhouse host: %w", err)
}
dbConn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{host},
Protocol: clickhouse.Native,
Addr: []string{host},
Auth: clickhouse.Auth{
Username: container.User,
Password: container.Password,
Database: container.DbName,
Username: c.User,
Password: c.Password,
Database: c.DbName,
},
TLS: &tls.Config{
RootCAs: c.RootCAs,
},
})
const retries = 3
Expand All @@ -109,53 +135,81 @@ func GetClickhouseAsDB(ctx context.Context, container *chmodule.ClickHouseContai
return nil, fmt.Errorf("failed to ping clickhouse after %d retries: %w", retries, err)
}

// Terminate function terminates the clickhouse and zookeeper containers.
// ConnectionHost returns the host and port of the clickhouse container, using the default, native 9000 port, and
// obtaining the host and exposed port from the container
func (c *Container) ConnectionHost(ctx context.Context) (string, error) {
host, err := c.Host(ctx)
if err != nil {
return "", err
}

port, err := c.MappedPort(ctx, SecureNativePort)
if err != nil {
return "", err
}

return host + ":" + port.Port(), nil
}

// Terminate function terminates the clickhouse containers.
// If an error occurs, it will be printed to stderr.
func (c *Container) Terminate(ctx context.Context) {
if err := c.ClickHouseContainer.Terminate(ctx); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "failed to terminate clickhouse container: %v", err)
}
if err := c.ZooKeeperContainer.Terminate(ctx); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "failed to terminate clickhouse container: %v", err)
}
}

// GetCurrentCols returns the current columns of the table.
func GetCurrentCols(ctx context.Context, chConn clickhouse.Conn, tableName string) ([]ColInfo, error) {
selectStm := fmt.Sprintf("SELECT name, type, comment FROM system.columns where table='%s'", tableName)
rows, err := chConn.Query(ctx, selectStm)
if err != nil {
return nil, fmt.Errorf("failed to show table: %w", err)
func createCert() (*tlscert.Certificate, *tlscert.Certificate, error) {
// Generate a certificate for localhost and save it to disk.
caCert := tlscert.SelfSignedFromRequest(tlscert.Request{
Name: "ca-cert",
Host: "localhost",
SubjectCommonName: "localhost",
IsCA: true,
ValidFor: time.Hour * 24 * 365 * 10,
})
if caCert == nil {
return nil, nil, fmt.Errorf("failed to generate CA certificate")
}
defer rows.Close() //nolint // we are not interested in the error here
colInfos := []ColInfo{}
count := 0
for rows.Next() {
count++
var info ColInfo
err := rows.Scan(&info.Name, &info.Type, &info.Comment)
if err != nil {
return nil, fmt.Errorf("failed to scan table: %w", err)
}
colInfos = append(colInfos, info)

cert := tlscert.SelfSignedFromRequest(tlscert.Request{
Name: "test-cert",
SubjectCommonName: "test-cert",
Host: "localhost",
Parent: caCert,
ValidFor: time.Hour * 24 * 365,
})
if cert == nil {
return nil, nil, fmt.Errorf("failed to generate client certificate")
}
return colInfos, nil
}

// StartZooKeeperContainer function starts a zookeeper container. The caller is responsible for terminating the container.
func StartZooKeeperContainer(ctx context.Context) (testcontainers.Container, string, error) {
zkPort := nat.Port("2181/tcp")
return caCert, cert, nil
}

zkcontainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
ExposedPorts: []string{zkPort.Port()},
Image: "zookeeper:3.7",
WaitingFor: wait.ForListeningPort(zkPort),
},
Started: true,
})
if err != nil {
return zkcontainer, "", fmt.Errorf("failed to start zookeeper container: %w", err)
// WithCerts is a customize request option that adds the certificates to the clickhouse container.
func WithCerts(caCert, clientCerts *tlscert.Certificate) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) {
req.ExposedPorts = append(req.ExposedPorts, SecureNativePort.Port())
ca := testcontainers.ContainerFile{
Reader: bytes.NewReader(caCert.Bytes),
ContainerFilePath: "/etc/clickhouse-server/certs/ca.crt",
FileMode: 0o755,
}
cert := testcontainers.ContainerFile{
Reader: bytes.NewReader(clientCerts.Bytes),
ContainerFilePath: "/etc/clickhouse-server/certs/client.crt",
FileMode: 0o755,
}
key := testcontainers.ContainerFile{
Reader: bytes.NewReader(clientCerts.KeyBytes),
ContainerFilePath: "/etc/clickhouse-server/certs/client.key",
FileMode: 0o755,
}
config := testcontainers.ContainerFile{
Reader: bytes.NewReader(secureConfigXML),
ContainerFilePath: "/etc/clickhouse-server/config.d/aconfig.xml",
FileMode: 0o755,
}
req.Files = append(req.Files, ca, cert, key, config)
}
return zkcontainer, zkPort.Port(), nil
}
37 changes: 37 additions & 0 deletions pkg/clickhouseinfra/colinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package clickhouseinfra

import (
"context"
"fmt"

"github.com/ClickHouse/clickhouse-go/v2"
)

// ColInfo is a struct that holds the column meta information.
type ColInfo struct {
Name string
Type string
Comment string
}

// GetCurrentCols returns the current columns of the table.
func GetCurrentCols(ctx context.Context, chConn clickhouse.Conn, tableName string) ([]ColInfo, error) {
selectStm := fmt.Sprintf("SELECT name, type, comment FROM system.columns where table='%s'", tableName)
rows, err := chConn.Query(ctx, selectStm)
if err != nil {
return nil, fmt.Errorf("failed to show table: %w", err)
}
defer rows.Close() //nolint // we are not interested in the error here
colInfos := []ColInfo{}
count := 0
for rows.Next() {
count++
var info ColInfo
err := rows.Scan(&info.Name, &info.Type, &info.Comment)
if err != nil {
return nil, fmt.Errorf("failed to scan table: %w", err)
}
colInfos = append(colInfos, info)
}
return colInfos, nil
}
4 changes: 2 additions & 2 deletions pkg/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ func TestMigration(t *testing.T) {

defer chcontainer.Terminate(ctx)

db, err := clickhouseinfra.GetClickhouseAsDB(ctx, chcontainer.ClickHouseContainer)
db, err := chcontainer.GetClickhouseAsDB(ctx)
require.NoError(t, err, "Failed to get clickhouse db")

err = migrations.RunGoose(ctx, []string{"up", "-v"}, db)
require.NoError(t, err, "Failed to run migration")

conn, err := clickhouseinfra.GetClickHouseAsConn(chcontainer.ClickHouseContainer)
conn, err := chcontainer.GetClickHouseAsConn()
require.NoError(t, err, "Failed to get clickhouse connection")

// Iterate over the rows and check the column names
Expand Down
13 changes: 13 additions & 0 deletions pkg/vss/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@ import (
const (
// TableName is the name of the distributed table in Clickhouse.
TableName = "signal"

// TokenIDCol is the name of the token_id column in Clickhouse.
TokenIDCol = "token_id"
// TimestampCol is the name of the timestamp column in Clickhouse.
TimestampCol = "timestamp"
// SourceCol is the name of the source column in Clickhouse.
SourceCol = "source"
// NameCol is the name of the name column in Clickhouse.
NameCol = "name"
// ValueNumberCol is the name of the value_number column in Clickhouse.
ValueNumberCol = "value_number"
// ValueStringCol is the name of the value_string column in Clickhouse.
ValueStringCol = "value_string"
)

// Signal represents a single signal collected from a device.
Expand Down

0 comments on commit 0566b44

Please sign in to comment.