From baa0e65aef491fc706a3fd2fff4b1be2450e7339 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Mon, 26 Sep 2022 15:44:36 +0100 Subject: [PATCH] Fix missing application integrations migration. Closes #5. --- main.go | 396 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 395 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 3eb69a0..a19f594 100644 --- a/main.go +++ b/main.go @@ -101,6 +101,7 @@ func run(cmd *cobra.Command, args []string) error { migrateOrganizations() migrateOrganizationUsers() migrateApplications() + migrateApplicationIntegrations() for _, nsConfigFile := range nsConfigFiles { log.Printf("Reading NS configuration file: %s", nsConfigFile) @@ -403,7 +404,400 @@ func migrateApplications() { } } -func migrateApplicationIntegrations() +func migrateApplicationIntegrations() { + log.Println("Migrating application integrations") + + type Intergration struct { + ID int64 `db:"id"` + CreatedAt time.Time `db:"created_at"` + UpdatedAt time.Time `db:"updated_at"` + ApplicationID int64 `db:"application_id"` + Kind string `db:"kind"` + Settings json.RawMessage `db:"settings"` + } + + ints := []Intergration{} + if err := asDB.Select(&ints, "select id, created_at, updated_at, application_id, kind, settings from integration"); err != nil { + panic(err) + } + + for _, i := range ints { + _, err := csDB.Exec(` + insert into application_integration ( + application_id, + kind, + created_at, + updated_at, + configuration + ) values ($1, $2, $3, $4, $5)`, + intToUUID(i.ApplicationID), + getIntegrationKind(i.Kind), + i.CreatedAt, + i.UpdatedAt, + getIntegrationConfiguration(i.Kind, i.Settings), + ) + if err != nil { + panic(err) + } + } + +} + +func getIntegrationKind(k string) string { + switch k { + case "HTTP": + return "Http" + case "INFLUXDB": + return "InfluxDb" + case "THINGSBOARD": + return "ThingsBoard" + case "MYDEVICES": + return "MyDevices" + case "LORACLOUD": + return "LoraCloud" + case "GCP_PUBSUB": + return "GcpPubSub" + case "AWS_SNS": + return "AwsSns" + case "AZURE_SERVICE_BUS": + return "AzureServiceBus" + case "PILOT_THINGS": + return "PilotThings" + default: + log.Fatalf("Unknown integration kind: %s", k) + } + + return "" +} + +func getIntegrationConfiguration(kind string, raw json.RawMessage) json.RawMessage { + out := make(map[string]interface{}) + + // HTTP + if kind == "HTTP" { + type ASHttpConfiguration struct { + Headers map[string]string `json:"headers"` + EventEndpointURL string `json:"eventEndpointURL"` + Marshaler string `json:"marshaler"` + } + + type CSHttpConfiguration struct { + Headers map[string]string `json:"headers"` + EventEndpointURL string `json:"event_endpoint_url"` + Json bool `json:"json"` + } + + var as ASHttpConfiguration + if err := json.Unmarshal(raw[:], &as); err != nil { + panic(err) + } + + out["Http"] = CSHttpConfiguration{ + Headers: as.Headers, + EventEndpointURL: as.EventEndpointURL, + Json: as.Marshaler == "JSON", + } + } + + // InfluxDB + if kind == "INFLUXDB" { + type ASInfluxDBConfiguration struct { + Endpoint string `json:"endpoint"` + Version int `json:"version"` + DB string `json:"db"` + Username string `json:"username"` + Password string `json:"password"` + RetentionPolicyName string `json:"retentionPolicyName"` + Precision string `json:"precision"` + Token string `json:"token"` + Organization string `json:"org"` + Bucket string `json:"bucket"` + } + + type CSInfluxDBConfiguration struct { + Endpoint string `json:"endpoint"` + Version int `json:"version"` + DB string `json:"db"` + Username string `json:"username"` + Password string `json:"password"` + RetentionPolicyName string `json:"retention_policy_name"` + Precision int `json:"precision"` + Token string `json:"token"` + Organization string `json:"organization"` + Bucket string `json:"bucket"` + } + + var as ASInfluxDBConfiguration + if err := json.Unmarshal(raw[:], &as); err != nil { + panic(err) + } + + out["InfluxDb"] = CSInfluxDBConfiguration{ + Endpoint: as.Endpoint, + Version: as.Version - 1, + DB: as.DB, + Username: as.Username, + Password: as.Password, + RetentionPolicyName: as.RetentionPolicyName, + Precision: map[string]int{ + "ns": 0, + "u": 1, + "ms": 2, + "s": 3, + "m": 4, + "h": 5, + }[as.Precision], + Token: as.Token, + Organization: as.Organization, + Bucket: as.Bucket, + } + } + + // ThingsBoard + if kind == "THINGSBOARD" { + type ASThingsBoardConfiguration struct { + Server string `json:"server"` + } + + type CSThingsBoardConfiguration struct { + Server string `json:"server"` + } + + var as ASThingsBoardConfiguration + if err := json.Unmarshal(raw[:], &as); err != nil { + panic(err) + } + + out["ThingsBoard"] = CSThingsBoardConfiguration{ + Server: as.Server, + } + } + + // myDevices + if kind == "MYDEVICES" { + type ASMyDevicesConfiguration struct { + Endpoint string `json:"endpoint"` + } + + type CSMyDevicesConfiguration struct { + Endpoint string `json:"endpoint"` + } + + var as ASMyDevicesConfiguration + if err := json.Unmarshal(raw[:], &as); err != nil { + panic(err) + } + + out["MyDevices"] = CSMyDevicesConfiguration{ + Endpoint: as.Endpoint, + } + } + + // LoRa Cloud + if kind == "LORACLOUD" { + type ASLoRaCloudConfiguration struct { + // Geolocation. + Geolocation bool `json:"geolocation"` + GeolocationToken string `json:"geolocationToken"` + GeolocationBufferTTL int `json:"geolocationBufferTTL"` + GeolocationMinBufferSize int `json:"geolocationMinBufferSize"` + GeolocationTDOA bool `json:"geolocationTDOA"` + GeolocationRSSI bool `json:"geolocationRSSI"` + GeolocationGNSS bool `json:"geolocationGNSS"` + GeolocationGNSSPayloadField string `json:"geolocationGNSSPayloadField"` + GeolocationGNSSUseRxTime bool `json:"geolicationGNSSUseRxTime"` + GeolocationWifi bool `json:"geolocationWifi"` + GeolocationWifiPayloadField string `json:"geolocationWifiPayloadField"` + + // Device Application Services. + DAS bool `json:"das"` + DASToken string `json:"dasToken"` + DASModemPort uint8 `json:"dasModemPort"` + DASGNSSPort uint8 `json:"dasGNSSPort"` + DASGNSSUseRxTime bool `json:"dasGNSSUseRxTime"` + DASStreamingGeolocWorkaround bool `json:"dasStreamingGeolocWorkaround"` + } + + type ModemGeolocationServices struct { + Token string `json:"token"` + ModemEnabled bool `json:"modem_enabled"` + ModemPort int `json:"modem_port"` + GnssPort int `json:"gnss_port"` + GnssUseRxTime bool `json:"gnss_use_rx_time"` + ParseTlv bool `json:"parse_tlv"` + GeolocationBufferTTL int `json:"geolocation_buffer_ttl"` + GeolocationMinBufferSize int `json:"geolocation_min_buffer_size"` + GeolocationTDOA bool `json:"geolocation_tdoa"` + GeolocationRSSI bool `json:"geolocation_rssi"` + GeolocationGNSS bool `json:"geolocation_gnss"` + GeolocationGNSSPayloadField string `json:"geolocation_gnss_payload_field"` + GeolocationGNSSUseRxTime bool `json:"geolocation_gnss_use_rx_time"` + GeolocationWifi bool `json:"geolocation_wifi"` + GeolocationWifiPayloadField string `json:"geolocation_wifi_payload_field"` + } + + type CSLoRaCloudConfiguration struct { + ModemGeolocationServices ModemGeolocationServices `json:"modem_geolocation_services"` + } + + var as ASLoRaCloudConfiguration + if err := json.Unmarshal(raw[:], &as); err != nil { + panic(err) + } + + out["LoraCloud"] = CSLoRaCloudConfiguration{ + ModemGeolocationServices: ModemGeolocationServices{ + Token: as.DASToken, + ModemEnabled: as.DAS, + ModemPort: int(as.DASModemPort), + GnssPort: int(as.DASGNSSPort), + GnssUseRxTime: as.DASGNSSUseRxTime, + ParseTlv: as.DASStreamingGeolocWorkaround, + GeolocationBufferTTL: as.GeolocationBufferTTL, + GeolocationMinBufferSize: as.GeolocationMinBufferSize, + GeolocationTDOA: as.GeolocationTDOA, + GeolocationRSSI: as.GeolocationRSSI, + GeolocationGNSS: as.GeolocationGNSS, + GeolocationGNSSPayloadField: as.GeolocationGNSSPayloadField, + GeolocationGNSSUseRxTime: as.GeolocationGNSSUseRxTime, + GeolocationWifi: as.GeolocationWifi, + GeolocationWifiPayloadField: as.GeolocationWifiPayloadField, + }, + } + } + + // GCP Pub/Sub + if kind == "GCP_PUBSUB" { + type ASGCPConfiguration struct { + Marshaler string `mapstructure:"marshaler" json:"marshaler"` + CredentialsFileBytes []byte `mapstructure:"-" json:"credentialsFile"` + ProjectID string `mapstructure:"project_id" json:"projectID"` + TopicName string `mapstructure:"topic_name" json:"topicName"` + } + + type CSGCPConfiguration struct { + Encoding int `json:"encoding"` + CredentialsFile string `json:"credentials_file"` + ProjectID string `json:"project_id"` + TopicName string `json:"topic_name"` + } + + var as ASGCPConfiguration + if err := json.Unmarshal(raw[:], &as); err != nil { + panic(err) + } + + encoding := 0 + if as.Marshaler == "PROTOBUF" { + encoding = 1 + } + + out["GcpPubSub"] = CSGCPConfiguration{ + Encoding: encoding, + CredentialsFile: string(as.CredentialsFileBytes), + ProjectID: as.ProjectID, + TopicName: as.TopicName, + } + } + + // AWS SNS + if kind == "AWS_SNS" { + type ASAWSConfiguration struct { + Marshaler string `mapstructure:"marshaler" json:"marshaler"` + AWSRegion string `mapstructure:"aws_region" json:"region"` + AWSAccessKeyID string `mapstructure:"aws_access_key_id" json:"accessKeyID"` + AWSSecretAccessKey string `mapstructure:"aws_secret_access_key" json:"secretAccessKey"` + TopicARN string `mapstructure:"topic_arn" json:"topicARN"` + } + + type CSAWSConfiguration struct { + Encoding int `json:"encoding"` + Region string `json:"region"` + AccessKeyID string `json:"access_key_id"` + SecretAccessKey string `json:"secret_access_key"` + TopicARN string `json:"topic_arn"` + } + + var as ASAWSConfiguration + if err := json.Unmarshal(raw[:], &as); err != nil { + panic(err) + } + + encoding := 0 + if as.Marshaler == "PROTOBUF" { + encoding = 1 + } + + out["AwsSns"] = CSAWSConfiguration{ + Encoding: encoding, + Region: as.AWSRegion, + AccessKeyID: as.AWSAccessKeyID, + SecretAccessKey: as.AWSSecretAccessKey, + TopicARN: as.TopicARN, + } + } + + // Azure service-bus + if kind == "AZURE_SERVICE_BUS" { + type ASAzureConfiguration struct { + Marshaler string `mapstructure:"marshaler" json:"marshaler"` + ConnectionString string `mapstructure:"connection_string" json:"connectionString"` + PublishName string `mapstructure:"publish_name" json:"publishName"` + } + + type CSAzureConfiguration struct { + Encoding int `json:"encoding"` + ConnectionString string `json:"connection_string"` + PublishName string `json:"publish_name"` + } + + var as ASAzureConfiguration + if err := json.Unmarshal(raw[:], &as); err != nil { + panic(err) + } + + encoding := 0 + if as.Marshaler == "PROTOBUF" { + encoding = 1 + } + + out["AzureServiceBus"] = CSAzureConfiguration{ + Encoding: encoding, + ConnectionString: as.ConnectionString, + PublishName: as.PublishName, + } + } + + // Pilot Things + if kind == "PILOT_THINGS" { + type ASPilotThingsConfiguration struct { + Server string `json:"server"` + Token string `json:"token"` + } + + type CSPilotThingsConfiguration struct { + Server string `json:"server"` + Token string `json:"token"` + } + + var as ASPilotThingsConfiguration + if err := json.Unmarshal(raw[:], &as); err != nil { + panic(err) + } + + out["PilotThings"] = CSPilotThingsConfiguration{ + Server: as.Server, + Token: as.Token, + } + } + + b, err := json.Marshal(out) + if err != nil { + panic(err) + } + + return b +} func migrateGateways() { log.Println("Migrating gateways")