Skip to content

Commit

Permalink
Gateways
Browse files Browse the repository at this point in the history
  • Loading branch information
artemiusgreat committed Nov 10, 2024
1 parent 396213a commit eb847f8
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 100 deletions.
3 changes: 2 additions & 1 deletion Core/Enums/InstrumentEnum.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public enum InstrumentEnum : byte
Options = 4,
Futures = 5,
Contracts = 6,
Currencies = 7
Currencies = 7,
FuturesOptions = 8
}
}
4 changes: 2 additions & 2 deletions Derivative/Derivative.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
<ItemGroup>
<PackageReference Include="alglib.net" Version="3.19.0" />
<PackageReference Include="CalcEngine.Core" Version="1.0.0" />
<PackageReference Include="Canvas.Views.Web" Version="4.5.3" />
<PackageReference Include="Canvas.Views.Web" Version="4.5.5" />
<PackageReference Include="MathNet.Numerics" Version="5.0.0" />
<PackageReference Include="Meta.Numerics" Version="4.1.4" />
<PackageReference Include="MudBlazor" Version="7.14.0" />
<PackageReference Include="MudBlazor" Version="7.15.0" />
<PackageReference Include="Estimator" Version="1.0.6" />
</ItemGroup>

Expand Down
95 changes: 47 additions & 48 deletions Gateway/Alpaca/Libs/Adapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -469,16 +469,13 @@ protected virtual async Task<ClientWebSocket> GetConnection(ClientWebSocket ws,
{
while (ws.State is WebSocketState.Open)
{
var data = new ArraySegment<byte>(new byte[byte.MaxValue]);
var data = new byte[short.MaxValue];

await ws.ReceiveAsync(data, cancellation.Token).ContinueWith(async o =>
{
var response = await o;
var content = $"[{Encoding.ASCII.GetString(data.Array).Trim(['[', ']'])}]";
var message = JsonNode
.Parse(content)
?.AsArray()
?.FirstOrDefault();
var content = $"{Encoding.Default.GetString(data).Trim(['\0'])}";
var message = JsonNode.Parse(content);

switch ($"{message?["T"]}".ToUpper())
{
Expand All @@ -498,31 +495,32 @@ await ws.ReceiveAsync(data, cancellation.Token).ContinueWith(async o =>
/// <param name="content"></param>
protected virtual void OnPoint(string content)
{
var streamPoint = JsonSerializer
.Deserialize<QuoteMessage[]>(content, _sender.Options)
.FirstOrDefault();

var instrument = Account.Instruments.Get(streamPoint.Symbol) ?? new InstrumentModel();
var point = new PointModel
{
Ask = streamPoint.AskPrice,
Bid = streamPoint.BidPrice,
AskSize = streamPoint.AskSize ?? 0,
BidSize = streamPoint.BidSize ?? 0,
Last = streamPoint.BidPrice ?? streamPoint.AskPrice,
Time = streamPoint.TimestampUtc ?? DateTime.Now,
TimeFrame = instrument.TimeFrame,
Instrument = instrument
};

instrument.Name = streamPoint.Symbol;
instrument.Points.Add(point);
instrument.PointGroups.Add(point, instrument.TimeFrame);
var streamPoints = JsonSerializer.Deserialize<QuoteMessage[]>(content, _sender.Options);

PointStream(new MessageModel<PointModel>
foreach (var streamPoint in streamPoints)
{
Next = instrument.PointGroups.Last()
});
var instrument = Account.Instruments.Get(streamPoint.Symbol) ?? new InstrumentModel();
var point = new PointModel
{
Ask = streamPoint.AskPrice,
Bid = streamPoint.BidPrice,
AskSize = streamPoint.AskSize ?? 0,
BidSize = streamPoint.BidSize ?? 0,
Last = streamPoint.BidPrice ?? streamPoint.AskPrice,
Time = streamPoint.TimestampUtc ?? DateTime.Now,
TimeFrame = instrument.TimeFrame,
Instrument = instrument
};

instrument.Name = streamPoint.Symbol;
instrument.Points.Add(point);
instrument.PointGroups.Add(point, instrument.TimeFrame);

PointStream(new MessageModel<PointModel>
{
Next = instrument.PointGroups.Last()
});
}
}

/// <summary>
Expand All @@ -531,30 +529,31 @@ protected virtual void OnPoint(string content)
/// <param name="content"></param>
protected virtual void OnTrade(string content)
{
var orderMessage = JsonSerializer
.Deserialize<TradeMessage[]>(content, _sender.Options)
.FirstOrDefault();
var streamOrders = JsonSerializer.Deserialize<TradeMessage[]>(content, _sender.Options);

var action = new TransactionModel
foreach (var streamOrder in streamOrders)
{
Id = orderMessage.TradeId,
Time = orderMessage.TimestampUtc,
Price = orderMessage.Price,
CurrentVolume = orderMessage.Size,
Instrument = new InstrumentModel { Name = orderMessage.Symbol }
};
var action = new TransactionModel
{
Id = streamOrder.TradeId,
Time = streamOrder.TimestampUtc,
Price = streamOrder.Price,
CurrentVolume = streamOrder.Size,
Instrument = new InstrumentModel { Name = streamOrder.Symbol }
};

var order = new OrderModel
{
Side = InternalMap.GetOrderSide(orderMessage.TakerSide)
};
var order = new OrderModel
{
Side = InternalMap.GetOrderSide(streamOrder.TakerSide)
};

var message = new MessageModel<OrderModel>
{
Next = order
};
var message = new MessageModel<OrderModel>
{
Next = order
};

OrderStream(message);
OrderStream(message);
}
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion Gateway/Alpaca/Libs/Alpaca.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

<ItemGroup>
<PackageReference Include="Alpaca.Markets" Version="7.1.2" />
<PackageReference Include="Distribution.Stream" Version="1.2.2" />
<PackageReference Include="Distribution.Stream" Version="1.2.4" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 3 additions & 3 deletions Gateway/Coinbase/Libs/Coinbase.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Distribution.Stream" Version="1.2.2" />
<PackageReference Include="jose-jwt" Version="5.0.0" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.1.2" />
<PackageReference Include="Distribution.Stream" Version="1.2.4" />
<PackageReference Include="jose-jwt" Version="5.1.0" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="8.2.0" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion Gateway/InteractiveBrokers/Libs/InteractiveBrokers.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Distribution.Stream" Version="1.2.2" />
<PackageReference Include="Distribution.Stream" Version="1.2.4" />
</ItemGroup>

<ItemGroup>
Expand Down
68 changes: 50 additions & 18 deletions Gateway/Schwab/Libs/Adapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
using System.Linq;
using System.Net.Http;
using System.Net.WebSockets;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Linq;
using Terminal.Core.Domains;
using Terminal.Core.Enums;
using Terminal.Core.Extensions;
Expand Down Expand Up @@ -148,7 +150,7 @@ public override async Task<ResponseModel<StatusEnum>> Subscribe(InstrumentModel
Parameters = new SrteamParamsMessage
{
Keys = instrument.Name,
Fields = string.Join(",", Enumerable.Range(0, 55))
Fields = string.Join(",", Enumerable.Range(0, 10))
}
});

Expand Down Expand Up @@ -531,34 +533,64 @@ protected virtual async Task<ClientWebSocket> GetConnection(ClientWebSocket ws,
{
while (ws.State is WebSocketState.Open)
{
var data = new ArraySegment<byte>(new byte[short.MaxValue]);
var data = new byte[short.MaxValue];

await ws.ReceiveAsync(data, cancellation.Token).ContinueWith(async o =>
{
var response = await o;
var content = $"[{Encoding.ASCII.GetString(data)}]";

Console.WriteLine("\n");
Console.WriteLine(content);
Console.WriteLine("\n");

//var message = JsonNode
// .Parse(content)
// ?.AsArray()
// ?.FirstOrDefault();

//switch ($"{message?["T"]}".ToUpper())
//{
// case "Q": OnPoint(content); break;
// case "T": OnTrade(content); break;
//}
var content = $"{Encoding.Default.GetString(data).Trim(['\0', '[', ']'])}";
var message = JsonNode.Parse(content);

if (message["data"] is not null)
{
var streamPoints = message["data"]
.AsArray()
.Select(o => o.Deserialize<StreamDataMessage>());

OnPoint(streamPoints.Where(o => InternalMap.GetInstrumentType(o.Service) is not null));
}
});
}
});

return ws;
}

/// <summary>
/// Process quote from the stream
/// </summary>
/// <param name="streamPoints"></param>
protected virtual void OnPoint(IEnumerable<StreamDataMessage> streamPoints)
{
foreach (var streamPoint in streamPoints)
{
var map = InternalMap.GetStreamMap(streamPoint.Service);

foreach (var data in streamPoint.Content)
{
var point = new PointModel();
var instrumentName = $"{data.Get("key")}";
var instrument = Account.Instruments.Get(instrumentName) ?? new InstrumentModel();

point.Instrument = instrument;
point.Bid = double.TryParse($"{data.Get(map.Get("Bid Price"))}", out var x1) ? x1 : null;
point.Ask = double.TryParse($"{data.Get(map.Get("Ask Price"))}", out var x2) ? x2 : null;
point.BidSize = double.TryParse($"{data.Get(map.Get("Bid Size"))}", out var x3) ? x3 : null;
point.AskSize = double.TryParse($"{data.Get(map.Get("Ask Size"))}", out var x4) ? x4 : null;
point.Last = double.TryParse($"{data.Get(map.Get("Last Price"))}", out var x5) ? x5 : (point.Bid ?? point.Ask);

instrument.Name = instrumentName;
instrument.Points.Add(point);
instrument.PointGroups.Add(point, instrument.TimeFrame);

PointStream(new MessageModel<PointModel>
{
Next = instrument.PointGroups.Last()
});
}
}
}

/// <summary>
/// Send data to the API
/// </summary>
Expand Down
16 changes: 2 additions & 14 deletions Gateway/Schwab/Libs/Maps/ExternalMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,15 @@ namespace Schwab.Mappers
{
public class ExternalMap
{
public static IDictionary<string, string> GetStreamingKeys(InstrumentModel instrument)
{
switch (instrument.Type)
{
case InstrumentEnum.Shares: return StreamEquityMap.Map;
case InstrumentEnum.Futures: return StreamFutureMap.Map;
case InstrumentEnum.Currencies: return StreamCurrencyMap.Map;
case InstrumentEnum.Options: return instrument.Name.StartsWith('/') ? StreamFutureOptionMap.Map : StreamOptionMap.Map;
}

return null;
}

public static string GetStreamingService(InstrumentModel instrument)
{
switch (instrument.Type)
{
case InstrumentEnum.Shares: return "LEVELONE_EQUITIES";
case InstrumentEnum.Futures: return "LEVELONE_FUTURES";
case InstrumentEnum.Currencies: return "LEVELONE_FOREX";
case InstrumentEnum.Options: return instrument.Name.StartsWith('/') ? "LEVELONE_FUTURES_OPTIONS" : "LEVELONE_OPTIONS";
case InstrumentEnum.Options: return "LEVELONE_OPTIONS";
case InstrumentEnum.FuturesOptions: return "LEVELONE_FUTURES_OPTIONS";
}

return null;
Expand Down
57 changes: 57 additions & 0 deletions Gateway/Schwab/Libs/Maps/InternalMap.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Schwab.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using Terminal.Core.Domains;
using Terminal.Core.Enums;
Expand All @@ -9,6 +10,62 @@ namespace Schwab.Mappers
{
public class InternalMap
{
/// <summary>
/// Map fields in the stream
/// </summary>
/// <param name="instrument"></param>
/// <returns></returns>
public static IDictionary<string, string> GetStreamMap(InstrumentModel instrument)
{
switch (instrument.Type)
{
case InstrumentEnum.Shares: return StreamEquityMap.Map;
case InstrumentEnum.Futures: return StreamFutureMap.Map;
case InstrumentEnum.Currencies: return StreamCurrencyMap.Map;
case InstrumentEnum.Options: return StreamOptionMap.Map;
case InstrumentEnum.FuturesOptions: return StreamFutureOptionMap.Map;
}

return null;
}

/// <summary>
/// Map fields in the stream
/// </summary>
/// <param name="assetType"></param>
/// <returns></returns>
public static IDictionary<string, string> GetStreamMap(string assetType)
{
switch (assetType)
{
case "LEVELONE_EQUITIES": return StreamEquityMap.Map;
case "LEVELONE_FUTURES": return StreamFutureMap.Map;
case "LEVELONE_FOREX": return StreamCurrencyMap.Map;
case "LEVELONE_OPTIONS": return StreamOptionMap.Map;
case "LEVELONE_FUTURES_OPTIONS": return StreamFutureOptionMap.Map;
}

return null;
}

/// <summary>
/// External instrument type to local
/// </summary>
/// <param name="assetType"></param>
/// <returns></returns>
public static InstrumentEnum? GetInstrumentType(string assetType)
{
switch (assetType)
{
case "LEVELONE_EQUITIES": return InstrumentEnum.Shares;
case "LEVELONE_FUTURES": return InstrumentEnum.Futures;
case "LEVELONE_FOREX": return InstrumentEnum.Currencies;
case "LEVELONE_OPTIONS": return InstrumentEnum.Options;
case "LEVELONE_FUTURES_OPTIONS": return InstrumentEnum.FuturesOptions;
}

return null;
}
/// <summary>
/// Get order book
/// </summary>
Expand Down
Loading

0 comments on commit eb847f8

Please sign in to comment.