-
Notifications
You must be signed in to change notification settings - Fork 0
/
ConsumeSignalRMessages.cs
57 lines (56 loc) · 2.46 KB
/
ConsumeSignalRMessages.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.WebJobs.Kusto;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.Extensions.Configuration;
namespace SignalRToKusto
{
public class ConsumeSignalRMessages
{
IConfiguration _configuration;
public ConsumeSignalRMessages(IConfiguration configuration)
{
_configuration = configuration;
}
HubConnection connection = null;
[FunctionName("TrimbleNotificationToKusto24")]
public void TrimbleNotificationToKusto(
[TimerTrigger("* */5 * * * *", RunOnStartup = true)] TimerInfo timer, ILogger logger,
[Kusto(Database: "%KustoDB%", TableName = "%KustoTable%", DataFormat = "txt", Connection = "KustoConnectionString")] IAsyncCollector<string> notifications)
{
if (connection == null)
{
string connectionString = _configuration.GetValue<string>("SignalRConnection");
// Note: This will print the API Key as well!
logger.LogTrace($"Initialized hub connection at : {DateTime.Now} for {connectionString}");
connection = new HubConnectionBuilder()
.WithUrl(connectionString)
.WithAutomaticReconnect(new[] { TimeSpan.FromSeconds(5) })
.Build();
// Make this configurable too!
string methodName = _configuration.GetValue<string>("MethodName");
connection.On<dynamic>(methodName, async (message) =>
{
try
{
logger.LogTrace($"Received message: {message}");
notifications.AddAsync(message.ToString());
await notifications.FlushAsync();
}
catch (Exception ex)
{
await connection.StopAsync();
logger.LogError($"Closing connection as there is an error ingesting to Kusto {ex}");
}
});
}
logger.LogTrace($"Timer trigger function executed at: {DateTime.UtcNow}");
if (connection.State != HubConnectionState.Connected)
{
logger.LogInformation($"Connection state is {connection.State}, starting hub connection at : {DateTime.UtcNow}");
connection.StartAsync();
}
}
}
}