Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Commit

Permalink
Refresh token on 401 error in LLM gateway, and quota requests (#1215)
Browse files Browse the repository at this point in the history
  • Loading branch information
calyptobai authored Jan 26, 2024
1 parent c712a3f commit 2e2cd7b
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 35 deletions.
29 changes: 24 additions & 5 deletions server/bleep/src/llm_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use futures::{Stream, StreamExt};
use reqwest_eventsource::EventSource;
use tracing::{debug, error, warn};

use crate::{periodic::sync_github_status_once, Application};

use self::api::FunctionCall;

pub mod api {
Expand Down Expand Up @@ -213,13 +215,15 @@ impl From<&api::Message> for tiktoken_rs::ChatCompletionRequestMessage {
enum ChatError {
BadRequest(String),
TooManyRequests(String),
InvalidToken,
Other(anyhow::Error),
}

#[derive(Clone)]
pub struct Client {
http: reqwest::Client,
pub base_url: String,
app: Application,

pub max_retries: u32,

pub bearer_token: Option<String>,
Expand All @@ -234,10 +238,11 @@ pub struct Client {
}

impl Client {
pub fn new(base_url: &str) -> Self {
pub fn new(app: Application) -> Self {
Self {
app,
http: reqwest::Client::new(),
base_url: base_url.to_owned(),

max_retries: 5,

bearer_token: None,
Expand Down Expand Up @@ -305,7 +310,10 @@ impl Client {
version: semver::Version,
) -> Result<reqwest::Response, reqwest::Error> {
self.http
.get(format!("{}/v1/compatibility", self.base_url))
.get(format!(
"{}/v1/compatibility",
self.app.config.answer_api_url
))
.query(&[("version", version)])
.send()
.await
Expand Down Expand Up @@ -365,6 +373,10 @@ impl Client {
error!("LLM request failed, request not eligible for retry: {body}");
bail!("request failed (not eligible for retry): {body}");
}
Err(ChatError::InvalidToken) => {
warn!("invalid token, retrying LLM request");
sync_github_status_once(&self.app).await;
}
Err(ChatError::Other(e)) => {
// We log the messages in a separate `debug!` statement so that they can be
// filtered out, due to their verbosity.
Expand All @@ -387,7 +399,9 @@ impl Client {
) -> Result<impl Stream<Item = anyhow::Result<String>>, ChatError> {
let mut event_source = Box::pin(
EventSource::new({
let mut builder = self.http.post(format!("{}/v2/q", self.base_url));
let mut builder = self
.http
.post(format!("{}/v2/q", self.app.config.answer_api_url));

if let Some(bearer) = &self.bearer_token {
builder = builder.bearer_auth(bearer);
Expand Down Expand Up @@ -433,6 +447,11 @@ impl Client {
warn!("bad request to LLM: {body}");
return Err(ChatError::BadRequest(body));
}
Some(Err(reqwest_eventsource::Error::InvalidStatusCode(status, _)))
if status == StatusCode::UNAUTHORIZED =>
{
return Err(ChatError::InvalidToken);
}
Some(Err(reqwest_eventsource::Error::InvalidStatusCode(status, response)))
if status == StatusCode::TOO_MANY_REQUESTS =>
{
Expand Down
8 changes: 6 additions & 2 deletions server/bleep/src/periodic/remotes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,16 @@ pub(crate) async fn sync_github_status(app: Application) {
// credentials from CLI/config
loop {
// then retrieve username & other maintenance
update_credentials(&app).await;
update_repo_list(&app).await;
sync_github_status_once(&app).await;
sleep_systime(POLL_PERIOD).await;
}
}

pub async fn sync_github_status_once(app: &Application) {
update_credentials(app).await;
update_repo_list(app).await;
}

pub(crate) async fn update_repo_list(app: &Application) {
if let Some(gh) = app.credentials.github() {
let repos = match gh.current_repo_list().await {
Expand Down
2 changes: 1 addition & 1 deletion server/bleep/src/webserver/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl User {
}

let access_token = self.access_token().map(str::to_owned);
Ok(llm_gateway::Client::new(&app.config.answer_api_url).bearer(access_token))
Ok(llm_gateway::Client::new(app.clone()).bearer(access_token))
}

pub(crate) async fn paid_features(&self, app: &Application) -> bool {
Expand Down
66 changes: 39 additions & 27 deletions server/bleep/src/webserver/quota.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use axum::{Extension, Json};
use chrono::{DateTime, Utc};
use reqwest::StatusCode;
use serde::Deserialize;
use tracing::error;

use crate::Application;
use crate::{periodic::sync_github_status_once, Application};

use super::{middleware::User, Error};

Expand Down Expand Up @@ -53,37 +54,48 @@ async fn get_request<T: for<'a> Deserialize<'a>>(
Extension(user): Extension<User>,
endpoint: &str,
) -> super::Result<Json<T>> {
const MAX_RETRIES: usize = 5;

let Some(api_token) = user.access_token() else {
return Err(Error::unauthorized("answer API token was not present"));
};

let response = reqwest::Client::new()
.get(format!("{}{}", app.config.answer_api_url, endpoint))
.bearer_auth(api_token)
.send()
.await
.map_err(Error::internal)?;
for _ in 0..MAX_RETRIES {
let response = reqwest::Client::new()
.get(format!("{}{}", app.config.answer_api_url, endpoint))
.bearer_auth(api_token)
.send()
.await
.map_err(Error::internal)?;

if response.status().is_success() {
let body = response.text().await.map_err(Error::internal)?;
match serde_json::from_str::<T>(&body) {
Ok(t) => Ok(Json(t)),
Err(_) => Err(Error::internal(format!(
"quota call return invalid JSON: {body}"
))),
}
} else {
let status = response.status();
match response.text().await {
Ok(body) if !body.is_empty() => Err(Error::internal(format!(
"request failed with status code {status}: {body}",
))),
Ok(_) => Err(Error::internal(format!(
"request failed with status code {status}, response had no body",
))),
Err(_) => Err(Error::internal(format!(
"request failed with status code {status}, failed to retrieve response body",
))),
if response.status().is_success() {
let body = response.text().await.map_err(Error::internal)?;
return match serde_json::from_str::<T>(&body) {
Ok(t) => Ok(Json(t)),
Err(_) => Err(Error::internal(format!(
"quota call return invalid JSON: {body}"
))),
};
} else if response.status() == StatusCode::UNAUTHORIZED {
sync_github_status_once(&app).await;
continue;
} else {
let status = response.status();
return match response.text().await {
Ok(body) if !body.is_empty() => Err(Error::internal(format!(
"request failed with status code {status}: {body}",
))),
Ok(_) => Err(Error::internal(format!(
"request failed with status code {status}, response had no body",
))),
Err(_) => Err(Error::internal(format!(
"request failed with status code {status}, failed to retrieve response body",
))),
};
}
}

Err(Error::internal(
"failed to make quota request, potentially failed authorization?",
))
}

0 comments on commit 2e2cd7b

Please sign in to comment.