diff --git a/c/mqtt.c b/c/mqtt.c index 1c4ca20..20a56af 100644 --- a/c/mqtt.c +++ b/c/mqtt.c @@ -131,11 +131,13 @@ typedef struct struct mosquitto *mosq; /* mosquito structure */ bool is_in_use; bool is_async; /* if true then use mosquitto_connect_async */ + int refs; atom_t symbol; /* (%p) */ bool is_async_loop_started; /* set to true after mosquitto_loop_start call */ int keepalive; int loop_max_packets; - PL_engine_t pl_engine; /* prolog engine for async processing */ + PL_engine_t pl_engine; /* prolog engine for async processing */ + bool use_callbacks; predicate_t callback_on_connect2; predicate_t callback_on_disconnect2; predicate_t callback_on_log2; @@ -210,13 +212,14 @@ close_list(term_t list) static int destroy_mqtt(swi_mqtt *m) { - _LOG("--- (f-c) destroy_mqtt swi_mqtt: %p \n", m); - if (m && m->mosq) { + _LOG("--- (f-c) destroy_mqtt swi_mqtt: %p refs: %d\n", m, m->refs); + if (m && m->mosq && m->refs == 0) { if (m->is_in_use == TRUE) { _LOG("--- (f-c) destroy_mqtt - connection is in use\n"); return FALSE; } else { + _LOG("--- (f-c) destroy_mqtt - release mosq and blob\n"); mosquitto_destroy(m->mosq); m->mosq = NULL; free(m); @@ -527,18 +530,20 @@ void on_unsubscribe_callback(struct mosquitto *mosq, void *obj, int mid) static void acquire_mqtt_symbol(atom_t symbol) { - _LOG("--- (f-c) acquire_mqtt_symbol\n"); + _LOG("--- (f-c) acquire_mqtt_symbol symbol: %d\n", (int) symbol); swi_mqtt *m = PL_blob_data(symbol, NULL, NULL); m->symbol = symbol; + m->refs++; } static int release_mqtt_symbol(atom_t symbol) { - _LOG("--- (f-c) release_mqtt_symbol\n"); + _LOG("--- (f-c) release_mqtt_symbol symbol: %d\n", (int) symbol); swi_mqtt *m = PL_blob_data(symbol, NULL, NULL); + m->refs--; destroy_mqtt(m); return TRUE; } @@ -580,7 +585,7 @@ static int unify_swi_mqtt(term_t handle, swi_mqtt *m) { _LOG("--- (f-c) unify_swi_mqtt\n"); - + if ( PL_unify_blob(handle, m, sizeof(*m), &mqtt_blob) ) return TRUE; if ( !PL_is_variable(handle) ) @@ -791,6 +796,7 @@ c_mqtt_pub(term_t conn, term_t topic, term_t payload, term_t options) if ( options ) { + _LOG("--- (f-c) c_mqtt_pub > parsing options...\n"); term_t tail = PL_copy_term_ref(options); term_t head = PL_new_term_ref(); @@ -805,11 +811,12 @@ c_mqtt_pub(term_t conn, term_t topic, term_t payload, term_t options) _PL_get_arg(1, head, arg); - if ( name == ATOM_retain) { if (!PL_get_bool( arg, &retain)) { result = FALSE;goto CLEANUP;} - } else if ( name == ATOM_qos ) { if (!PL_get_integer(arg, &qos) ) { result = FALSE;goto CLEANUP;} + if ( name == ATOM_retain) { if (!PL_get_bool( arg, &retain)) { result = FALSE;_LOG("--- (f-c) c_mqtt_pub > get ATOM_retain failed\n");goto CLEANUP;} + } else if ( name == ATOM_qos ) { if (!PL_get_integer(arg, &qos) ) { result = FALSE;_LOG("--- (f-c) c_mqtt_pub > get ATOM_qos failed\n");goto CLEANUP;} } } else { + _LOG("--- (f-c) c_mqtt_pub > PL_get_name_arity failed\n"); result = FALSE; // pl_error("c_mqtt_pub", 4, NULL, ERR_TYPE, head, "option"); goto CLEANUP; } @@ -817,9 +824,11 @@ c_mqtt_pub(term_t conn, term_t topic, term_t payload, term_t options) // unify with NIL --> end of list if ( !PL_get_nil(tail) ) { + _LOG("--- (f-c) c_mqtt_pub > PL_get_nil failed\n"); result = FALSE; // pl_error("c_mqtt_pub", 4, NULL, ERR_TYPE, tail, "list"); goto CLEANUP; } + _LOG("--- (f-c) c_mqtt_pub > parsing options done\n"); } if (!PL_get_chars(payload, &mqtt_payload, CVT_WRITE | BUF_MALLOC)) { @@ -932,6 +941,8 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options) goto CLEANUP; } + m->refs = 0; + // pass swi_mqtt as user object (will be available in all callbacks) mosq = mosquitto_new(client_id, true, m); if (mosq) @@ -1011,6 +1022,10 @@ c_mqtt_connect(term_t conn, term_t host, term_t port, term_t options) } } _LOG("--- (f-c) c_mqtt_connect > bind done\n"); + } else { + _LOG("--- (f-c) c_mqtt_connect > unable to create mosq client\n"); + result = FALSE; + goto CLEANUP; } if ( unify_swi_mqtt(conn, m) ) diff --git a/c/mqtt.o b/c/mqtt.o index 9b3c9bf..aaeeae3 100644 Binary files a/c/mqtt.o and b/c/mqtt.o differ diff --git a/lib/x86_64-linux/mqtt.so b/lib/x86_64-linux/mqtt.so index 649785e..4208302 100644 Binary files a/lib/x86_64-linux/mqtt.so and b/lib/x86_64-linux/mqtt.so differ diff --git a/prolog/mqtt.pl b/prolog/mqtt.pl index 3c6a9c8..fbc07ac 100644 --- a/prolog/mqtt.pl +++ b/prolog/mqtt.pl @@ -99,7 +99,7 @@ mqtt_pub(Connection, Topic, Payload) :- - mqtt_pub(Connection, Topic, Payload, [retain(0), qos(0)]). + mqtt_pub(Connection, Topic, Payload, [retain(false), qos(0)]). % publish to mqtt mqtt_pub(Connection, Topic, Payload, Options) :-