Skip to content

Commit

Permalink
added symbol reference for acquire and release
Browse files Browse the repository at this point in the history
  • Loading branch information
olsky committed Jun 3, 2016
1 parent 969f4a9 commit 355f236
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 9 deletions.
31 changes: 23 additions & 8 deletions c/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,13 @@ typedef struct
struct mosquitto *mosq; /* mosquito structure */
bool is_in_use;
bool is_async; /* if true then use mosquitto_connect_async */
int refs;
atom_t symbol; /* <swi_mqtt>(%p) */
bool is_async_loop_started; /* set to true after mosquitto_loop_start call */
int keepalive;
int loop_max_packets;
PL_engine_t pl_engine; /* prolog engine for async processing */
PL_engine_t pl_engine; /* prolog engine for async processing */
bool use_callbacks;
predicate_t callback_on_connect2;
predicate_t callback_on_disconnect2;
predicate_t callback_on_log2;
Expand Down Expand Up @@ -210,13 +212,14 @@ close_list(term_t list)
static int
destroy_mqtt(swi_mqtt *m)
{
_LOG("--- (f-c) destroy_mqtt swi_mqtt: %p \n", m);
if (m && m->mosq) {
_LOG("--- (f-c) destroy_mqtt swi_mqtt: %p refs: %d\n", m, m->refs);
if (m && m->mosq && m->refs == 0) {
if (m->is_in_use == TRUE)
{
_LOG("--- (f-c) destroy_mqtt - connection is in use\n");
return FALSE;
} else {
_LOG("--- (f-c) destroy_mqtt - release mosq and blob\n");
mosquitto_destroy(m->mosq);
m->mosq = NULL;
free(m);
Expand Down Expand Up @@ -527,18 +530,20 @@ void on_unsubscribe_callback(struct mosquitto *mosq, void *obj, int mid)
static void
acquire_mqtt_symbol(atom_t symbol)
{
_LOG("--- (f-c) acquire_mqtt_symbol\n");
_LOG("--- (f-c) acquire_mqtt_symbol symbol: %d\n", (int) symbol);

swi_mqtt *m = PL_blob_data(symbol, NULL, NULL);
m->symbol = symbol;
m->refs++;
}

static int
release_mqtt_symbol(atom_t symbol)
{
_LOG("--- (f-c) release_mqtt_symbol\n");
_LOG("--- (f-c) release_mqtt_symbol symbol: %d\n", (int) symbol);

swi_mqtt *m = PL_blob_data(symbol, NULL, NULL);
m->refs--;
destroy_mqtt(m);
return TRUE;
}
Expand Down Expand Up @@ -580,7 +585,7 @@ static int
unify_swi_mqtt(term_t handle, swi_mqtt *m)
{
_LOG("--- (f-c) unify_swi_mqtt\n");

if ( PL_unify_blob(handle, m, sizeof(*m), &mqtt_blob) )
return TRUE;
if ( !PL_is_variable(handle) )
Expand Down Expand Up @@ -791,6 +796,7 @@ c_mqtt_pub(term_t conn, term_t topic, term_t payload, term_t options)

if ( options )
{
_LOG("--- (f-c) c_mqtt_pub > parsing options...\n");
term_t tail = PL_copy_term_ref(options);
term_t head = PL_new_term_ref();

Expand All @@ -805,21 +811,24 @@ c_mqtt_pub(term_t conn, term_t topic, term_t payload, term_t options)

_PL_get_arg(1, head, arg);

if ( name == ATOM_retain) { if (!PL_get_bool( arg, &retain)) { result = FALSE;goto CLEANUP;}
} else if ( name == ATOM_qos ) { if (!PL_get_integer(arg, &qos) ) { result = FALSE;goto CLEANUP;}
if ( name == ATOM_retain) { if (!PL_get_bool( arg, &retain)) { result = FALSE;_LOG("--- (f-c) c_mqtt_pub > get ATOM_retain failed\n");goto CLEANUP;}
} else if ( name == ATOM_qos ) { if (!PL_get_integer(arg, &qos) ) { result = FALSE;_LOG("--- (f-c) c_mqtt_pub > get ATOM_qos failed\n");goto CLEANUP;}
}

} else {
_LOG("--- (f-c) c_mqtt_pub > PL_get_name_arity failed\n");
result = FALSE; // pl_error("c_mqtt_pub", 4, NULL, ERR_TYPE, head, "option");
goto CLEANUP;
}
}
// unify with NIL --> end of list
if ( !PL_get_nil(tail) )
{
_LOG("--- (f-c) c_mqtt_pub > PL_get_nil failed\n");
result = FALSE; // pl_error("c_mqtt_pub", 4, NULL, ERR_TYPE, tail, "list");
goto CLEANUP;
}
_LOG("--- (f-c) c_mqtt_pub > parsing options done\n");
}

if (!PL_get_chars(payload, &mqtt_payload, CVT_WRITE | BUF_MALLOC)) {
Expand Down Expand Up @@ -932,6 +941,8 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options)
goto CLEANUP;
}

m->refs = 0;

// pass swi_mqtt as user object (will be available in all callbacks)
mosq = mosquitto_new(client_id, true, m);
if (mosq)
Expand Down Expand Up @@ -1011,6 +1022,10 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options)
}
}
_LOG("--- (f-c) c_mqtt_connect > bind done\n");
} else {
_LOG("--- (f-c) c_mqtt_connect > unable to create mosq client\n");
result = FALSE;
goto CLEANUP;
}

if ( unify_swi_mqtt(conn, m) )
Expand Down
Binary file modified c/mqtt.o
Binary file not shown.
Binary file modified lib/x86_64-linux/mqtt.so
Binary file not shown.
2 changes: 1 addition & 1 deletion prolog/mqtt.pl
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@


mqtt_pub(Connection, Topic, Payload) :-
mqtt_pub(Connection, Topic, Payload, [retain(0), qos(0)]).
mqtt_pub(Connection, Topic, Payload, [retain(false), qos(0)]).

% publish to mqtt
mqtt_pub(Connection, Topic, Payload, Options) :-
Expand Down

0 comments on commit 355f236

Please sign in to comment.