-
Notifications
You must be signed in to change notification settings - Fork 129
/
Copy pathuROHTTPWebsocketServer.pas
333 lines (290 loc) · 11 KB
/
uROHTTPWebsocketServer.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
unit uROHTTPWebsocketServer;
interface
uses
Classes, IdServerIOHandlerWebsocket, IdIOHandlerWebsocket,
uROIndyHTTPServer, uROClientIntf, uROServer, uROHTTPDispatch,
IdContext, IdCustomHTTPServer, IdCustomTCPServer, uROHash, uROServerIntf,
IdServerWebsocketContext, IdServerSocketIOHandling,
IdServerWebsocketHandling;
type
TROTransportContext = class;
TROIndyHTTPWebsocketServer = class(TROIndyHTTPServer)
private
FOnCustomChannelExecute: TWebsocketChannelRequest;
FSocketIO: TIdServerSocketIOHandling_Ext;
function GetSocketIO: TIdServerSocketIOHandling;
protected
FROTransportContexts: TInterfaceList;
procedure InternalServerConnect(AThread: TIdContext); override;
procedure InternalServerDisConnect(AThread: TIdContext); virtual;
procedure InternalServerCommandGet(AThread: TIdThreadClass;
ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo); override;
procedure ProcessRemObjectsRequest(const AThread: TIdContext; const strmRequest: TMemoryStream; const strmResponse: TMemoryStream);
function GetDispatchersClass: TROMessageDispatchersClass; override;
public
procedure AfterConstruction; override;
destructor Destroy; override;
procedure Loaded; override;
property SocketIO: TIdServerSocketIOHandling read GetSocketIO;
property OnCustomChannelExecute: TWebsocketChannelRequest read FOnCustomChannelExecute write FOnCustomChannelExecute;
end;
TROHTTPDispatcher_Websocket = class(TROHTTPDispatcher)
public
function CanHandleMessage(const aTransport: IROTransport; aRequeststream : TStream): boolean; override;
end;
TROHTTPMessageDispatchers_WebSocket = class(TROHTTPMessageDispatchers)
protected
function GetDispatcherClass : TROMessageDispatcherClass; override;
end;
TROTransportContext = class(TInterfacedObject,
IROTransport, IROTCPTransport,
IROActiveEventServer)
private
FROServer: TROIndyHTTPServer;
FIdContext: TIdServerWSContext;
FEventCount: Integer;
FClientId: TGUID;
private
class var FGlobalEventCount: Integer;
protected
{IROTransport}
function GetTransportObject: TObject;
{IROTCPTransport}
function GetClientAddress : string;
{IROActiveEventServer}
procedure EventsRegistered(aSender : TObject; aClient: TGUID);
procedure DispatchEvent(anEventDataItem : TROEventData; aSessionReference : TGUID; aSender: TObject); // asender is TROEventRepository
public
//constructor Create(aROServer: TROIndyHTTPServer; aIOHandler: TIdIOHandlerWebsocket);
constructor Create(aROServer: TROIndyHTTPServer; aIdContext: TIdServerWSContext);
property Context: TIdServerWSContext read FIdContext;
property ClientId: TGUID read FClientId write FClientId;
end;
procedure Register;
implementation
uses
SysUtils, IdCoderMIME, Windows, uROEventRepository, uROSessions, uROClient,
uROClasses, StrUtils, uROIdServerWebsocketHandling;
procedure Register;
begin
RegisterComponents('RBK', [TROIndyHTTPWebsocketServer]);
end;
procedure TROIndyHTTPWebsocketServer.AfterConstruction;
begin
inherited;
FSocketIO := TIdServerSocketIOHandling_Ext.Create;
FROTransportContexts := TInterfaceList.Create;
IndyServer.ContextClass := TROIdServerWSContext;
if Self.IndyServer.IOHandler = nil then
IndyServer.IOHandler := TIdServerIOHandlerWebsocket.Create(Self);
IndyServer.OnDisconnect := InternalServerDisConnect;
end;
destructor TROIndyHTTPWebsocketServer.Destroy;
begin
inherited;
FSocketIO.Free;
FROTransportContexts.Free;
end;
function TROIndyHTTPWebsocketServer.GetDispatchersClass: TROMessageDispatchersClass;
begin
Result := TROHTTPMessageDispatchers_Websocket;
end;
function TROIndyHTTPWebsocketServer.GetSocketIO: TIdServerSocketIOHandling;
begin
Result := FSocketIO;
end;
procedure TROIndyHTTPWebsocketServer.InternalServerCommandGet(AThread: TIdThreadClass;
ARequestInfo: TIdHTTPRequestInfo; AResponseInfo: TIdHTTPResponseInfo);
begin
(AThread as TIdServerWSContext).OnCustomChannelExecute := Self.OnCustomChannelExecute;
(AThread as TIdServerWSContext).SocketIO := Self.FSocketIO;
(AThread as TROIdServerWSContext).OnRemObjectsRequest := Self.ProcessRemObjectsRequest;
if not TROIdServerWebsocketHandling.ProcessServerCommandGet(AThread as TIdServerWSContext, ARequestInfo, AResponseInfo) then
inherited InternalServerCommandGet(AThread, ARequestInfo, AResponseInfo)
end;
procedure TROIndyHTTPWebsocketServer.InternalServerConnect(AThread: TIdContext);
begin
inherited;
(AThread as TIdServerWSContext).OnCustomChannelExecute := Self.OnCustomChannelExecute;
(AThread as TROIdServerWSContext).OnRemObjectsRequest := Self.ProcessRemObjectsRequest;
end;
procedure TROIndyHTTPWebsocketServer.InternalServerDisConnect(
AThread: TIdContext);
var
transport: TROTransportContext;
begin
transport := AThread.Data as TROTransportContext;
if transport <> nil then
FROTransportContexts.Remove(transport);
//transport._Release;
AThread.Data := nil;
end;
procedure TROIndyHTTPWebsocketServer.Loaded;
begin
//do before inherited in case of designtime connection
if Self.IndyServer.IOHandler = nil then
IndyServer.IOHandler := TIdServerIOHandlerWebsocket.Create(Self);
inherited;
end;
procedure TROIndyHTTPWebsocketServer.ProcessRemObjectsRequest(
const AThread: TIdContext; const strmRequest: TMemoryStream; const strmResponse: TMemoryStream);
var
cWSNR: array[0..High(C_ROWSNR)] of AnsiChar;
msg: TROMessageDispatcher;
iMsgNr: Integer;
imsg: IROMessage;
transport: TROTransportContext;
begin
if strmRequest.Size < Length(C_ROWSNR) + SizeOf(iMsgNr) then Exit;
//read messagenr from the end
strmRequest.Position := strmRequest.Size - Length(C_ROWSNR) - SizeOf(iMsgNr);
strmRequest.Read(cWSNR[0], Length(C_ROWSNR));
if (cWSNR <> C_ROWSNR) then Exit;
strmRequest.Read(iMsgNr, SizeOf(iMsgNr));
strmRequest.Position := 0;
//trunc extra data
strmRequest.Size := strmRequest.Size - Length(C_ROWSNR) - SizeOf(iMsgNr);
transport := AThread.Data as TROTransportContext;
//no RO transport object already made?
if transport = nil then
begin
//create IROTransport object
transport := TROTransportContext.Create(Self, AThread as TIdServerWSContext);
//(transport as IROTransport)._AddRef;
FROTransportContexts.Add(transport);
//attach RO transport to indy context
AThread.Data := transport;
//todo: enveloppes
//read client GUID the first time (needed to be able to send RO events)
msg := Self.Dispatchers.FindDispatcher(transport, strmRequest);
if msg = nil then
raise EROException.Create('No suiteable message dispatcher found!');
imsg := (msg.MessageIntf as IROMessageCloneable).Clone;
imsg.InitializeRead(transport);
imsg.ReadFromStream(strmRequest);
transport.ClientId := imsg.ClientID;
imsg := nil;
Assert(not IsEqualGUID(transport.ClientID, EmptyGUID));
end;
//EXECUTE FUNCTION
Self.DispatchMessage(transport, strmRequest, strmResponse);
//write number at end
strmResponse.Position := strmResponse.Size;
strmResponse.Write(C_ROWSNR, Length(C_ROWSNR));
strmResponse.Write(iMsgNr, SizeOf(iMsgNr));
strmResponse.Position := 0;
end;
{ TROTransport }
constructor TROTransportContext.Create(aROServer: TROIndyHTTPServer;
aIdContext: TIdServerWSContext);
begin
FROServer := aROServer;
FIdContext := aIdContext;
end;
procedure TROTransportContext.EventsRegistered(aSender: TObject; aClient: TGUID);
begin
//
end;
procedure TROTransportContext.DispatchEvent(anEventDataItem: TROEventData;
aSessionReference: TGUID; aSender: TObject);
var
i: Integer;
LContext: TIdContext;
transport: TROTransportContext;
l: TList;
ws: TIdIOHandlerWebsocket;
cWSNR: array[0..High(C_ROWSNR)] of AnsiChar;
begin
l := FROServer.IndyServer.Contexts.LockList;
try
if l.Count <= 0 then Exit;
anEventDataItem.Data.Position := anEventDataItem.Data.Size - Length(C_ROWSNR) - SizeOf(FEventCount);
anEventDataItem.Data.Read(cWSNR[0], Length(cWSNR));
//event number not written already?
if cWSNR <> C_ROWSNR then
begin
//new event nr
FEventCount := -1 * InterlockedIncrement(FGlobalEventCount); //negative = event, positive is normal RO message
//overflow? then start again from 0
if FEventCount > 0 then
begin
InterlockedExchange(FGlobalEventCount, 0);
FEventCount := -1 * InterlockedIncrement(FGlobalEventCount); //negative = event, positive is normal RO message
end;
Assert(FEventCount < 0);
//write nr at end of message
anEventDataItem.Data.Position := anEventDataItem.Data.Size;
anEventDataItem.Data.Write(C_ROWSNR, Length(C_ROWSNR));
anEventDataItem.Data.Write(FEventCount, SizeOf(FEventCount));
anEventDataItem.Data.Position := 0;
end;
//search specific client
for i := 0 to l.Count - 1 do
begin
LContext := TIdContext(l.Items[i]);
transport := LContext.Data as TROTransportContext;
if transport = nil then Continue;
if not IsEqualGUID(transport.ClientId, aSessionReference) then Continue;
//direct write event data
ws := (LContext.Connection.IOHandler as TIdIOHandlerWebsocket);
if not ws.IsWebsocket then Exit;
ws.Lock;
try
try ws.Write(anEventDataItem.Data, wdtBinary) except {continue with other connections} end;
finally
ws.Unlock;
end;
end;
finally
anEventDataItem.RemoveRef;
FROServer.IndyServer.Contexts.UnlockList;
end;
end;
function TROTransportContext.GetClientAddress: string;
begin
Result := FIdContext.Binding.PeerIP;
end;
function TROTransportContext.GetTransportObject: TObject;
begin
Result := FROServer;
end;
{ TROHTTPMessageDispatchers_WebSocket }
function TROHTTPMessageDispatchers_WebSocket.GetDispatcherClass: TROMessageDispatcherClass;
begin
result := TROHTTPDispatcher_Websocket;
end;
{ TROHTTPDispatcher_Websocket }
function TROHTTPDispatcher_Websocket.CanHandleMessage(
const aTransport: IROTransport; aRequeststream: TStream): boolean;
var
tcp: IROTCPTransport;
buf: array [0..5] of AnsiChar;
begin
if aRequeststream = nil then result := FALSE else // for preventing warning in FPC
result := FALSE;
if not Enabled or
not Supports(aTransport, IROTCPTransport, tcp)
then
Exit;
if (tcp as TROTransportContext).FIdContext.IOHandler.IsWebsocket then
begin
//we can handle all kind of messages, independent on the path, so check which kind of message we have
Result := Self.Message.IsValidMessage((aRequeststream as TMemoryStream).Memory, aRequeststream.Size);
//goes wrong with enveloppes!
//TROMessage.Envelopes_ProcessIncoming
if not Result and
(aRequeststream.Size > 6) then
begin
aRequeststream.Read(buf,6);
Result := (buf[0] = EnvelopeSignature[0]) and
(buf[1] = EnvelopeSignature[1]) and
(buf[2] = EnvelopeSignature[2]) and
(buf[3] = EnvelopeSignature[3]) and
(buf[4] = EnvelopeSignature[4]);
aRequeststream.Position := 0;
end;
end
else
Result := inherited CanHandleMessage(aTransport, aRequeststream);
end;
end.