diff --git a/kit/src/behaviors/allocate/mod.rs b/kit/src/behaviors/allocate/mod.rs index 0d305210..8f40eac9 100644 --- a/kit/src/behaviors/allocate/mod.rs +++ b/kit/src/behaviors/allocate/mod.rs @@ -1,74 +1,145 @@ +use self::pool::AllocateOrDeallocate; use super::*; -pub trait AllocateType +pub trait AllocateType where + P: PoolType, E: Send + 'static, { - // TODO: This should probably be how we do it, but this generic `P` gets - // annoying fn change_allocation_amount(&mut self, event: E) -> - // Option; - fn change_allocation_amount(&mut self, event: E) -> Option>; + fn change_allocation_amount( + &mut self, + event: E, + ) -> Option<(AllocateOrDeallocate, P::AllocationData)>; } #[derive(Clone, Debug, Serialize, Deserialize)] -struct Allocate +struct Allocate where - A: AllocateType, - E: Send + 'static, S: State, + A: AllocateType, + P: PoolType, + E: Send + 'static, { pub data: S::Data, pub allocate_type: A, - _phantom_a: PhantomData, _phantom_e: PhantomData, + _phantom_p: PhantomData

, } +// TODO: This is actually the exact same as the `swap::Config` so... maybe they +// can be combined. #[derive(Debug, Serialize, Deserialize, State)] pub struct Config { - pub allocation_data: P::AllocationData, + pub token_admin: String, + pub _phantom: PhantomData

, } -#[derive(State)] -pub struct Processing -where - P: PoolType, - E: Send + 'static, -{ - pub pool: Pool

, - pub client: Arc, - pub messager: Messager, - _phantom: PhantomData, -} - -#[allow(unused_variables)] +// TODO: This start up is also very much the same as the `Swap` start up. More +// than likely, `Update`, `Swap`, and `Allocate` can all be combined into one +// type of behavior like a `Interact` behavior that just specializes to do +// different things. #[async_trait::async_trait] -impl Behavior for Allocate> +impl Behavior for Allocate, A, P, E> where - A: AllocateType + Send, - P: PoolType + Send, + A: AllocateType + Send, + P: PoolType + Send + Sync, E: Send + 'static, { - type Processor = Allocate>; + type Processor = Allocate, A, P, E>; async fn startup( mut self, client: Arc, - messager: Messager, + mut messager: Messager, ) -> Result { - todo!(); + // TODO: Here we probably need to filter on the `PoolCreation` so that we get + // the correct pool. + let completed_todo = GetPoolTodo::

::complete(&mut messager).await; + let (deployment_data, pool_creation) = ( + completed_todo.deployment_data.unwrap(), + completed_todo.pool_creation.unwrap(), + ); + + let (strategy_contract, solver_contract) = + P::get_contracts(&deployment_data, client.clone()); + let dfmm = DFMM::new(deployment_data.dfmm, client.clone()); + + // TODO: This sort of approval and token loop is also repeated in other places + // like `Swap` and `Allocate`. + // Get the intended tokens for the pool and do approvals. + let mut tokens: Vec> = Vec::new(); + for token_address in pool_creation.tokens.into_iter() { + let token = ArbiterToken::new(token_address, client.clone()); + let name = token.name().call().await?; + messager + .send( + To::Agent(self.data.token_admin.clone()), + TokenAdminQuery::MintRequest(MintRequest { + token: name, + mint_to: client.address(), + mint_amount: parse_ether(100)?, + }), + ) + .await + .unwrap(); + assert_eq!( + messager.get_next::().await.unwrap().data, + Response::Success + ); + token + .approve(dfmm.address(), MAX) + .send() + .await + .unwrap() + .await + .unwrap(); + + tokens.push(token); + } + + // build pool for processor and stream + let pool = Pool::

