Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add round robin option for client and allow client to switch urls after disconnecting. #241

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions demo/multiurls_switch/multiurls_switch.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
#include "nng/nng.h"
#include "nng/supplemental/util/platform.h"

// For example. There are 3 urls.
// When round robin is true.
// The order of connecting will be 0 (server unavailable) -> 1 (connected and then disconnected) -> 2 -> 0.
// When round robin is false.
// The order of connecting will be 0 (server unavailable) -> 1 (connected and then disconnected) -> 0.
#define ROUND_ROBIN true

void
print_helper()
{
Expand All @@ -49,6 +56,9 @@ intHandler(int dummy)
exit(0);
}

static nng_cv *switch_cv;
static nng_mtx *switch_mtx;

static void
disconnect_cb(nng_pipe p, nng_pipe_ev ev, void *arg)
{
Expand All @@ -59,6 +69,11 @@ disconnect_cb(nng_pipe p, nng_pipe_ev ev, void *arg)
// nng_pipe_get_ptr(p, NNG_OPT_MQTT_DISCONNECT_PROPERTY, &prop);
// nng_socket_get?
printf("%s: disconnected!\n", __FUNCTION__);

// Wake to reconnect
nng_mtx_lock(switch_mtx);
nng_cv_wake(switch_cv);
nng_mtx_unlock(switch_mtx);
(void) ev;
(void) arg;
}
Expand Down Expand Up @@ -101,29 +116,37 @@ client_connect(const char **urls, int len)

int cnt = -1;

nng_socket sock;
if ((rv = nng_mqtt_client_open(&sock)) != 0) {
fatal("nng_socket", rv);
}
nng_mqtt_set_connect_cb(sock, connect_cb, (void *)&sock);
nng_mqtt_set_disconnect_cb(sock, disconnect_cb, connmsg);

while (1) {
nng_dialer dialer;
nng_socket sock;
if ((rv = nng_mqtt_client_open(&sock)) != 0) {
fatal("nng_socket", rv);
}
nng_mqtt_set_connect_cb(sock, connect_cb, (void *)&sock);
nng_mqtt_set_disconnect_cb(sock, disconnect_cb, connmsg);

cnt = (cnt + 1) % len;
const char *url = urls[cnt];

nng_dialer dialer;
if ((rv = nng_dialer_create(&dialer, sock, url)) != 0) {
fatal("nng_dialer_create", rv);
}
nng_dialer_set_ptr(dialer, NNG_OPT_MQTT_CONNMSG, connmsg);

printf("Connecting to server %s ...\n", url);
printf("Connecting to server [%d]%s ...\n", cnt, url);
if ((rv = nng_dialer_start(dialer, NNG_FLAG_ALLOC)) != 0) {
printf("Failed to connect to %s rv%d\n", url, rv);
} else {
break;
continue;
}
// connected
// Wait for disconnect
nng_mtx_lock(switch_mtx);
nng_cv_wait(switch_cv);
nng_mtx_unlock(switch_mtx);
// close socket
nng_close(sock);
if (ROUND_ROBIN == false) {
cnt = -1; // Always from the first url
}
}

Expand All @@ -133,6 +156,7 @@ client_connect(const char **urls, int len)
int
main()
{
int rv;
print_helper();
const char *urls[] = {
"mqtt-tcp://example.io:1883",
Expand All @@ -141,6 +165,10 @@ main()
};
int len = sizeof(urls) / sizeof(char *);

if ((0 != (rv = nng_mtx_alloc(&switch_mtx))) ||
(0 != (rv = nng_cv_alloc(&switch_cv, switch_mtx)))) {
fatal("Failed to init switch mtx or cv", rv);
}
client_connect(urls, len);

signal(SIGINT, intHandler);
Expand Down
Loading