Skip to content

Latest commit

 

History

History
87 lines (65 loc) · 3.04 KB

README.md

File metadata and controls

87 lines (65 loc) · 3.04 KB

Spawn Federated Example

Certain use cases require more complex and careful modeling of Actors. Here we exemplify the splitting and data aggregation scenario using Actors.

Targets:

Data Processing

Federated Machine Learning

Complex integration scenarios

Run

First up the Elixir application:

make run

NOTE: This example uses the MySQL database as persistent storage for its actors. And it is also expected that you have previously created a database called eigr-functions-db in the MySQL instance.

Second execute call for sending Task to Coordinator Actor:

iex(federated_01@127.0.0.1)1>list = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
iex(federated_01@127.0.0.1)4> task = %Federated.Domain.TaskRequest{
...(federated_01@127.0.0.1)4>      id: Uniq.UUID.uuid4(),
...(federated_01@127.0.0.1)4>      workers: 2,
...(federated_01@127.0.0.1)4>      task_strategy: :SUM,
...(federated_01@127.0.0.1)4>      aggregation_strategy: :SUM_TASKS,
...(federated_01@127.0.0.1)4>      data: %Federated.Domain.Data{numbers: list}
...(federated_01@127.0.0.1)4>    }

%Federated.Domain.TaskRequest{
  id: "15391a75-716d-4e2f-a9fa-3c9900d710b2",
  data: %Federated.Domain.Data{
    numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
     20],
    __unknown_fields__: []
  },
  workers: 2,
  __unknown_fields__: []
}
iex(federated_01@127.0.0.1)6> {:ok, task_id} = SpawnFederatedExample.Client.push(task)
{:ok, "86255146-8561-404a-b958-423d7c159ed2"}

In the logs you will see some messages like these:

2022-10-14 19:39:41.667 [federated_01@127.0.0.1]:[pid=<0.939.0> ]:[debug]:TaskCoordinator Received Aggregate Results to Join. Response: [%Federated.Domain.FederatedTaskResult{id: "331131c9-14f2-4a73-9d56-bc24c42f28f2", worker_id: "worker-0", data: %Federated.Domain.Result{data: 55, __unknown_fields__: []}, status: :DONE, __unknown_fields__: []}]

2022-10-14 19:39:41.669 [federated_01@127.0.0.1]:[pid=<0.939.0> ]:[debug]:TaskCoordinator Received Aggregate Results to Join. Response: [%Federated.Domain.FederatedTaskResult{id: "50bb96bb-886e-4656-b896-202dbaa2b279", worker_id: "worker-1", data: %Federated.Domain.Result{data: 155, __unknown_fields__: []}, status: :DONE, __unknown_fields__: []}]

This means that the Coordinator generated the subtasks, generated two workers worker-0 and worker-1 sent the subtasks to them and received the results back for aggregation.

Checking the results

Use fetch operation for get aggregated task result:

iex(federated_01@127.0.0.1)7>SpawnFederatedExample.Client.fetch(task_id)
{:ok,
 %Federated.Domain.Coordinator.Summary{
   task_id: "59cb2271-d88d-4a21-a240-d4a0b76f9020",
   sub_tasks: 2,
   response: %Federated.Domain.TaskResponse{
     id: "59cb2271-d88d-4a21-a240-d4a0b76f9020",
     result: %Federated.Domain.Result{data: 210, __unknown_fields__: []},
     __unknown_fields__: []
   },
   status: :DONE,
   __unknown_fields__: []
 }}
iex(federated_01@127.0.0.1)8>