diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java index 7c11394328..15552a01c8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -10,8 +10,11 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -26,6 +29,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.LongSerializer; import org.jumpmind.db.model.Table; import org.jumpmind.symmetric.common.ParameterConstants; @@ -45,6 +49,9 @@ import org.springframework.util.ClassUtils; import org.springframework.util.SystemPropertyUtils; +import com.google.gson.Gson; +import com.google.gson.JsonObject; + import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; @@ -70,21 +77,14 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter { private String schemaPackage; - private String[] parseDatePatterns = new String[] { - "yyyy/MM/dd HH:mm:ss.SSSSSS", - "yyyy-MM-dd HH:mm:ss", - "ddMMMyyyy:HH:mm:ss.SSS Z", - "ddMMMyyyy:HH:mm:ss.SSS", - "yyyy-MM-dd HH:mm:ss.SSS", - "ddMMMyyyy:HH:mm:ss.SSSSSS", - "yyyy-MM-dd", - "yyyy-MM-dd'T'HH:mmZZZZ", - "yyyy-MM-dd'T'HH:mm:ssZZZZ", - "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ" - }; - + private String[] parseDatePatterns = new String[] { "yyyy/MM/dd HH:mm:ss.SSSSSS", "yyyy-MM-dd HH:mm:ss", "ddMMMyyyy:HH:mm:ss.SSS Z", + "ddMMMyyyy:HH:mm:ss.SSS", "yyyy-MM-dd HH:mm:ss.SSS", "ddMMMyyyy:HH:mm:ss.SSSSSS", "yyyy-MM-dd", "yyyy-MM-dd'T'HH:mmZZZZ", + "yyyy-MM-dd'T'HH:mm:ssZZZZ", "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ" }; + private List schemaPackageClassNames = new ArrayList(); + private Gson gson = new Gson(); + public final static String KAFKA_FORMAT_XML = "XML"; public final static String KAFKA_FORMAT_JSON = "JSON"; public final static String KAFKA_FORMAT_AVRO = "AVRO"; @@ -140,7 +140,7 @@ public KafkaWriterFilter(IParameterService parameterService) { } public boolean beforeWrite(DataContext context, Table table, CsvData data) { - + if (table.getNameLowerCase().startsWith("sym_")) { return true; } else { @@ -150,6 +150,9 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { if (data.getDataEventType() == DataEventType.DELETE) { rowData = data.getParsedData(CsvData.OLD_DATA); } + else if (data.getDataEventType() == DataEventType.CREATE) { + return false; + } StringBuffer kafkaText = new StringBuffer(); @@ -165,23 +168,18 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { List kafkaDataList = kafkaDataMap.get(kafkaDataKey); if (outputFormat.equals(KAFKA_FORMAT_JSON)) { - kafkaText.append("{\"").append(table.getName()).append("\": {").append("\"eventType\": \"" + data.getDataEventType() + "\",") - .append("\"data\": { "); + JsonObject jsonData = new JsonObject(); for (int i = 0; i < table.getColumnNames().length; i++) { - kafkaText.append("\"").append(table.getColumnNames()[i]).append("\": "); - - if (rowData[i] != null) { - kafkaText.append("\""); - } - kafkaText.append(rowData[i]); - if (rowData[i] != null) { - kafkaText.append("\""); - } - if (i + 1 < table.getColumnNames().length) { - kafkaText.append(","); - } + jsonData.addProperty(table.getColumnNames()[i], rowData[i]); } - kafkaText.append(" } } }"); + JsonObject change = new JsonObject(); + change.addProperty("eventType", data.getDataEventType().toString()); + change.add("data", jsonData); + + JsonObject kafkaMessage = new JsonObject(); + kafkaMessage.add(table.getName(), change); + + kafkaText.append(gson.toJson(kafkaMessage)); } else if (outputFormat.equals(KAFKA_FORMAT_CSV)) { kafkaText.append("\nTABLE").append(",").append(table.getName()).append(",").append("EVENT").append(",") .append(data.getDataEventType()).append(","); @@ -215,25 +213,24 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { Class propertyTypeClass = PropertyUtils.getPropertyType(pojo, colName); if (CharSequence.class.equals(propertyTypeClass)) { PropertyUtils.setSimpleProperty(pojo, colName, rowData[i]); - } - else if (Long.class.equals(propertyTypeClass)) { + } else if (Long.class.equals(propertyTypeClass)) { Date date = null; try { date = DateUtils.parseDate(rowData[i], parseDatePatterns); - } - catch (Exception e) { + } catch (Exception e) { log.debug(rowData[i] + " was not a recognized date format so treating it as a long."); } BeanUtils.setProperty(pojo, colName, date != null ? date.getTime() : rowData[i]); - } - else { + } else { BeanUtils.setProperty(pojo, colName, rowData[i]); } } } - sendKafkaMessageByObject(pojo, kafkaDataKey); + Future kafkaResult = sendKafkaMessageByObject(pojo, kafkaDataKey); + kafkaResult.get(); //Wait for Kafka to send pending messages or throw an exception } else { - throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName); + throw new RuntimeException( + "Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName); } } catch (NoSuchMethodException e) { log.info("Unable to find setter on POJO based on table " + table.getName(), e); @@ -247,7 +244,13 @@ else if (Long.class.equals(propertyTypeClass)) { } catch (InstantiationException e) { log.info("Unable to instantiate a constructor on POJO based on table " + tableName, e); throw new RuntimeException(e); - } + } catch (InterruptedException e) { + log.info("Unable to write to Kafka, table " + tableName, e); + throw new RuntimeException(e); + } catch (ExecutionException e) { + log.info("Unable to write to Kafka, table " + tableName, e); + throw new RuntimeException(e); + } } else { GenericData.Record avroRecord = new GenericData.Record(schema); avroRecord.put("table", table.getName()); @@ -394,32 +397,42 @@ public void earlyCommit(DataContext context) { } public void batchComplete(DataContext context) { + LinkedList> pending = new LinkedList>(); + if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) { String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId(); - + KafkaProducer producer = new KafkaProducer(configs); try { if (confluentUrl == null && kafkaDataMap.size() > 0) { StringBuffer kafkaText = new StringBuffer(); - - + for (Map.Entry> entry : kafkaDataMap.entrySet()) { + for (String row : entry.getValue()) { if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) { - sendKafkaMessage(producer, row, entry.getKey()); + Future result = sendKafkaMessage(producer, row, entry.getKey()); + pending.add(result); } else { kafkaText.append(row); } } if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) { - sendKafkaMessage(producer, kafkaText.toString(), entry.getKey()); + Future result = sendKafkaMessage(producer, kafkaText.toString(), entry.getKey()); + pending.add(result); } } + + for (Future request : pending) { + //Wait for Kafka to send pending messages or throw an exception + request.get(); + } kafkaDataMap = new HashMap>(); - } + } } catch (Exception e) { log.warn("Unable to write batch to Kafka " + batchFileName, e); e.printStackTrace(); + throw new RuntimeException("Unable to write batch to Kafka " + batchFileName, e); } finally { producer.close(); context.put(KAFKA_TEXT_CACHE, new HashMap>()); @@ -435,15 +448,17 @@ public void batchCommitted(DataContext context) { public void batchRolledback(DataContext context) { } - public void sendKafkaMessage(KafkaProducer producer, String kafkaText, String topic) { - producer.send(new ProducerRecord(topic, kafkaText)); + public Future sendKafkaMessage(KafkaProducer producer, String kafkaText, String topic) { log.debug("Data to be sent to Kafka-" + kafkaText); + return producer.send(new ProducerRecord(topic, kafkaText)); + } - public void sendKafkaMessageByObject(Object bean, String topic) { + public Future sendKafkaMessageByObject(Object bean, String topic) { KafkaProducer producer = new KafkaProducer(configs); - producer.send(new ProducerRecord(topic, bean)); + Future result = producer.send(new ProducerRecord(topic, bean)); producer.close(); + return result; } public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException {