forked from fwaris/FsOpenAI
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMonitoring.fs
160 lines (138 loc) · 4.76 KB
/
Monitoring.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
namespace FsOpenAI.GenAI
open System
open Azure
open FSharp.Control
open Azure.AI.OpenAI
open System.Threading.Channels
open FsOpenAI.Shared
open FSharp.CosmosDb
open Microsoft.Azure.Cosmos
type ChatLogMsg = {
Role : string
Content : string
}
type ChatLog = {
SystemMessge: string
Messages : ChatLogMsg list
Temperature : float
MaxTokens : int
}
type PromptLog =
| Embedding of string
| Chat of ChatLog
type MFeedback = {
ThumbsUpDn : int
Comment : string option
}
type FeedbackEntry = {
LogId : string
UserId : string
Feedback : MFeedback
}
type MIndexRef = {
Backend: string
Name: string
}
type DiagEntry = {
[<Id>]
id : string
AppId : string
[<PartitionKey>]
UserId : string
Prompt : PromptLog
Feedback : MFeedback option
Question : string
Response : string
InputTokens : int
OutputTokens : int
Error : string
Backend : string
Resource : string
Model : string
IndexRefs : MIndexRef list
Timestamp : DateTime
}
type LogEntry = Diag of DiagEntry | Feedback of FeedbackEntry
[<AutoOpen>]
module Monitoring =
let BUFFER_SIZE = 1000
let BUFFER_WAIT = 10000
let mutable private _cnctnInfo = lazy None
let init (ccstr,database,container) =
match Connection.tryCreate<DiagEntry>(ccstr,database,container) with
| Some x -> _cnctnInfo <- lazy(Some x)
| None -> ()
let getConnectionFromConfig() =
try
Env.appConfig.Value
|> Option.bind(fun x -> Env.logInfo $"{x.DatabaseName},{x.DiagTableName}"; x.DiagTableName |> Option.map(fun t -> x.DatabaseName,t))
|> Option.bind(fun (database,container) ->
Settings.getSettings().Value.LOG_CONN_STR
|> Option.map(fun cstr -> Env.logInfo $"{Utils.shorten 30 cstr}";cstr,database,container))
with ex ->
Env.logException (ex,"Monitoring.getConnectionFromConfig")
None
let ensureConnection() =
match _cnctnInfo.Value with
| Some _ -> ()
| None ->
match getConnectionFromConfig() with
| Some (cstr,db,cntnr) -> init(cstr,db,cntnr)
| None -> ()
let private writeDiagAsync (diagEntries:DiagEntry[]) =
async {
match _cnctnInfo.Value with
| Some c ->
try
do!
Cosmos.fromConnectionString c.ConnectionString
|> Cosmos.database c.DatabaseName
|> Cosmos.container c.ContainerName
|> Cosmos.upsertMany (Array.toList diagEntries)
|> Cosmos.execAsync
|> AsyncSeq.iter (fun _ -> ())
with ex ->
Env.logException (ex,"writeLog")
| None -> ()
}
let private updateDiagEntry (fb:MFeedback) (de:DiagEntry) =
{de with Feedback = Some fb}
let private updateWithFeedbackAsync (fbEntries:FeedbackEntry[]) =
async {
match _cnctnInfo.Value with
| Some c ->
try
let db =
Cosmos.fromConnectionString c.ConnectionString
|> Cosmos.database c.DatabaseName
|> Cosmos.container c.ContainerName
do!
fbEntries
|> AsyncSeq.ofSeq
|> AsyncSeq.iterAsync(fun fb ->
db
|> Cosmos.update fb.LogId fb.UserId (updateDiagEntry fb.Feedback)
|> Cosmos.execAsync
|> AsyncSeq.iter (fun _ -> ())
)
with ex ->
Env.logException (ex,"writeLog")
| None -> ()
}
let private channel = Channel.CreateBounded<LogEntry>(BoundedChannelOptions(BUFFER_SIZE,FullMode = BoundedChannelFullMode.DropOldest))
//background loop to read channel and write diagnostics entry to backend
let private consumerLoop =
asyncSeq {
while true do
let! data = channel.Reader.ReadAsync().AsTask() |> Async.AwaitTask
yield data
}
|> AsyncSeq.bufferByCountAndTime 10 BUFFER_WAIT
|> AsyncSeq.iterAsync (fun entries -> async {
let diagEntries = entries |> Array.choose (function Diag de -> Some de | _ -> None)
let feedbackEntries = entries |> Array.choose (function Feedback fb -> Some fb | _ -> None)
do! writeDiagAsync diagEntries
do! updateWithFeedbackAsync feedbackEntries
})
|> Async.Start
let write logEntry = channel.Writer.WriteAsync logEntry |> ignore