{ + id: pool_creation.id, + dfmm, + instance: P::create_instance(strategy_contract, solver_contract, pool_creation.params), + tokens, + liquidity_token: ERC20::new(pool_creation.liquidity_token, client.clone()), + }; + + let processor = Self::Processor { + data: PoolProcessing { + messager, + client, + pool, + }, + allocate_type: self.allocate_type, + _phantom_e: PhantomData, + _phantom_p: PhantomData, + }; + + Ok(processor) } } #[async_trait::async_trait] -impl Processor for Allocate> +impl Processor for Allocate, A, P, E> where - A: AllocateType + Send, - P: PoolType + Send, + A: AllocateType + Send, + P: PoolType + Send + Sync, E: Send + 'static, { - async fn get_stream(&mut self) -> Result>> { - todo!("We have not implemented the 'get_stream' method yet for the 'Allocate' behavior."); + default async fn get_stream(&mut self) -> Result>> { + Ok(None) } - async fn process(&mut self, _event: E) -> Result { - Ok(ControlFlow::Halt) + default async fn process(&mut self, event: E) -> Result { + if let Some((allocate_or_deallocate, allocation_data)) = + self.allocate_type.change_allocation_amount(event) + { + self.data + .pool + .change_allocation(allocate_or_deallocate, allocation_data) + .await?; + } + + Ok(ControlFlow::Continue) } } diff --git a/kit/src/behaviors/creator.rs b/kit/src/behaviors/create.rs similarity index 88% rename from kit/src/behaviors/creator.rs rename to kit/src/behaviors/create.rs index 61f92b71..711e5a13 100644 --- a/kit/src/behaviors/creator.rs +++ b/kit/src/behaviors/create.rs @@ -1,5 +1,4 @@ use super::*; -use crate::behaviors::token::Response; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Create { @@ -28,14 +27,14 @@ where ) -> Result { // Receive the `DeploymentData` from the `Deployer` agent and use it to get the // contracts. - debug!("Starting the creator"); let deployment_data = messager.get_next::().await?.data; - debug!("Creator: Received deployment data {:?}", deployment_data); let (strategy_contract, solver_contract) = P::get_contracts(&deployment_data, client.clone()); let dfmm = DFMM::new(deployment_data.dfmm, client.clone()); + // TODO: This sort of approval and token loop is also repeated in other places + // like `Swap` and `Allocate`. // Get the intended tokens for the pool and do approvals. let mut tokens = Vec::new(); for tkn in self.data.token_list.drain(..) { @@ -75,8 +74,12 @@ where tokens.push(token); } + // TODO: There is a bit of a misleading thing happening here. The pool + // controller does indeed get set correctly, however, there are some other + // structs that may show incorrect data. This should be consolidated and + // fixed!!! debug!( - "Setting Controller Address to self address: {:?}", + "Setting controller Address to self address: {:?}", client.address() ); if self.data.base_config.controller == eAddress::zero() { @@ -84,7 +87,6 @@ where } let params = P::set_controller(self.data.params.clone(), client.address()); - debug!("creating pool..."); let pool = Pool::

::new( self.data.base_config.clone(), params.clone(), diff --git a/kit/src/behaviors/mod.rs b/kit/src/behaviors/mod.rs index d763c041..ab197f4f 100644 --- a/kit/src/behaviors/mod.rs +++ b/kit/src/behaviors/mod.rs @@ -6,12 +6,12 @@ use arbiter_engine::{ }; #[allow(unused)] use arbiter_macros::{Behaviors, State}; -use bindings::{arbiter_token::ArbiterToken, dfmm::DFMM}; +use bindings::{arbiter_token::ArbiterToken, dfmm::DFMM, erc20::ERC20}; use ethers::utils::parse_ether; -pub use token::{MintRequest, TokenAdminQuery}; +pub use token::{MintRequest, Response, TokenAdminQuery}; use self::{ - creator::Create, + create::Create, deploy::{Deploy, DeploymentData}, pool::{PoolCreation, PoolType}, token::TokenAdmin, @@ -21,7 +21,7 @@ use super::*; pub const MAX: eU256 = eU256::MAX; pub mod allocate; -pub mod creator; +pub mod create; pub mod deploy; pub mod swap; pub mod token; @@ -29,16 +29,12 @@ pub mod update; #[derive(Debug, Deserialize, Serialize)] pub enum Behaviors { - Create(Create>), + Create(Create>), Deployer(Deploy), TokenAdmin(TokenAdmin), Swap(swap::Config

), } -pub trait Configurable Deserialize<'a>> { - fn configure(data: T) -> Self; -} - #[derive(Debug, Deserialize, Serialize)] #[serde(bound = "P: PoolType")] pub enum MessageTypes

