Skip to content

Commit

Permalink
feat: log drains rules, extends rules implementation (#2059)
Browse files Browse the repository at this point in the history
* feat: log drains rules extending initial implementation

* feat: add/remove rules

* feat: edit backend

* chore: remove dbg

* chore: add tsting for delete backend with rules

* feat: remove the Remove backend button

* feat: more styling

* feat: edit form enhancements

* feat: dynamically start/stop drains

* fix: multiple rules for same source and same backend

* chore: fix failing test, add backend type

* docs: drain rules, postgres, webhook

* chore: fix failing test

* chore: formatting

* docs: fix bad linking

* chore: version bump to v1.7.0

* docs: @doc and @SPEC tags

* docs: add more specs and docs

* docs: more and more docs!

* chore: fix typespecs

* chore: PR review refactoring

* chore: PR review, remove apply
  • Loading branch information
Ziinc authored May 10, 2024
1 parent a4042a3 commit b92d760
Show file tree
Hide file tree
Showing 31 changed files with 1,345 additions and 324 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.6.6
1.7.0
3 changes: 2 additions & 1 deletion docs/docs.logflare.com/docs/backends/_category_.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"collapsed": false,
"position": 5,
"link": {
"type": "generated-index"
"type": "doc",
"id": "backends/index"
}
}
21 changes: 21 additions & 0 deletions docs/docs.logflare.com/docs/backends/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Backends

By default, the Logflare service offers managed BigQuery as a backend automatically.

If your plan is BYOB, your provided BigQuery configurations will disable ingestion into Logflare's managed BigQuery.

Logflare's mutli-backend capabilities allows 3rd party services and other data storage engines to be used together for purposes such as data redundancy, integrations and more.

:::info
Multi-backend support is a private alpha feature. Contact us if you wish to preview these features.
:::

When there are multiple attached backends, the Logflare service will always insert into the managed BigQuery (or a configured BYOB), as well as any other attached backends to that source.

As such, a backend can be attched to multiple sources, and the underlying data storage mechanisms will be automatically managed by Logflare. Details for each are further explained in the documentation for each backend.

A backend can be any one of the following types:

- **Ingest-only**: The backend only ingests data, and cannot be queried through any supported query languages.
- **Query-only**: The backend can only be queried. Data storage is handled externally, outside of Logflare.
- **Fully-featured**: The backend can ingest data, and can be queried by supported query languages.
25 changes: 23 additions & 2 deletions docs/docs.logflare.com/docs/backends/postgres/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toc_max_heading_level: 3
Logflare has experimental support for storing and querying log events to a PostgreSQL server. Ingested logs are directly inserted into tables, and each source maps to a Postgres table within a given schema.

:::warning
PostgreSQL as a backend is only available for the **V2 ingestion** and currently has limited functionality.
PostgreSQL as a backend is only available for the **V2 ingestion** and currently has limited functionality and support. Ingestion architecture and underlying storage mechanisms are subject to **breaking changes**.
:::

