Skip to content

Commit

Permalink
Merge pull request #36 from hhxsv5/master
Browse files Browse the repository at this point in the history
Support subscribing the multiple channels
  • Loading branch information
hhxsv5 authored Mar 25, 2019
2 parents 42d606d + 7e62600 commit 4038659
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 36 deletions.
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,17 @@ $auth = null;
$api = new WebSocketFeed($auth);

$query = ['connectId' => uniqid('', true)];
$channel = [
'topic' => '/market/ticker:KCS-BTC',
//'response' => true,
$channels = [
['topic' => '/market/ticker:KCS-BTC'], // Subscribe multiple channels
['topic' => '/market/ticker:ETH-BTC'],
];

$api->subscribePublicChannel($query, $channel, function (array $message, WebSocket $ws, LoopInterface $loop) use ($api) {
$api->subscribePublicChannels($query, $channels, function (array $message, WebSocket $ws, LoopInterface $loop) use ($api) {
var_dump($message);

// Unsubscribe the channel
// $ws->send(json_encode($api->createUnsubscribeMessage('/market/ticker:ETH-BTC')));

// Stop loop
// $loop->stop();
}, function ($code, $reason) {
Expand Down Expand Up @@ -178,7 +182,9 @@ go(function () {
| KuCoin\SDK\PrivateApi\WebSocketFeed::getPublicServer() | NO | https://docs.kucoin.com/#apply-connect-token |
| KuCoin\SDK\PrivateApi\WebSocketFeed::getPrivateServer() | YES | https://docs.kucoin.com/#apply-connect-token |
| KuCoin\SDK\PrivateApi\WebSocketFeed::subscribePublicChannel() | NO | https://docs.kucoin.com/#public-channels |
| KuCoin\SDK\PrivateApi\WebSocketFeed::subscribePublicChannels() | NO | https://docs.kucoin.com/#public-channels |
| KuCoin\SDK\PrivateApi\WebSocketFeed::subscribePrivateChannel() | YES | https://docs.kucoin.com/#private-channels |
| KuCoin\SDK\PrivateApi\WebSocketFeed::subscribePrivateChannels() | YES | https://docs.kucoin.com/#private-channels |

</details>

Expand Down
12 changes: 8 additions & 4 deletions examples/WebSocketFeed.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
$api = new WebSocketFeed($auth);

$query = ['connectId' => uniqid('', true)];
$channel = [
'topic' => '/market/ticker:KCS-BTC',
//'response' => true,
$channels = [
['topic' => '/market/ticker:KCS-BTC'], // Subscribe multiple channels
['topic' => '/market/ticker:ETH-BTC'],
];

$api->subscribePublicChannel($query, $channel, function (array $message, WebSocket $ws, LoopInterface $loop) use ($api) {
$api->subscribePublicChannels($query, $channels, function (array $message, WebSocket $ws, LoopInterface $loop) use ($api) {
var_dump($message);

// Unsubscribe the channel
// $ws->send(json_encode($api->createUnsubscribeMessage('/market/ticker:ETH-BTC')));

// Stop loop
// $loop->stop();
}, function ($code, $reason) {
Expand Down
106 changes: 84 additions & 22 deletions src/PrivateApi/WebSocketFeed.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,16 @@ public function getPrivateServer(array $params = [])
}

/**
* Subscribe channel by url
* Subscribe multiple channels by url
* @param array $server
* @param array $channel
* @param array $channels
* @param callable $onMessage
* @param callable|null $onClose
* @param array $options
* @throws \Exception|\Throwable
*/
public function subscribeChannel(array $server, array $channel, callable $onMessage, callable $onClose = null, array $options = [])
public function subscribeChannels(array $server, array $channels, callable $onMessage, callable $onClose = null, array $options = [])
{
$channel['type'] = 'subscribe';
if (!isset($options['tls']['verify_peer'])) {
$options['tls']['verify_peer'] = !static::isSkipVerifyTls();
}
Expand All @@ -111,7 +110,7 @@ public function subscribeChannel(array $server, array $channel, callable $onMess
* @var \Exception|\Throwable $exception
*/
$exception = null;
$connector($server['connectUrl'])->then(function (WebSocket $ws) use ($server, $channel, $onMessage, $onClose, $loop) {
$connector($server['connectUrl'])->then(function (WebSocket $ws) use ($server, $channels, $onMessage, $onClose, $loop) {
// Add timer to send ping message
$pingTimer = $loop->addPeriodicTimer($server['pingInterval'] / 1000 - 1, function () use ($ws) {
try {
Expand All @@ -122,7 +121,7 @@ public function subscribeChannel(array $server, array $channel, callable $onMess
// Ignore this exception
}
});
$ws->on('message', function (MessageInterface $msg) use ($ws, $channel, $onMessage, $loop, $pingTimer) {
$ws->on('message', function (MessageInterface $msg) use ($server, $ws, $channels, $onMessage, $loop, $pingTimer) {
$msgStr = $msg->__toString();
$msgArray = json_decode($msgStr, true);
if (!isset($msgArray['type'])) {
Expand All @@ -131,8 +130,10 @@ public function subscribeChannel(array $server, array $channel, callable $onMess
switch ($msgArray['type']) {
case 'welcome':
// Do subscribe
if (!isset($msgArray['id']) || $msgArray['id'] === $channel['id']) {
$ws->send(json_encode($channel));
if (!isset($msgArray['id']) || $msgArray['id'] === $server['connectId']) {
foreach ($channels as $channel) {
$ws->send(json_encode($channel));
}
}
break;
case 'ack':
Expand Down Expand Up @@ -167,41 +168,89 @@ public function subscribeChannel(array $server, array $channel, callable $onMess
}

/**
* Subscribe public channel
* Subscribe multiple public channels
* @param array $query The query of websocket url
* @param array $channel
* @param array $channels
* @param callable $onMessage
* @param callable|null $onClose
* @param array $options
* @throws \Exception|\Throwable
*/
public function subscribePublicChannel(array $query, array $channel, callable $onMessage, callable $onClose = null, array $options = [])
public function subscribePublicChannels(array $query, array $channels, callable $onMessage, callable $onClose = null, array $options = [])
{
if (!isset($query['connectId']) || !isset($channel['id'])) {
$channel['id'] = $query['connectId'] = uniqid('', true);
if (!isset($channels[0])) {
$channels = [$channels];
}
array_walk($channels, function (&$channel) {
if (!isset($channel['id'])) {
$channel['id'] = uniqid('', true);
}
$channel['type'] = 'subscribe';
$channel['privateChannel'] = false;
});
if (!isset($query['connectId'])) {
$query['connectId'] = uniqid('', true);
}
$channel['privateChannel'] = false;
$server = $this->getPublicServer($query);
$this->subscribeChannel($server, $channel, $onMessage, $onClose, $options);
$server['connectId'] = $query['connectId'];
$this->subscribeChannels($server, $channels, $onMessage, $onClose, $options);
}

/**
* Subscribe private channel
* Subscribe multiple private channels
* @param array $query The query of websocket url
* @param array $channel
* @param array $channels
* @param callable $onMessage
* @param callable|null $onClose
* @param array $options
* @throws \Exception|\Throwable
*/
public function subscribePrivateChannel(array $query, array $channel, callable $onMessage, callable $onClose = null, array $options = [])
public function subscribePrivateChannels(array $query, array $channels, callable $onMessage, callable $onClose = null, array $options = [])
{
if (!isset($query['connectId']) || !isset($channel['id'])) {
$channel['id'] = $query['connectId'] = uniqid('', true);
if (!isset($channels[0])) {
$channels = [$channels];
}
array_walk($channels, function (&$channel) {
if (!isset($channel['id'])) {
$channel['id'] = uniqid('', true);
}
$channel['type'] = 'subscribe';
$channel['privateChannel'] = true;
});
if (!isset($query['connectId'])) {
$query['connectId'] = uniqid('', true);
}
$channel['privateChannel'] = true;
$server = $this->getPrivateServer($query);
$this->subscribeChannel($server, $channel, $onMessage, $onClose, $options);
$server['connectId'] = $query['connectId'];
$this->subscribeChannels($server, $channels, $onMessage, $onClose, $options);
}

/**
* Subscribe one public channel
* @param array $query The query of websocket url
* @param array $channel
* @param callable $onMessage
* @param callable|null $onClose
* @param array $options
* @throws \Exception|\Throwable
*/
public function subscribePublicChannel(array $query, array $channel, callable $onMessage, callable $onClose = null, array $options = [])
{
$this->subscribePublicChannels($query, [$channel], $onMessage, $onClose, $options);
}

/**
* Subscribe one private channel
* @param array $query The query of websocket url
* @param array $channel
* @param callable $onMessage
* @param callable|null $onClose
* @param array $options
* @throws \Exception|\Throwable
*/
public function subscribePrivateChannel(array $query, array $channel, callable $onMessage, callable $onClose = null, array $options = [])
{
$this->subscribePrivateChannels($query, [$channel], $onMessage, $onClose, $options);
}

/**
Expand All @@ -213,4 +262,17 @@ public function createPingMessage($id = null)
{
return ['id' => $id ?: uniqid('', true), 'type' => 'ping'];
}

/**
* Create message for unsubscribe
* @param string $topic
* @param bool $privateChannel
* @param bool $response
* @param string $id
* @return array
*/
public function createUnsubscribeMessage($topic, $privateChannel = false, $response = true, $id = null)
{
return ['id' => $id ?: uniqid('', true), 'type' => 'unsubscribe', 'topic' => $topic, 'privateChannel' => $privateChannel, 'response' => $response];
}
}
73 changes: 67 additions & 6 deletions tests/WebSocketFeedTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ public function testGetPrivateBullet(WebSocketFeed $api)
*/
public function testSubscribePublicChannel(WebSocketFeed $api)
{
$query = [
'connectId' => uniqid('', true),
];
$query = ['connectId' => uniqid('', true),];
$channel = [
'topic' => '/market/ticker:KCS-BTC',
//'response' => true,
Expand Down Expand Up @@ -102,11 +100,42 @@ public function testSubscribePublicChannel(WebSocketFeed $api)
* @param WebSocketFeed $api
* @throws \Exception|\Throwable
*/
public function testSubscribePrivateChannel(WebSocketFeed $api)
public function testSubscribePublicChannels(WebSocketFeed $api)
{
$query = [
'connectId' => uniqid('', true),
$query = ['connectId' => uniqid('', true),];
$channels = [
['topic' => '/market/ticker:KCS-BTC',/*'response' => true,*/],
['topic' => '/market/ticker:ETH-BTC',/*'response' => true,*/],
];

$options = [
// 'tls' => [
// 'verify_peer' => false,
// ],
];
$api->subscribePublicChannels($query, $channels, function (array $message, WebSocket $ws, LoopInterface $loop) use ($api) {
$this->assertInternalType('array', $message);
$this->assertArrayHasKey('type', $message);
$this->assertEquals('message', $message['type']);

// Dynamic output
fputs(STDIN, print_r($message, true));

// Stop for phpunit
$loop->stop();
}, function ($code, $reason) {
echo "OnClose: {$code} {$reason}\n";
}, $options);
}

/**
* @dataProvider apiProvider
* @param WebSocketFeed $api
* @throws \Exception|\Throwable
*/
public function testSubscribePrivateChannel(WebSocketFeed $api)
{
$query = ['connectId' => uniqid('', true),];
$channel = [
'topic' => '/market/match:KCS-BTC',
//'response' => true,
Expand All @@ -130,4 +159,36 @@ public function testSubscribePrivateChannel(WebSocketFeed $api)
echo "OnClose: {$code} {$reason}\n";
}, $options);
}

/**
* @dataProvider apiProvider
* @param WebSocketFeed $api
* @throws \Exception|\Throwable
*/
public function testSubscribePrivateChannels(WebSocketFeed $api)
{
$query = ['connectId' => uniqid('', true),];
$channels = [
['topic' => '/market/match:KCS-BTC',/*'response' => true,*/],
['topic' => '/market/match:ETH-BTC',/*'response' => true,*/],
];

$options = [
// 'tls' => [
// 'verify_peer' => false,
// ],
];
$api->subscribePrivateChannels($query, $channels, function (array $message, WebSocket $ws, LoopInterface $loop) use ($api) {
$this->assertInternalType('array', $message);
$this->assertArrayHasKey('type', $message);
$this->assertEquals('message', $message['type']);
// Dynamic output
fputs(STDIN, print_r($message, true));

// Stop for phpunit
$loop->stop();
}, function ($code, $reason) {
echo "OnClose: {$code} {$reason}\n";
}, $options);
}
}

0 comments on commit 4038659

Please sign in to comment.