Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
authorjapps authored Jan 17, 2021
2 parents 351af71 + 1a81fdd commit 171628a
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 26 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
#Still docker images are pulled every time. We need to cahche it for best practice.
before_install:
- wget https://raw.githubusercontent.com/authorjapps/zerocode-docker-factory/master/compose/kafka-schema-registry.yml
- docker login -u $DOCKER_USERNAME -p $DOCKER_TOKEN
- docker-compose -f kafka-schema-registry.yml up -d

#Just compile and run tests, also print version at the beginning.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,22 @@ public static Consumer createConsumer(String bootStrapServers, String consumerPr
}
}

public static ConsumerRecords initialPollWaitingForConsumerGroupJoin(Consumer consumer) {
for (int run = 0; run < 10; run++) {
if (!consumer.assignment().isEmpty()) {
return new ConsumerRecords(new HashMap());
}
ConsumerRecords records = consumer.poll(Duration.of(500, ChronoUnit.MILLIS));
if (!records.isEmpty()) {
return records;
public static ConsumerRecords initialPollWaitingForConsumerGroupJoin(Consumer consumer, ConsumerLocalConfigs effectiveLocalConfigs) {

for (int run = 0; run < 50; run++) {
if (!consumer.assignment().isEmpty()) {
return new ConsumerRecords(new HashMap());
}
ConsumerRecords records = consumer.poll(Duration.of(getPollTime(effectiveLocalConfigs), ChronoUnit.MILLIS));
if (!records.isEmpty()) {
return records;
}
}
}

throw new RuntimeException("\n********* Kafka Consumer unable to join in time *********\n");
throw new RuntimeException("\n********* Kafka Consumer unable to join in time - try increasing consumer polling time setting *********\n");
}


public static void validateLocalConfigs(ConsumerLocalConfigs localConfigs) {
if (localConfigs != null) {
Boolean localCommitSync = localConfigs.getCommitSync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public String receive(String kafkaServers, String topicName, String requestJsonW

LOGGER.info("initial polling to trigger ConsumerGroupJoin");

ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer);
ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer, effectiveLocal);

if(!records.isEmpty()) {
LOGGER.info("Received {} records on initial poll\n", records.count());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,22 +160,30 @@ public void should_read_json_with_headers_in_record() throws IOException {

@Test
public void test_firstPoll_exits_early_on_assignment() {

// given
consumerCommon = new ConsumerCommonConfigs(true, false, "aTestFile", "JSON", true, 3, 1000L, "");
consumerLocal = null;
ConsumerLocalConfigs consumerEffectiveConfigs = deriveEffectiveConfigs(consumerLocal, consumerCommon);
Consumer consumer = Mockito.mock(Consumer.class);
HashSet<TopicPartition> partitions = new HashSet<>();
partitions.add(new TopicPartition("test.topic", 0));
Mockito.when(consumer.assignment()).thenReturn(partitions);

// when
ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer);
ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer, consumerEffectiveConfigs);

// then
assertThat(records.isEmpty(), is(true));
}

@Test
public void test_firstPoll_exits_on_receiving_records() {

// given
consumerCommon = new ConsumerCommonConfigs(true, false, "aTestFile", "JSON", true, 3, 5000L, "");
consumerLocal = new ConsumerLocalConfigs("RAW", "sTestLocalFile", true, false, false, 3, 50L, "1,0,test-topic");
ConsumerLocalConfigs consumerEffectiveConfigs = deriveEffectiveConfigs(consumerLocal, consumerCommon);
Consumer consumer = Mockito.mock(Consumer.class);
Mockito.when(consumer.assignment()).thenReturn(new HashSet<TopicPartition>());

Expand All @@ -185,7 +193,7 @@ public void test_firstPoll_exits_on_receiving_records() {
Mockito.when(consumerRecords.isEmpty()).thenReturn(false);

// when
ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer);
ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer, consumerEffectiveConfigs);

// then
assertThat(records, equalTo(consumerRecords));
Expand All @@ -194,7 +202,12 @@ public void test_firstPoll_exits_on_receiving_records() {

@Test
public void test_firstPoll_throws_after_timeout() throws Exception {

// given
consumerCommon = new ConsumerCommonConfigs(true, false, "aTestFile", "JSON", true, 3, null, "");
consumerLocal = new ConsumerLocalConfigs("RAW", "sTestLocalFile", true, false, false, 3, 50L, "1,0,test-topic");
ConsumerLocalConfigs consumerEffectiveConfigs = deriveEffectiveConfigs(consumerLocal, consumerCommon);

Consumer consumer = Mockito.mock(Consumer.class);
Mockito.when(consumer.assignment()).thenReturn(new HashSet<TopicPartition>());

Expand All @@ -207,6 +220,6 @@ public void test_firstPoll_throws_after_timeout() throws Exception {
expectedException.expectMessage("Kafka Consumer unable to join in time");

// when
ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer);
ConsumerRecords records = initialPollWaitingForConsumerGroupJoin(consumer, consumerEffectiveConfigs);
}
}
1 change: 1 addition & 0 deletions http-testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
<version>2.19.1</version>
<configuration>
<includes>
<include>org.jsmart.zerocode.testhelp.tests.MockServerTest</include>
<include>org.jsmart.zerocode.zerocodejavaexec.pojo.OrderTest</include>
<include>org.jsmart.zerocode.testhelp.tests.HelloWorldCherryPickSuite</include>
<include>org.jsmart.zerocode.testhelp.tests.helloworldjavaexec.HelloWorldJavaApiAsProtocolTest</include>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.jsmart.zerocode.testhelp.tests;


import org.jsmart.zerocode.testhelp.localserver.RunMeFirstLocalMockRESTServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;


public class MockServerTest {
private RunMeFirstLocalMockRESTServer mockRESTServer;

@Before
public void start(){
mockRESTServer = new RunMeFirstLocalMockRESTServer(RunMeFirstLocalMockRESTServer.PORT);
mockRESTServer.start();
}

@After
public void stop(){
mockRESTServer.stop();
}

@Test
public void testMockServerRunning(){
Assert.assertTrue(mockRESTServer.isRunning());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"apis": [
{
"name": "Sample_POST_Employee_Create",
"method": "POST",
"operation": "POST",
"url": "/api/v1/google-uk/employees",
"ignoreBody": true,
"response": {
Expand All @@ -24,7 +24,7 @@
},
{
"name": "sample POST with matching body",
"method": "POST",
"operation": "POST",
"url": "/api/v1/employees",
"ignoreBody": false,
"body": {
Expand All @@ -41,7 +41,7 @@
},
{
"name": "sample GET for Emma Norton",
"method": "GET",
"operation": "GET",
"url": "/api/v1/employees/39001",
"response": {
"status": 200,
Expand All @@ -55,7 +55,7 @@
},
{
"name": "Sample_Get_Employee_by_Id",
"method": "GET",
"operation": "GET",
"url": "/api/v1/google-uk/employees/999",
"response": {
"status": 200,
Expand All @@ -75,7 +75,7 @@
},
{
"name": "Screening - sample POST with matching body",
"method": "POST",
"operation": "POST",
"url": "/api/v1/employees/screening",
"ignoreBody": false,
"body": {
Expand All @@ -94,7 +94,7 @@
},
{
"name": "Screening - sample GET",
"method": "GET",
"operation": "GET",
"url": "/api/v1/employees/screening/SCRUNIQUEID5003",
"response": {
"status": 200,
Expand All @@ -110,7 +110,7 @@
},
{
"name": "Sample_Get_Created_Employee_by_Id",
"method": "GET",
"operation": "GET",
"url": "/api/v1/google-uk/employees/1000",
"response": {
"status": 200,
Expand All @@ -130,7 +130,7 @@
},
{
"name": "sample_get_api",
"method": "GET",
"operation": "GET",
"url": "/api/v1/google-uk/employees/UK1001",
"response": {
"status": 200,
Expand All @@ -150,7 +150,7 @@
},
{
"name": "bare_string_get",
"method": "GET",
"operation": "GET",
"url": "/api/v1/google-uk/employees/101",
"response": {
"status": 200,
Expand All @@ -159,7 +159,7 @@
},
{
"name": "Sample_Get_Full_Employee_by_Id",
"method": "GET",
"operation": "GET",
"url": "/api/v1/employees/emp1001",
"response": {
"status": 200,
Expand All @@ -185,7 +185,7 @@
},
{
"name": "Sample_Get_Address_by_emp_id",
"method": "GET",
"operation": "GET",
"url": "/api/v1/addresses/empoyee/emp1001",
"response": {
"status": 200,
Expand All @@ -210,7 +210,7 @@
},
{
"name": "Mock GET employee details including DOB",
"method": "GET",
"operation": "GET",
"url": "/api/v1/google-uk/employees/UK-LON-1002",
"response": {
"status": 200,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.jsmart.zerocode.integration.tests.kafka.consume;

import org.jsmart.zerocode.core.domain.Scenario;
import org.jsmart.zerocode.core.domain.TargetEnv;
import org.jsmart.zerocode.core.runner.ZeroCodeUnitRunner;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;

@TargetEnv("kafka_servers/kafka_test_server_polling.properties")
@RunWith(ZeroCodeUnitRunner.class)
public class KafkaConsumePollingTest {

/**
* When no polling time is explicitly defined in properties
* file e.g consumer.pollingTime
* Then intial poll consumer join will default to program
* defined default of 500ms.
*/
@Test
@Scenario("kafka/consume/test_kafka_consume.json")
public void testKafkaConsume() throws Exception {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
# kafka bootstrap servers comma separated
# e.g. localhost:9092,host2:9093
# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
kafka.bootstrap.servers=localhost:9092

kafka.producer.properties=kafka_servers/kafka_producer.properties
kafka.consumer.properties=kafka_servers/kafka_consumer.properties

# --------------------------------------------------------------------
# Optional local consumer properties common/central to all test cases.
# These can be overwritten by the tests locally.
# --------------------------------------------------------------------
# If this property is set, then the consumer does a commitSync after reading the message(s)
# Make sure you don't set both commitSync and commitAsync to true
consumer.commitSync = true
# If this property is set, then the consumer does a commitAsync after reading the message(s)
# Make sure you don't set both commitSync and commitAsync to true
consumer.commitAsync = false
# All records those were read are dumped to this specified file path
# This path can be a relative path or an absolute path. If the file
# does not exist, it creates the file and dumps the records
consumer.fileDumpTo= target/temp/demo.txt
# If this property is set to true, all records are shown in the response.
# When dealing with large number of records, you might not be interested
# in the individual records, but interested in the recordCount
# i.e. total number of records consumed
consumer.showRecordsConsumed=false
# That means if any record(s) are read, then this counter is reset to 0(zero) and the consumer
# polls again. So if no records are fetched for a specific poll interval, then the consumer
# gives a retry retrying until this max number polls/reties reached.
consumer.maxNoOfRetryPollsOrTimeouts = 5


# local producer properties
producer.key1=value1-testv ycvb

0 comments on commit 171628a

Please sign in to comment.