-
Notifications
You must be signed in to change notification settings - Fork 0
/
Program.cs
87 lines (76 loc) · 2.85 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
using Microsoft.AspNetCore.SignalR.Client;
using Kusto.Data;
using Kusto.Data.Net.Client;
using Kusto.Data.Common;
using Kusto.Ingest;
using System.Data;
/// <summary>
/// Example of using WebSocket, Trimble Notification Service, Kusto SDK, and SignalR to stream messages into Kusto.
/// References:
/// https://developer.trimblemaps.com/restful-apis/trip-management/notifications-service/
/// https://github.com/Azure/azure-kusto-samples-dotnet
/// </summary>
/// <remarks>
/// This sample assumes:
/// 1. A valid Trimble API key with the TM add-on.
/// 2. Your Kusto cluster has streaming enabled. (Streaming is enabled by default for Fabric KQL Database & ADX Free-Personal Clusters)
/// 3. Your Kusto database has a table with the following schema:
/// .create table TrimbleNotificationRaw (message:dynamic)
/// </remarks>
// params required
string apiKey = "your-api-key";
string clusterUri = "your-kql-clusterUri";
string database = "your-kql-database";
string table = "your-kql-table";
// kusto client settings
var clusterKcsb = new KustoConnectionStringBuilder(clusterUri)
.WithAadUserPromptAuthentication(); // .WithAadApplicationKeyAuthentication(applicationClientId: "<your-client-id>", applicationKey: "<your-secret>", authority: "your-tenant-id");
var ingestClient = KustoIngestFactory
.CreateStreamingIngestClient(clusterKcsb);
var ingestProps = new KustoIngestionProperties(database, table) {
Format = DataSourceFormat.txt
};
// signalR connection to trimble notification service
var connection = new HubConnectionBuilder()
.WithUrl("https://notifications.trimblemaps.com/register?apikey=" + apiKey)
.WithAutomaticReconnect(new [] {TimeSpan.FromSeconds(5)})
.Build();
connection.On<dynamic>("notificationMessage", (message) =>
{
try
{
Console.WriteLine($"message: {message}");
// create a stream from the message
MemoryStream stream = new MemoryStream();
StreamWriter writer = new StreamWriter( stream );
stream.Position = 0;
// write the message to the stream
writer.Write( message );
writer.Flush();
stream.Position = 0;
// ingest the stream to kusto
_= ingestClient.IngestFromStreamAsync(stream, ingestProps).Result;
}
catch (Exception ex)
{
Console.WriteLine($"Exception: {ex}");
}
});
// retry connecting to trimble notification service
bool connected = false;
while (!connected)
{
try
{
connection.StartAsync().Wait();
Console.WriteLine("Connection started");
connected = true;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
await Task.Delay(new Random().Next(0, 5) * 2000); // delay 0-10s before retrying
}
}
Console.ReadLine();
await connection.StopAsync();