Skip to content

Commit

Permalink
Merge pull request #2199 from telefonicaid/issue-2197-deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
mrutid authored Oct 18, 2022
2 parents 2b46412 + 066c846 commit a1b48bc
Show file tree
Hide file tree
Showing 11 changed files with 231 additions and 67 deletions.
2 changes: 2 additions & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
- [cygnus-ngsi] Upgrade Debian version from 11.2 to 11.3 in Dockerfile
- [cygnus-ngsi] Remove apache-flume and apache-maven tar.gz files from docker
- [cygnus-common] Remove grouping rules functionality (deprecated since version 1.6.0 in December 2016)
- [cygnus-common][SQLBackend] Ordernig batch INSERT sentences upon upsert to avoid deadlocks (#2197)
- [cygnus-ngsi] Ensure columns are in a given order in column aggregator (#2197)
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,9 @@ public void upsertTransaction (LinkedHashMap<String, ArrayList<JsonElement>> agg
schema,
attrNativeTypes);

// Ordering queries to avoid deadlocks. See issue #2197 for more detail
upsertQuerysList.sort(Comparator.comparing(buff -> buff.toString()));

for (StringBuffer query : upsertQuerysList) {
PreparedStatement upsertStatement;
currentUpsertQuery = query.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public class SQLQueryUtils {
* @param timestampKey the timestamp key
* @param timestampFormat the timestamp format
* @param sqlInstance the sql instance
* @param destination the destination
* @param dataBase the database
* @param schema the database schema
* @param attrNativeTypes
* @return the string buffer
*/
protected static ArrayList<StringBuffer> sqlUpsertQuery(LinkedHashMap<String, ArrayList<JsonElement>> aggregation,
Expand Down Expand Up @@ -127,24 +129,27 @@ protected static ArrayList<StringBuffer> postgreSqlUpsertQuery(LinkedHashMap<Str
StringBuffer values = new StringBuffer("(");
StringBuffer fields = new StringBuffer("(");
StringBuffer updateSet = new StringBuffer();
String valuesSeparator = "";
String fieldsSeparator = "";
String updateSetSeparator = "";
ArrayList<String> keys = new ArrayList<>(aggregation.keySet());
for (int j = 0 ; j < keys.size() ; j++) {
if (lastData.get(keys.get(j)).get(i) != null) {
JsonElement value = lastData.get(keys.get(j)).get(i);
if (j == 0) {
values.append(getStringValueFromJsonElement(value, "'", attrNativeTypes));
fields.append(keys.get(j));
if (!Arrays.asList(uniqueKey.split("\\s*,\\s*")).contains(keys.get(j))) {
updateSet.append(keys.get(j)).append("=").append(postgisTempReference).append(".").append(keys.get(j));
}
} else {
values.append(",").append(getStringValueFromJsonElement(value, "'", attrNativeTypes));
fields.append(",").append(keys.get(j));
if (!Arrays.asList(uniqueKey.split("\\s*,\\s*")).contains(keys.get(j))) {
updateSet.append(", ").append(keys.get(j)).append("=").append(postgisTempReference).append(".").append(keys.get(j));
}
}
// values
JsonElement value = lastData.get(keys.get(j)).get(i);
String valueToAppend = value == null ? "null" : getStringValueFromJsonElement(value, "'", attrNativeTypes);
values.append(valuesSeparator).append(valueToAppend);
valuesSeparator = ",";

// fields
fields.append(fieldsSeparator).append(keys.get(j));
fieldsSeparator = ",";

// updateSet
if (!Arrays.asList(uniqueKey.split("\\s*,\\s*")).contains(keys.get(j))) {
updateSet.append(updateSetSeparator).append(keys.get(j)).append("=").append(postgisTempReference).append(".").append(keys.get(j));
updateSetSeparator = ",";
}

}
query.append("INSERT INTO ").append(postgisDestination).append(" ").append(fields).append(") ").
append("VALUES ").append(values).append(") ");
Expand Down Expand Up @@ -280,7 +285,9 @@ protected static StringBuffer mySQLUpdateRecordQuery(String key,
* @param aggregation the aggregation
* @param tableName the table name
* @param sqlInstance the sql instance
* @param destination the destination
* @param database the database
* @param schema the database schema
* @param attrNativeTypes
* @return the string buffer
*/
protected static StringBuffer sqlInsertQuery(LinkedHashMap<String, ArrayList<JsonElement>> aggregation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public abstract class NGSIGenericAggregator {
private String lastDataMode;
private long lastDataTimestamp;
private String lastDataTimestampKeyOnAggregation;
private String lastDataUniqueKeyOnAggragation;
private String lastDataUniqueKeyOnAggregation;

/**
* Gets aggregation.
Expand Down Expand Up @@ -272,18 +272,18 @@ public void setLastDataMode(String lastDataMode) {
public void setLastDataTimestampKeyOnAggregation(String lastDataTimestampKeyOnAggregation) { this.lastDataTimestampKeyOnAggregation = lastDataTimestampKeyOnAggregation; }

/**
* Gets last data key on aggragation.
* Gets last data key on aggregation.
*
* @return the last data key on aggragation
* @return the last data key on aggregation
*/
public String getLastDataUniqueKeyOnAggragation() { return lastDataUniqueKeyOnAggragation; }
public String getLastDataUniqueKeyOnAggregation() { return lastDataUniqueKeyOnAggregation; }

/**
* Sets last data key on aggragation.
* Sets last data key on aggregation.
*
* @param lastDataUniqueKeyOnAggragation the last data key on aggragation
* @param lastDataUniqueKeyOnAggregation the last data key on aggregation
*/
public void setLastDataUniqueKeyOnAggragation(String lastDataUniqueKeyOnAggragation) { this.lastDataUniqueKeyOnAggragation = lastDataUniqueKeyOnAggragation; }
public void setLastDataUniqueKeyOnAggregation(String lastDataUniqueKeyOnAggregation) { this.lastDataUniqueKeyOnAggregation = lastDataUniqueKeyOnAggregation; }

/**
* Gets hdfs folder. For HDFS sink.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,40 @@ public class NGSIGenericColumnAggregator extends NGSIGenericAggregator {

private boolean swapCoordinates;

private boolean isSpecialKey(String key) {
return (key.equalsIgnoreCase(NGSIConstants.ENTITY_ID) ||
key.equalsIgnoreCase(NGSIConstants.ENTITY_TYPE) ||
key.equalsIgnoreCase(NGSIConstants.FIWARE_SERVICE_PATH) ||
key.equalsIgnoreCase(NGSIConstants.RECV_TIME_TS+"C") ||
key.equalsIgnoreCase(NGSIConstants.RECV_TIME));
} // isSpecialKey

@Override
public void initialize(NGSIEvent event) {
// TBD: possible option for postgisSink
swapCoordinates = false;
// particular initialization
LinkedHashMap<String, ArrayList<JsonElement>> aggregation = getAggregation();
aggregation.put(NGSIConstants.RECV_TIME_TS+"C", new ArrayList<JsonElement>());
aggregation.put(NGSIConstants.RECV_TIME, new ArrayList<JsonElement>());
aggregation.put(NGSIConstants.FIWARE_SERVICE_PATH, new ArrayList<JsonElement>());
// First: fields that are part of the primary key of the table
// (needed in SQL sinks to avoid deadlocks would be avoided. See issue #2197 for more detail),
// except for main fields (entityId, entityType, etc.) which are added in second part
String uniqueKeys = getLastDataUniqueKey();
if (uniqueKeys != null) {
for (String key : getLastDataUniqueKey().split(",")) {
if (!isSpecialKey(key.trim())) {
aggregation.put(key.trim(), new ArrayList<JsonElement>());
}
}
}

// Second: main fields
aggregation.put(NGSIConstants.ENTITY_ID, new ArrayList<JsonElement>());
aggregation.put(NGSIConstants.ENTITY_TYPE, new ArrayList<JsonElement>());
aggregation.put(NGSIConstants.FIWARE_SERVICE_PATH, new ArrayList<JsonElement>());
aggregation.put(NGSIConstants.RECV_TIME_TS+"C", new ArrayList<JsonElement>());
aggregation.put(NGSIConstants.RECV_TIME, new ArrayList<JsonElement>());

// iterate on all this context element attributes, if there are attributes
// Third: iterate on all this context element attributes, if there are attributes
ArrayList<NotifyContextRequest.ContextAttribute> contextAttributes = null;
if (isEnableNameMappings() && event.getMappedCE() != null && event.getMappedCE().getAttributes() != null && !event.getMappedCE().getAttributes().isEmpty()) {
contextAttributes = event.getMappedCE().getAttributes();
Expand Down Expand Up @@ -92,8 +113,10 @@ public void aggregate(NGSIEvent event) {
NotifyContextRequest.ContextElement contextElement = event.getContextElement();
NotifyContextRequest.ContextElement mappedContextElement = event.getMappedCE();
String entityId = contextElement.getId();
if (isEnableLastData() && (getLastDataUniqueKey().equalsIgnoreCase(NGSIConstants.ENTITY_ID))) {
setLastDataUniqueKeyOnAggragation(NGSIConstants.ENTITY_ID);
// FIXME: thi's weird... getLastDataUniqueKey() could be a comma separated string (e.g. "entityid,foo,bar")?
// In that case equalsIgnoreCase("entityid") should not work...
if (isEnableLastData() && getLastDataUniqueKey() != null && (getLastDataUniqueKey().equalsIgnoreCase(NGSIConstants.ENTITY_ID))) {
setLastDataUniqueKeyOnAggregation(NGSIConstants.ENTITY_ID);
currentEntityId = entityId;
}
String entityType = contextElement.getType();
Expand Down Expand Up @@ -127,8 +150,8 @@ public void aggregate(NGSIEvent event) {
setLastDataTimestampKeyOnAggregation(attrName);
currentTS = CommonUtils.getTimeInstantFromString(attrValue.getAsString());
}
if ((getLastDataUniqueKeyOnAggragation() == null) && (getLastDataUniqueKey().equalsIgnoreCase(attrName))) {
setLastDataUniqueKeyOnAggragation(attrName);
if ((getLastDataUniqueKeyOnAggregation() == null) && getLastDataUniqueKey() != null && (getLastDataUniqueKey().equalsIgnoreCase(attrName))) {
setLastDataUniqueKeyOnAggregation(attrName);
currentEntityId = attrName;
}
}
Expand Down Expand Up @@ -180,10 +203,10 @@ public void aggregate(NGSIEvent event) {
boolean updateLastData = false;
LinkedHashMap<String, ArrayList<JsonElement>> lastData = getLastData();
if (numPreviousValues > 0) {
if (lastData.containsKey(getLastDataUniqueKeyOnAggragation())) {
ArrayList<JsonElement> list = lastData.get(getLastDataUniqueKeyOnAggragation());
if (lastData.containsKey(getLastDataUniqueKeyOnAggregation())) {
ArrayList<JsonElement> list = lastData.get(getLastDataUniqueKeyOnAggregation());
for (int i = 0 ; i < list.size() ; i++) {
if (list.get(i).getAsString().equals(currentEntityId)) {
if (list.get(i) !=null && list.get(i).getAsString().equals(currentEntityId)) {
long storedTS = CommonUtils.getTimeInstantFromString(
lastData.get(getLastDataTimestampKeyOnAggregation()).
get(i).getAsString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ public void testNativeTypeColumnBatch() throws CygnusBadConfiguration, CygnusRun
}
}
System.out.println(aggregation);
String correctBatch = "{\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"someNumber\":2,\"somneBoolean\":true,\"someDate\":\"2016-09-21T01:23:00.00Z\",\"someGeoJson\":\"{\\\"type\\\": \\\"Point\\\",\\\"coordinates\\\": [-0.036177,39.986159]}\",\"someJson\":\"{\\\"String\\\": \\\"string\\\"}\",\"someString\":\"foo\",\"someString2\":\"\"},{\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"someName1\":\"-3.7167, 40.3833\",\"someName1_md\":[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}],\"someName2\":\"someValue2\"}";
String correctBatch = "{\"entityId\":\"someId\",\"entityType\":\"someType\",\"fiwareServicePath\":\"somePath\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"someNumber\":2,\"somneBoolean\":true,\"someDate\":\"2016-09-21T01:23:00.00Z\",\"someGeoJson\":\"{\\\"type\\\": \\\"Point\\\",\\\"coordinates\\\": [-0.036177,39.986159]}\",\"someJson\":\"{\\\"String\\\": \\\"string\\\"}\",\"someString\":\"foo\",\"someString2\":\"\"},{\"entityId\":\"someId\",\"entityType\":\"someType\",\"fiwareServicePath\":\"somePath\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"someName1\":\"-3.7167, 40.3833\",\"someName1_md\":[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}],\"someName2\":\"someValue2\"}";
assertEquals(aggregation, correctBatch);
} catch (Exception e) {
fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1684,8 +1684,8 @@ public void testNativeTypeColumnBatchJson() throws CygnusBadConfiguration, Cygnu
aggregator.aggregate(event);
}
}
String correctBatch = "{\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"someNumber\":2,\"someNumber_md\":[],\"somneBoolean\":true,\"somneBoolean_md\":[],\"someDate\":\"2016-09-21T01:23:00.00Z\",\"someDate_md\":[],\"someGeoJson\":\"{\"type\":\"Point\",\"coordinates\":[-0.036177,39.986159]}\",\"someGeoJson_md\":[],\"someJson\":\"{\"String\":\"string\"}\",\"someJson_md\":[],\"someString\":\"foo\",\"someString_md\":[],\"someString2\":\"\",\"someString2_md\":[]}\n" +
"{\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\",\"someName1\":\"-3.7167,40.3833\",\"someName1_md\":[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}],\"someName2\":\"someValue2\",\"someName2_md\":[]}";
String correctBatch = "{\"entityId\":\"someId\",\"entityType\":\"someType\",\"fiwareServicePath\":\"somePath\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"someNumber\":2,\"someNumber_md\":[],\"somneBoolean\":true,\"somneBoolean_md\":[],\"someDate\":\"2016-09-21T01:23:00.00Z\",\"someDate_md\":[],\"someGeoJson\":\"{\"type\":\"Point\",\"coordinates\":[-0.036177,39.986159]}\",\"someGeoJson_md\":[],\"someJson\":\"{\"String\":\"string\"}\",\"someJson_md\":[],\"someString\":\"foo\",\"someString_md\":[],\"someString2\":\"\",\"someString2_md\":[]}\n" +
"{\"entityId\":\"someId\",\"entityType\":\"someType\",\"fiwareServicePath\":\"somePath\",\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"someName1\":\"-3.7167,40.3833\",\"someName1_md\":[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}],\"someName2\":\"someValue2\",\"someName2_md\":[]}";
if (ngsihdfsSink.jsonToPersist(aggregator.getAggregationToPersist()).replace(" ", "").equals(correctBatch.replace(" ", ""))) {
assertTrue(true);
System.out.println(getTestTraceHead("[NGSIHDFSSink.testNativeTypeColumnBatchJson]") + "- OK ");
Expand Down Expand Up @@ -1727,8 +1727,8 @@ public void testNativeTypeColumnBatchJsonNotMetadata() throws CygnusBadConfigura
aggregator.aggregate(event);
}
}
String correctBatch = "{\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\", \"someNumber\":2, \"somneBoolean\":true, \"someDate\":\"2016-09-21T01:23:00.00Z\", \"someGeoJson\":\"{\"type\": \"Point\",\"coordinates\": [-0.036177,39.986159]}\", \"someJson\":\"{\"String\": \"string\"}\", \"someString\":\"foo\", \"someString2\":\"\"}\n" +
"{\"recvTime\":\"2016-04-20T07:19:55.801Z\",\"fiwareServicePath\":\"somePath\",\"entityId\":\"someId\",\"entityType\":\"someType\", \"someName1\":\"-3.7167, 40.3833\", \"someName2\":\"someValue2\"}";
String correctBatch = "{\"entityId\":\"someId\",\"entityType\":\"someType\",\"fiwareServicePath\":\"somePath\",\"recvTime\":\"2016-04-20T07:19:55.801Z\", \"someNumber\":2, \"somneBoolean\":true, \"someDate\":\"2016-09-21T01:23:00.00Z\", \"someGeoJson\":\"{\"type\": \"Point\",\"coordinates\": [-0.036177,39.986159]}\", \"someJson\":\"{\"String\": \"string\"}\", \"someString\":\"foo\", \"someString2\":\"\"}\n" +
"{\"entityId\":\"someId\",\"entityType\":\"someType\",\"fiwareServicePath\":\"somePath\",\"recvTime\":\"2016-04-20T07:19:55.801Z\", \"someName1\":\"-3.7167, 40.3833\", \"someName2\":\"someValue2\"}";
if (ngsihdfsSink.jsonToPersist(aggregator.getAggregationToPersist()).replace(" ", "").equals(correctBatch.replace(" ", ""))) {
assertTrue(true);
System.out.println(getTestTraceHead("[NGSIHDFSSink.testNativeTypeColumnBatchJsonNotMetadata]") + "- OK ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,7 @@ public void testNativeTypeColumnBatch() throws CygnusBadConfiguration, CygnusRun
for (NGSIEvent event : events) {
aggregator.aggregate(event);
}
String correctBatch = "('2016-04-20 07:19:55.801','somePath','someId','someType',2,'[]',TRUE,'[]','2016-09-21T01:23:00.00Z','[]','{\"type\": \"Point\",\"coordinates\": [-0.036177,39.986159]}','[]','{\"String\": \"string\"}','[]','foo','[]','','[]',NULL,NULL,NULL,NULL),('2016-04-20 07:19:55.801','somePath','someId','someType',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'-3.7167, 40.3833','[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}]','someValue''2','[]')";
String correctBatch = "('someId','someType','somePath','2016-04-20 07:19:55.801',2,'[]',TRUE,'[]','2016-09-21T01:23:00.00Z','[]','{\"type\": \"Point\",\"coordinates\": [-0.036177,39.986159]}','[]','{\"String\": \"string\"}','[]','foo','[]','','[]',NULL,NULL,NULL,NULL),('someId','someType','somePath','2016-04-20 07:19:55.801',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,'-3.7167, 40.3833','[{\"name\":\"location\",\"type\":\"string\",\"value\":\"WGS84\"}]','someValue''2','[]')";
String valuesForInsert = SQLQueryUtils.getValuesForInsert(aggregator.getAggregationToPersist(), aggregator.isAttrNativeTypes());
if (valuesForInsert.equals(correctBatch)) {
System.out.println(getTestTraceHead("[NGSIMySQKSink.testNativeTypesColumnBatch]")
Expand Down
Loading

0 comments on commit a1b48bc

Please sign in to comment.