diff --git a/distributor/src/distributor/filters.bal b/distributor/src/distributor/filters.bal index 1acf397..49b1c25 100644 --- a/distributor/src/distributor/filters.bal +++ b/distributor/src/distributor/filters.bal @@ -2,7 +2,6 @@ import ballerina/auth; import ballerina/encoding; import ballerina/http; import ballerina/log; -import ballerina/websub; const WWW_AUTHENTICATE_HEADER = "WWW-Authenticate"; @@ -70,7 +69,6 @@ public type SubscriptionFilter object { log:printWarn("error decoding topic, using the original form: " + topic + ". Error: " + decodedTopic.toString()); } - map callbackMap = resultCallbackMap; match topic { JSON_TOPIC => { @@ -95,8 +93,6 @@ public type SubscriptionFilter object { log:printWarn("error decoding callback, using the original form: " + callback + ". Error: " + decodedCallback.toString()); } - websub:Hub hubVar = hub; - string headerValue = request.getHeader(http:AUTH_HEADER); if !(headerValue.startsWith(auth:AUTH_SCHEME_BASIC)) { @@ -110,22 +106,9 @@ public type SubscriptionFilter object { if (result is [string, string]) { [string, string][username, _] = result; - if callbackMap.hasKey(username) { - string existingCallback = callbackMap.get(username); - log:printInfo("Removing existing subscription callback: " + existingCallback + ", for username: " + - username + ", and topic: " + topic); - error? remResult = hubVar.removeSubscription(topic, existingCallback); - if (remResult is error) { - log:printError("error removing existing subscription for username: " + username, remResult); - } - log:printInfo("Adding a new subscription callback: " + callback + ", for username: " + - username + ", and topic: " + topic); - updateUserCallback(username, topic, callback); - } else { - log:printInfo("Adding a subscription callback: " + callback + ", for username: " + username + + log:printInfo("Adding a subscription callback: " + callback + ", for username: " + username + ", and topic: " + topic); - saveUserCallback(username, topic, callback); - } + saveUserCallback(username, topic, callback); callbackMap[username] = <@untainted> callback; } else { log:printError("Error extracting credentials", result); diff --git a/distributor/src/distributor/save.bal b/distributor/src/distributor/save.bal index bab68f0..bb0c263 100644 --- a/distributor/src/distributor/save.bal +++ b/distributor/src/distributor/save.bal @@ -34,7 +34,7 @@ const string CREATE_CALLBACKS_TABLE = "CREATE TABLE IF NOT EXISTS callbacks (" + " callback VARCHAR(200) NOT NULL," + " PRIMARY KEY (username, topic))"; const INSERT_CALLBACK = "INSERT INTO callbacks (username, topic, callback) VALUES (?, ?, ?)"; -const UPDATE_CALLBACK = "UPDATE callbacks SET callback = ? WHERE username = ? AND topic = ?"; +const REMOVE_CALLBACK = "DELETE from callbacks WHERE username = ? AND topic = ? AND callback = ?"; const SELECT_CALLBACKS = "SELECT * FROM callbacks"; const DROP_CALLBACKS_TABLE = "DROP TABLE callbacks"; @@ -247,9 +247,9 @@ function saveUserCallback(string username, string topic, string callback) { } } -# Update a subscription username-calback combination. -function updateUserCallback(string username, string topic, string callback) { - var r = dbClient->update(UPDATE_CALLBACK, callback, username, topic); +# Remove a subscription username-calback combination. +function removeUserCallback(string username, string topic, string callback) { + var r = dbClient->update(REMOVE_CALLBACK, username, topic, callback); if r is error { log:printError("Unable to update username-callback in database: ", r); } diff --git a/distributor/src/distributor/website.bal b/distributor/src/distributor/website.bal index 7e3e7af..cbb667c 100644 --- a/distributor/src/distributor/website.bal +++ b/distributor/src/distributor/website.bal @@ -1,8 +1,11 @@ +import ballerina/auth; import ballerina/http; +import ballerina/log; import ballerina/mime; import ballerina/time; import ballerina/xmlutils; import ballerina/file; +import ballerina/websub; const LEVEL_PD = "POLLING-DIVISION"; const LEVEL_ED = "ELECTORAL-DISTRICT"; @@ -132,6 +135,8 @@ service mediaWebsite on mediaListener { # web/info.html and it'll get shown at subscriber startup # + return - error if problem resource function info(http:Caller caller, http:Request request) returns error? { + removeSubscriptionsForUsername(request); + http:Response hr = new; hr.setFileAsPayload("web/info.txt", "text/plain"); check caller->ok(hr); @@ -274,3 +279,45 @@ function generateResultsTable(string 'type) returns string { tab = tab + ""; return tab; } + +function removeSubscriptionsForUsername(http:Request request) { + if (!request.hasHeader(http:AUTH_HEADER)) { + return; + } + + websub:Hub hubVar = hub; + string headerValue = request.getHeader(http:AUTH_HEADER); + + if !(headerValue.startsWith(auth:AUTH_SCHEME_BASIC)) { + return; + } + + string credential = headerValue.substring(5, headerValue.length()).trim(); + + var result = auth:extractUsernameAndPassword(credential); + + if (result is [string, string]) { + [string, string][username, _] = result; + + removeSubscription(username, resultCallbackMap, JSON_TOPIC, hubVar); + removeSubscription(username, imageCallbackMap, IMAGE_PDF_TOPIC, hubVar); + removeSubscription(username, awaitResultsCallbackMap, AWAIT_RESULTS_TOPIC, hubVar); + } else { + log:printError("Error extracting credentials to remove subscription", result); + } +} + +function removeSubscription(string username, map callbackMap, string topic, websub:Hub hub) { + if !callbackMap.hasKey(username) { + return; + } + + string existingCallback = callbackMap.get(username); + log:printInfo("Removing existing subscription callback: " + existingCallback + ", for username: " + + username + ", and topic: " + topic); + error? remResult = hub.removeSubscription(topic, existingCallback); + if (remResult is error) { + log:printError("error removing existing subscription for username: " + username, remResult); + } + removeUserCallback(username, topic, existingCallback); +}