## Behavior and Configuration
Expand All @@ -29,5 +29,26 @@ Where possible, storage and querying behavior will follow the [BigQuery](../bigq

The following values can be provided to the backend:

- `url` (`string`, required): A PostgreSQL connection string, for connecting to the server.
- `url` (`string`, optional): A PostgreSQL connection string, for connecting to the server.
- `schema` (`string`, optional): The schema to scope all Logflare-related operations to.
- `username` (`string`), optional: Username to connect as. Cannot be used with `url`.
- `password` (`string`), optional: Password for user authentication. Cannot be used with `url`.
- `port` (`string`), optional: Port of the database server. Cannot be used with `url`.
- `hostname` (`string`), optional: Hostname of the database server. Cannot be used with `url`.
- `hostname` (`string`), optional: Hostname of the database server. Cannot be used with `url`.
- `pool_size` (`integer`), optional: Sets the number of connections in the connection pool to the database server. Defaults to 1.

If a `url` is provided, it cannot be used in conjunction with username/password credentials.

Either `url` or username/password credentials must be provided.

Configure `pool_size` if you wish to increase the throughput of ingestion and reduce chance of the connection pool being empty during ingest.

### API Querying
It is possible to query the PostgreSQL backend using PostreSQL dialect, using the management API querying endpoint:

```
GET https://api.logflare.app/api/
```

Valid private access tokens must be used.
17 changes: 17 additions & 0 deletions docs/docs.logflare.com/docs/backends/webhook.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Webhook

The Webhook backend will send batched events to a desired HTTP(S) destination.

## Behaviour and configurations
### Configuration

The following values are required when creating a webhook backend:

- `url`: (`string`, required) a valid HTTP(S) endpoint.

### Request
The request made will be a `POST`.

### Batching

Events will be sent in batches of up to 250, with intervals of up to 1 second.
21 changes: 21 additions & 0 deletions docs/docs.logflare.com/docs/concepts/rules.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Rules

Rules are logical rulesets that direct ingestion data to one or more sources or [backends](../backends). This allows us to copy subsets of ingested data to multiple sources, or to forward data to external backends.

Filtering syntax used is the [Logflare Query Language](../concepts/lql).

## Source Rules

Source rules allow for source-to-source filtering, where a source will send data to a given destination source.

## Drain Rules

Drain rules allow for source-to-backend filtering, where a source will send data to a given destination [backend](../backends). The backend could be ingest-only, such as 3rd party services.

:::info Private Alpha Only
Drain rules and multi-backends are a private alpha only feature for the Logflare service. Please contact us if this interests you.
:::

On drain creation, data ingested into the source will automatically be routed to the selected backend.

If the backend is a supported fully-featured managed backend (such as BigQuery or Postgres), tables will automatically be created and managed as if the source had been attached to the backend. If an associated source table already exists, it would insert into the existing table.
1 change: 1 addition & 0 deletions lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ defmodule Logflare.Application do
ContextCache,
Partners.Cache,
Users.Cache,
Backends.Cache,
Sources.Cache,
Billing.Cache,
SourceSchemas.Cache,
Expand Down
93 changes: 79 additions & 14 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ defmodule Logflare.Backends do
|> Enum.map(fn sb -> typecast_config_string_map_to_atom_map(sb) end)
end

@doc """
Returns all backends set as a rule destination for a given source.
### Example
iex> list_backends_with_rules(source)
[%Backend{...}, ...]
"""
@spec list_backends_with_rules(Source.t()) :: [Backend.t()]
def list_backends_with_rules(%Source{id: source_id}) do
from(b in Backend, join: r in assoc(b, :rules), where: r.source_id == ^source_id)
|> Repo.all()
|> Enum.map(fn sb -> typecast_config_string_map_to_atom_map(sb) end)
end

def preload_rules(backends) do
Repo.preload(backends, rules: [:source])
end

@doc """
Creates a Backend for a given source.
"""
Expand Down Expand Up @@ -92,11 +110,29 @@ defmodule Logflare.Backends do
@spec update_source_backends(Source.t(), [Backend.t()]) ::
{:ok, Source.t()} | {:error, Ecto.Changeset.t()}
def update_source_backends(%Source{} = source, backends) do
source
|> Repo.preload(:backends)
|> Ecto.Changeset.change()
|> Ecto.Changeset.put_assoc(:backends, backends)
|> Repo.update()
changeset =
source
|> Repo.preload(:backends)
|> Ecto.Changeset.change()
|> Ecto.Changeset.put_assoc(:backends, backends)

with {:ok, _} = result <- Repo.update(changeset) do
changes = Ecto.Changeset.get_change(changeset, :backends)

if source_sup_started?(source) do
for %Ecto.Changeset{action: action, data: backend} <- changes do
case action do
:update ->
SourceSup.start_backend_child(source, backend)

:replace ->
SourceSup.stop_backend_child(source, backend)
end
end
end

result
end
end

# common config validation function
Expand Down Expand Up @@ -181,20 +217,21 @@ defmodule Logflare.Backends do
Adds log events to the source event buffer.
The ingestion pipeline then pulls from the buffer and dispatches log events to the correct backends.
TODO: Perform syncronous parsing and validation of log event params.
Events are conditionally dispatched to backends based on whether they are registered. If they register for ingestion dispatching, events will get sent to the registered backend.
Once this function returns `:ok`, the events get dispatched to respective backend adaptor portions of the pipeline to be further processed.
"""
@type log_param :: map()
@spec ingest_logs([log_param()], Source.t()) :: :ok
def ingest_logs(event_params, source) do
@spec ingest_logs([log_param()], Source.t(), Backend.t() | nil) :: :ok
def ingest_logs(event_params, source, backend \\ nil) do
:ok = Source.Supervisor.ensure_started(source)
{log_events, errors} = split_valid_events(source, event_params)
count = Enum.count(log_events)
increment_counters(source, count)
maybe_broadcast_and_route(source, log_events)
RecentLogsServer.push(source, log_events)
dispatch_to_backends(source, log_events)
dispatch_to_backends(source, backend, log_events)
if Enum.empty?(errors), do: {:ok, count}, else: {:error, errors}
end

Expand All @@ -204,9 +241,12 @@ defmodule Logflare.Backends do
le =
param
|> case do
%LogEvent{} = le ->
%LogEvent{source: %Source{}} = le ->
le

%LogEvent{} = le ->
%{le | source: source}

param ->
LogEvent.make(param, %{source: source})
end
Expand Down Expand Up @@ -249,7 +289,13 @@ defmodule Logflare.Backends do
:ok
end

defp dispatch_to_backends(source, log_events) do
defp dispatch_to_backends(source, %Backend{} = backend, log_events) do
adaptor_module = Adaptor.get_adaptor(backend)
adaptor_module.ingest(nil, log_events, default_ingest_opts(source, backend))
:ok
end

defp dispatch_to_backends(source, nil, log_events) do
Registry.dispatch(SourceDispatcher, source.id, fn entries ->
for {pid, {adaptor_module, :ingest, opts}} <- entries do
# TODO: spawn tasks to do this concurrently
Expand All @@ -263,25 +309,42 @@ defmodule Logflare.Backends do
@doc """
Registers a backend for ingestion dispatching. Any opts that are provided are stored in the registry.
If the `:register_for_ingest` field is `true`, the backend will be registered for ingest dispatching.
The backend will not receive any dispatched events if it is set to false, and this function will be a `:noop`.
Auto-populated options:
- `:backend_id`
- `:source_id`
"""
@spec register_backend_for_ingest_dispatch(Source.t(), Backend.t(), keyword()) ::
:ok
def register_backend_for_ingest_dispatch(source, backend, opts \\ []) do
def register_backend_for_ingest_dispatch(source, backend, opts \\ [])

def register_backend_for_ingest_dispatch(
source,
%Backend{register_for_ingest: true} = backend,
opts
) do
mod = Adaptor.get_adaptor(backend)

{:ok, _pid} =
Registry.register(
SourceDispatcher,
source.id,
{mod, :ingest, [backend_id: backend.id, source_id: source.id] ++ opts}
{mod, :ingest, default_ingest_opts(source, backend) ++ opts}
)

:ok
end

def register_backend_for_ingest_dispatch(_source, _backend, _opts), do: :ok

defp default_ingest_opts(source, backend) do
[backend_id: backend.id, source_id: source.id]
end

@doc """
Registers a unique source-related process on the source registry. Unique.
For internal use only, should not be called outside of the `Logflare` namespace.
Expand Down Expand Up @@ -320,8 +383,10 @@ defmodule Logflare.Backends do
@doc """
checks if the SourceSup for a given source has been started.
"""
@spec source_sup_started?(Source.t()) :: boolean()
def source_sup_started?(%Source{id: id}) do
@spec source_sup_started?(Source.t() | non_neg_integer()) :: boolean()
def source_sup_started?(%Source{id: id}), do: source_sup_started?(id)

def source_sup_started?(id) when is_number(id) do
Registry.lookup(SourceRegistry, {id, SourceSup}) != []
end

Expand Down
3 changes: 3 additions & 0 deletions lib/logflare/backends/backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Logflare.Backends.Backend do
alias Logflare.Backends.Backend
alias Logflare.Source
alias Logflare.User
alias Logflare.Rule

@adaptor_types [:bigquery, :webhook, :postgres]

Expand All @@ -20,6 +21,8 @@ defmodule Logflare.Backends.Backend do
field(:config, :map)
many_to_many(:sources, Source, join_through: "sources_backends")
belongs_to(:user, User)
has_many(:rules, Rule)
field(:register_for_ingest, :boolean, virtual: true, default: true)
timestamps()
end

Expand Down
1 change: 1 addition & 0 deletions lib/logflare/backends/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Logflare.Backends.Cache do
}
end

def list_backends(source), do: apply_repo_fun(:list_backends, [source])
def get_backend_by(kv), do: apply_repo_fun(:get_backend_by, [kv])
def get_backend(arg), do: apply_repo_fun(:get_backend, [arg])

Expand Down
Loading

0 comments on commit b92d760

Please sign in to comment.