Skip to content

Commit

Permalink
Merge branch 'main' into js-flatbuf
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Sep 10, 2024
2 parents 5d1d90b + a0b70a6 commit 90e76d7
Show file tree
Hide file tree
Showing 58 changed files with 2,382 additions and 1,204 deletions.
24 changes: 11 additions & 13 deletions csharp/ExcelAddIn/DeephavenExcelFunctions.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.CodeAnalysis;
using Deephaven.ExcelAddIn.ExcelDna;
using Deephaven.ExcelAddIn.Factories;
using Deephaven.ExcelAddIn.Models;
using Deephaven.ExcelAddIn.Operations;
using Deephaven.ExcelAddIn.Providers;
using Deephaven.ExcelAddIn.Viewmodels;
using Deephaven.ExcelAddIn.Views;
using ExcelDna.Integration;

namespace Deephaven.ExcelAddIn;
Expand All @@ -21,38 +17,38 @@ public static void ShowConnectionsDialog() {

[ExcelFunction(Description = "Snapshots a table", IsThreadSafe = true)]
public static object DEEPHAVEN_SNAPSHOT(string tableDescriptor, object filter, object wantHeaders) {
if (!TryInterpretCommonArgs(tableDescriptor, filter, wantHeaders, out var td, out var filt, out var wh, out var errorText)) {
if (!TryInterpretCommonArgs(tableDescriptor, filter, wantHeaders, out var tq, out var wh, out var errorText)) {
return errorText;
}

// These two are used by ExcelDNA to share results for identical invocations. The functionName is arbitary but unique.
const string functionName = "Deephaven.ExcelAddIn.DeephavenExcelFunctions.DEEPHAVEN_SNAPSHOT";
var parms = new[] { tableDescriptor, filter, wantHeaders };
ExcelObservableSource eos = () => new SnapshotOperation(td!, filt, wh, StateManager);
ExcelObservableSource eos = () => new SnapshotOperation(tq, wh, StateManager);
return ExcelAsyncUtil.Observe(functionName, parms, eos);
}

[ExcelFunction(Description = "Subscribes to a table", IsThreadSafe = true)]
public static object DEEPHAVEN_SUBSCRIBE(string tableDescriptor, object filter, object wantHeaders) {
if (!TryInterpretCommonArgs(tableDescriptor, filter, wantHeaders, out var td, out var filt, out var wh, out string errorText)) {
if (!TryInterpretCommonArgs(tableDescriptor, filter, wantHeaders, out var tq, out var wh, out string errorText)) {
return errorText;
}
// These two are used by ExcelDNA to share results for identical invocations. The functionName is arbitary but unique.
const string functionName = "Deephaven.ExcelAddIn.DeephavenExcelFunctions.DEEPHAVEN_SUBSCRIBE";
var parms = new[] { tableDescriptor, filter, wantHeaders };
ExcelObservableSource eos = () => new SubscribeOperation(td, filt, wh, StateManager);
ExcelObservableSource eos = () => new SubscribeOperation(tq, wh, StateManager);
return ExcelAsyncUtil.Observe(functionName, parms, eos);
}

private static bool TryInterpretCommonArgs(string tableDescriptor, object filter, object wantHeaders,
[NotNullWhen(true)]out TableTriple? tableDescriptorResult, out string filterResult, out bool wantHeadersResult, out string errorText) {
filterResult = "";
[NotNullWhen(true)]out TableQuad? tableQuadResult, out bool wantHeadersResult, out string errorText) {
tableQuadResult = null;
wantHeadersResult = false;
if (!TableTriple.TryParse(tableDescriptor, out tableDescriptorResult, out errorText)) {
if (!TableTriple.TryParse(tableDescriptor, out var tt, out errorText)) {
return false;
}

if (!ExcelDnaHelpers.TryInterpretAs(filter, "", out filterResult)) {
if (!ExcelDnaHelpers.TryInterpretAs(filter, "", out var condition)) {
errorText = "Can't interpret FILTER argument";
return false;
}
Expand All @@ -62,6 +58,8 @@ private static bool TryInterpretCommonArgs(string tableDescriptor, object filter
errorText = "Can't interpret WANT_HEADERS argument";
return false;
}

tableQuadResult = new TableQuad(tt.EndpointId, tt.PersistentQueryId, tt.TableName, condition);
return true;
}
}
3 changes: 3 additions & 0 deletions csharp/ExcelAddIn/ExcelAddIn.csproj.user
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@
<Compile Update="views\CredentialsDialog.cs">
<SubType>Form</SubType>
</Compile>
<Compile Update="views\DeephavenMessageBox.cs">
<SubType>Form</SubType>
</Compile>
</ItemGroup>
</Project>
199 changes: 147 additions & 52 deletions csharp/ExcelAddIn/StateManager.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Deephaven.DeephavenClient.ExcelAddIn.Util;
using System.Diagnostics;
using System.Net;
using Deephaven.DeephavenClient;
using Deephaven.ExcelAddIn.Models;
using Deephaven.ExcelAddIn.Providers;
Expand All @@ -8,76 +9,170 @@ namespace Deephaven.ExcelAddIn;

public class StateManager {
public readonly WorkerThread WorkerThread = WorkerThread.Create();
private readonly SessionProviders _sessionProviders;
private readonly Dictionary<EndpointId, CredentialsProvider> _credentialsProviders = new();
private readonly Dictionary<EndpointId, SessionProvider> _sessionProviders = new();
private readonly Dictionary<PersistentQueryKey, PersistentQueryProvider> _persistentQueryProviders = new();
private readonly Dictionary<TableQuad, ITableProvider> _tableProviders = new();
private readonly ObserverContainer<AddOrRemove<EndpointId>> _credentialsPopulationObservers = new();
private readonly ObserverContainer<EndpointId?> _defaultEndpointSelectionObservers = new();

public StateManager() {
_sessionProviders = new SessionProviders(WorkerThread);
private EndpointId? _defaultEndpointId = null;

public IDisposable SubscribeToCredentialsPopulation(IObserver<AddOrRemove<EndpointId>> observer) {
WorkerThread.EnqueueOrRun(() => {
_credentialsPopulationObservers.Add(observer, out _);
// Give this observer the current set of endpoint ids.
var keys = _credentialsProviders.Keys.ToArray();
foreach (var endpointId in keys) {
observer.OnNext(AddOrRemove<EndpointId>.OfAdd(endpointId));
}
});

return WorkerThread.EnqueueOrRunWhenDisposed(
() => _credentialsPopulationObservers.Remove(observer, out _));
}

public IDisposable SubscribeToDefaultEndpointSelection(IObserver<EndpointId?> observer) {
WorkerThread.EnqueueOrRun(() => {
_defaultEndpointSelectionObservers.Add(observer, out _);
observer.OnNext(_defaultEndpointId);
});

return WorkerThread.EnqueueOrRunWhenDisposed(
() => _defaultEndpointSelectionObservers.Remove(observer, out _));
}

public IDisposable SubscribeToSessions(IObserver<AddOrRemove<EndpointId>> observer) {
return _sessionProviders.Subscribe(observer);
/// <summary>
/// The major difference between the credentials providers and the other providers
/// is that the credential providers don't remove themselves from the map
/// upon the last dispose of the subscriber. That is, they hang around until we
/// manually remove them.
/// </summary>
public IDisposable SubscribeToCredentials(EndpointId endpointId,
IObserver<StatusOr<CredentialsBase>> observer) {
IDisposable? disposer = null;
LookupOrCreateCredentialsProvider(endpointId,
cp => disposer = cp.Subscribe(observer));

return WorkerThread.EnqueueOrRunWhenDisposed(() =>
Utility.Exchange(ref disposer, null)?.Dispose());
}

public IDisposable SubscribeToSession(EndpointId endpointId, IObserver<StatusOr<SessionBase>> observer) {
return _sessionProviders.SubscribeToSession(endpointId, observer);
public void SetCredentials(CredentialsBase credentials) {
LookupOrCreateCredentialsProvider(credentials.Id,
cp => cp.SetCredentials(credentials));
}

public IDisposable SubscribeToCredentials(EndpointId endpointId, IObserver<StatusOr<CredentialsBase>> observer) {
return _sessionProviders.SubscribeToCredentials(endpointId, observer);
public void Reconnect(EndpointId id) {
// Quick-and-dirty trick for reconnect is to re-send the credentials to the observers.
LookupOrCreateCredentialsProvider(id, cp => cp.Resend());
}

public IDisposable SubscribeToDefaultSession(IObserver<StatusOr<SessionBase>> observer) {
return _sessionProviders.SubscribeToDefaultSession(observer);
public void TryDeleteCredentials(EndpointId id, Action onSuccess, Action<string> onFailure) {
if (WorkerThread.EnqueueOrNop(() => TryDeleteCredentials(id, onSuccess, onFailure))) {
return;
}

if (!_credentialsProviders.TryGetValue(id, out var cp)) {
onFailure($"{id} unknown");
return;
}

if (cp.ObserverCountUnsafe != 0) {
onFailure($"{id} is still active");
return;
}

if (id.Equals(_defaultEndpointId)) {
SetDefaultEndpointId(null);
}

_credentialsProviders.Remove(id);
_credentialsPopulationObservers.OnNext(AddOrRemove<EndpointId>.OfRemove(id));
onSuccess();
}

public IDisposable SubscribeToDefaultCredentials(IObserver<StatusOr<CredentialsBase>> observer) {
return _sessionProviders.SubscribeToDefaultCredentials(observer);
private void LookupOrCreateCredentialsProvider(EndpointId endpointId,
Action<CredentialsProvider> action) {
if (WorkerThread.EnqueueOrNop(() => LookupOrCreateCredentialsProvider(endpointId, action))) {
return;
}
if (!_credentialsProviders.TryGetValue(endpointId, out var cp)) {
cp = new CredentialsProvider(this);
_credentialsProviders.Add(endpointId, cp);
cp.Init();
_credentialsPopulationObservers.OnNext(AddOrRemove<EndpointId>.OfAdd(endpointId));
}

action(cp);
}

public IDisposable SubscribeToTableTriple(TableTriple descriptor, string filter,
IObserver<StatusOr<TableHandle>> observer) {
// There is a chain with multiple elements:
//
// 1. Make a TableHandleProvider
// 2. Make a ClientProvider
// 3. Subscribe the ClientProvider to either the session provider named by the endpoint id
// or to the default session provider
// 4. Subscribe the TableHandleProvider to the ClientProvider
// 4. Subscribe our observer to the TableHandleProvider
// 5. Return a dispose action that disposes all the needfuls.

var thp = new TableHandleProvider(WorkerThread, descriptor, filter);
var cp = new ClientProvider(WorkerThread, descriptor);

var disposer1 = descriptor.EndpointId == null ?
SubscribeToDefaultSession(cp) :
SubscribeToSession(descriptor.EndpointId, cp);
var disposer2 = cp.Subscribe(thp);
var disposer3 = thp.Subscribe(observer);

// The disposer for this needs to dispose both "inner" disposers.
return ActionAsDisposable.Create(() => {
// TODO(kosak): probably don't need to be on the worker thread here
WorkerThread.Invoke(() => {
var temp1 = Utility.Exchange(ref disposer1, null);
var temp2 = Utility.Exchange(ref disposer2, null);
var temp3 = Utility.Exchange(ref disposer3, null);
temp3?.Dispose();
temp2?.Dispose();
temp1?.Dispose();
});
public IDisposable SubscribeToSession(EndpointId endpointId,
IObserver<StatusOr<SessionBase>> observer) {
IDisposable? disposer = null;
WorkerThread.EnqueueOrRun(() => {
if (!_sessionProviders.TryGetValue(endpointId, out var sp)) {
sp = new SessionProvider(this, endpointId, () => _sessionProviders.Remove(endpointId));
_sessionProviders.Add(endpointId, sp);
sp.Init();
}
disposer = sp.Subscribe(observer);
});

return WorkerThread.EnqueueOrRunWhenDisposed(() =>
Utility.Exchange(ref disposer, null)?.Dispose());
}

public void SetCredentials(CredentialsBase credentials) {
_sessionProviders.SetCredentials(credentials);
public IDisposable SubscribeToPersistentQuery(EndpointId endpointId, PersistentQueryId? pqId,
IObserver<StatusOr<Client>> observer) {

IDisposable? disposer = null;
WorkerThread.EnqueueOrRun(() => {
var key = new PersistentQueryKey(endpointId, pqId);
if (!_persistentQueryProviders.TryGetValue(key, out var pqp)) {
pqp = new PersistentQueryProvider(this, endpointId, pqId,
() => _persistentQueryProviders.Remove(key));
_persistentQueryProviders.Add(key, pqp);
pqp.Init();
}
disposer = pqp.Subscribe(observer);
});

return WorkerThread.EnqueueOrRunWhenDisposed(
() => Utility.Exchange(ref disposer, null)?.Dispose());
}

public void SetDefaultCredentials(CredentialsBase credentials) {
_sessionProviders.SetDefaultCredentials(credentials);
public IDisposable SubscribeToTable(TableQuad key, IObserver<StatusOr<TableHandle>> observer) {
IDisposable? disposer = null;
WorkerThread.EnqueueOrRun(() => {
if (!_tableProviders.TryGetValue(key, out var tp)) {
Action onDispose = () => _tableProviders.Remove(key);
if (key.EndpointId == null) {
tp = new DefaultEndpointTableProvider(this, key.PersistentQueryId, key.TableName, key.Condition,
onDispose);
} else if (key.Condition.Length != 0) {
tp = new FilteredTableProvider(this, key.EndpointId, key.PersistentQueryId, key.TableName,
key.Condition, onDispose);
} else {
tp = new TableProvider(this, key.EndpointId, key.PersistentQueryId, key.TableName, onDispose);
}
_tableProviders.Add(key, tp);
tp.Init();
}
disposer = tp.Subscribe(observer);
});

return WorkerThread.EnqueueOrRunWhenDisposed(
() => Utility.Exchange(ref disposer, null)?.Dispose());
}

public void SetDefaultEndpointId(EndpointId? defaultEndpointId) {
if (WorkerThread.EnqueueOrNop(() => SetDefaultEndpointId(defaultEndpointId))) {
return;
}

public void Reconnect(EndpointId id) {
_sessionProviders.Reconnect(id);
_defaultEndpointId = defaultEndpointId;
_defaultEndpointSelectionObservers.OnNext(_defaultEndpointId);
}
}
Loading

0 comments on commit 90e76d7

Please sign in to comment.