Skip to content

Commit

Permalink
Add debug command for event admin remote provider based on MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzhenbao committed Oct 10, 2024
1 parent 39e8ee5 commit 8d490bc
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ if (EVENT_ADMIN_REMOTE_PROVIDER_MQTT)
set(EARPM_DEPS
Celix::event_admin_api
Celix::event_admin_spi
Celix::shell_api
Celix::c_rsa_spi
Celix::rsa_common
Celix::log_helper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,30 @@ TEST_F(CelixEarpmActErrorInjectionTestSuite, FailedToAddEndpointListenerServiceT
});
}

TEST_F(CelixEarpmActErrorInjectionTestSuite, FailedToCreateCommandServiceProperiesTest) {
TestEarpmActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_properties_create((void*)&celix_bundleActivator_start, 1, nullptr, 2);
auto status = celix_bundleActivator_start(act, ctx);
ASSERT_EQ(ENOMEM, status);
});
}

TEST_F(CelixEarpmActErrorInjectionTestSuite, FailedToSetCommandNameTest) {
TestEarpmActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_properties_set((void*)&celix_bundleActivator_start, 1, ENOMEM, 2);
auto status = celix_bundleActivator_start(act, ctx);
ASSERT_EQ(ENOMEM, status);
});
}

TEST_F(CelixEarpmActErrorInjectionTestSuite, FailedToAddCommandServiceTest) {
TestEarpmActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_dmComponent_addInterface((void*)&celix_bundleActivator_start, 1, ENOMEM, 3);
auto status = celix_bundleActivator_start(act, ctx);
ASSERT_EQ(ENOMEM, status);
});
}

