From 70a0cf0bf6221ef95dbb56e1d0d577bc0c7b1a6a Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Fri, 29 Dec 2023 22:13:25 +0800 Subject: [PATCH] feat: randomized batch size and schema size --- lib/pipeline.ex | 10 +++++++--- lib/worker.ex | 46 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/lib/pipeline.ex b/lib/pipeline.ex index 395b068..116eb54 100644 --- a/lib/pipeline.ex +++ b/lib/pipeline.ex @@ -17,9 +17,13 @@ defmodule Loadfest.Pipeline do end def handle_demand(demand, state) when demand > 0 do - events = Loadfest.Worker.make_batch(Enum.random(50..100)) - # events = Loadfest.Worker.make_batch_stream() |> Enum.to_list() |> dbg() - {:noreply, [%Broadway.Message{data: events, acknowledger: {__MODULE__, :ack, 3}}], state} + messages = + for batch_size <- Loadfest.Worker.stream_batch_sizes() |> Enum.take(2), + events = Loadfest.Worker.stream_batch() |> Enum.take(batch_size) do + %Broadway.Message{data: events, acknowledger: {__MODULE__, :ack, 3}} + end + + {:noreply, messages, state} end def ack(_, _, _) do diff --git a/lib/worker.ex b/lib/worker.ex index e2d0e91..d59805b 100644 --- a/lib/worker.ex +++ b/lib/worker.ex @@ -94,8 +94,52 @@ defmodule Loadfest.Worker do end) end + def stream_batch_sizes() do + StreamData.frequency([ + {8, StreamData.constant(1)}, + {2, StreamData.integer(1..10)}, + {2, StreamData.integer(50..60)}, + {1, StreamData.integer(80..100)}, + {3, StreamData.integer(100..150)}, + {1, StreamData.integer(150..250)} + ]) + end + def stream_batch() do - StreamData.optional_map(%{ + StreamData.fixed_map(%{ + context: + StreamData.fixed_map(%{ + tags: StreamData.list_of(gen_string(), max_length: 50), + value: StreamData.integer(), + property_a: gen_string(), + property_b: gen_string(), + property_c: gen_string(), + property_d: gen_string(), + property_e: gen_string(), + property_f: gen_string(), + property_g: gen_string(), + generated: StreamData.map_of(StreamData.integer(1..100), gen_string(), min_length: 5, max_length: 10), + nested: + StreamData.fixed_map(%{ + property_a: gen_string(), + property_b: gen_string(), + property_c: gen_string(), + property_d: gen_string(), + property_e: gen_string(), + property_f: gen_string(), + property_g: gen_string(), + nested_twice: + StreamData.optional_map(%{ + property_a: gen_string(), + property_b: gen_string(), + property_c: gen_string(), + property_d: gen_string(), + property_e: gen_string(), + property_f: gen_string(), + property_g: gen_string() + }) + }) + }), custom_user_data: StreamData.optional_map(%{ address: