Skip to content

Commit

Permalink
Merge pull request #46 from Logflare/feat/randomized-batch-size
Browse files Browse the repository at this point in the history
feat: randomized batch size and schema size
  • Loading branch information
Ziinc authored Dec 29, 2023
2 parents ac7a33f + 70a0cf0 commit c00d0ba
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
10 changes: 7 additions & 3 deletions lib/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ defmodule Loadfest.Pipeline do
end

def handle_demand(demand, state) when demand > 0 do
events = Loadfest.Worker.make_batch(Enum.random(50..100))
# events = Loadfest.Worker.make_batch_stream() |> Enum.to_list() |> dbg()
{:noreply, [%Broadway.Message{data: events, acknowledger: {__MODULE__, :ack, 3}}], state}
messages =
for batch_size <- Loadfest.Worker.stream_batch_sizes() |> Enum.take(2),
events = Loadfest.Worker.stream_batch() |> Enum.take(batch_size) do
%Broadway.Message{data: events, acknowledger: {__MODULE__, :ack, 3}}
end

{:noreply, messages, state}
end

def ack(_, _, _) do
Expand Down
46 changes: 45 additions & 1 deletion lib/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,52 @@ defmodule Loadfest.Worker do
end)
end

def stream_batch_sizes() do
StreamData.frequency([
{8, StreamData.constant(1)},
{2, StreamData.integer(1..10)},
{2, StreamData.integer(50..60)},
{1, StreamData.integer(80..100)},
{3, StreamData.integer(100..150)},
{1, StreamData.integer(150..250)}
])
end

def stream_batch() do
StreamData.optional_map(%{
StreamData.fixed_map(%{
context:
StreamData.fixed_map(%{
tags: StreamData.list_of(gen_string(), max_length: 50),
value: StreamData.integer(),
property_a: gen_string(),
property_b: gen_string(),
property_c: gen_string(),
property_d: gen_string(),
property_e: gen_string(),
property_f: gen_string(),
property_g: gen_string(),
generated: StreamData.map_of(StreamData.integer(1..100), gen_string(), min_length: 5, max_length: 10),
nested:
StreamData.fixed_map(%{
property_a: gen_string(),
property_b: gen_string(),
property_c: gen_string(),
property_d: gen_string(),
property_e: gen_string(),
property_f: gen_string(),
property_g: gen_string(),
nested_twice:
StreamData.optional_map(%{
property_a: gen_string(),
property_b: gen_string(),
property_c: gen_string(),
property_d: gen_string(),
property_e: gen_string(),
property_f: gen_string(),
property_g: gen_string()
})
})
}),
custom_user_data:
StreamData.optional_map(%{
address:
Expand Down

0 comments on commit c00d0ba

Please sign in to comment.