TEST_F(CelixEarpmActErrorInjectionTestSuite, FailedToGetDependencyManagerTest) {
TestEarpmActivator([](void *act, celix_bundle_context_t *ctx) {
celix_ei_expect_celix_bundleContext_getDependencyManager((void*)&celix_bundleActivator_start, 1, nullptr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,17 @@ TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToSubscribeCtrlMessageOfEven
ASSERT_EQ(nullptr, earpm);
}

TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToDupDebugCommandTest) {
auto earpm = celix_earpm_create(ctx.get());
ASSERT_NE(earpm, nullptr);

celix_ei_expect_celix_utils_strdup((void*)&celix_earpm_executeCommand, 0, nullptr);
auto res = celix_earpm_executeCommand(earpm, "celix::earpm", stdout, stderr);
EXPECT_FALSE(res);

celix_earpm_destroy(earpm);
}

TEST_F(CelixEarpmImplErrorInjectionTestSuite, FailedToDupEventHandlerFilterInfoTest) {
TestAddEventHandlerServiceErrorInjection(
[]() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,4 +1001,31 @@ TEST_F(CelixEarpmImplTestSuite, SendEventAndThenRemoteHandlerRemovedByUpdateMess
});
}

TEST_F(CelixEarpmImplTestSuite, ExecuteCommandTest) {
TestRemoteProvider([](celix_event_admin_remote_provider_mqtt_t* earpm) {
AddRemoteHandlerInfoToRemoteProviderAndCheck(earpm, R"({"handler":{"handlerId":123,"topics":["subscribedEvent"]}})");

celix_event_handler_service_t eventHandlerService;
eventHandlerService.handle = nullptr;
eventHandlerService.handleEvent = [](void*, const char*, const celix_properties_t*) { return CELIX_SUCCESS; };
celix_autoptr(celix_properties_t) props = celix_properties_create();
celix_properties_setLong(props, CELIX_FRAMEWORK_SERVICE_ID, 234);
celix_properties_set(props, CELIX_EVENT_TOPIC, "topic");
celix_properties_set(props, CELIX_EVENT_FILTER, "(a=b)");
auto status = celix_earpm_addEventHandlerService(earpm, &eventHandlerService, props);
EXPECT_EQ(status, CELIX_SUCCESS);

auto res = celix_earpm_executeCommand(earpm, "celix::earpm", stdout, stderr);
EXPECT_TRUE(res);
});
}

TEST_F(CelixEarpmImplTestSuite, ExecuteCommandFailedTest) {
auto earpm = celix_earpm_create(ctx.get());
ASSERT_NE(earpm, nullptr);

auto res = celix_earpm_executeCommand(earpm, "celix::earpm unexpectedSubCmd", stdout, stderr);
EXPECT_FALSE(res);

celix_earpm_destroy(earpm);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <errno.h>

#include "celix_errno.h"
#include "celix_shell_command.h"
#include "celix_bundle_activator.h"
#include "celix_dm_component.h"
#include "celix_dm_service_dependency.h"
Expand All @@ -39,6 +40,7 @@ typedef struct celix_event_admin_remote_provider_mqtt_activator {
celix_event_admin_remote_provider_mqtt_t* providerMqtt;
celix_event_remote_provider_service_t providerSvc;
endpoint_listener_t endpointListener;
celix_shell_command_t cmdSvc;
} celix_event_admin_remote_provider_mqtt_activator_t;

static celix_status_t celix_eventAdminRemoteProviderMqttActivator_start(celix_event_admin_remote_provider_mqtt_activator_t *act, celix_bundle_context_t *ctx) {
Expand Down Expand Up @@ -162,6 +164,24 @@ static celix_status_t celix_eventAdminRemoteProviderMqttActivator_start(celix_ev
return status;
}

act->cmdSvc.handle = act->providerMqtt;
act->cmdSvc.executeCommand = celix_earpm_executeCommand;
celix_autoptr(celix_properties_t) cmdProps = celix_properties_create();
if (cmdProps == NULL) {
return ENOMEM;
}
status = celix_properties_set(cmdProps, CELIX_SHELL_COMMAND_NAME, "celix::earpm");
if (status != CELIX_SUCCESS) {
return status;
}
(void)celix_properties_set(cmdProps, CELIX_SHELL_COMMAND_USAGE, "celix::earpm");
(void)celix_properties_set(cmdProps, CELIX_SHELL_COMMAND_DESCRIPTION, "Show the status of the Event Admin Remote Provider Mqtt.");
status = celix_dmComponent_addInterface(earpmCmp, CELIX_SHELL_COMMAND_SERVICE_NAME, CELIX_SHELL_COMMAND_SERVICE_VERSION,
&act->cmdSvc, celix_steal_ptr(cmdProps));
if (status != CELIX_SUCCESS) {
return status;
}

celix_dependency_manager_t* mng = celix_bundleContext_getDependencyManager(ctx);
if (mng == NULL) {
return ENOMEM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,16 @@ void celix_earpmClient_destroy(celix_earpm_client_t* client) {
return;
}

void celix_earpmClient_info(celix_earpm_client_t* client, FILE* outStream) {
celixThreadMutex_lock(&client->mutex);
size_t queueSize = client->freeMsgPool.cap;
size_t usedSize = client->freeMsgPool.usedSize;
celixThreadMutex_unlock(&client->mutex);

fprintf(outStream, "\nMessage Queue Info:\n");
fprintf(outStream, "\tTotal:%zu, Used:%zu\n", queueSize, usedSize);
}

static celix_status_t celix_earpmClient_configMosq(mosquitto *mosq, celix_log_helper_t* logHelper, const char* sessionEndMsgTopic, const char* sessionEndMsgSenderUUID, const char* sessionEndMsgVersion) {
assert(mosq != NULL);
int rc = mosquitto_int_option(mosq, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
extern "C" {
#endif
#include <time.h>
#include <stdio.h>

#include "celix_errno.h"
#include "celix_cleanup.h"
Expand Down Expand Up @@ -93,7 +94,7 @@ celix_status_t celix_earpmClient_publishAsync(celix_earpm_client_t* client, cons

celix_status_t celix_earpmClient_publishSync(celix_earpm_client_t* client, const celix_earpm_client_request_info_t* requestInfo);


void celix_earpmClient_info(celix_earpm_client_t* client, FILE* outStream);


#ifdef __cplusplus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,50 @@ static void celix_earpm_connectedCallback(void* handle) {
return;
}

static void celix_earpm_infoCmd(celix_event_admin_remote_provider_mqtt_t* earpm, FILE* outStream) {
fprintf(outStream, "Event Admin Remote Provider Based On MQTT Info:\n");
{
fprintf(outStream, "\nLocal Subscriptions:\n");
celix_auto(celix_mutex_lock_guard_t) mutexGuard = celixMutexLockGuard_init(&earpm->mutex);
CELIX_STRING_HASH_MAP_ITERATE(earpm->eventSubscriptions, iter) {
celix_earpm_event_subscription_t* subscription = iter.value.ptrValue;
fprintf(outStream, "\t%s -> QOS:%d, SubCnt:%d\n", iter.key, subscription->curQos, celix_arrayList_size(subscription->handlerServiceIdList));
}
fprintf(outStream, "\nRemote Framework Info:\n");
CELIX_STRING_HASH_MAP_ITERATE(earpm->remoteFrameworks, iter) {
celix_earpm_remote_framework_info_t* fwInfo = iter.value.ptrValue;
fprintf(outStream, "\t%s -> HandlerCnt:%zu, NoAckCnt:%d, PendingAckCnt:%zu\n", iter.key,
celix_longHashMap_size(fwInfo->handlerInfoMap), fwInfo->continuousNoAckCount, celix_longHashMap_size(fwInfo->eventAckSeqNrMap));
}
}

celix_earpmClient_info(earpm->mqttClient, outStream);
}

bool celix_earpm_executeCommand(void* handle, const char* commandLine, FILE* outStream, FILE* errorStream) {
assert(handle != NULL);
assert(commandLine != NULL);
assert(outStream != NULL);
assert(errorStream != NULL);
celix_event_admin_remote_provider_mqtt_t* earpm = (celix_event_admin_remote_provider_mqtt_t*)handle;
celix_autofree char* cmd = celix_utils_strdup(commandLine);
if (cmd == NULL) {
fprintf(errorStream, "Failed to process command line %s.\n", commandLine);
return false;
}
const char* subCmd = NULL;
char* savePtr = NULL;
strtok_r(cmd, " ", &savePtr);
subCmd = strtok_r(NULL, " ", &savePtr);
if (subCmd == NULL) {
celix_earpm_infoCmd(earpm, outStream);
} else {
fprintf(errorStream, "Unexpected sub command %s\n", subCmd);
return false;
}
return true;
}

size_t celix_earpm_currentRemoteFrameworkCount(celix_event_admin_remote_provider_mqtt_t* earpm) {
celix_auto(celix_mutex_lock_guard_t) mutexGuard = celixMutexLockGuard_init(&earpm->mutex);
size_t cnt = celix_stringHashMap_size(earpm->remoteFrameworks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
extern "C" {
#endif

#include <stdbool.h>

#include "celix_errno.h"
#include "celix_compiler.h"
#include "celix_bundle_context.h"
Expand All @@ -47,6 +49,8 @@ celix_status_t celix_earpm_setEventAdminSvc(void* handle, void* eventAdminSvc);
celix_status_t celix_earpm_postEvent(void* handle , const char* topic, const celix_properties_t* eventProps);
celix_status_t celix_earpm_sendEvent(void* handle , const char* topic, const celix_properties_t* eventProps);

bool celix_earpm_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errorStream);

size_t celix_earpm_currentRemoteFrameworkCount(celix_event_admin_remote_provider_mqtt_t* earpm);

#ifdef __cplusplus
Expand Down
1 change: 1 addition & 0 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def configure(self):
if options["build_event_admin_remote_provider_mqtt"]:
options["build_event_admin"] = True
options["build_remote_service_admin"] = True
options["build_shell_api"] = True

if options["build_cxx_rsa_integration"]:
options["build_cxx_remote_service_admin"] = True
Expand Down

0 comments on commit 8d490bc

Please sign in to comment.