@@ -55,6 +51,9 @@ where Update(P::Parameters), } +// TODO: This is used by `Allocate` and `Swap` at the moment, so it was moved +// here since it was more central. However, there is likely a better way to +// combine all of these things. #[derive(Debug)] struct GetPoolTodo { deployment_data: Option, diff --git a/kit/src/behaviors/swap/mod.rs b/kit/src/behaviors/swap/mod.rs index 52ccc74d..73e63c2f 100644 --- a/kit/src/behaviors/swap/mod.rs +++ b/kit/src/behaviors/swap/mod.rs @@ -1,7 +1,7 @@ -use self::{bindings::erc20::ERC20, pool::InputToken}; use super::*; -use crate::behaviors::token::Response; +use crate::pool::InputToken; +// TODO: This could depend on the `PoolType` as the `AllocateType` does. pub trait SwapType { fn compute_swap_amount(&self, event: E) -> Option<(eU256, InputToken)>; } @@ -14,7 +14,7 @@ where { pub data: S::Data, pub swap_type: T, - pub _phantom: PhantomData, + _phantom: PhantomData, } // Should also get some data necessary for mint amounts and what not. @@ -27,20 +27,10 @@ where pub _phantom: PhantomData

, } -#[derive(Debug, Clone, State)] -pub struct Processing

-where - P: PoolType, - // T: SwapType, - // E: Send + 'static, -{ - pub messager: Messager, - pub client: Arc, - pub pool: Pool

, - // pub swap_type: T, - // _phantom: PhantomData, -} - +// TODO: This start up is also very much the same as the `Allocate` start up. +// More than likely, `Update`, `Swap`, and `Allocate` can all be combined into +// one type of behavior like a `Interact` behavior that just specializes to do +// different things. #[async_trait::async_trait] impl Behavior for Swap, T, E> where @@ -48,7 +38,7 @@ where T: SwapType + Send + Clone, E: Send + 'static, { - type Processor = Swap, T, E>; + type Processor = Swap, T, E>; async fn startup( mut self, client: Arc, @@ -66,6 +56,8 @@ where P::get_contracts(&deployment_data, client.clone()); let dfmm = DFMM::new(deployment_data.dfmm, client.clone()); + // TODO: This sort of approval and token loop is also repeated in other places + // like `Allocate` and `Create`. // Get the intended tokens for the pool and do approvals. let mut tokens: Vec> = Vec::new(); for token_address in pool_creation.tokens.into_iter() { @@ -107,7 +99,7 @@ where }; let processor = Self::Processor { - data: Processing { + data: PoolProcessing { messager, client, pool, @@ -123,7 +115,7 @@ where /// This is the default implementation for any processor that takes in some /// event E and will work for the `Swap` struct. #[async_trait::async_trait] -impl Processor for Swap, T, E> +impl Processor for Swap, T, E> where P: PoolType + Send + Sync, T: SwapType + Send + Clone, @@ -154,7 +146,7 @@ where /// `SwapType` as long as it streams `Message`s. If you need to stream something /// else, just copy this specialization and use whatever stream item you'd like! #[async_trait::async_trait] -impl Processor for Swap, T, Message> +impl Processor for Swap, T, Message> where P: PoolType + Send + Sync, T: SwapType + Send + Clone, diff --git a/kit/src/behaviors/update/mod.rs b/kit/src/behaviors/update/mod.rs index e1744f57..e1d7153d 100644 --- a/kit/src/behaviors/update/mod.rs +++ b/kit/src/behaviors/update/mod.rs @@ -1,38 +1,27 @@ use std::collections::VecDeque; -use tracing::warn; - use super::*; -use crate::bindings::erc20::ERC20; #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Update { +pub struct Update { pub token_admin: String, - pub data: S::Data, -} - -#[derive(Clone, Debug, Serialize, Deserialize, State)] -pub struct Config { - pub base_config: BaseConfig, - pub allocation_data: P::AllocationData, - pub token_list: Vec, pub params: VecDeque, + pub data: S::Data, } +// TODO: One could add in an `UpdateType` as we have with `Allocate` and `Swap`. +// This `Update` only acts with messages for now. -#[derive(Debug, Clone, State)] -pub struct Processing { - pub messager: Messager, - pub client: Arc, - pub pool: Pool

, - pub pool_params: VecDeque, -} +// TODO: This could be set up in another way if more state dependent data is +// needed! Right now it is just a place holder. +#[derive(Debug, Serialize, Deserialize, State)] +pub struct Config; #[async_trait::async_trait] -impl

Behavior for Update> +impl

