diff --git a/CHANGELOG.md b/CHANGELOG.md index 848a2125d..0cc5e22b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,8 @@ The `Unreleased` section name is replaced by the expected version of next releas - `eqx stats`: `-I` flag; relabel Documents as Items, retaining existing `-D` flag [#464](https://github.com/jet/equinox/pull/464) - `eqx`: `-Q` flag omits timestamps from console output logging [#459](https://github.com/jet/equinox/pull/459) - `Equinox.CosmosStore.Linq`: Add LINQ querying support for Indexed `u`nfolds (`AccessStrategy.Custom`+`CosmosStoreCategory.shouldCompress`) [#450](https://github.com/jet/equinox/pull/450) +- `eqx dump`, `eqx query`: `-sl` Support for specifying streams to dump via a [CosmosDB `LIKE` expression](https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/query/keywords#like) [#450](https://github.com/jet/equinox/pull/450) +- `eqx dump`: `-Q` strips intervals, regularizes snapshots, logs stream names [#450](https://github.com/jet/equinox/pull/450) - `eqx top`: Support for analyzing space usage for event and view containers by category and/or stream [#450](https://github.com/jet/equinox/pull/450) - `eqx destroy`: Support for deleting the items(documents) underlying a category/stream/arbitrary `WHERE` clause [#450](https://github.com/jet/equinox/pull/450) diff --git a/src/Equinox.CosmosStore/CosmosStoreLinq.fs b/src/Equinox.CosmosStore/CosmosStoreLinq.fs index 69100c8f7..f292411e0 100644 --- a/src/Equinox.CosmosStore/CosmosStoreLinq.fs +++ b/src/Equinox.CosmosStore/CosmosStoreLinq.fs @@ -100,11 +100,16 @@ module Internal = action, items, responses, totalRtt.TotalMilliseconds, totalRdc, miB totalRds, miB totalOds, totalRu, interval.ElapsedMilliseconds) } /// Runs a query that can be hydrated as 'T let enum log container cat = enum_ log container "Index" cat Events.LogEventLevel.Information + let exec__<'R> (log: ILogger) (container: Container) cat logLevel (queryDefinition: QueryDefinition): TaskSeq<'R> = + if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.run {cat} {query}", cat, queryDefinition.QueryText) + container.GetItemQueryIterator<'R> queryDefinition |> enum_ log container "Query" cat logLevel /// Runs a query that renders 'T, Hydrating the results as 'P (can be the same types but e.g. you might want to map an object to a JsonElement etc) let enumAs<'T, 'P> (log: ILogger) (container: Container) cat logLevel (query: IQueryable<'T>): TaskSeq<'P> = let queryDefinition = query.ToQueryDefinition() - if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.query {cat} {query}", cat, queryDefinition.QueryText) - container.GetItemQueryIterator<'P> queryDefinition |> enum log container cat + exec__<'P> log container cat logLevel queryDefinition + /// Execute a query, hydrating as 'R + let exec<'R> (log: ILogger) (container: Container) logLevel (queryDefinition: QueryDefinition): TaskSeq<'R> = + exec__<'R> log container "%" logLevel queryDefinition module AggregateOp = /// Runs one of the typical Cosmos SDK extensions, e.g. CountAsync, logging the costs let [] exec (log: ILogger) (container: Container) (op: string) (cat: string) (query: IQueryable<'T>) run render: System.Threading.Tasks.Task<'R> = task { diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index e1ac09871..4e80c3c23 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -126,6 +126,7 @@ and [] StatsParameters = | Dynamo _ -> "Dynamo Connection parameters." and [] QueryParameters = | [] StreamName of string + | [] StreamLike of string | [] CategoryName of string | [] CategoryLike of string | [] UnfoldName of string @@ -138,6 +139,7 @@ and [] QueryParameters = interface IArgParserTemplate with member a.Usage = a |> function | StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`." + | StreamLike _ -> "Specify stream name to match against `p`, e.g. `%-f7c1ce63389a45bdbea1cccebb1b3c8a`." | CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`." | CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`)." | UnfoldName _ -> "Specify unfold Name to match against `u.c`, e.g. `Snapshotted`" @@ -154,11 +156,12 @@ and [] QueryParameters = | Cosmos _ -> "Parameters for CosmosDB." and [] Mode = Default | SnapOnly | SnapWithStream | ReadOnly | ReadWithStream | Raw and [] Criteria = - | SingleStream of string | CatName of string | CatLike of string | Custom of sql: string | Unfiltered + | SingleStream of string | StreamLike of string | CatName of string | CatLike of string | Custom of sql: string | Unfiltered member x.Sql = x |> function | Criteria.SingleStream sn -> $"c.p = \"{sn}\"" + | Criteria.StreamLike pat -> $"c.p LIKE \"{pat}\"" | Criteria.CatName n -> $"c.p LIKE \"{n}-%%\"" - | Criteria.CatLike pat -> $"c.p LIKE \"{pat}\"" + | Criteria.CatLike pat -> $"c.p LIKE \"{pat}-%%\"" | Criteria.Custom filter -> filter | Criteria.Unfiltered -> "1=1" and QueryArguments(p: ParseResults) = @@ -166,14 +169,14 @@ and QueryArguments(p: ParseResults) = member val Pretty = p.Contains QueryParameters.Pretty member val TeeConsole = p.Contains QueryParameters.Console member val Criteria = - match p.TryGetResult QueryParameters.StreamName, p.TryGetResult QueryParameters.CategoryName, p.TryGetResult QueryParameters.CategoryLike with - | Some sn, None, None -> Criteria.SingleStream sn - | Some _, Some _, _ - | Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName are mutually exclusive" - | None, Some cn, None -> Criteria.CatName cn - | None, None, Some cl -> Criteria.CatLike cl - | None, None, None -> Criteria.Unfiltered - | None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive" + match p.TryGetResult QueryParameters.StreamName, p.TryGetResult QueryParameters.StreamLike, + p.TryGetResult QueryParameters.CategoryName, p.TryGetResult QueryParameters.CategoryLike with + | Some sn, None, None, None -> Criteria.SingleStream sn + | None, Some sl, None, None -> Criteria.StreamLike sl + | None, None, Some cn, None -> Criteria.CatName cn + | None, None, None, Some cl -> Criteria.CatLike cl + | None, None, None, None -> Criteria.Unfiltered + | _ -> p.Raise "StreamName/StreamLike and CategoryLike/CategoryName are mutually exclusive" member val Filepath = p.TryGetResult QueryParameters.File member val UnfoldName = p.TryGetResult QueryParameters.UnfoldName member val UnfoldCriteria = p.TryGetResult QueryParameters.UnfoldCriteria @@ -229,6 +232,7 @@ and TopArguments(p: ParseResults) = container.GetItemQueryIterator(qd, requestOptions = qo) and [] DestroyParameters = | [] StreamName of string + | [] StreamLike of string | [] CategoryName of string | [] CategoryLike of string | [] CustomFilter of sql: string @@ -238,6 +242,7 @@ and [] DestroyParameters = interface IArgParserTemplate with member a.Usage = a |> function | StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`." + | StreamLike _ -> "Specify stream name to match against `p`, e.g. `%-f7c1ce63389a45bdbea1cccebb1b3c8a`." | CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`." | CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`)." | CustomFilter _ -> "Specify a custom filter, referencing the document as `c.` (e.g. `'c.p LIKE \"test-%\" AND c._ts < 1717138092'`)" @@ -246,13 +251,14 @@ and [] DestroyParameters = | Cosmos _ -> "Parameters for CosmosDB." and DestroyArguments(p: ParseResults) = member val Criteria = - match p.TryGetResult StreamName, p.TryGetResult CategoryName, p.TryGetResult CategoryLike, p.TryGetResult CustomFilter with - | None, None, None, None -> p.Raise "Category, stream name, or custom SQL must be supplied" - | Some sn, None, None, None -> Criteria.SingleStream sn - | None, Some cn, None, None -> Criteria.CatName cn - | None, None, Some cl, None -> Criteria.CatLike cl - | None, None, None, Some filter -> Criteria.Custom filter - | _ -> p.Raise "StreamName/CategoryLike/CategoryName/CustomFilter are mutually exclusive" + match p.TryGetResult StreamName, p.TryGetResult DestroyParameters.StreamLike, p.TryGetResult CategoryName, p.TryGetResult CategoryLike, p.TryGetResult CustomFilter with + | Some sn, None, None, None, None -> Criteria.SingleStream sn + | None, Some sl, None, None, None -> Criteria.StreamLike sl + | None, None, Some cn, None, None -> Criteria.CatName cn + | None, None, None, Some cl, None -> Criteria.CatLike cl + | None, None, None, None, Some filter -> Criteria.Custom filter + | None, None, None, None, None -> p.Raise "Category or stream name/pattern, or custom SQL must be supplied" + | _ -> p.Raise "StreamName/SteamLike/CategoryLike/CategoryName/CustomFilter are mutually exclusive" member val CosmosArgs = p.GetResult DestroyParameters.Cosmos |> Store.Cosmos.Arguments member val DryRun = p.Contains Force |> not member val Dop = p.GetResult(Parallelism, 32) @@ -263,12 +269,15 @@ and DestroyArguments(p: ParseResults) = and SnEventsUnfolds = { p: string; id: string; es: int; us: int } and [] DumpParameters = | [] Stream of FsCodec.StreamName + | [] StreamLike of string | [] Correlation | [] Blobs | [] JsonSkip | [] Pretty | [] FlattenUnfolds | [] TimeRegular + | [] Intervals + | [] Names | [] UnfoldsOnly | [] EventsOnly | [] Cosmos of ParseResults @@ -281,12 +290,15 @@ and [] DumpParameters = interface IArgParserTemplate with member a.Usage = a |> function | Stream _ -> "Specify stream(s) to dump." + | StreamLike _ -> "(CosmosDB only) Specify stream name pattern to dump: LIKE expression with `%` and `_` tokens etc." | Correlation -> "Include Correlation/Causation identifiers" | Blobs -> "Don't assume Data/Metadata is UTF-8 text" | JsonSkip -> "Don't assume Data/Metadata is JSON" | Pretty -> "Pretty print the JSON over multiple lines" - | FlattenUnfolds -> "Don't pretty print the JSON over multiple lines for Unfolds" + | FlattenUnfolds -> "Don't pretty print the JSON over multiple lines for Unfolds. Quiet mode: Pretty print" | TimeRegular -> "Don't humanize time intervals between events" + | Intervals -> "Omit intervals between events. Quiet mode: Include intervals" + | Names -> "Emit StreamName prior to events/unfolds instead of adding log context. Quiet mode: exclude stream names" | UnfoldsOnly -> "Exclude Events. Default: show both Events and Unfolds" | EventsOnly -> "Exclude Unfolds/Snapshots. Default: show both Events and Unfolds." | Es _ -> "Parameters for EventStore." @@ -322,6 +334,19 @@ and DumpArguments(p: ParseResults) = let storeLog = createStoreLog false storeLog, Store.MessageDb.config log None p | x -> p.Raise $"unexpected subcommand %A{x}" + member val CosmosArgs = p.GetResult DumpParameters.Cosmos |> Store.Cosmos.Arguments + member x.Connect() = + match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with + | Store.Config.Cosmos (cc, _, _) -> cc.Container + | _ -> p.Raise "Dump StreamLike option requires Cosmos" + member x.Streams(infoLogLevel) = + let streams = p.GetResults DumpParameters.Stream + match p.TryGetResult DumpParameters.StreamLike with + | None -> streams + | Some pattern -> + let container = x.Connect() + let q = Microsoft.Azure.Cosmos.QueryDefinition($"SELECT DISTINCT VALUE c.p from c where c.p LIKE \"{pattern}\"") + Equinox.CosmosStore.Linq.Internal.Query.exec Log.Logger container infoLogLevel q |> FSharp.Control.TaskSeq.toList let writeToStatsSinks (c : LoggerConfiguration) = c.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink()) .WriteTo.Sink(Equinox.DynamoStore.Core.Log.InternalMetrics.Stats.LogSink()) @@ -484,7 +509,7 @@ module CosmosQuery = let sql = composeSql a Log.Information("Querying {mode}: {q}", a.Mode, sql) Microsoft.Azure.Cosmos.QueryDefinition sql - let run quiet (a: QueryArguments) = task { + let run ill (a: QueryArguments) = task { let sw = System.Diagnostics.Stopwatch.StartNew() let serdes = if a.Pretty then prettySerdes.Value else FsCodec.SystemTextJson.Serdes.Default let maybeFileStream = a.Filepath |> Option.map (fun p -> @@ -502,9 +527,8 @@ module CosmosQuery = let inline arrayLen x = if isNull x then 0 else Array.length x pageStreams.Clear(); for x in items do if x.p <> null && pageStreams.Add x.p then accStreams.Add x.p |> ignore let pageI, pageE, pageU = items.Length, items |> Seq.sumBy (_.e >> arrayLen), items |> Seq.sumBy (_.u >> arrayLen) - let ll = if quiet then LogEventLevel.Debug else LogEventLevel.Information - Log.Write(ll, "Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\.hh\:mm\:ss}", - rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, DateTime.UtcNow - newestTs) + Log.Write(ill, "Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\.hh\:mm\:ss}", + rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, DateTime.UtcNow - newestTs) maybeFileStream |> Option.iter (fun stream -> for x in items do serdes.SerializeToStream(x, stream) @@ -575,7 +599,7 @@ module CosmosTop = bytes = utf8Size x; eBytes = eb; uBytes = ub; cBytes = int64 (ec + uc); iBytes = ei + ui } let [] OrderByTs = " ORDER BY c._ts" let private sql (a: TopArguments) = $"SELECT * FROM c WHERE {a.Criteria.Sql}{if a.TsOrder then OrderByTs else null}" - let run quiet (a: TopArguments) = task { + let run ill (a: TopArguments) = task { let sw = System.Diagnostics.Stopwatch.StartNew() let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet() let mutable accI, accE, accU, accRus, accRds, accOds, accBytes, accParse = 0L, 0L, 0L, 0., 0L, 0L, 0L, TimeSpan.Zero @@ -592,9 +616,8 @@ module CosmosTop = s.Add(if s.TryGetValue(x, &v) then s.Remove x |> ignore; v.Merge x else x) |> ignore pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes - let ll = if quiet then LogEventLevel.Debug else LogEventLevel.Information - Log.Write(ll, "Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}<{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}", - rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, DateTime.UtcNow - newestTs) + Log.Write(ill, "Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}<{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}", + rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, DateTime.UtcNow - newestTs) pageStreams.Clear() accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods; accBytes <- accBytes + pageB @@ -764,12 +787,14 @@ module Dump = let private prettifyJson (json: string) = use parsed = System.Text.Json.JsonDocument.Parse json prettySerdes.Value.Serialize parsed - let run (log : ILogger, verboseConsole, maybeSeq) (p : ParseResults) = async { + let run ill (log : ILogger, verboseConsole, maybeSeq) (p : ParseResults) = async { let a = DumpArguments p let createStoreLog storeVerbose = createStoreLog storeVerbose verboseConsole maybeSeq let storeLog, storeConfig = a.ConfigureStore(log, createStoreLog) let doU, doE = not (p.Contains EventsOnly), not (p.Contains UnfoldsOnly) - let doC, doJ, doS, doT = p.Contains Correlation, not (p.Contains JsonSkip), not (p.Contains Blobs), not (p.Contains TimeRegular) + let quietMode = ill <> LogEventLevel.Debug + let doN = p.Contains Names = quietMode + let doI, doC, doJ, doS, doT = p.Contains Intervals <> quietMode, p.Contains Correlation, not (p.Contains JsonSkip), not (p.Contains Blobs), not (p.Contains TimeRegular) let store = Services.Store(storeConfig) let initial = List.empty @@ -778,7 +803,7 @@ module Dump = let idCodec = FsCodec.Codec.Create((fun _ -> failwith "No encoding required"), tryDecode, (fun _ _ -> failwith "No mapCausation")) let isOriginAndSnapshot = (fun (event : FsCodec.ITimelineEvent<_>) -> not doE && event.IsUnfold), fun _state -> failwith "no snapshot required" let formatUnfolds, formatEvents = - if p.Contains FlattenUnfolds then id else prettifyJson + if p.Contains FlattenUnfolds = quietMode then id else prettifyJson , if p.Contains Pretty then prettifyJson else id let mutable payloadBytes = 0 let render format (data : ReadOnlyMemory) = @@ -796,6 +821,8 @@ module Dump = | x when x.TotalMinutes >= 1. -> x.ToString "m\mss\.ff\s" | x -> x.ToString("s\.fff\s") let dumpEvents (streamName: FsCodec.StreamName) = async { + let log = if doN then Log.Information("Dumping {sn}", streamName); log + else log.ForContext("sn", streamName) let struct (categoryName, sid) = FsCodec.StreamName.split streamName let cat = store.Category(categoryName, idCodec, fold, initial, isOriginAndSnapshot) let decider = Equinox.Decider.forStream storeLog cat sid @@ -808,20 +835,25 @@ module Dump = | Some p when not x.IsUnfold -> let ts = x.Timestamp - p in if doT then humanize ts else ts.ToString() | _ -> if doT then "n/a" else "0" prevTs <- Some x.Timestamp - if not doC then log.Information("{i,4}@{t:u}+{d,9} {u:l} {e:l} {data:l} {meta:l}", - x.Index, x.Timestamp, interval, ty, x.EventType, render x.Data, render x.Meta) - else log.Information("{i,4}@{t:u}+{d,9} Corr {corr} Cause {cause} {u:l} {e:l} {data:l} {meta:l}", - x.Index, x.Timestamp, interval, x.CorrelationId, x.CausationId, ty, x.EventType, render x.Data, render x.Meta) - match streamBytes with ValueNone -> () | ValueSome x -> log.Information("ISyncContext.StreamEventBytes {kib:n1}KiB", float x / 1024.) } + if doC then + log.Information("{i,4}@{t:u}+{d,9} Corr {corr} Cause {cause} {u:l} {e:l} {data:l} {meta:l}", + x.Index, x.Timestamp, interval, x.CorrelationId, x.CausationId, ty, x.EventType, render x.Data, render x.Meta) + elif doI then + log.Information("{i,4}@{t:u}+{d,9:u} {u:l} {e:l} {data:l} {meta:l}", + x.Index, x.Timestamp, interval, ty, x.EventType, render x.Data, render x.Meta) + else + log.Information("{i,4}@{t:u} {u:l} {e:l} {data:l} {meta:l}", + x.Index, x.Timestamp, ty, x.EventType, render x.Data, render x.Meta) + match streamBytes with ValueNone -> () | ValueSome x -> log.Write(ill, "ISyncContext.StreamEventBytes {kib:n1}KiB", float x / 1024.) } resetStats () - let streams = p.GetResults DumpParameters.Stream - log.ForContext("streams",streams).Information("Reading...") + let streams = a.Streams(ill) + log.ForContext("streams",streams).Write(ill, "Reading...") do! streams |> Seq.map dumpEvents - |> Async.Parallel + |> Async.Sequential |> Async.Ignore - log.Information("Total Event Bodies Payload {kib:n1}KiB", float payloadBytes / 1024.) + log.Write(ill, "Total Event Bodies Payload {kib:n1}KiB", float payloadBytes / 1024.) if verboseConsole then dumpStats log storeConfig } @@ -830,13 +862,14 @@ type Arguments(p: ParseResults) = let quiet, verbose, verboseConsole = p.Contains Quiet, p.Contains Verbose, p.Contains VerboseConsole member _.CreateDomainLog() = createDomainLog quiet verbose verboseConsole maybeSeq member _.ExecuteSubCommand() = async { + let ill = if quiet then LogEventLevel.Debug else LogEventLevel.Information match p.GetSubCommand() with | Init a -> do! CosmosInit.containerAndOrDb Log.Logger a CancellationToken.None |> Async.AwaitTaskCorrect | InitAws a -> do! DynamoInit.table Log.Logger a | InitSql a -> do! SqlInit.databaseOrSchema Log.Logger a - | Dump a -> do! Dump.run (Log.Logger, verboseConsole, maybeSeq) a - | Query a -> do! CosmosQuery.run quiet (QueryArguments a) |> Async.AwaitTaskCorrect - | Top a -> do! CosmosTop.run quiet (TopArguments a) |> Async.AwaitTaskCorrect + | Dump a -> do! Dump.run ill (Log.Logger, verboseConsole, maybeSeq) a + | Query a -> do! CosmosQuery.run ill (QueryArguments a) |> Async.AwaitTaskCorrect + | Top a -> do! CosmosTop.run ill (TopArguments a) |> Async.AwaitTaskCorrect | Destroy a -> do! CosmosDestroy.run (DestroyArguments a) |> Async.AwaitTaskCorrect | Stats a -> do! CosmosStats.run (Log.Logger, verboseConsole, maybeSeq) a | LoadTest a -> let n = p.GetResult(LogFile, fun () -> p.ProgramName + ".log")