Skip to content

Commit

Permalink
Merge pull request #1910 from atsign-foundation/gkc/fix-unawaited-wri…
Browse files Browse the repository at this point in the history
…tes-bug

fix: ensure all connection writes are awaited
  • Loading branch information
gkc authored Apr 23, 2024
2 parents ad4ea2c + bf99873 commit 35f18f4
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 63 deletions.
3 changes: 3 additions & 0 deletions packages/at_secondary_server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.0.43
- fix: ensure all connection writes are awaited

## 3.0.42
- feat: allow filtering of requests in EnrollVerbHandler using enrollment
approval status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ class InboundConnectionImpl<T extends Socket> extends BaseSocketConnection
requestTimestampQueue = Queue();

logger.info(logger.getAtConnectionLogMessage(
metaData, 'New connection ('
metaData,
'New connection ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));

socket.done.onError((error, stackTrace) {
logger.info('socket.done.onError called with $error. Calling this.close()');
logger
.info('socket.done.onError called with $error. Calling this.close()');
this.close();
});
}
Expand Down Expand Up @@ -233,7 +235,8 @@ class InboundConnectionImpl<T extends Socket> extends BaseSocketConnection

try {
logger.info(logger.getAtConnectionLogMessage(
metaData, 'destroying socket ('
metaData,
'destroying socket ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ class OutboundConnectionImpl<T extends Socket>
..isCreated = true;

logger.info(logger.getAtConnectionLogMessage(
metaData, 'New connection ('
metaData,
'New connection ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));

socket.done.onError((error, stackTrace) {
logger.info('socket.done.onError called with $error. Calling this.close()');
logger
.info('socket.done.onError called with $error. Calling this.close()');
this.close();
});
}
Expand Down Expand Up @@ -59,7 +61,8 @@ class OutboundConnectionImpl<T extends Socket>
try {
var socket = underlying;
logger.info(logger.getAtConnectionLogMessage(
metaData, 'destroying socket ('
metaData,
'destroying socket ('
'this side: ${underlying.address}:${underlying.port}'
' remote side: ${underlying.remoteAddress}:${underlying.remotePort}'
')'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class GlobalExceptionHandler {
} else {
errorDescription = exception.toString();
}
_writeToSocket(atConnection, prompt, errorCode, errorDescription);
await _writeToSocket(atConnection, prompt, errorCode, errorDescription);
}
}
}
Expand All @@ -150,11 +150,11 @@ class GlobalExceptionHandler {
return error_description[errorCode];
}

void _writeToSocket(AtConnection atConnection, String prompt,
String? errorCode, String errorDescription) {
Future<void> _writeToSocket(AtConnection atConnection, String prompt,
String? errorCode, String errorDescription) async {
if (atConnection.metaData.clientVersion ==
AtConnectionMetaData.clientVersionNotAvailable) {
atConnection.write('error:$errorCode-$errorDescription\n$prompt');
await atConnection.write('error:$errorCode-$errorDescription\n$prompt');
return;
}
// The JSON encoding of error message is supported by the client versions greater than 3.0.37
Expand All @@ -166,10 +166,10 @@ class GlobalExceptionHandler {
'errorCode': errorCode,
'errorDescription': errorDescription
};
atConnection.write('error:${jsonEncode(errorJsonMap)}\n$prompt');
await atConnection.write('error:${jsonEncode(errorJsonMap)}\n$prompt');
return;
}
// Defaults to return the error message in string format if all the conditions fails
atConnection.write('error:$errorCode-$errorDescription\n$prompt');
await atConnection.write('error:$errorCode-$errorDescription\n$prompt');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class StatsNotificationService {
}

/// Writes the lastCommitID to all Monitor connections
void writeStatsToMonitor({String? latestCommitID, String? operationType}) {
Future<void> writeStatsToMonitor(
{String? latestCommitID, String? operationType}) async {
try {
latestCommitID ??= atCommitLog!.lastCommittedSequenceNumber().toString();
// Gets the list of active connections.
Expand All @@ -156,8 +157,8 @@ class StatsNotificationService {
..messageType = MessageType.key.toString()
..isTextMessageEncrypted = false;
// Convert notification object to JSON and write to connection
connection
.write('notification: ${jsonEncode(notification.toJson())}\n');
await connection.write('notification:'
' ${jsonEncode(notification.toJson())}\n');
}
}
if (numOfMonitorConn == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ class AtSecondaryServerImpl implements AtSecondaryServer {
/// @param - ServerSocket
void _listen(var serverSocket) {
logger.info('serverSocket _listen : ${serverSocket.runtimeType}');
serverSocket.listen(((clientSocket) {
serverSocket.listen(((clientSocket) async {
var sessionID = '_${Uuid().v4()}';
InboundConnection? connection;
try {
Expand All @@ -453,9 +453,9 @@ class AtSecondaryServerImpl implements AtSecondaryServer {
connection = inBoundConnectionManager
.createSocketConnection(clientSocket, sessionId: sessionID);
connection.acceptRequests(_executeVerbCallBack, _streamCallBack);
connection.write('@');
await connection.write('@');
} on InboundConnectionLimitException catch (e) {
GlobalExceptionHandler.getInstance()
await GlobalExceptionHandler.getInstance()
.handle(e, atConnection: connection, clientSocket: clientSocket);
}
}), onError: (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ abstract class ChangeVerbHandler extends AbstractVerbHandler {
if (_responseInternal != null &&
_responseInternal!.isError == false &&
_responseInternal!.data != null) {
statsNotificationService.writeStatsToMonitor(
await statsNotificationService.writeStatsToMonitor(
latestCommitID: _responseInternal!.data,
operationType: getVerb().name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class MonitorVerbHandler extends AbstractVerbHandler {
// If enrollmentId is null, then connection is authenticated via PKAM
if ((atConnection.metaData as InboundConnectionMetadata).enrollmentId ==
null) {
_sendLegacyNotification(notification);
await _sendLegacyNotification(notification);
} else {
// If enrollmentId is populated, then connection is authenticated via APKAM
await _sendNotificationByEnrollmentNamespaceAccess(notification);
Expand All @@ -101,7 +101,7 @@ class MonitorVerbHandler extends AbstractVerbHandler {
/// - Writes all the notifications on the connection.
/// - Optionally, if regex is supplied, write only the notifications that
/// matches the pattern.
void _sendLegacyNotification(Notification notification) {
Future<void> _sendLegacyNotification(Notification notification) async {
var fromAtSign = notification.fromAtSign;
if (fromAtSign != null) {
fromAtSign = fromAtSign.replaceAll('@', '');
Expand All @@ -110,8 +110,8 @@ class MonitorVerbHandler extends AbstractVerbHandler {
// If the user does not provide regex, defaults to ".*" to match all notifications.
if (notification.notification!.contains(RegExp(regex)) ||
(fromAtSign != null && fromAtSign.contains(RegExp(regex)))) {
atConnection
.write('notification: ${jsonEncode(notification.toJson())}\n');
await atConnection.write('notification:'
' ${jsonEncode(notification.toJson())}\n');
}
} on FormatException {
logger.severe('Invalid regular expression : $regex');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class StreamVerbHandler extends AbstractVerbHandler {
logger.severe('sender connection is null for stream id:$streamId');
throw UnAuthenticatedException('Invalid stream id');
}
await StreamManager.senderSocketMap[streamId]!
.write('stream:done $streamId\n');
await StreamManager.senderSocketMap[streamId]!.write('stream:done'
' $streamId\n');
_cleanUp(streamId);
break;
case 'init':
Expand Down
2 changes: 1 addition & 1 deletion packages/at_secondary_server/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: at_secondary
description: Implementation of secondary server.
version: 3.0.42
version: 3.0.43
repository: https://github.com/atsign-foundation/at_server
homepage: https://www.example.com
publish_to: none
Expand Down
3 changes: 2 additions & 1 deletion packages/at_secondary_server/test/delete_verb_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ void main() {

test('verify deletion of cached signing private key', () async {
inboundConnection.metadata.isAuthenticated = true;
var command = 'delete:cached:@alice:${AtConstants.atSigningPrivateKey}@alice';
var command =
'delete:cached:@alice:${AtConstants.atSigningPrivateKey}@alice';
Response response =
await handler.processInternal(command, inboundConnection);
expect(int.parse(response.data!).runtimeType, int);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:at_secondary/src/connection/inbound/inbound_connection_impl.dart
import 'package:at_secondary/src/connection/inbound/inbound_connection_pool.dart';
import 'package:at_secondary/src/server/at_secondary_impl.dart';
import 'package:at_secondary/src/server/server_context.dart';
import 'package:at_server_spec/at_server_spec.dart';
import 'package:mocktail/mocktail.dart';
import 'package:test/test.dart';
import 'package:at_utils/at_utils.dart';
Expand Down Expand Up @@ -82,7 +83,7 @@ void main() async {
expect(poolInstance.getCurrentSize(), 1);
});

test('test connection pool - clear idle connection', () {
test('test connection pool - clear idle connection', () async {
var poolInstance = InboundConnectionPool.getInstance();
poolInstance.init(10);
var connection1 = MockInboundConnectionImpl(mockSocket, 'aaa');
Expand All @@ -95,7 +96,7 @@ void main() async {
milliseconds:
(serverContext.unauthenticatedInboundIdleTimeMillis * 0.9)
.floor()));
connection2.write('test data');
await connection2.write('test data');
expect(poolInstance.getCurrentSize(), 3);
sleep(Duration(
milliseconds:
Expand All @@ -112,12 +113,13 @@ void main() async {
});

/// Verify that, at lowWaterMark, allowable idle time is still as configured by inboundIdleTimeMillis
test('test connection pool - at lowWaterMark - clear idle connection', () {
test('test connection pool - at lowWaterMark - clear idle connection',
() async {
int maxPoolSize = 10;

var poolInstance = InboundConnectionPool.getInstance();
poolInstance.init(maxPoolSize);
var connections = [];
List<AtConnection> connections = [];

int lowWaterMark =
(maxPoolSize * serverContext.inboundConnectionLowWaterMarkRatio)
Expand All @@ -134,7 +136,7 @@ void main() async {
(serverContext.unauthenticatedInboundIdleTimeMillis * 0.9)
.floor()));

connections[1].write('test data');
await connections[1].write('test data');
expect(poolInstance.getCurrentSize(), lowWaterMark);
sleep(Duration(
milliseconds:
Expand All @@ -157,7 +159,8 @@ void main() async {
/// - Wait until we pass the currently allowable idle time for 'authenticated'
/// - Verify that the number of connections in the pool is now 3, since only
/// the 3 that we wrote to earlier are still not 'idle'
test('test connection pool - 90% capacity - clear idle connection', () {
test('test connection pool - 90% capacity - clear idle connection',
() async {
int maxPoolSize = 100; // Please don't change this

var poolInstance = InboundConnectionPool.getInstance();
Expand Down Expand Up @@ -207,10 +210,11 @@ void main() async {
int numUnAuthToWriteTo = 10;
// Let's write to a few authenticated connections, and some more unauthenticated connections
for (int i = 0; i < numAuthToWriteTo; i++) {
connections[i * 2].write('test data'); // evens are authenticated
await connections[i * 2].write('test data'); // evens are authenticated
}
for (int i = 0; i < numUnAuthToWriteTo; i++) {
connections[i * 2 + 1].write('test data'); // odds are not authenticated
await connections[i * 2 + 1]
.write('test data'); // odds are not authenticated
}

expect(poolInstance.getCurrentSize(), desiredPoolSize);
Expand Down
8 changes: 5 additions & 3 deletions packages/at_secondary_server/test/test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ class MockOutboundClientManager extends Mock implements OutboundClientManager {}
class MockNotificationManager extends Mock implements NotificationManager {}

class MockStatsNotificationService extends Mock
implements StatsNotificationService {}
implements StatsNotificationService {
@override
Future<void> writeStatsToMonitor(
{String? latestCommitID, String? operationType}) async {}
}

class MockAtCacheManager extends Mock implements AtCacheManager {}

Expand Down Expand Up @@ -234,8 +238,6 @@ verbTestsSetUp() async {
.thenAnswer((invocation) async => 'some-notification-id');

statsNotificationService = MockStatsNotificationService();
when(() => statsNotificationService.writeStatsToMonitor())
.thenAnswer((invocation) {});
}

Future<void> verbTestsTearDown() async {
Expand Down
67 changes: 44 additions & 23 deletions tests/at_end2end_test/test/e2e_test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -110,36 +110,53 @@ class SimpleOutboundSocketHandler {
// queue.clear();
}

final int newLineCodeUnit = 10;
final int atCharCodeUnit = 64;

/// Handles responses from the remote secondary, adds to [_queue] for processing in [read] method
/// Throws a [BufferOverFlowException] if buffer is unable to hold incoming data
Future<void> _messageHandler(data) async {
String result;
if (!_buffer.isOverFlow(data)) {
// skip @ prompt. byte code for @ is 64
if (data.length == 1 && data.first == 64) {
return;
}
//ignore prompt(@ or @<atSign>@) after '\n'. byte code for \n is 10
if (data.last == 64 && data.contains(10)) {
data = data.sublist(0, data.lastIndexOf(10) + 1);
_buffer.append(data);
} else if (data.length > 1 && data.first == 64 && data.last == 64) {
// pol responses do not end with '\n'. Add \n for buffer completion
_buffer.append(data);
_buffer.addByte(10);
// check buffer overflow
_checkBufferOverFlow(data);

// Loop from last index to until the end of data.
// If a new line character is found, then it is end
// of server response. process the data.
// Else add the byte to buffer.
for (int element = 0; element < data.length; element++) {
// If it's a '\n' then complete data has been received. process it.
if (data[element] == newLineCodeUnit) {
String result = utf8.decode(_buffer.getData().toList());
result = _stripPrompt(result);
_buffer.clear();
_queue.add(result);
} else {
_buffer.append(data);
_buffer.addByte(data[element]);
}
} else {
_buffer.clear();
throw BufferOverFlowException('Buffer overflow on outbound connection');
}
if (_buffer.isEnd()) {
result = utf8.decode(_buffer.getData());
result = result.trim();
}

_checkBufferOverFlow(data) {
if (_buffer.isOverFlow(data)) {
int bufferLength = (_buffer.length() + data.length) as int;
_buffer.clear();
_queue.add(result);
throw BufferOverFlowException(
'data length exceeded the buffer limit. Data length : $bufferLength and Buffer capacity ${_buffer.capacity}');
}
}

String _stripPrompt(String result) {
var colonIndex = result.indexOf(':');
if (colonIndex == -1) {
return result;
}
var responsePrefix = result.substring(0, colonIndex);
var response = result.substring(colonIndex);
if (responsePrefix.contains('@')) {
responsePrefix =
responsePrefix.substring(responsePrefix.lastIndexOf('@') + 1);
}
return '$responsePrefix$response';
}

/// A message which is returned from [read] if throwTimeoutException is set to false
Expand All @@ -151,10 +168,14 @@ class SimpleOutboundSocketHandler {
// Wait this many milliseconds between checks on the queue
var loopDelay=250;

bool first = true;
// Check every loopDelay milliseconds until we get a response or timeoutMillis have passed.
var loopCount = (timeoutMillis / loopDelay).round();
for (var i = 0; i < loopCount; i++) {
await Future.delayed(Duration(milliseconds: loopDelay));
if (!first) {
await Future.delayed(Duration(milliseconds: loopDelay));
}
first = false;
var queueLength = _queue.length;
if (queueLength > 0) {
result = _queue.removeFirst();
Expand Down
Loading

0 comments on commit 35f18f4

Please sign in to comment.