diff --git a/tools/astarte_dev_tool/config/config.ex b/tools/astarte_dev_tool/config/config.ex deleted file mode 100644 index 8450bf75d..000000000 --- a/tools/astarte_dev_tool/config/config.ex +++ /dev/null @@ -1,21 +0,0 @@ -import Config - -config :logger, :console, - format: {PrettyLog.LogfmtFormatter, :format}, - metadata: [:realm, :datacenter, :replication_factor, :module, :function, :tag] - -# Configure your database -config :astarte_dev_tool, Astarte.DataAccess.Repo, - # Overrides the default type `bigserial` used for version attribute in schema migration - migration_primary_key: [name: :id, type: :binary_id], - contact_points: [System.get_env("CASSANDRA_DB_HOST") || "localhost"], - keyspace: "astarte", - port: System.get_env("CASSANDRA_DB_PORT") || 9042, - # Waiting time in milliseconds for the database connection - sync_connect: 5000, - log: :info, - stacktrace: true, - show_sensitive_data_on_connection_error: true, - pool_size: 10 - -config :astarte_dev_tool, ecto_repos: [Astarte.DataAccess.Repo] diff --git a/tools/astarte_dev_tool/config/config.exs b/tools/astarte_dev_tool/config/config.exs new file mode 100644 index 000000000..488affe74 --- /dev/null +++ b/tools/astarte_dev_tool/config/config.exs @@ -0,0 +1,14 @@ +import Config + +config :logger, :console, + format: {PrettyLog.LogfmtFormatter, :format}, + metadata: [:realm, :datacenter, :replication_factor, :module, :function, :tag] + +config :astarte_dev_tool, :xandra, + nodes: [ + "#{System.get_env("CASSANDRA_DB_HOST") || "localhost"}:#{System.get_env("CASSANDRA_DB_PORT") || 9042}" + ], + sync_connect: 5000, + log: :info, + stacktrace: true, + pool_size: 10 diff --git a/tools/astarte_dev_tool/lib/commands/realm/create.ex b/tools/astarte_dev_tool/lib/commands/realm/create.ex index 49151e36c..a40d2c412 100644 --- a/tools/astarte_dev_tool/lib/commands/realm/create.ex +++ b/tools/astarte_dev_tool/lib/commands/realm/create.ex @@ -18,18 +18,37 @@ defmodule AstarteDevTool.Commands.Realm.Create do @moduledoc false - alias AstarteDevTool.Constants.System, as: Constants + alias Astarte.DataAccess.Database + alias Astarte.DataAccess.Realm, as: RealmDataAccess - def exec(path, volumes \\ false) do - args = - if volumes, - do: Constants.command_down_args() ++ ["-v"], - else: Constants.command_down_args() - - case System.cmd(Constants.command(), args, Constants.base_opts() ++ [cd: path]) do - {_result, 0} -> :ok - {:error, reason} -> {:error, "System is not up and running: #{reason}"} - {result, exit_code} -> {:error, "Cannot exec system.down: #{result}, #{exit_code}"} + @start_apps [ + :logger, + :crypto, + :ssl, + :xandra, + :astarte_data_access + ] + def exec( + [{_, _} | _] = nodes, + realm_name, + replication \\ 1, + max_retention \\ 1, + public_key_pem \\ "@@@@", + device_registration_limit \\ nil, + realm_schema_version \\ 10 + ) do + with :ok <- Enum.each(@start_apps, &Application.ensure_all_started/1), + {:ok, _client} <- Database.connect(cassandra_nodes: nodes), + :ok <- + RealmDataAccess.create_realm( + realm_name, + replication, + max_retention, + public_key_pem, + device_registration_limit, + realm_schema_version + ) do + :ok end end end diff --git a/tools/astarte_dev_tool/lib/data/realm.ex b/tools/astarte_dev_tool/lib/data/realm.ex deleted file mode 100644 index 4cd842565..000000000 --- a/tools/astarte_dev_tool/lib/data/realm.ex +++ /dev/null @@ -1,1636 +0,0 @@ -# -# This file is part of Astarte. -# -# Copyright 2024 SECO Mind Srl -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -defmodule AstarteDevTool.Data.Realm do - require Logger - alias Astarte.Core.Realm - alias Astarte.Core.CQLUtils - alias Astarte.Housekeeping.Config - alias Astarte.Housekeeping.Migrator - - @default_replication_factor 1 - - def create( - realm_name, - public_key_pem, - nil = _replication_factor, - device_limit, - max_retention, - opts - ) do - create( - realm_name, - public_key_pem, - @default_replication_factor, - device_limit, - max_retention, - opts - ) - end - - def create(realm_name, public_key_pem, replication, device_limit, max_retention, opts) do - with :ok <- validate_realm_name(realm_name), - keyspace_name = - CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!()), - :ok <- Xandra.Cluster.run(:xandra, &check_replication(&1, replication)), - {:ok, replication_map_str} <- build_replication_map_str(replication) do - if opts[:async] do - {:ok, _pid} = - Task.start(fn -> - do_create( - realm_name, - keyspace_name, - public_key_pem, - replication_map_str, - device_limit, - max_retention - ) - end) - - :ok - else - do_create( - realm_name, - keyspace_name, - public_key_pem, - replication_map_str, - device_limit, - max_retention - ) - end - end - end - - def delete(realm_name, opts \\ []) do - keyspace_name = - CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!()) - - if opts[:async] do - {:ok, _pid} = Task.start(fn -> do_delete(realm_name, keyspace_name) end) - - :ok - else - do_delete(realm_name, keyspace_name) - end - end - - def update_public_key(realm_name, new_public_key) do - with :ok <- validate_realm_name(realm_name), - keyspace_name = - CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!()) do - Xandra.Cluster.run(:xandra, fn conn -> - do_update_public_key(conn, keyspace_name, new_public_key) - end) - end - end - - def delete_device_registration_limit(realm_name) do - with :ok <- validate_realm_name(realm_name) do - Xandra.Cluster.run(:xandra, fn conn -> - do_delete_device_registration_limit(conn, realm_name) - end) - end - end - - def set_device_registration_limit(realm_name, new_limit) do - with :ok <- validate_realm_name(realm_name) do - Xandra.Cluster.run(:xandra, fn conn -> - do_set_device_registration_limit(conn, realm_name, new_limit) - end) - end - end - - defp build_replication_map_str(replication_factor) - when is_integer(replication_factor) and replication_factor > 0 do - replication_map_str = - "{'class': 'SimpleStrategy', 'replication_factor': #{replication_factor}}" - - {:ok, replication_map_str} - end - - defp build_replication_map_str(datacenter_replication_factors) - when is_map(datacenter_replication_factors) do - datacenter_replications_str = - Enum.map(datacenter_replication_factors, fn {datacenter, replication_factor} -> - "'#{datacenter}': #{replication_factor}" - end) - |> Enum.join(",") - - replication_map_str = "{'class': 'NetworkTopologyStrategy', #{datacenter_replications_str}}" - - {:ok, replication_map_str} - end - - defp build_replication_map_str(_invalid_replication) do - {:error, :invalid_replication} - end - - defp validate_realm_name(realm_name) do - if Realm.valid_name?(realm_name) do - :ok - else - _ = - Logger.warning("Invalid realm name.", - tag: "invalid_realm_name", - realm: realm_name - ) - - {:error, :realm_not_allowed} - end - end - - defp do_delete(realm_name, keyspace_name) do - Xandra.Cluster.run(:xandra, [timeout: 60_000], fn conn -> - with :ok <- verify_realm_deletion_preconditions(conn, keyspace_name), - :ok <- execute_realm_deletion(conn, realm_name, keyspace_name) do - :ok - else - {:error, reason} -> - _ = - Logger.warning("Cannot delete realm: #{inspect(reason)}.", - tag: "realm_deletion_failed", - realm: realm_name - ) - - {:error, reason} - end - end) - end - - defp verify_realm_deletion_preconditions(conn, keyspace_name) do - with :ok <- check_no_connected_devices(conn, keyspace_name) do - :ok - else - {:error, reason} -> - _ = - Logger.warning("Realm deletion preconditions are not satisfied: #{inspect(reason)}.", - tag: "realm_deletion_preconditions_rejected", - realm: keyspace_name - ) - - {:error, reason} - end - end - - defp execute_realm_deletion(conn, realm_name, keyspace_name) do - with :ok <- delete_keyspace(conn, keyspace_name), - :ok <- remove_realm(conn, realm_name) do - :ok - else - {:error, reason} -> - _ = - Logger.warning("Cannot delete realm: #{inspect(reason)}.", - tag: "realm_deletion_failed", - realm: realm_name - ) - - {:error, reason} - end - end - - defp do_create( - realm_name, - keyspace_name, - public_key_pem, - replication_map_str, - device_limit, - max_retention - ) do - Xandra.Cluster.run(:xandra, [timeout: 60_000], fn conn -> - with :ok <- validate_realm_name(realm_name), - :ok <- create_keyspace(conn, keyspace_name, replication_map_str), - {:ok, keyspace_conn} <- build_keyspace_conn(conn, keyspace_name), - :ok <- create_kv_store(keyspace_conn), - :ok <- create_names_table(keyspace_conn), - :ok <- create_devices_table(keyspace_conn), - :ok <- create_endpoints_table(keyspace_conn), - :ok <- create_interfaces_table(keyspace_conn), - :ok <- create_individual_properties_table(keyspace_conn), - :ok <- create_simple_triggers_table(keyspace_conn), - :ok <- create_grouped_devices_table(keyspace_conn), - :ok <- create_deletion_in_progress_table(keyspace_conn), - :ok <- insert_realm_public_key(keyspace_conn, public_key_pem), - :ok <- insert_realm_astarte_schema_version(keyspace_conn), - :ok <- insert_realm(conn, realm_name, device_limit), - :ok <- insert_datastream_max_retention(keyspace_conn, max_retention) do - :ok - else - {:error, reason} -> - _ = - Logger.warning("Cannot create realm: #{inspect(reason)}.", - tag: "realm_creation_failed", - realm: realm_name - ) - - {:error, reason} - end - end) - end - - defp build_keyspace_conn(conn, realm_name) do - case validate_realm_name(realm_name) do - :ok -> - {:ok, {conn, realm_name}} - - {:error, reason} -> - _ = - Logger.warning("Cannot build realm conn: #{inspect(reason)}.", - tag: "build_keyspace_conn_error", - realm: realm_name - ) - - {:error, reason} - end - end - - defp check_no_connected_devices(conn, realm_name) do - query = """ - SELECT * FROM #{realm_name}.devices WHERE connected = true LIMIT 1 ALLOW FILTERING; - """ - - with {:ok, %Xandra.Page{} = page} <- - Xandra.execute(conn, query, %{}, consistency: :one) do - if Enum.empty?(page) do - :ok - else - _ = - Logger.warning("Realm #{realm_name} still has connected devices.", - tag: "connected_devices_present" - ) - - {:error, :connected_devices_present} - end - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp delete_keyspace(conn, realm_name) do - query = """ - DROP KEYSPACE #{realm_name} - """ - - with {:ok, %Xandra.SchemaChange{}} <- CSystem.execute_schema_change(conn, query) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_keyspace(conn, realm_name, replication_map_str) do - query = """ - CREATE KEYSPACE #{realm_name} - WITH replication = #{replication_map_str} - AND durable_writes = true; - """ - - with {:ok, %Xandra.SchemaChange{}} <- CSystem.execute_schema_change(conn, query) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_kv_store({conn, realm}) do - query = """ - CREATE TABLE #{realm}.kv_store ( - group varchar, - key varchar, - value blob, - - PRIMARY KEY ((group), key) - ); - """ - - with {:ok, %Xandra.SchemaChange{}} <- CSystem.execute_schema_change(conn, query) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_names_table({conn, realm}) do - query = """ - CREATE TABLE #{realm}.names ( - object_name varchar, - object_type int, - object_uuid uuid, - PRIMARY KEY ((object_name), object_type) - ); - """ - - with {:ok, %Xandra.SchemaChange{}} <- CSystem.execute_schema_change(conn, query) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_devices_table({conn, realm}) do - query = """ - CREATE TABLE #{realm}.devices ( - device_id uuid, - aliases map, - introspection map, - introspection_minor map, - old_introspection map>, int>, - protocol_revision int, - first_registration timestamp, - credentials_secret ascii, - inhibit_credentials_request boolean, - cert_serial ascii, - cert_aki ascii, - first_credentials_request timestamp, - last_connection timestamp, - last_disconnection timestamp, - connected boolean, - pending_empty_cache boolean, - total_received_msgs bigint, - total_received_bytes bigint, - exchanged_bytes_by_interface map>, bigint>, - exchanged_msgs_by_interface map>, bigint>, - last_credentials_request_ip inet, - last_seen_ip inet, - attributes map, - - groups map, - - PRIMARY KEY (device_id) - ); - """ - - with {:ok, %Xandra.SchemaChange{}} <- CSystem.execute_schema_change(conn, query) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_endpoints_table({conn, realm}) do - query = """ - CREATE TABLE #{realm}.endpoints ( - interface_id uuid, - endpoint_id uuid, - interface_name ascii, - interface_major_version int, - interface_minor_version int, - interface_type int, - endpoint ascii, - value_type int, - reliability int, - retention int, - expiry int, - database_retention_ttl int, - database_retention_policy int, - allow_unset boolean, - explicit_timestamp boolean, - description varchar, - doc varchar, - - PRIMARY KEY ((interface_id), endpoint_id) - ); - """ - - with {:ok, %Xandra.SchemaChange{}} <- CSystem.execute_schema_change(conn, query) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_interfaces_table({conn, realm}) do - query = """ - CREATE TABLE #{realm}.interfaces ( - name ascii, - major_version int, - minor_version int, - interface_id uuid, - storage_type int, - storage ascii, - type int, - ownership int, - aggregation int, - automaton_transitions blob, - automaton_accepting_states blob, - description varchar, - doc varchar, - - PRIMARY KEY (name, major_version) - ); - """ - - with {:ok, %Xandra.SchemaChange{}} <- CSystem.execute_schema_change(conn, query) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_individual_properties_table({conn, realm}) do - query = """ - CREATE TABLE #{realm}.individual_properties ( - device_id uuid, - interface_id uuid, - endpoint_id uuid, - path varchar, - reception_timestamp timestamp, - reception_timestamp_submillis smallint, - - double_value double, - integer_value int, - boolean_value boolean, - longinteger_value bigint, - string_value varchar, - binaryblob_value blob, - datetime_value timestamp, - doublearray_value list, - integerarray_value list, - booleanarray_value list, - longintegerarray_value list, - stringarray_value list, - binaryblobarray_value list, - datetimearray_value list, - - PRIMARY KEY((device_id, interface_id), endpoint_id, path) - ) - """ - - with {:ok, %Xandra.SchemaChange{}} <- CSystem.execute_schema_change(conn, query) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_simple_triggers_table({conn, realm}) do - query = """ - CREATE TABLE #{realm}.simple_triggers ( - object_id uuid, - object_type int, - parent_trigger_id uuid, - simple_trigger_id uuid, - trigger_data blob, - trigger_target blob, - - PRIMARY KEY ((object_id, object_type), parent_trigger_id, simple_trigger_id) - ); - """ - - with {:ok, %Xandra.SchemaChange{}} <- CSystem.execute_schema_change(conn, query) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_grouped_devices_table({conn, realm}) do - query = """ - CREATE TABLE #{realm}.grouped_devices ( - group_name varchar, - insertion_uuid timeuuid, - device_id uuid, - - PRIMARY KEY ((group_name), insertion_uuid, device_id) - ); - """ - - with {:ok, %Xandra.SchemaChange{}} <- CSystem.execute_schema_change(conn, query) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_deletion_in_progress_table({conn, realm}) do - query = """ - CREATE TABLE #{realm}.deletion_in_progress ( - device_id uuid, - vmq_ack boolean, - dup_start_ack boolean, - dup_end_ack boolean, - PRIMARY KEY (device_id) - ); - """ - - case CSystem.execute_schema_change(conn, query) do - {:ok, %Xandra.SchemaChange{}} -> - :ok - - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{Exception.message(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{Exception.message(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp insert_realm_public_key({conn, realm}, public_key_pem) do - query = """ - INSERT INTO #{realm}.kv_store (group, key, value) - VALUES ('auth', 'jwt_public_key_pem', varcharAsBlob(:public_key_pem)); - """ - - params = %{"public_key_pem" => public_key_pem} - - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, %Xandra.Void{}} <- - Xandra.execute(conn, prepared, params, consistency: :each_quorum) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp insert_realm_astarte_schema_version({conn, realm}) do - query = """ - INSERT INTO #{realm}.kv_store - (group, key, value) - VALUES ('astarte', 'schema_version', bigintAsBlob(#{Migrator.latest_realm_schema_version()})); - """ - - with {:ok, %Xandra.Void{}} <- - Xandra.execute(conn, query, %{}, consistency: :each_quorum) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp remove_realm(conn, realm_name) do - # undecoded realm name - query = """ - DELETE FROM #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.realms - WHERE realm_name = :realm_name; - """ - - params = %{"realm_name" => realm_name} - - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, %Xandra.Void{}} <- - Xandra.execute(conn, prepared, params, consistency: :each_quorum) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp insert_realm(conn, realm_name, device_limit) do - query = """ - INSERT INTO #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.realms (realm_name, device_registration_limit) - VALUES (:realm_name, :device_registration_limit); - """ - - device_registration_limit = if device_limit == 0, do: nil, else: device_limit - - params = %{ - "realm_name" => realm_name, - "device_registration_limit" => device_registration_limit - } - - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, %Xandra.Void{}} <- - Xandra.execute(conn, prepared, params, consistency: :each_quorum) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - # ScyllaDB considers TTL=0 as unset, see - # https://opensource.docs.scylladb.com/stable/cql/time-to-live.html#notes - defp insert_datastream_max_retention(_conn_realm, 0) do - :ok - end - - defp insert_datastream_max_retention({conn, keyspace_name}, max_retention) do - statement = """ - INSERT INTO :keyspace_name.kv_store (group, key, value) - VALUES ('realm_config', 'datastream_maximum_storage_retention', intAsBlob(:max_retention)); - """ - - params = %{ - "max_retention" => max_retention - } - - # This is safe since we checked the realm name in the caller - query = String.replace(statement, ":keyspace_name", keyspace_name) - - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, %Xandra.Void{}} <- - Xandra.execute(conn, prepared, params, consistency: :each_quorum) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - def initialize_database do - Xandra.Cluster.run(:xandra, [timeout: 60_000], fn conn -> - with :ok <- create_astarte_keyspace(conn), - :ok <- creates_table(conn), - :ok <- create_astarte_kv_store(conn), - :ok <- insert_astarte_schema_version(conn) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = - Logger.error( - "Database error while initializing database: #{inspect(err)}. ASTARTE WILL NOT WORK.", - tag: "init_database_error" - ) - - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.error( - "Database connection error while initializing database: #{inspect(err)}. ASTARTE WILL NOT WORK.", - tag: "init_database_connection_error" - ) - - {:error, :database_connection_error} - - {:error, reason} -> - _ = - Logger.error( - "Error while initializing database: #{inspect(reason)}. ASTARTE WILL NOT WORK.", - tag: "init_error" - ) - - {:error, reason} - end - end) - end - - defp create_astarte_keyspace(conn) do - # TODO: add support for creating the astarte keyspace with NetworkTopologyStrategy, - # right now the replication factor is an integer so SimpleStrategy is always used - astarte_keyspace_replication = Config.astarte_keyspace_replication_factor!() - - with {:ok, replication_map_str} <- build_replication_map_str(astarte_keyspace_replication), - query = """ - CREATE KEYSPACE #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())} - WITH replication = #{replication_map_str} - AND durable_writes = true; - """, - :ok <- check_replication(conn, astarte_keyspace_replication), - {:ok, %Xandra.SchemaChange{}} <- - Xandra.execute(conn, query, %{}, consistency: :each_quorum) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - - {:error, reason} -> - _ = - Logger.warning("Cannot create Astarte Keyspace: #{inspect(reason)}.", - tag: "astarte_keyspace_creation_failed" - ) - - {:error, reason} - end - end - - defp creates_table(conn) do - query = """ - CREATE TABLE #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.realms ( - realm_name varchar, - device_registration_limit bigint, - PRIMARY KEY (realm_name) - ); - """ - - with {:ok, %Xandra.SchemaChange{}} <- - Xandra.execute(conn, query, %{}, consistency: :each_quorum) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp create_astarte_kv_store(conn) do - query = """ - CREATE TABLE #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.kv_store ( - group varchar, - key varchar, - value blob, - - PRIMARY KEY ((group), key) - ); - """ - - with {:ok, %Xandra.SchemaChange{}} <- - Xandra.execute(conn, query, %{}, consistency: :each_quorum) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp insert_astarte_schema_version(conn) do - query = """ - INSERT INTO #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.kv_store - (group, key, value) - VALUES ('astarte', 'schema_version', bigintAsBlob(#{Migrator.latest_astarte_schema_version()})); - """ - - with {:ok, %Xandra.Void{}} <- Xandra.execute(conn, query, %{}, consistency: :each_quorum) do - :ok - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - def is_realm_existing(realm_name) do - Xandra.Cluster.run(:xandra, &is_realm_existing(&1, realm_name)) - end - - def is_astarte_keyspace_existing do - query = """ - SELECT keyspace_name - FROM system_schema.keyspaces - WHERE keyspace_name='#{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}' - """ - - case Xandra.Cluster.execute(:xandra, query) do - {:ok, %Xandra.Page{} = page} -> - if Enum.count(page) > 0 do - {:ok, true} - else - {:ok, false} - end - - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - def check_astarte_health(consistency) do - query = """ - SELECT COUNT(*) - FROM #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.realms - """ - - with {:ok, %Xandra.Page{} = page} <- - Xandra.Cluster.execute(:xandra, query, %{}, consistency: consistency), - {:ok, _} <- Enum.fetch(page, 0) do - :ok - else - :error -> - _ = - Logger.warning( - "Cannot retrieve count for #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.realms table.", - tag: "health_check_error" - ) - - {:error, :health_check_bad} - - {:error, %Xandra.Error{} = err} -> - _ = - Logger.warning("Database error, health is not good: #{inspect(err)}.", - tag: "health_check_database_error" - ) - - {:error, :health_check_bad} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database error, health is not good: #{inspect(err)}.", - tag: "health_check_database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - def list_realms do - query = """ - SELECT realm_name - FROM #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.realms; - """ - - case Xandra.Cluster.execute(:xandra, query, %{}, consistency: :quorum) do - {:ok, %Xandra.Page{} = page} -> - {:ok, Enum.map(page, fn %{"realm_name" => realm_name} -> realm_name end)} - - {:error, %Xandra.Error{} = err} -> - _ = - Logger.warning("Database error while listing realms: #{inspect(err)}.", - tag: "database_error" - ) - - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error while listing realms: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - def get_realm(realm_name) do - keyspace_name = - CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!()) - - Xandra.Cluster.run(:xandra, fn conn -> - with {:ok, true} <- is_realm_existing(conn, realm_name), - {:ok, public_key} <- get_public_key(conn, keyspace_name), - {:ok, replication_map} <- get_realm_replication(conn, keyspace_name), - {:ok, device_registration_limit} <- get_device_registration_limit(conn, realm_name), - {:ok, max_retention} <- - get_datastream_maximum_storage_retention(conn, keyspace_name) do - case replication_map do - %{ - "class" => "org.apache.cassandra.locator.SimpleStrategy", - "replication_factor" => replication_factor_string - } -> - {replication_factor, ""} = Integer.parse(replication_factor_string) - - %{ - realm_name: realm_name, - jwt_public_key_pem: public_key, - replication_class: "SimpleStrategy", - replication_factor: replication_factor, - device_registration_limit: device_registration_limit, - datastream_maximum_storage_retention: max_retention - } - - %{"class" => "org.apache.cassandra.locator.NetworkTopologyStrategy"} -> - datacenter_replication_factors = - Enum.reduce(replication_map, %{}, fn - {"class", _}, acc -> - acc - - {datacenter, replication_factor_string}, acc -> - {replication_factor, ""} = Integer.parse(replication_factor_string) - Map.put(acc, datacenter, replication_factor) - end) - - %{ - realm_name: realm_name, - jwt_public_key_pem: public_key, - replication_class: "NetworkTopologyStrategy", - datacenter_replication_factors: datacenter_replication_factors, - device_registration_limit: device_registration_limit, - datastream_maximum_storage_retention: max_retention - } - end - else - # Returned by is_realm_existing - {:ok, false} -> - {:error, :realm_not_found} - - {:error, reason} -> - _ = - Logger.warning("Error while getting realm: #{inspect(reason)}.", - tag: "get_realm_error", - realm: realm_name - ) - - {:error, reason} - end - end) - end - - def set_datastream_maximum_storage_retention(realm_name, new_retention) do - keyspace_name = - CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!()) - - with :ok <- validate_realm_name(realm_name) do - Xandra.Cluster.run( - :xandra, - &do_set_datastream_maximum_storage_retention(&1, keyspace_name, new_retention) - ) - end - end - - def delete_datastream_maximum_storage_retention(realm_name) do - keyspace_name = - CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!()) - - with :ok <- validate_realm_name(realm_name) do - Xandra.Cluster.run( - :xandra, - &do_delete_datastream_maximum_storage_retention(&1, keyspace_name) - ) - end - end - - defp is_realm_existing(conn, realm_name) do - query = """ - SELECT realm_name from #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.realms - WHERE realm_name=:realm_name; - """ - - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, %Xandra.Page{} = page} <- - Xandra.execute(conn, prepared, %{"realm_name" => realm_name}, consistency: :quorum) do - if Enum.count(page) > 0 do - {:ok, true} - else - {:ok, false} - end - else - {:error, reason} -> - _ = - Logger.warning("Cannot check if realm exists: #{inspect(reason)}.", - tag: "is_realm_existing_error", - realm: realm_name - ) - - {:error, reason} - end - end - - defp get_public_key(conn, realm_name) do - statement = """ - SELECT blobAsVarchar(value) - FROM :realm_name.kv_store - WHERE group='auth' AND key='jwt_public_key_pem'; - """ - - with :ok <- validate_realm_name(realm_name), - query = String.replace(statement, ":realm_name", realm_name), - {:ok, %Xandra.Page{} = page} <- Xandra.execute(conn, query, %{}, consistency: :quorum) do - case Enum.fetch(page, 0) do - {:ok, %{"system.blobasvarchar(value)" => public_key}} -> - {:ok, public_key} - - :error -> - {:error, :public_key_not_found} - end - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - - {:error, reason} -> - _ = - Logger.warning("Cannot get public key: #{inspect(reason)}.", - tag: "get_public_key_error", - realm: realm_name - ) - - {:error, reason} - end - end - - defp do_update_public_key(conn, keyspace_name, new_public_key) do - statement = """ - INSERT INTO :keyspace_name.kv_store (group, key, value) - VALUES('auth','jwt_public_key_pem', varcharAsBlob(:new_public_key)) - """ - - # TODO move away from this when NoaccOS' PR is merged - query = String.replace(statement, ":keyspace_name", keyspace_name) - # TODO refactor when NoaccOS' PR is merged - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, result} <- - Xandra.execute(conn, prepared, %{"new_public_key" => new_public_key}, - consistency: :quorum - ) do - {:ok, result} - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{Exception.message(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{Exception.message(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp do_set_device_registration_limit(conn, realm_name, new_device_registration_limit) do - statement = """ - UPDATE #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.realms - SET device_registration_limit = :new_device_registration_limit - WHERE realm_name = :realm_name - """ - - params = %{ - "new_device_registration_limit" => new_device_registration_limit, - "realm_name" => realm_name - } - - with {:ok, prepared} <- Xandra.prepare(conn, statement) do - case Xandra.execute(conn, prepared, params, consistency: :quorum) do - {:ok, result} -> - {:ok, result} - - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{Exception.message(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{Exception.message(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - end - - defp do_set_datastream_maximum_storage_retention(conn, realm_name, new_retention) do - statement = """ - UPDATE :realm_name.kv_store - SET value = intAsBlob(:new_retention) - WHERE group='realm_config' AND key='datastream_maximum_storage_retention' - """ - - params = %{ - "new_retention" => new_retention - } - - # TODO move away from this when NoaccOS' PR is merged - query = String.replace(statement, ":realm_name", realm_name) - - # TODO refactor when NoaccOS' PR is merged - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, result} <- Xandra.execute(conn, prepared, params, consistency: :quorum) do - {:ok, result} - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{Exception.message(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{Exception.message(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp do_delete_device_registration_limit(conn, realm_name) do - statement = """ - DELETE device_registration_limit - FROM #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.realms - WHERE realm_name = :realm_name - """ - - params = %{ - "realm_name" => realm_name - } - - with {:ok, prepared} <- Xandra.prepare(conn, statement) do - case Xandra.execute(conn, prepared, params, consistency: :quorum) do - {:ok, result} -> - {:ok, result} - - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{Exception.message(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{Exception.message(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - end - - defp do_delete_datastream_maximum_storage_retention(conn, realm_name) do - statement = """ - DELETE FROM :realm_name.kv_store - WHERE group='realm_config' AND key='datastream_maximum_storage_retention' - """ - - # TODO move away from this when NoaccOS' PR is merged - query = String.replace(statement, ":realm_name", realm_name) - - # TODO refactor when NoaccOS' PR is merged - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, result} <- Xandra.execute(conn, prepared, %{}, consistency: :quorum) do - {:ok, result} - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{Exception.message(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{Exception.message(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp get_realm_replication(conn, realm_name) do - query = """ - SELECT replication - FROM system_schema.keyspaces - WHERE keyspace_name=:realm_name - """ - - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, page} <- Xandra.execute(conn, prepared, %{"realm_name" => realm_name}) do - case Enum.fetch(page, 0) do - {:ok, %{"replication" => replication_map}} -> - {:ok, replication_map} - - :error -> - # Something really wrong here, but we still cover this - _ = - Logger.error("Cannot find realm replication.", - tag: "realm_replication_not_found", - realm: realm_name - ) - - {:error, :realm_replication_not_found} - end - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - # Replication factor of 1 is always ok - defp check_replication(_conn, 1) do - :ok - end - - # If replication factor is an integer, we're using SimpleStrategy - # Check that the replication factor is <= the number of nodes in the same datacenter - defp check_replication(conn, replication_factor) - when is_integer(replication_factor) and replication_factor > 1 do - with {:ok, local_datacenter} <- get_local_datacenter(conn) do - check_replication_for_datacenter(conn, local_datacenter, replication_factor, local: true) - end - end - - defp check_replication(conn, datacenter_replication_factors) - when is_map(datacenter_replication_factors) do - with {:ok, local_datacenter} <- get_local_datacenter(conn) do - Enum.reduce_while(datacenter_replication_factors, :ok, fn - {datacenter, replication_factor}, _acc -> - opts = - if datacenter == local_datacenter do - [local: true] - else - [] - end - - case check_replication_for_datacenter(conn, datacenter, replication_factor, opts) do - :ok -> {:cont, :ok} - {:error, reason} -> {:halt, {:error, reason}} - end - end) - end - end - - defp get_device_registration_limit(conn, realm_name) do - query = """ - SELECT device_registration_limit - FROM #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.realms - WHERE realm_name=:realm_name - """ - - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, page} <- Xandra.execute(conn, prepared, %{"realm_name" => realm_name}) do - case Enum.fetch(page, 0) do - {:ok, %{"device_registration_limit" => value}} -> - {:ok, value} - - :error -> - # Something really wrong here, but we still cover this - _ = - Logger.error("Cannot find realm device_registration_limit.", - tag: "realm_device_registration_limit_not_found", - realm: realm_name - ) - - {:error, :realm_device_registration_limit_not_found} - end - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp get_datastream_maximum_storage_retention(conn, realm_name) do - statement = """ - SELECT blobAsInt(value) - FROM :realm_name.kv_store - WHERE group='realm_config' AND key='datastream_maximum_storage_retention' - """ - - # TODO change this once NoaccOS' PR is merged - with :ok <- validate_realm_name(realm_name), - query = String.replace(statement, ":realm_name", realm_name), - {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, page} <- Xandra.execute(conn, prepared, %{}) do - case Enum.fetch(page, 0) do - {:ok, %{"system.blobasint(value)" => value}} -> - {:ok, value} - - :error -> - {:ok, nil} - end - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp get_local_datacenter(conn) do - query = """ - SELECT data_center - FROM system.local; - """ - - with {:ok, %Xandra.Page{} = page} <- Xandra.execute(conn, query) do - case Enum.fetch(page, 0) do - {:ok, %{"data_center" => datacenter}} -> - {:ok, datacenter} - - :error -> - _ = - Logger.error( - "Empty dataset while getting local datacenter, something is really wrong.", - tag: "get_local_datacenter_error" - ) - - {:error, :local_datacenter_not_found} - end - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end - - defp check_replication_for_datacenter(conn, datacenter, replication_factor, opts) do - query = """ - SELECT COUNT(*) - FROM system.peers - WHERE data_center=:data_center - ALLOW FILTERING; - """ - - with {:ok, prepared} <- Xandra.prepare(conn, query), - {:ok, %Xandra.Page{} = page} <- - Xandra.execute(conn, prepared, %{"data_center" => datacenter}) do - case Enum.fetch(page, 0) do - :error -> - _ = - Logger.warning("Cannot retrieve node count for datacenter #{datacenter}.", - tag: "datacenter_not_found", - datacenter: datacenter - ) - - {:error, :datacenter_not_found} - - {:ok, %{"count" => dc_node_count}} -> - # If we're querying the datacenter of the local node, add 1 (itself) to the count - actual_node_count = - if opts[:local] do - dc_node_count + 1 - else - dc_node_count - end - - if replication_factor <= actual_node_count do - :ok - else - _ = - Logger.warning( - "Trying to set replication_factor #{replication_factor} " <> - "in datacenter #{datacenter} that has #{actual_node_count} nodes.", - tag: "invalid_replication_factor", - datacenter: datacenter, - replication_factor: replication_factor - ) - - error_message = - "replication_factor #{replication_factor} is >= #{actual_node_count} nodes " <> - "in datacenter #{datacenter}" - - {:error, {:invalid_replication, error_message}} - end - end - else - {:error, %Xandra.Error{} = err} -> - _ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error") - {:error, :database_error} - - {:error, %Xandra.ConnectionError{} = err} -> - _ = - Logger.warning("Database connection error: #{inspect(err)}.", - tag: "database_connection_error" - ) - - {:error, :database_connection_error} - end - end -end - -# # -# # This file is part of Astarte. -# # -# # Copyright 2017-2023 SECO Mind Srl -# # -# # Licensed under the Apache License, Version 2.0 (the "License"); -# # you may not use this file except in compliance with the License. -# # You may obtain a copy of the License at -# # -# # http://www.apache.org/licenses/LICENSE-2.0 -# # -# # Unless required by applicable law or agreed to in writing, software -# # distributed under the License is distributed on an "AS IS" BASIS, -# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# # See the License for the specific language governing permissions and -# # limitations under the License. -# # - -# defmodule Astarte.Housekeeping.Queries do - -# end diff --git a/tools/astarte_dev_tool/lib/mix/tasks/astarte_dev_tool/realm/create.ex b/tools/astarte_dev_tool/lib/mix/tasks/astarte_dev_tool/realm/create.ex index 778ff0509..747f1cf6b 100644 --- a/tools/astarte_dev_tool/lib/mix/tasks/astarte_dev_tool/realm/create.ex +++ b/tools/astarte_dev_tool/lib/mix/tasks/astarte_dev_tool/realm/create.ex @@ -18,33 +18,37 @@ defmodule Mix.Tasks.AstarteDevTool.Realm.Create do use Mix.Task - alias AstarteDevTool.Commands.System.Down + alias Astarte.Core.Realm + alias AstarteDevTool.Utilities.Node + alias AstarteDevTool.Commands.Realm.Create alias AstarteDevTool.Utilities.Path @shortdoc "Create realm/s into the running Astarte" @aliases [ - # p: :path, - # v: :volumes + p: :path, + n: :node ] @switches [ - # path: :string, - # volumes: :boolean, - # log_level: :string + path: :string, + node: :keep, + log_level: :string ] @moduledoc """ - Create realm/s into the running Astarte platform. + Create realm into the running Astarte platform. ## Examples - $ mix astarte_dev_tool.realm.create realm_1 realm_a + $ mix astarte_dev_tool.realm.create -n localhost:12345 realm1 + $ mix astarte_dev_tool.realm.create -n localhost:12345 -n cassandra:54321 realm1 ## Command line options * `-p` `--path` - (required) working Astarte project directory - * `-v` `--volumes` - remove volumes after switching off + * `-n` `--node` - (at least one is required) Cassandra/Scylla cluster node. + Every node has format **host/ip:port** * `--log-level` - the level to set for `Logger`. This task does not start your application, so whatever level you have configured in @@ -55,22 +59,39 @@ defmodule Mix.Tasks.AstarteDevTool.Realm.Create do @impl true def run(args) do - {opts, _} = OptionParser.parse!(args, strict: @switches, aliases: @aliases) + {opts, args} = OptionParser.parse!(args, strict: @switches, aliases: @aliases) unless Keyword.has_key?(opts, :path), do: Mix.raise("The --path argument is required") + unless Keyword.has_key?(opts, :node), + do: Mix.raise("At least one --node argument is required") + + unless Enum.count(args) === 1, + do: Mix.raise("The command required one argument - the Realm name") + + realm_name = Enum.at(args, 0) + + unless Realm.valid_name?(realm_name), + do: Mix.raise("Invalid Realm name provided") + + nodes = + case(opts |> Keyword.get_values(:node) |> Node.parse_nodes()) do + {:ok, nodes} -> nodes + {:error, _} -> Mix.raise("--node argument must be in : format") + end + if log_level = opts[:log_level], do: Logger.configure(level: String.to_existing_atom(log_level)) with path <- opts[:path], {:ok, abs_path} <- Path.directory_path_from(path), - _ = Mix.shell().info("Stopping astarte system..."), - :ok <- Down.exec(abs_path, opts[:volumes]) do - Mix.shell().info("Astarte's system stopped successfully.") + :ok <- Mix.Tasks.AstarteDevTool.System.Check.run(["-p", abs_path]), + :ok <- Create.exec(nodes, realm_name) do + Mix.shell().info("Realms created successfully.") :ok else {:error, output} -> - Mix.raise("Failed to stop Astarte's system. Output: #{output}") + Mix.raise("Failed to create Astarte's realms. Output: #{output}") end end end diff --git a/tools/astarte_dev_tool/lib/mix/tasks/astarte_dev_tool/system/check.ex b/tools/astarte_dev_tool/lib/mix/tasks/astarte_dev_tool/system/check.ex index 507aa58e5..b2cec3549 100644 --- a/tools/astarte_dev_tool/lib/mix/tasks/astarte_dev_tool/system/check.ex +++ b/tools/astarte_dev_tool/lib/mix/tasks/astarte_dev_tool/system/check.ex @@ -60,14 +60,17 @@ defmodule Mix.Tasks.AstarteDevTool.System.Check do do: Logger.configure(level: String.to_existing_atom(log_level)) with path <- opts[:path], - {:ok, abs_path} <- Path.directory_path_from(path), - {:list, :ok} <- {:list, Check.exec(abs_path)} do - Mix.shell().info("All Astarte's services are up & ready") - :ok - else - {:list, {:error, list}} -> - Mix.raise("Some Astarte's services do not seem to be working: #{list}") + {:ok, abs_path} <- Path.directory_path_from(path) do + case Check.exec(abs_path) do + :ok -> + Mix.shell().info("All Astarte's services are up & ready") + :ok + {:error, list} = data -> + Mix.raise("Some Astarte's services do not seem to be working: #{list}") + data + end + else {:error, output} -> Mix.raise("Failed to check Astarte's system. Output: #{output}") end diff --git a/tools/astarte_dev_tool/lib/utilities/node.ex b/tools/astarte_dev_tool/lib/utilities/node.ex new file mode 100644 index 000000000..130b10e81 --- /dev/null +++ b/tools/astarte_dev_tool/lib/utilities/node.ex @@ -0,0 +1,36 @@ +# +# This file is part of Astarte. +# +# Copyright 2024 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +defmodule AstarteDevTool.Utilities.Node do + @host_regex ~r/^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$/ + @ip_regex ~r/^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$/ + @port_regex ~r/^([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$/ + + def parse_nodes(list), do: parse_nodes(list, []) + + defp parse_nodes([], acc), do: {:ok, acc} + + defp parse_nodes([str | t], acc) do + with [host, port] <- String.split(str, ":"), + true <- (host =~ @host_regex or host =~ @ip_regex) and port =~ @port_regex do + parse_nodes(t, [{host, port} | acc]) + else + _ -> {:error, "Invalid format"} + end + end +end diff --git a/tools/astarte_dev_tool/lib/utilities/system.ex b/tools/astarte_dev_tool/lib/utilities/system.ex index aad28256f..229254297 100644 --- a/tools/astarte_dev_tool/lib/utilities/system.ex +++ b/tools/astarte_dev_tool/lib/utilities/system.ex @@ -24,7 +24,7 @@ defmodule AstarteDevTool.Utilities.System do command = "docker" args = - ~w(ps -a --no-trunc --format {{.ID}}#{@field_separator}{{.Names}} -f label=com.docker.compose.project.working_dir=#{path}) + ~w(ps -a --no-trunc --format {{.ID}}#{@field_separator}{{.Names}} -f status=running -f label=com.docker.compose.project.working_dir=#{path}) {pids_str, 0} = System.cmd(command, args, cd: path) diff --git a/tools/astarte_dev_tool/mix.lock b/tools/astarte_dev_tool/mix.lock index 6c98d0d64..e68a352a4 100644 --- a/tools/astarte_dev_tool/mix.lock +++ b/tools/astarte_dev_tool/mix.lock @@ -5,6 +5,7 @@ "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, "cqerl": {:hex, :cqerl, "2.0.1", "c92929e885adc50cda1f11b73eb0121363e8418533312f8a44defb5f14853445", [:rebar3], [{:lz4, "~> 0.2.4", [hex: :lz4_erl, repo: "hexpm", optional: false]}, {:re2, "1.9.5", [hex: :re2, repo: "hexpm", optional: false]}, {:semver, "~> 0.0.1", [hex: :semver_erl, repo: "hexpm", optional: false]}, {:snappyer, "1.2.6", [hex: :snappyer, repo: "hexpm", optional: false]}, {:uuid, "~> 2.0.0", [hex: :uuid_erl, repo: "hexpm", optional: false]}], "hexpm", "96e9ee407830508187a5edff9fc49983a7122b5c4127c640320a226b59ae12fe"}, "cqex": {:hex, :cqex, "1.0.1", "bc9980ac3b82d039879f8d6ca589deab799fe08f80ff449d60ad709f2524718f", [:mix], [{:cqerl, "~> 2.0.1", [hex: :cqerl, repo: "hexpm", optional: false]}], "hexpm", "1bbf2079c044cbf0f747f60dcf0409a951eaa8f1a2447cd6d80d6ff1b7c4dc6b"}, + "curvy": {:hex, :curvy, "0.3.1", "2645a11452743a37de2393da4d2e60700632498b166413b4f73bc34c57a911e1", [:mix], [], "hexpm", "82df293452f7b751becabc29e8aad0f7d88ffdcd790ac7a2ea16ea1544681d8a"}, "cyanide": {:hex, :cyanide, "2.0.0", "f97b700b87f9b0679ae812f0c4b7fe35ea6541a4121a096cf10287941b7a6d55", [:mix], [], "hexpm", "7f9748251804c2a2115b539202568e1117ab2f0ae09875853fb89cc94ae19dd1"}, "db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, @@ -12,6 +13,7 @@ "ecto_morph": {:hex, :ecto_morph, "0.1.29", "bc0b915779636bd2d30c54cad6922b3cb40f85b1d4ad59bdffd3c788d9d1f972", [:mix], [{:ecto, ">= 3.0.3", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "814bed72e3d03b278c1dfb3fbc4da37f478a37518ee54f010c1ad9254f1ca0e3"}, "ecto_sql": {:hex, :ecto_sql, "3.12.0", "73cea17edfa54bde76ee8561b30d29ea08f630959685006d9c6e7d1e59113b7d", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dc9e4d206f274f3947e96142a8fdc5f69a2a6a9abb4649ef5c882323b6d512f0"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, + "ex_crypto": {:hex, :ex_crypto, "0.10.0", "af600a89b784b36613a989da6e998c1b200ff1214c3cfbaf8deca4aa2f0a1739", [:mix], [{:poison, ">= 2.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm", "ccc7472cfe8a0f4565f97dce7e9280119bf15a5ea51c6535e5b65f00660cde1c"}, "exandra": {:hex, :exandra, "0.10.2", "e95dca77501df9ae48f23854224e91712e64d65cd7157e2fe46232ea97918ec6", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.10", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.10", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:xandra, "~> 0.18.0", [hex: :xandra, repo: "hexpm", optional: false]}], "hexpm", "334616b170233828f2acac0b060c3c6c91051848972218d2b159e3a455b07c84"}, "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, @@ -23,6 +25,7 @@ "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, + "poison": {:hex, :poison, "6.0.0", "9bbe86722355e36ffb62c51a552719534257ba53f3271dacd20fbbd6621a583a", [:mix], [{:decimal, "~> 2.1", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "bb9064632b94775a3964642d6a78281c07b7be1319e0016e1643790704e739a2"}, "protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"}, "quickrand": {:hex, :quickrand, "2.0.7", "d2bd76676a446e6a058d678444b7fda1387b813710d1af6d6e29bb92186c8820", [:rebar3], [], "hexpm", "b8acbf89a224bc217c3070ca8bebc6eb236dbe7f9767993b274084ea044d35f0"}, "re2": {:hex, :re2, "1.9.5", "3c419527fb2cc75eda1657f04dc7e8cea9864899b43ff6cc3250fa7525431e83", [:rebar3], [], "hexpm", "4861336271ac565224e79e133cbaf90af77739cda3fff25f6965b24a1146a4d6"},