diff --git a/src/Equinox.CosmosStore/CosmosStoreLinq.fs b/src/Equinox.CosmosStore/CosmosStoreLinq.fs index 7b8b4bf25..f586ad16c 100644 --- a/src/Equinox.CosmosStore/CosmosStoreLinq.fs +++ b/src/Equinox.CosmosStore/CosmosStoreLinq.fs @@ -97,11 +97,14 @@ module Internal = interval = interval; bytes = totalOds; count = items; ru = totalRu } in log |> Log.event evt log.Information("EqxCosmos {action:l} {count} ({trips}r {totalRtt:f0}ms; {rdc}i {rds:f2}>{ods:f2} MiB) {rc:f2} RU {latency} ms", "Index", items, responses, totalRtt.TotalMilliseconds, totalRdc, miB totalRds, miB totalOds, totalRu, interval.ElapsedMilliseconds) } + /// Runs a query that can by hydrated as 'T + let enum<'T> (log: ILogger) (container: Container) cat (queryDefinition: QueryDefinition): TaskSeq<'T> = + container.GetItemQueryIterator<'T>(queryDefinition) |> toAsyncEnum<'T> log container cat /// Runs a query that renders 'T, Hydrating the results as 'P (can be the same types but e.g. you might want to map an object to a JsonElement etc) - let enum<'T, 'P> (log: ILogger) (container: Container) cat (query: IQueryable<'T>): TaskSeq<'P> = + let enumAs<'T, 'P> (log: ILogger) (container: Container) cat logLevel (query: IQueryable<'T>): TaskSeq<'P> = let queryDefinition = query.ToQueryDefinition() - if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.query {cat} {query}", cat, queryDefinition.QueryText) - container.GetItemQueryIterator<'P>(queryDefinition) |> toAsyncEnum<'P> log container cat + if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.query {cat} {query}", cat, queryDefinition.QueryText) + enum<'P> log container cat queryDefinition module AggregateOp = /// Runs one of the typical Cosmos SDK extensions, e.g. CountAsync, logging the costs let [] exec (log: ILogger) (container: Container) (op: string) (cat: string) (query: IQueryable<'T>) run render: System.Threading.Tasks.Task<'R> = task { @@ -118,21 +121,21 @@ module Internal = op, cat, summary, m.RetrievedDocumentCount, miB m.RetrievedDocumentSize, miB totalOds, totalRu, interval.ElapsedMilliseconds) return res } /// Runs query.CountAsync, with instrumentation equivalent to what query provides - let countAsync (log: ILogger) container cat (query: IQueryable<'T>) ct = - if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.count {cat} {query}", cat, query.ToQueryDefinition().QueryText) + let countAsync (log: ILogger) container cat logLevel (query: IQueryable<'T>) ct = + if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.count {cat} {query}", cat, query.ToQueryDefinition().QueryText) exec log container "count" cat query (_.CountAsync(ct)) id module Scalar = /// Generates a TOP 1 SQL query let top1 (query: IQueryable<'T>) = query.Take(1) /// Handles a query that's expected to yield 0 or 1 result item - let tryHeadAsync<'T, 'R> (log: ILogger) (container: Container) cat (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> = + let tryHeadAsync<'T, 'R> (log: ILogger) (container: Container) cat logLevel (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> = let queryDefinition = (top1 query).ToQueryDefinition() - if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.tryScalar {cat} {query}", queryDefinition.QueryText) + if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.tryScalar {cat} {query}", queryDefinition.QueryText) container.GetItemQueryIterator<'R>(queryDefinition) |> Query.toAsyncEnum log container cat |> TaskSeq.tryHead type Projection<'T, 'M>(query, category, container, enum: IQueryable<'T> -> TaskSeq<'M>, count: IQueryable<'T> -> CancellationToken -> Task) = - static member Create<'P>(q, cat, c, log, hydrate: 'P -> 'M) = - Projection<'T, 'M>(q, cat, c, Query.enum<'T, 'P> log c cat >> TaskSeq.map hydrate, AggregateOp.countAsync log c cat) + static member Create<'P>(q, cat, c, log, hydrate: 'P -> 'M, logLevel) = + Projection<'T, 'M>(q, cat, c, Query.enumAs<'T, 'P> log c cat logLevel >> TaskSeq.map hydrate, AggregateOp.countAsync log c cat logLevel) member _.Enum: TaskSeq<'M> = query |> enum member x.EnumPage(skip, take): TaskSeq<'M> = query |> Query.offsetLimit (skip, take) |> enum member _.CountAsync: CancellationToken -> Task = query |> count @@ -162,6 +165,14 @@ type SnAndSnap<'I>() = Expression.Bind(snapMember, snapExpression.Body.Replace(snapExpression.Parameters[0], param)) |]), [| param |]) +/// Represents a query projecting information values from an Index and/or Snapshots with a view to rendering the items and/or a count +type Query<'T, 'M>(inner: Internal.Projection<'T, 'M>) = + member _.Enum: TaskSeq<'M> = inner.Enum + member _.EnumPage(skip, take): TaskSeq<'M> = inner.EnumPage(skip, take) + member _.CountAsync(ct: CancellationToken): Task = inner.CountAsync ct + member _.Count(): Async = inner.CountAsync |> Async.call + [] member val Inner = inner + /// Helpers for Querying and Projecting results based on relevant aspects of Equinox.CosmosStore's storage schema module Index = @@ -188,8 +199,8 @@ module Index = container.GetItemLinqQueryable>().Where(fun d -> d.p.StartsWith(prefix) && d.u[0].c = caseName) /// Returns the StreamName (from the `p` field) for a 0/1 item query; only the TOP 1 item is returned - let tryGetStreamNameAsync log cat container (query: IQueryable>) ct = - Internal.Scalar.tryHeadAsync log cat container (query.Select(fun x -> x.p)) ct + let tryGetStreamNameAsync log cat logLevel container (query: IQueryable>) ct = + Internal.Scalar.tryHeadAsync log cat logLevel container (query.Select(fun x -> x.p)) ct /// Query the items, returning the Stream name and the Snapshot as a JsonElement (Decompressed if applicable) let projectStreamNameAndSnapshot<'I> snapExpression: Expression, SnAndSnap<'I>>> = @@ -197,18 +208,14 @@ module Index = let pExpression item = Expression.PropertyOrField(item, nameof Unchecked.defaultof>.p) SnAndSnap.CreateItemQueryLambda(pExpression, snapExpression) -/// Represents a query projecting information values from an Index and/or Snapshots with a view to rendering the items and/or a count -type Query<'T, 'M>(inner: Internal.Projection<'T, 'M>) = - member _.Enum: TaskSeq<'M> = inner.Enum - member _.EnumPage(skip, take): TaskSeq<'M> = inner.EnumPage(skip, take) - member _.CountAsync(ct: CancellationToken): Task = inner.CountAsync ct - member _.Count(): Async = inner.CountAsync |> Async.call - [] member val Inner = inner + let createSnAndSnapshotQuery<'I, 'M> log container cat logLevel (hydrate: SnAndSnap -> 'M) (query: IQueryable>) = + Internal.Projection.Create(query, cat, container, log, hydrate, logLevel) |> Query, 'M> /// Enables querying based on uncompressed Indexed values stored as secondary unfolds alongside the snapshot [] -type IndexContext<'I>(container, categoryName, caseName, log) = +type IndexContext<'I>(container, categoryName, caseName, log, []?queryLogLevel) = + let queryLogLevel = defaultArg queryLogLevel Serilog.Events.LogEventLevel.Debug member val Log = log member val Description = $"{categoryName}/{caseName}" with get, set member val Container = container @@ -228,8 +235,9 @@ type IndexContext<'I>(container, categoryName, caseName, log) = Index.byCategoryNameOnly<'I> container categoryName /// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria - member x.TryGetStreamNameWhereAsync(criteria: Expressions.Expression, bool>>, ct) = - Index.tryGetStreamNameAsync x.Log container categoryName (x.ByCategory().Where criteria) ct + member x.TryGetStreamNameWhereAsync(criteria: Expressions.Expression, bool>>, ct, [] ?logLevel) = + let logLevel = defaultArg logLevel queryLogLevel + Index.tryGetStreamNameAsync x.Log container categoryName logLevel (x.ByCategory().Where criteria) ct /// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria member x.TryGetStreamNameWhere(criteria: Expressions.Expression, bool>>): Async = @@ -237,5 +245,8 @@ type IndexContext<'I>(container, categoryName, caseName, log) = /// Query the items, grabbing the Stream name and the Snapshot; The StreamName and the (Decompressed if applicable) Snapshot are passed to `hydrate` member x.QueryStreamNameAndSnapshot(query: IQueryable>, selectBody: Expression, 'I>>, - hydrate: SnAndSnap -> 'M) = - Internal.Projection.Create(query.Select(Index.projectStreamNameAndSnapshot<'I> selectBody), categoryName, container, x.Log, hydrate) |> Query + hydrate: SnAndSnap -> 'M, + [] ?logLevel): Query, 'M> = + let logLevel = defaultArg logLevel queryLogLevel + query.Select(Index.projectStreamNameAndSnapshot<'I> selectBody) + |> Index.createSnAndSnapshotQuery x.Log container categoryName logLevel hydrate