Skip to content

Commit

Permalink
feat: watchdog and keepalive task
Browse files Browse the repository at this point in the history
  • Loading branch information
RockChinQ committed Oct 5, 2023
1 parent 7256874 commit a979392
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 9 deletions.
24 changes: 24 additions & 0 deletions free_one_api/entities/channel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import json
import time

import tiktoken

Expand All @@ -21,6 +23,9 @@ class Channel:
enabled: bool

latency: int

fail_count: int
"""Amount of sequential failures. Only in memory."""

def __init__(self, id: int, name: str, adapter: llm.LLMLibAdapter, model_mapping: dict, enabled: bool, latency: int):
self.id = id
Expand All @@ -29,6 +34,7 @@ def __init__(self, id: int, name: str, adapter: llm.LLMLibAdapter, model_mapping
self.model_mapping = model_mapping
self.enabled = enabled
self.latency = latency
self.fail_count = 0

@classmethod
def dump_channel(cls, chan: 'Channel') -> dict:
Expand Down Expand Up @@ -68,3 +74,21 @@ def count_tokens(
num_tokens += len(encoding.encode(value))
num_tokens += 3 # every reply is primed with <|start|>assistant<|message|>
return num_tokens

async def heartbeat(self, timeout: int=300) -> int:
"""Call adapter test, returns fail count."""

try:
start = time.time()
succ = await asyncio.wait_for(self.adapter.test(), timeout=timeout)
if succ:
latency = int((time.time() - start)*100)/100
self.fail_count = 0
self.latency = latency
return 0
else:
self.fail_count += 1
return self.fail_count
finally:
self.fail_count += 1
return self.fail_count
4 changes: 2 additions & 2 deletions free_one_api/impls/adapter/hugchat.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ def chatbot(self) -> hugchat.ChatBot:
sign = login.Login(self.config['email'], self.config['passwd'])
cookie: requests.sessions.RequestsCookieJar = None
try:
cookie = sign.loadCookiesFromDir("hugchatCookies")
cookie = sign.loadCookiesFromDir("data/hugchatCookies")
except:
cookie = sign.login()
sign.saveCookiesToDir("hugchatCookies")
sign.saveCookiesToDir("data/hugchatCookies")

self._chatbot = hugchat.ChatBot(cookies=cookie.get_dict())
return self._chatbot
Expand Down
31 changes: 29 additions & 2 deletions free_one_api/impls/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import asyncio

import yaml

Expand All @@ -11,6 +12,7 @@
from ..models.channel import mgr as chanmgr
from ..models.key import mgr as keymgr
from ..models.router import group as routergroup
from ..models.watchdog import wd as wdmgr

from .adapter import revChatGPT
from .adapter import claude
Expand All @@ -35,21 +37,29 @@ class Application:
key: keymgr.AbsAPIKeyManager
"""API Key manager."""

watchdog: wdmgr.AbsWatchDog

def __init__(
self,
dbmgr: db.DatabaseInterface,
router: routermgr.RouterManager,
channel: chanmgr.AbsChannelManager,
key: keymgr.AbsAPIKeyManager,
watchdog: wdmgr.AbsWatchDog,
):
self.dbmgr = dbmgr
self.router = router
self.channel = channel
self.key = key
self.watchdog = watchdog

def run(self):
async def run(self):
"""Run application."""
return self.router.serve()
loop = asyncio.get_running_loop()

loop.create_task(self.watchdog.run())

await self.router.serve(loop)

default_config = {
"database": {
Expand All @@ -59,6 +69,7 @@ def run(self):
"watchdog": {
"heartbeat": {
"interval": 1800,
"timeout": 300,
"fail_limit": 3,
},
},
Expand Down Expand Up @@ -137,11 +148,27 @@ async def make_application(config_path: str) -> Application:
config=config['router'],
)

# watchdog and tasks
from .watchdog import wd as watchdog

wdmgr = watchdog.WatchDog()

# tasks
from .watchdog.tasks import heartbeat

hbtask = heartbeat.HeartBeatTask(
channelmgr,
config['watchdog']['heartbeat'],
)

wdmgr.add_task(hbtask)

app = Application(
dbmgr=dbmgr,
router=routermgr,
channel=channelmgr,
key=apikeymgr,
watchdog=wdmgr,
)

return app
Expand Down
4 changes: 2 additions & 2 deletions free_one_api/impls/router/mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ def __init__(self, routes: list[tuple[str, list[str], callable, dict]], config:
for method in methods:
self._app.route(route, methods=[method], **kwargs)(handler)

def serve(self):
async def serve(self, loop):
"""Serve API."""
self._app.run(host="0.0.0.0", port=self.port)
return await self._app.run_task(host="0.0.0.0", port=self.port)


if __name__ == "__main__":
Expand Down
Empty file.
Empty file.
45 changes: 45 additions & 0 deletions free_one_api/impls/watchdog/tasks/heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import asyncio
import traceback
import logging

from ....models.watchdog import task
from ....models.channel import mgr as chanmgr
from ....entities import channel


class HeartBeatTask(task.AbsTask):
"""HeartBeat task."""

channel: chanmgr.AbsChannelManager

cfg: dict

def __init__(self, chan: chanmgr.AbsChannelManager, cfg: dict):
self.channel = chan
self.cfg = cfg

self.delay = 10
self.interval = cfg['interval']

async def trigger(self):
"""Trigger this task."""

process_task = []

for chan in self.channel.channels:
if chan.enabled:
async def process(ch: channel.Channel):
fail_count = await ch.heartbeat(timeout=self.cfg["timeout"])
if fail_count > self.cfg["fail_limit"]:
try:
self.channel.disable_channel(ch.id)
logging.info(f"Disabled channel {ch.id} due to heartbeat failed {fail_count} times")
except Exception:
logging.warn(f"Failed to disable channel {ch.id}, traceback: {traceback.format_exc()}")
await self.channel.update_channel(ch)

process_task.append(process(chan))

logging.info(f"Start heartbeat task, {len(process_task)} channels to process")
await asyncio.gather(*process_task)

23 changes: 23 additions & 0 deletions free_one_api/impls/watchdog/wd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import asyncio

from ...models.watchdog import wd
from ...models.watchdog import task


class WatchDog(wd.AbsWatchDog):
"""WatchDog implementation."""

def __init__(self):
self.tasks = []

async def run(self):
cor = []

for task in self.tasks:
cor.append(task.loop())

await asyncio.gather(*cor)

def add_task(self, task: task.AbsTask):
"""Add a task."""
self.tasks.append(task)
Empty file.
33 changes: 33 additions & 0 deletions free_one_api/models/watchdog/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import abc
import asyncio
import traceback
import logging


class AbsTask(metaclass=abc.ABCMeta):
"""Task of WatchDog.
A task may need instances of other modules to work. These dependencies should be set by outer constructor.
Task's delay and interval may be set by outer constructor also, and be scheduled by WatchDog implementation.
"""

delay: int = 0
"""Delay before first trigger."""

interval: int = 60
"""Interval between two triggers."""

@abc.abstractmethod
async def trigger(self):
"""Trigger this task."""
raise NotImplementedError

async def loop(self):
"""Loop this task."""
await asyncio.sleep(self.delay)
while True:
try:
await self.trigger()
except Exception:
logging.warn(f"Failed to trigger task {self.__class__.__name__}, traceback: {traceback.format_exc()}")
await asyncio.sleep(self.interval)
14 changes: 14 additions & 0 deletions free_one_api/models/watchdog/wd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import abc

from . import task

class AbsWatchDog(metaclass=abc.ABCMeta):
"""Model of WatchDog."""

tasks: list[task.AbsTask]
"""Added tasks."""

@abc.abstractmethod
async def run(self):
"""Run WatchDog system."""
raise NotImplementedError
9 changes: 7 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import os
import sys
import asyncio
import logging

logging.basicConfig(level=logging.INFO)

if not os.path.exists('./data'):
os.mkdir('./data')

from free_one_api.impls import app

def main():
application = asyncio.run(app.make_application("./data/config.yaml"))
loop = asyncio.get_event_loop()

application = loop.run_until_complete(app.make_application("./data/config.yaml"))

application.run()
loop.run_until_complete(application.run())

if __name__ == "__main__":
main()
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ claude-api
bardapi
hugchat
g4f
revTongYi
revTongYi
colorlog

0 comments on commit a979392

Please sign in to comment.