Skip to content

Commit

Permalink
v1.15.0 is released
Browse files Browse the repository at this point in the history
  • Loading branch information
dialogflowchatbot committed Aug 12, 2024
1 parent 4022e00 commit 72e24e0
Show file tree
Hide file tree
Showing 10 changed files with 519 additions and 441 deletions.
3 changes: 3 additions & 0 deletions src/flow/rt/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub(crate) struct Response {
pub(crate) next_action: NextActionType,
#[serde(rename = "extraData")]
pub(crate) extra_data: ExtraData,
#[serde(rename = "sseReceiverTicket")]
pub(crate) sse_receiver_ticket: String,
}

impl Response {
Expand All @@ -71,6 +73,7 @@ impl Response {
extra_data: ExtraData {
external_link: String::new(),
},
sse_receiver_ticket: String::new(),
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions src/flow/rt/facade.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use std::collections::HashMap;
use std::sync::{LazyLock, Mutex};

use axum::response::IntoResponse;
use axum::Json;
use tokio::sync::mpsc::Sender;

use super::dto::Request;
use super::executor;
use crate::result::Result;
use crate::web::server::to_res;

static ANSWER_SSE_SESSIONS: LazyLock<Mutex<HashMap<String, Sender<String>>>> =
LazyLock::new(|| Mutex::new(HashMap::with_capacity(128)));

pub(crate) async fn answer(Json(mut req): Json<Request>) -> impl IntoResponse {
let now = std::time::Instant::now();
let r = executor::process(&mut req).await;
Expand All @@ -13,3 +21,21 @@ pub(crate) async fn answer(Json(mut req): Json<Request>) -> impl IntoResponse {
log::info!("Response used time:{:?}", now.elapsed());
res
}

pub(crate) async fn answer_sse(Json(req): Json<Request>) -> impl IntoResponse {
let now = std::time::Instant::now();
let (s, r) = tokio::sync::mpsc::channel::<String>(1);
let mut l = ANSWER_SSE_SESSIONS.lock().unwrap();
l.insert(String::from(req.session_id), s);
log::info!("Response used time:{:?}", now.elapsed());
""
}

pub(super) fn get_sender(session_id: &str) -> Result<Option<Sender<String>>> {
let l = ANSWER_SSE_SESSIONS.lock()?;
if l.contains_key(session_id) {
let s = l.get(session_id).unwrap();
return Ok(Some(s.clone()));
}
return Ok(None);
}
60 changes: 56 additions & 4 deletions src/flow/rt/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rkyv::{Archive, Deserialize, Serialize};
use super::condition::ConditionData;
use super::context::Context;
use super::dto::{AnswerData, AnswerType, CollectData, Request, Response};
use crate::ai::chat::ResultReceiver;
use crate::external::http::client as http;
use crate::flow::rt::collector;
use crate::flow::subflow::dto::NextActionType;
Expand Down Expand Up @@ -360,7 +361,7 @@ impl RuntimeNode for SendEmailNode {

#[derive(Archive, Deserialize, Serialize)]
#[archive(compare(PartialEq), check_bytes)]
enum LlmChatNodeExitCondition {
pub(crate) enum LlmChatNodeExitCondition {
Intent(String),
SpecialInputs(String),
MaxChatTimes(u32),
Expand All @@ -372,14 +373,65 @@ pub(crate) struct LlmChatNode {
pub(super) prompt: String,
pub(super) context_len: u8,
pub(super) exit_condition: LlmChatNodeExitCondition,
pub(super) streaming: bool,
pub(super) next_node_id: String,
}

impl RuntimeNode for LlmChatNode {
fn exec(&self, req: &Request, ctx: &mut Context, _response: &mut Response) -> bool {
fn exec(&self, req: &Request, ctx: &mut Context, response: &mut Response) -> bool {
// println!("Into LlmChatNode");
// crate::ai::chat::chat(robot_id, prompt, sender)
false
match &self.exit_condition {
LlmChatNodeExitCondition::Intent(i) => {
if req.user_input_intent.is_some() && req.user_input_intent.as_ref().unwrap().eq(i)
{
add_next_node(ctx, &self.next_node_id);
return false;
}
}
LlmChatNodeExitCondition::SpecialInputs(s) => {
if req.user_input.eq(s) {
add_next_node(ctx, &self.next_node_id);
return false;
}
}
LlmChatNodeExitCondition::MaxChatTimes(t) => todo!(),
}
if self.streaming {
let r = super::facade::get_sender(&req.session_id);
if r.is_err() {
return false;
}
let s_op = r.unwrap();
if s_op.is_none() {
return false;
}
let s = s_op.unwrap();
let ticket = String::new();
let robot_id = req.robot_id.clone();
let prompt = self.prompt.clone();
tokio::task::spawn(async move {
if let Err(e) =
crate::ai::chat::chat(&robot_id, &prompt, ResultReceiver::SseSender(&s)).await
{
log::info!("LlmChatNode response failed, err: {:?}", &e);
}
});
false
} else {
let mut s = String::with_capacity(1024);
if let Err(e) = tokio::runtime::Handle::current().block_on(async {
crate::ai::chat::chat(&req.robot_id, &self.prompt, ResultReceiver::StrBuf(&mut s))
.await
}) {
log::info!("LlmChatNode response failed, err: {:?}", &e);
} else {
response.answers.push(AnswerData {
text: s,
answer_type: AnswerType::TextPlain,
});
}
true
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use dialogflow::web::server::start_app;

fn main() {
// dialogflow::web::t1();
env::set_var("RUST_LOG", "TRACE");
env::set_var("RUST_LOG", "INFO");
let mut builder = LoggerBuilder::from_default_env();
builder
.target(Target::Stdout)
Expand Down
4 changes: 2 additions & 2 deletions src/man/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl Default for Settings {
model: String::new(),
connect_timeout_millis: 5000,
read_timeout_millis: 10000,
max_response_token_length: 10,
max_response_token_length: 1000,
proxy_url: String::new(),
},
text_generation_provider: TextGenerationProvider {
Expand All @@ -192,7 +192,7 @@ impl Default for Settings {
model: String::new(),
connect_timeout_millis: 5000,
read_timeout_millis: 10000,
max_response_token_length: 10,
max_response_token_length: 1000,
proxy_url: String::new(),
},
sentence_embedding_provider: SentenceEmbeddingProvider {
Expand Down
Loading

0 comments on commit 72e24e0

Please sign in to comment.