Skip to content

Commit

Permalink
feat(csharp/ExcelAddIn): ExcelAddIn demo v7: Better GUI, better
Browse files Browse the repository at this point in the history
threading, better sharing
  • Loading branch information
kosak committed Sep 7, 2024
1 parent 2856dba commit 2350500
Show file tree
Hide file tree
Showing 35 changed files with 1,369 additions and 932 deletions.
24 changes: 11 additions & 13 deletions csharp/ExcelAddIn/DeephavenExcelFunctions.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.CodeAnalysis;
using Deephaven.ExcelAddIn.ExcelDna;
using Deephaven.ExcelAddIn.Factories;
using Deephaven.ExcelAddIn.Models;
using Deephaven.ExcelAddIn.Operations;
using Deephaven.ExcelAddIn.Providers;
using Deephaven.ExcelAddIn.Viewmodels;
using Deephaven.ExcelAddIn.Views;
using ExcelDna.Integration;

namespace Deephaven.ExcelAddIn;
Expand All @@ -21,38 +17,38 @@ public static void ShowConnectionsDialog() {

[ExcelFunction(Description = "Snapshots a table", IsThreadSafe = true)]
public static object DEEPHAVEN_SNAPSHOT(string tableDescriptor, object filter, object wantHeaders) {
if (!TryInterpretCommonArgs(tableDescriptor, filter, wantHeaders, out var td, out var filt, out var wh, out var errorText)) {
if (!TryInterpretCommonArgs(tableDescriptor, filter, wantHeaders, out var tq, out var wh, out var errorText)) {
return errorText;
}

// These two are used by ExcelDNA to share results for identical invocations. The functionName is arbitary but unique.
const string functionName = "Deephaven.ExcelAddIn.DeephavenExcelFunctions.DEEPHAVEN_SNAPSHOT";
var parms = new[] { tableDescriptor, filter, wantHeaders };
ExcelObservableSource eos = () => new SnapshotOperation(td!, filt, wh, StateManager);
ExcelObservableSource eos = () => new SnapshotOperation(tq, wh, StateManager);
return ExcelAsyncUtil.Observe(functionName, parms, eos);
}

[ExcelFunction(Description = "Subscribes to a table", IsThreadSafe = true)]
public static object DEEPHAVEN_SUBSCRIBE(string tableDescriptor, object filter, object wantHeaders) {
if (!TryInterpretCommonArgs(tableDescriptor, filter, wantHeaders, out var td, out var filt, out var wh, out string errorText)) {
if (!TryInterpretCommonArgs(tableDescriptor, filter, wantHeaders, out var tq, out var wh, out string errorText)) {
return errorText;
}
// These two are used by ExcelDNA to share results for identical invocations. The functionName is arbitary but unique.
const string functionName = "Deephaven.ExcelAddIn.DeephavenExcelFunctions.DEEPHAVEN_SUBSCRIBE";
var parms = new[] { tableDescriptor, filter, wantHeaders };
ExcelObservableSource eos = () => new SubscribeOperation(td, filt, wh, StateManager);
ExcelObservableSource eos = () => new SubscribeOperation(tq, wh, StateManager);
return ExcelAsyncUtil.Observe(functionName, parms, eos);
}

private static bool TryInterpretCommonArgs(string tableDescriptor, object filter, object wantHeaders,
[NotNullWhen(true)]out TableTriple? tableDescriptorResult, out string filterResult, out bool wantHeadersResult, out string errorText) {
filterResult = "";
[NotNullWhen(true)]out TableQuad? tableQuadResult, out bool wantHeadersResult, out string errorText) {
tableQuadResult = null;
wantHeadersResult = false;
if (!TableTriple.TryParse(tableDescriptor, out tableDescriptorResult, out errorText)) {
if (!TableTriple.TryParse(tableDescriptor, out var tt, out errorText)) {
return false;
}

if (!ExcelDnaHelpers.TryInterpretAs(filter, "", out filterResult)) {
if (!ExcelDnaHelpers.TryInterpretAs(filter, "", out var condition)) {
errorText = "Can't interpret FILTER argument";
return false;
}
Expand All @@ -62,6 +58,8 @@ private static bool TryInterpretCommonArgs(string tableDescriptor, object filter
errorText = "Can't interpret WANT_HEADERS argument";
return false;
}

tableQuadResult = new TableQuad(tt.EndpointId, tt.PersistentQueryId, tt.TableName, condition);
return true;
}
}
3 changes: 3 additions & 0 deletions csharp/ExcelAddIn/ExcelAddIn.csproj.user
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@
<Compile Update="views\CredentialsDialog.cs">
<SubType>Form</SubType>
</Compile>
<Compile Update="views\DeephavenMessageBox.cs">
<SubType>Form</SubType>
</Compile>
</ItemGroup>
</Project>
203 changes: 147 additions & 56 deletions csharp/ExcelAddIn/StateManager.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Deephaven.DeephavenClient.ExcelAddIn.Util;
using System.Diagnostics;
using System.Net;
using Deephaven.DeephavenClient;
using Deephaven.ExcelAddIn.Models;
using Deephaven.ExcelAddIn.Providers;
Expand All @@ -8,80 +9,170 @@ namespace Deephaven.ExcelAddIn;

public class StateManager {
public readonly WorkerThread WorkerThread = WorkerThread.Create();
private readonly SessionProviders _sessionProviders;
private readonly Dictionary<EndpointId, CredentialsProvider> _credentialsProviders = new();
private readonly Dictionary<EndpointId, SessionProvider> _sessionProviders = new();
private readonly Dictionary<PersistentQueryKey, PersistentQueryProvider> _persistentQueryProviders = new();
private readonly Dictionary<TableQuad, ITableProvider> _tableProviders = new();
private readonly ObserverContainer<AddOrRemove<EndpointId>> _credentialsPopulationObservers = new();
private readonly ObserverContainer<EndpointId?> _defaultEndpointSelectionObservers = new();

private EndpointId? _defaultEndpointId = null;

public IDisposable SubscribeToCredentialsPopulation(IObserver<AddOrRemove<EndpointId>> observer) {
WorkerThread.EnqueueOrRun(() => {
_credentialsPopulationObservers.Add(observer, out _);
// Give this observer the current set of endpoint ids.
var keys = _credentialsProviders.Keys.ToArray();
foreach (var endpointId in keys) {
observer.OnNext(AddOrRemove<EndpointId>.OfAdd(endpointId));
}
});

public StateManager() {
_sessionProviders = new SessionProviders(WorkerThread);
return WorkerThread.EnqueueOrRunWhenDisposed(
() => _credentialsPopulationObservers.Remove(observer, out _));
}

public IDisposable SubscribeToSessions(IObserver<AddOrRemove<EndpointId>> observer) {
return _sessionProviders.Subscribe(observer);
}
public IDisposable SubscribeToDefaultEndpointSelection(IObserver<EndpointId?> observer) {
WorkerThread.EnqueueOrRun(() => {
_defaultEndpointSelectionObservers.Add(observer, out _);
observer.OnNext(_defaultEndpointId);
});

public IDisposable SubscribeToSession(EndpointId endpointId, IObserver<StatusOr<SessionBase>> observer) {
return _sessionProviders.SubscribeToSession(endpointId, observer);
return WorkerThread.EnqueueOrRunWhenDisposed(
() => _defaultEndpointSelectionObservers.Remove(observer, out _));
}

public IDisposable SubscribeToCredentials(EndpointId endpointId, IObserver<StatusOr<CredentialsBase>> observer) {
return _sessionProviders.SubscribeToCredentials(endpointId, observer);
/// <summary>
/// The major difference between the credentials providers and the other providers
/// is that the credential providers don't remove themselves from the map
/// upon the last dispose of the subscriber. That is, they hang around until we
/// manually remove them.
/// </summary>
public IDisposable SubscribeToCredentials(EndpointId endpointId,
IObserver<StatusOr<CredentialsBase>> observer) {
IDisposable? disposer = null;
LookupOrCreateCredentialsProvider(endpointId,
cp => disposer = cp.Subscribe(observer));

return WorkerThread.EnqueueOrRunWhenDisposed(() =>
Utility.Exchange(ref disposer, null)?.Dispose());
}

public IDisposable SubscribeToDefaultSession(IObserver<StatusOr<SessionBase>> observer) {
return _sessionProviders.SubscribeToDefaultSession(observer);
public void SetCredentials(CredentialsBase credentials) {
LookupOrCreateCredentialsProvider(credentials.Id,
cp => cp.SetCredentials(credentials));
}

public IDisposable SubscribeToDefaultCredentials(IObserver<StatusOr<CredentialsBase>> observer) {
return _sessionProviders.SubscribeToDefaultCredentials(observer);
public void Reconnect(EndpointId id) {
// Quick-and-dirty trick for reconnect is to re-send the credentials to the observers.
LookupOrCreateCredentialsProvider(id, cp => cp.Resend());
}

public IDisposable SubscribeToTableTriple(TableTriple descriptor, string filter,
IObserver<StatusOr<TableHandle>> observer) {
// There is a chain with multiple elements:
//
// 1. Make a TableHandleProvider
// 2. Make a ClientProvider
// 3. Subscribe the ClientProvider to either the session provider named by the endpoint id
// or to the default session provider
// 4. Subscribe the TableHandleProvider to the ClientProvider
// 4. Subscribe our observer to the TableHandleProvider
// 5. Return a dispose action that disposes all the needfuls.

var thp = new TableHandleProvider(WorkerThread, descriptor, filter);
var cp = new ClientProvider(WorkerThread, descriptor);

var disposer1 = descriptor.EndpointId == null ?
SubscribeToDefaultSession(cp) :
SubscribeToSession(descriptor.EndpointId, cp);
var disposer2 = cp.Subscribe(thp);
var disposer3 = thp.Subscribe(observer);

// The disposer for this needs to dispose both "inner" disposers.
return ActionAsDisposable.Create(() => {
// TODO(kosak): probably don't need to be on the worker thread here
WorkerThread.Invoke(() => {
var temp1 = Utility.Exchange(ref disposer1, null);
var temp2 = Utility.Exchange(ref disposer2, null);
var temp3 = Utility.Exchange(ref disposer3, null);
temp3?.Dispose();
temp2?.Dispose();
temp1?.Dispose();
});
});
public void TryDeleteCredentials(EndpointId id, Action onSuccess, Action<string> onFailure) {
if (WorkerThread.EnqueueOrNop(() => TryDeleteCredentials(id, onSuccess, onFailure))) {
return;
}

if (!_credentialsProviders.TryGetValue(id, out var cp)) {
onFailure($"{id} unknown");
return;
}

if (cp.ObserverCountUnsafe != 0) {
onFailure($"{id} is still active");
return;
}

if (id.Equals(_defaultEndpointId)) {
SetDefaultEndpointId(null);
}

_credentialsProviders.Remove(id);
_credentialsPopulationObservers.OnNext(AddOrRemove<EndpointId>.OfRemove(id));
onSuccess();
}

public void SetCredentials(CredentialsBase credentials) {
_sessionProviders.SetCredentials(credentials);
private void LookupOrCreateCredentialsProvider(EndpointId endpointId,
Action<CredentialsProvider> action) {
if (WorkerThread.EnqueueOrNop(() => LookupOrCreateCredentialsProvider(endpointId, action))) {
return;
}
if (!_credentialsProviders.TryGetValue(endpointId, out var cp)) {
cp = new CredentialsProvider(this);
_credentialsProviders.Add(endpointId, cp);
cp.Init();
_credentialsPopulationObservers.OnNext(AddOrRemove<EndpointId>.OfAdd(endpointId));
}

action(cp);
}

public void SetDefaultCredentials(CredentialsBase credentials) {
_sessionProviders.SetDefaultCredentials(credentials);
public IDisposable SubscribeToSession(EndpointId endpointId,
IObserver<StatusOr<SessionBase>> observer) {
IDisposable? disposer = null;
WorkerThread.EnqueueOrRun(() => {
if (!_sessionProviders.TryGetValue(endpointId, out var sp)) {
sp = new SessionProvider(this, endpointId, () => _sessionProviders.Remove(endpointId));
_sessionProviders.Add(endpointId, sp);
sp.Init();
}
disposer = sp.Subscribe(observer);
});

return WorkerThread.EnqueueOrRunWhenDisposed(() =>
Utility.Exchange(ref disposer, null)?.Dispose());
}

public void Reconnect(EndpointId id) {
_sessionProviders.Reconnect(id);
public IDisposable SubscribeToPersistentQuery(EndpointId endpointId, PersistentQueryId? pqId,
IObserver<StatusOr<Client>> observer) {

IDisposable? disposer = null;
WorkerThread.EnqueueOrRun(() => {
var key = new PersistentQueryKey(endpointId, pqId);
if (!_persistentQueryProviders.TryGetValue(key, out var pqp)) {
pqp = new PersistentQueryProvider(this, endpointId, pqId,
() => _persistentQueryProviders.Remove(key));
_persistentQueryProviders.Add(key, pqp);
pqp.Init();
}
disposer = pqp.Subscribe(observer);
});

return WorkerThread.EnqueueOrRunWhenDisposed(
() => Utility.Exchange(ref disposer, null)?.Dispose());
}

public void SwitchOnEmpty(EndpointId id, Action onEmpty, Action onNotEmpty) {
_sessionProviders.SwitchOnEmpty(id, onEmpty, onNotEmpty);
public IDisposable SubscribeToTable(TableQuad key, IObserver<StatusOr<TableHandle>> observer) {
IDisposable? disposer = null;
WorkerThread.EnqueueOrRun(() => {
if (!_tableProviders.TryGetValue(key, out var tp)) {
Action onDispose = () => _tableProviders.Remove(key);
if (key.EndpointId == null) {
tp = new DefaultEndpointTableProvider(this, key.PersistentQueryId, key.TableName, key.Condition,
onDispose);
} else if (key.Condition.Length != 0) {
tp = new FilteredTableProvider(this, key.EndpointId, key.PersistentQueryId, key.TableName,
key.Condition, onDispose);
} else {
tp = new TableProvider(this, key.EndpointId, key.PersistentQueryId, key.TableName, onDispose);
}
_tableProviders.Add(key, tp);
tp.Init();
}
disposer = tp.Subscribe(observer);
});

return WorkerThread.EnqueueOrRunWhenDisposed(
() => Utility.Exchange(ref disposer, null)?.Dispose());
}

public void SetDefaultEndpointId(EndpointId? defaultEndpointId) {
if (WorkerThread.EnqueueOrNop(() => SetDefaultEndpointId(defaultEndpointId))) {
return;
}

_defaultEndpointId = defaultEndpointId;
_defaultEndpointSelectionObservers.OnNext(_defaultEndpointId);
}
}
77 changes: 10 additions & 67 deletions csharp/ExcelAddIn/factories/ConnectionManagerDialogFactory.cs
Original file line number Diff line number Diff line change
@@ -1,74 +1,17 @@
using System.Collections.Concurrent;
using Deephaven.ExcelAddIn.Managers;
using Deephaven.ExcelAddIn.Viewmodels;
using Deephaven.ExcelAddIn.ViewModels;
using Deephaven.ExcelAddIn.Managers;
using Deephaven.ExcelAddIn.Util;
using Deephaven.ExcelAddIn.Views;

namespace Deephaven.ExcelAddIn.Factories;

internal static class ConnectionManagerDialogFactory {
public static void CreateAndShow(StateManager sm) {
var rowToManager = new ConcurrentDictionary<ConnectionManagerDialogRow, ConnectionManagerDialogRowManager>();

// The "new" button creates a "New/Edit Credentials" dialog
void OnNewButtonClicked() {
var cvm = CredentialsDialogViewModel.OfEmpty();
var dialog = CredentialsDialogFactory.Create(sm, cvm);
dialog.Show();
}

void OnDeleteButtonClicked(ConnectionManagerDialogRow[] rows) {
foreach (var row in rows) {
if (!rowToManager.TryGetValue(row, out var manager)) {
continue;
}
manager.DoDelete();
}
}

void OnReconnectButtonClicked(ConnectionManagerDialogRow[] rows) {
foreach (var row in rows) {
if (!rowToManager.TryGetValue(row, out var manager)) {
continue;
}
manager.DoReconnect();
}
}

void OnMakeDefaultButtonClicked(ConnectionManagerDialogRow[] rows) {
// Make the last selected row the default
if (rows.Length == 0) {
return;
}

var row = rows[^1];
if (!rowToManager.TryGetValue(row, out var manager)) {
return;
}

manager.DoSetAsDefault();
}

void OnEditButtonClicked(ConnectionManagerDialogRow[] rows) {
foreach (var row in rows) {
if (!rowToManager.TryGetValue(row, out var manager)) {
continue;
}
manager.DoEdit();
}
}

var cmDialog = new ConnectionManagerDialog(OnNewButtonClicked, OnDeleteButtonClicked,
OnReconnectButtonClicked, OnMakeDefaultButtonClicked, OnEditButtonClicked);
cmDialog.Show();
var dm = new ConnectionManagerDialogManager(cmDialog, rowToManager, sm);
var disposer = sm.SubscribeToSessions(dm);

cmDialog.Closed += (_, _) => {
disposer.Dispose();
dm.Dispose();
};
public static void CreateAndShow(StateManager stateManager) {
Utility.RunInBackground(() => {
var cmDialog = new ConnectionManagerDialog();
var dm = ConnectionManagerDialogManager.Create(stateManager, cmDialog);
cmDialog.Closed += (_, _) => dm.Dispose();
// Blocks forever (in this private thread)
cmDialog.ShowDialog();
});
}
}


Loading

0 comments on commit 2350500

Please sign in to comment.