diff --git a/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/HTTPPostResults.java b/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/HTTPPostResults.java index 4052c2f..e5ca3bf 100644 --- a/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/HTTPPostResults.java +++ b/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/HTTPPostResults.java @@ -17,6 +17,28 @@ public class HTTPPostResults implements Serializable { private String header; private String status; private String jsonResultBody; + private int statusCode; + + /** + * + * @param header + * @param status + * @param jsonResultBody + * @param statusCode + */ + public HTTPPostResults(String header, String status, String jsonResultBody, int statusCode) { + super(); + this.header = header; + this.status = status; + this.jsonResultBody = jsonResultBody; + this.statusCode = statusCode; + } + public int getStatusCode() { + return statusCode; + } + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } public String getHeader() { return header; } @@ -93,6 +115,8 @@ public String toString() { builder.append(status); builder.append(", jsonResultBody="); builder.append(jsonResultBody); + builder.append(", statusCode="); + builder.append(statusCode); builder.append("]"); return builder.toString(); } diff --git a/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/HTTPPostUtility.java b/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/HTTPPostUtility.java index 8715f39..428e7ba 100644 --- a/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/HTTPPostUtility.java +++ b/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/HTTPPostUtility.java @@ -33,13 +33,21 @@ public class HTTPPostUtility { public static HTTPPostResults postImage(String urlName, String fieldName, String imageName, String imageType, InputStream stream) { - HTTPPostResults results = new HTTPPostResults(); - if ( urlName == null || fieldName == null || imageName == null || imageType == null || stream == null ) { - System.out.println("Nulls"); - return results; + return null; } + + HTTPPostResults results = new HTTPPostResults(); + try { + /** Do we want a timeout + // do we want to allow users to set this + // connectionTimeout + // connectionTimeout + // http://unirest.io/java.html + */ + Unirest.setTimeouts(90000, 180000); + HttpResponse resp = Unirest.post(urlName) .field(fieldName, stream, ContentType.parse(imageType), imageName).asJson(); @@ -51,16 +59,20 @@ public static HTTPPostResults postImage(String urlName, String fieldName, String } } - results.setHeader( resp.getHeaders().toString() ); - results.setStatus(resp.getStatusText()); + if ( resp.getHeaders() != null) { + results.setHeader( resp.getHeaders().toString() ); + } + if ( resp.getStatusText() != null ) { + results.setStatus(resp.getStatusText()); + } - System.out.println("results found"); + results.setStatusCode(resp.getStatus()); - try { - Unirest.shutdown(); - } catch (IOException e) { - e.printStackTrace(); - } +// try { +// Unirest.shutdown(); +// } catch (IOException e) { +// e.printStackTrace(); +// } } catch (Throwable t) { t.printStackTrace(); } diff --git a/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/PostImageProcessor.java b/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/PostImageProcessor.java index 0eb90d3..609dadb 100644 --- a/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/PostImageProcessor.java +++ b/nifi-postimage-processors/src/main/java/com/dataflowdeveloper/processors/PostImageProcessor.java @@ -49,18 +49,20 @@ @CapabilityDescription("Post Image to HTTP") @SeeAlso({}) @ReadsAttributes({@ReadsAttribute(attribute="url, fieldname, imagename, imagetype", description="Need URL, Field Name, Image Name and Image Type.")}) -@WritesAttributes({@WritesAttribute(attribute="post.results, post.header, post.status", description="Output result of HTTP Post call.")}) +@WritesAttributes({@WritesAttribute(attribute="post.results, post.header, post.status, post.statuscode", description="Output result of HTTP Post call.")}) public class PostImageProcessor extends AbstractProcessor { /** output attribute name post.results will contain JSON **/ public static final String ATTRIBUTE_OUTPUT_NAME = "post.results"; - /** output attribute name post.results will contain JSON **/ + /** output attribute name post.header will contain JSON **/ public static final String ATTRIBUTE_OUTPUT_HEADER = "post.header"; - /** output attribute name post.results will contain JSON **/ + /** output attribute name post.status will contain JSON **/ public static final String ATTRIBUTE_OUTPUT_STATUS = "post.status"; - + + /** output attribute name post.statuscode will contain JSON **/ + public static final String ATTRIBUTE_OUTPUT_STATUS_CODE = "post.statuscode"; /** url http://127.0.0.1:9999/squeezenet/predict */ public static final PropertyDescriptor URL_NAME = new PropertyDescriptor.Builder().name("url") @@ -181,27 +183,37 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.read(flowFile, new InputStreamCallback() { @Override public void process(InputStream input) throws IOException { + if ( input == null ) { + return; + } HTTPPostResults results = HTTPPostUtility.postImage(url, field, image, imgtype, input); - if (results != null) { + if (results != null && results.getJsonResultBody() != null) { try { - getLogger().debug(String.format("Found %sresults", new Object[] { results.getStatus() })); - - System.out.println("Status=" + results.getStatus()); attributes.put(ATTRIBUTE_OUTPUT_NAME, results.getJsonResultBody()); attributes.put(ATTRIBUTE_OUTPUT_HEADER, results.getHeader()); attributes.put(ATTRIBUTE_OUTPUT_STATUS, results.getStatus()); + attributes.put(ATTRIBUTE_OUTPUT_STATUS_CODE, String.valueOf(results.getStatusCode())); } catch (Exception e) { e.printStackTrace(); } } + else { + try { + System.out.println("====> url" + url + "," + field + ","+ image + "," + imgtype ); + attributes.put(ATTRIBUTE_OUTPUT_NAME, "Fail"); + attributes.put(ATTRIBUTE_OUTPUT_HEADER, "Fail"); + attributes.put(ATTRIBUTE_OUTPUT_STATUS, "FAIL"); + attributes.put(ATTRIBUTE_OUTPUT_STATUS_CODE, "500"); + } catch (Exception e) { + e.printStackTrace(); + } + } } }); if (attributes.size() == 0) { - System.out.println("Errors"); session.transfer(flowFile, REL_FAILURE); } else { - System.out.println("count:" + attributes.size()); flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS); }