diff --git a/.travis.yml b/.travis.yml index 343ee0bda..8af8b4534 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,7 @@ services: #Still docker images are pulled every time. We need to cahche it for best practice. before_install: - wget https://raw.githubusercontent.com/authorjapps/zerocode-docker-factory/master/compose/kafka-schema-registry.yml + - docker login -u $DOCKER_USERNAME -p $DOCKER_TOKEN - docker-compose -f kafka-schema-registry.yml up -d #Just compile and run tests, also print version at the beginning. diff --git a/core/src/main/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelper.java b/core/src/main/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelper.java index 98546d860..357bc8425 100644 --- a/core/src/main/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelper.java +++ b/core/src/main/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelper.java @@ -79,20 +79,22 @@ public static Consumer createConsumer(String bootStrapServers, String consumerPr } } - public static ConsumerRecords initialPollWaitingForConsumerGroupJoin(Consumer consumer) { - for (int run = 0; run < 10; run++) { - if (!consumer.assignment().isEmpty()) { - return new ConsumerRecords(new HashMap()); - } - ConsumerRecords records = consumer.poll(Duration.of(500, ChronoUnit.MILLIS)); - if (!records.isEmpty()) { - return records; + public static ConsumerRecords initialPollWaitingForConsumerGroupJoin(Consumer consumer, ConsumerLocalConfigs effectiveLocalConfigs) { + + for (int run = 0; run < 50; run++) { + if (!consumer.assignment().isEmpty()) { + return new ConsumerRecords(new HashMap()); + } + ConsumerRecords records = consumer.poll(Duration.of(getPollTime(effectiveLocalConfigs), ChronoUnit.MILLIS)); + if (!records.isEmpty()) { + return records; + } } - } - throw new RuntimeException("\n********* Kafka Consumer unable to join in time *********\n"); + throw new RuntimeException("\n********* Kafka Consumer unable to join in time - try increasing consumer polling time setting *********\n"); } + public static void validateLocalConfigs(ConsumerLocalConfigs localConfigs) { if (localConfigs != null) { Boolean localCommitSync = localConfigs.getCommitSync(); diff --git a/core/src/main/java/org/jsmart/zerocode/core/kafka/receive/KafkaReceiver.java b/core/src/main/java/org/jsmart/zerocode/core/kafka/receive/KafkaReceiver.java index db64b5f08..9869189e2 100644 --- a/core/src/main/java/org/jsmart/zerocode/core/kafka/receive/KafkaReceiver.java +++ b/core/src/main/java/org/jsmart/zerocode/core/kafka/receive/KafkaReceiver.java @@ -63,7 +63,7 @@ public String receive(String kafkaServers, String topicName, String requestJsonW LOGGER.info("initial polling to trigger ConsumerGroupJoin"); - ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer); + ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer, effectiveLocal); if(!records.isEmpty()) { LOGGER.info("Received {} records on initial poll\n", records.count()); diff --git a/core/src/test/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelperTest.java b/core/src/test/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelperTest.java index e7962bc90..fda81e60a 100644 --- a/core/src/test/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelperTest.java +++ b/core/src/test/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelperTest.java @@ -160,14 +160,18 @@ public void should_read_json_with_headers_in_record() throws IOException { @Test public void test_firstPoll_exits_early_on_assignment() { + // given + consumerCommon = new ConsumerCommonConfigs(true, false, "aTestFile", "JSON", true, 3, 1000L, ""); + consumerLocal = null; + ConsumerLocalConfigs consumerEffectiveConfigs = deriveEffectiveConfigs(consumerLocal, consumerCommon); Consumer consumer = Mockito.mock(Consumer.class); HashSet partitions = new HashSet<>(); partitions.add(new TopicPartition("test.topic", 0)); Mockito.when(consumer.assignment()).thenReturn(partitions); // when - ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer); + ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer, consumerEffectiveConfigs); // then assertThat(records.isEmpty(), is(true)); @@ -175,7 +179,11 @@ public void test_firstPoll_exits_early_on_assignment() { @Test public void test_firstPoll_exits_on_receiving_records() { + // given + consumerCommon = new ConsumerCommonConfigs(true, false, "aTestFile", "JSON", true, 3, 5000L, ""); + consumerLocal = new ConsumerLocalConfigs("RAW", "sTestLocalFile", true, false, false, 3, 50L, "1,0,test-topic"); + ConsumerLocalConfigs consumerEffectiveConfigs = deriveEffectiveConfigs(consumerLocal, consumerCommon); Consumer consumer = Mockito.mock(Consumer.class); Mockito.when(consumer.assignment()).thenReturn(new HashSet()); @@ -185,7 +193,7 @@ public void test_firstPoll_exits_on_receiving_records() { Mockito.when(consumerRecords.isEmpty()).thenReturn(false); // when - ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer); + ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer, consumerEffectiveConfigs); // then assertThat(records, equalTo(consumerRecords)); @@ -194,7 +202,12 @@ public void test_firstPoll_exits_on_receiving_records() { @Test public void test_firstPoll_throws_after_timeout() throws Exception { + // given + consumerCommon = new ConsumerCommonConfigs(true, false, "aTestFile", "JSON", true, 3, null, ""); + consumerLocal = new ConsumerLocalConfigs("RAW", "sTestLocalFile", true, false, false, 3, 50L, "1,0,test-topic"); + ConsumerLocalConfigs consumerEffectiveConfigs = deriveEffectiveConfigs(consumerLocal, consumerCommon); + Consumer consumer = Mockito.mock(Consumer.class); Mockito.when(consumer.assignment()).thenReturn(new HashSet()); @@ -207,6 +220,6 @@ public void test_firstPoll_throws_after_timeout() throws Exception { expectedException.expectMessage("Kafka Consumer unable to join in time"); // when - ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer); + ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer, consumerEffectiveConfigs); } } \ No newline at end of file diff --git a/http-testing/pom.xml b/http-testing/pom.xml index 8a530d253..ee08871db 100644 --- a/http-testing/pom.xml +++ b/http-testing/pom.xml @@ -61,6 +61,7 @@ 2.19.1 + org.jsmart.zerocode.testhelp.tests.MockServerTest org.jsmart.zerocode.zerocodejavaexec.pojo.OrderTest org.jsmart.zerocode.testhelp.tests.HelloWorldCherryPickSuite org.jsmart.zerocode.testhelp.tests.helloworldjavaexec.HelloWorldJavaApiAsProtocolTest diff --git a/http-testing/src/test/java/org/jsmart/zerocode/testhelp/tests/MockServerTest.java b/http-testing/src/test/java/org/jsmart/zerocode/testhelp/tests/MockServerTest.java new file mode 100644 index 000000000..83feb239a --- /dev/null +++ b/http-testing/src/test/java/org/jsmart/zerocode/testhelp/tests/MockServerTest.java @@ -0,0 +1,30 @@ +package org.jsmart.zerocode.testhelp.tests; + + +import org.jsmart.zerocode.testhelp.localserver.RunMeFirstLocalMockRESTServer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class MockServerTest { + private RunMeFirstLocalMockRESTServer mockRESTServer; + + @Before + public void start(){ + mockRESTServer = new RunMeFirstLocalMockRESTServer(RunMeFirstLocalMockRESTServer.PORT); + mockRESTServer.start(); + } + + @After + public void stop(){ + mockRESTServer.stop(); + } + + @Test + public void testMockServerRunning(){ + Assert.assertTrue(mockRESTServer.isRunning()); + } + +} \ No newline at end of file diff --git a/http-testing/src/test/resources/localhost_stubs/localhost_REST_fake_end_points_stubs.json b/http-testing/src/test/resources/localhost_stubs/localhost_REST_fake_end_points_stubs.json index 73c533576..6113f5402 100644 --- a/http-testing/src/test/resources/localhost_stubs/localhost_REST_fake_end_points_stubs.json +++ b/http-testing/src/test/resources/localhost_stubs/localhost_REST_fake_end_points_stubs.json @@ -3,7 +3,7 @@ "apis": [ { "name": "Sample_POST_Employee_Create", - "method": "POST", + "operation": "POST", "url": "/api/v1/google-uk/employees", "ignoreBody": true, "response": { @@ -24,7 +24,7 @@ }, { "name": "sample POST with matching body", - "method": "POST", + "operation": "POST", "url": "/api/v1/employees", "ignoreBody": false, "body": { @@ -41,7 +41,7 @@ }, { "name": "sample GET for Emma Norton", - "method": "GET", + "operation": "GET", "url": "/api/v1/employees/39001", "response": { "status": 200, @@ -55,7 +55,7 @@ }, { "name": "Sample_Get_Employee_by_Id", - "method": "GET", + "operation": "GET", "url": "/api/v1/google-uk/employees/999", "response": { "status": 200, @@ -75,7 +75,7 @@ }, { "name": "Screening - sample POST with matching body", - "method": "POST", + "operation": "POST", "url": "/api/v1/employees/screening", "ignoreBody": false, "body": { @@ -94,7 +94,7 @@ }, { "name": "Screening - sample GET", - "method": "GET", + "operation": "GET", "url": "/api/v1/employees/screening/SCRUNIQUEID5003", "response": { "status": 200, @@ -110,7 +110,7 @@ }, { "name": "Sample_Get_Created_Employee_by_Id", - "method": "GET", + "operation": "GET", "url": "/api/v1/google-uk/employees/1000", "response": { "status": 200, @@ -130,7 +130,7 @@ }, { "name": "sample_get_api", - "method": "GET", + "operation": "GET", "url": "/api/v1/google-uk/employees/UK1001", "response": { "status": 200, @@ -150,7 +150,7 @@ }, { "name": "bare_string_get", - "method": "GET", + "operation": "GET", "url": "/api/v1/google-uk/employees/101", "response": { "status": 200, @@ -159,7 +159,7 @@ }, { "name": "Sample_Get_Full_Employee_by_Id", - "method": "GET", + "operation": "GET", "url": "/api/v1/employees/emp1001", "response": { "status": 200, @@ -185,7 +185,7 @@ }, { "name": "Sample_Get_Address_by_emp_id", - "method": "GET", + "operation": "GET", "url": "/api/v1/addresses/empoyee/emp1001", "response": { "status": 200, @@ -210,7 +210,7 @@ }, { "name": "Mock GET employee details including DOB", - "method": "GET", + "operation": "GET", "url": "/api/v1/google-uk/employees/UK-LON-1002", "response": { "status": 200, diff --git a/kafka-testing/src/test/java/org/jsmart/zerocode/integration/tests/kafka/consume/KafkaConsumePollingTest.java b/kafka-testing/src/test/java/org/jsmart/zerocode/integration/tests/kafka/consume/KafkaConsumePollingTest.java new file mode 100644 index 000000000..0f5395ad3 --- /dev/null +++ b/kafka-testing/src/test/java/org/jsmart/zerocode/integration/tests/kafka/consume/KafkaConsumePollingTest.java @@ -0,0 +1,25 @@ +package org.jsmart.zerocode.integration.tests.kafka.consume; + +import org.jsmart.zerocode.core.domain.Scenario; +import org.jsmart.zerocode.core.domain.TargetEnv; +import org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; + +@TargetEnv("kafka_servers/kafka_test_server_polling.properties") +@RunWith(ZeroCodeUnitRunner.class) +public class KafkaConsumePollingTest { + + /** + * When no polling time is explicitly defined in properties + * file e.g consumer.pollingTime + * Then intial poll consumer join will default to program + * defined default of 500ms. + */ + @Test + @Scenario("kafka/consume/test_kafka_consume.json") + public void testKafkaConsume() throws Exception { + } + +} diff --git a/kafka-testing/src/test/resources/kafka_servers/kafka_test_server_polling.properties b/kafka-testing/src/test/resources/kafka_servers/kafka_test_server_polling.properties new file mode 100755 index 000000000..623a8c231 --- /dev/null +++ b/kafka-testing/src/test/resources/kafka_servers/kafka_test_server_polling.properties @@ -0,0 +1,36 @@ +# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +# kafka bootstrap servers comma separated +# e.g. localhost:9092,host2:9093 +# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +kafka.bootstrap.servers=localhost:9092 + +kafka.producer.properties=kafka_servers/kafka_producer.properties +kafka.consumer.properties=kafka_servers/kafka_consumer.properties + +# -------------------------------------------------------------------- +# Optional local consumer properties common/central to all test cases. +# These can be overwritten by the tests locally. +# -------------------------------------------------------------------- +# If this property is set, then the consumer does a commitSync after reading the message(s) +# Make sure you don't set both commitSync and commitAsync to true +consumer.commitSync = true +# If this property is set, then the consumer does a commitAsync after reading the message(s) +# Make sure you don't set both commitSync and commitAsync to true +consumer.commitAsync = false +# All records those were read are dumped to this specified file path +# This path can be a relative path or an absolute path. If the file +# does not exist, it creates the file and dumps the records +consumer.fileDumpTo= target/temp/demo.txt +# If this property is set to true, all records are shown in the response. +# When dealing with large number of records, you might not be interested +# in the individual records, but interested in the recordCount +# i.e. total number of records consumed +consumer.showRecordsConsumed=false +# That means if any record(s) are read, then this counter is reset to 0(zero) and the consumer +# polls again. So if no records are fetched for a specific poll interval, then the consumer +# gives a retry retrying until this max number polls/reties reached. +consumer.maxNoOfRetryPollsOrTimeouts = 5 + + +# local producer properties +producer.key1=value1-testv ycvb