Behavior for Update where P: PoolType + Send + Sync, { - type Processor = Update>; + type Processor = Update, P>; async fn startup( mut self, client: Arc, @@ -61,12 +50,12 @@ where }; let processor = Self::Processor { - token_admin: self.token_admin.clone(), - data: Processing { + token_admin: self.token_admin, + params: self.params, + data: PoolProcessing { messager, client, pool, - pool_params: self.data.params.clone(), }, }; Ok(processor) @@ -74,7 +63,7 @@ where } #[async_trait::async_trait] -impl

Processor for Update> +impl

Processor for Update, P> where P: PoolType + Send + Sync, { @@ -84,7 +73,7 @@ where async fn process(&mut self, event: Message) -> Result { match serde_json::from_str(&event.data) { Ok(UpdateRequest::ApplyUpdate) => { - let params = self.data.pool_params.pop_front().unwrap(); + let params = self.params.pop_front().unwrap(); self.data.pool.update(params.clone()).await?; let _ = self .data @@ -94,7 +83,11 @@ where info!("Successfully updated!"); } Err(e) => { - warn!("Failed to parse message: {}", e); + debug!( + "Received message in `Update::process` that agent {:#?} cannot process: {}", + self.data.messager.id.clone(), + e + ); } } Ok(ControlFlow::Continue) diff --git a/kit/src/lib.rs b/kit/src/lib.rs index 8bed88d3..a15d4aa3 100644 --- a/kit/src/lib.rs +++ b/kit/src/lib.rs @@ -10,8 +10,10 @@ use std::fmt::Debug; use anyhow::Result; use arbiter_core::middleware::ArbiterMiddleware; +use arbiter_engine::{machine::State, messager::Messager}; +use arbiter_macros::State; pub use behaviors::token::TokenData; use ethers::types::{Address as eAddress, I256 as eI256, U256 as eU256}; -pub use pool::{BaseConfig, Pool, PoolType}; +pub use pool::{BaseConfig, Pool, PoolProcessing, PoolType}; use serde::{Deserialize, Serialize}; use tracing::{debug, info}; diff --git a/kit/src/pool/mod.rs b/kit/src/pool/mod.rs index 3c9815da..be08ae9d 100644 --- a/kit/src/pool/mod.rs +++ b/kit/src/pool/mod.rs @@ -214,7 +214,7 @@ impl Pool

{ /// /// Returns `Ok(())` if the allocation or deallocation is successful, /// otherwise returns an error. - pub async fn allocate_or_deallocate( + pub async fn change_allocation( &self, action: AllocateOrDeallocate, allocation_data: P::AllocationData, @@ -277,57 +277,12 @@ pub struct PoolCreation { pub allocation_data: P::AllocationData, } -// impl<'de, P: PoolType> Deserialize<'de> for PoolCreation

{ -// fn deserialize(deserializer: D) -> Result -// where -// D: serde::de::Deserializer<'de>, -// { -// let mut map = deserializer.deserialize_map(None)?; -// let mut id = None; -// let mut tokens = None; -// let mut liquidity_token = None; -// let mut params = None; -// let mut allocation_data = None; - -// while let Some(key) = map.next_key()? { -// match key { -// "id" => { -// id = Some(map.next_value()?); -// } -// "tokens" => { -// tokens = Some(map.next_value()?); -// } -// "liquidity_token" => { -// liquidity_token = Some(map.next_value()?); -// } -// "params" => { -// params = Some(map.next_value()?); -// } -// "allocation_data" => { -// allocation_data = Some(map.next_value()?); -// } -// _ => { -// // Ignore unknown fields -// let _ = map.next_value::(); -// } -// } -// } - -// let id = id.ok_or_else(|| serde::de::Error::missing_field("id"))?; -// let tokens = tokens.ok_or_else(|| -// serde::de::Error::missing_field("tokens"))?; let liquidity_token = -// liquidity_token.ok_or_else(|| -// serde::de::Error::missing_field("liquidity_token"))?; let params = -// params.ok_or_else(|| serde::de::Error::missing_field("params"))?; let -// allocation_data = allocation_data.ok_or_else(|| -// serde::de::Error::missing_field("allocation_data"))?; - -// Ok(PoolCreation { -// id, -// tokens, -// liquidity_token, -// params, -// allocation_data, -// }) -// } -// } +#[derive(Debug, Clone, State)] +pub struct PoolProcessing

+where + P: PoolType, +{ + pub pool: Pool

, + pub client: Arc, + pub messager: Messager, +}