Skip to content

Commit

Permalink
[improve] Upgrade opensearch sink to client 2.16 and tests to use ser…
Browse files Browse the repository at this point in the history
…ver 2.16.0 (#313)

* test with opensearch server 2.16.0

* temporary, disable tests to run opensearch test faster / not skip on other failures

* checkstyle

* trying to upgrade the client

* upgrade OS image in test base

* OS requires a password

* correct GC setting for LS 2.10

* updated jackson version to 2.16.0

(cherry picked from commit 1eb6441)

* Revert "temporary, disable tests to run opensearch test faster / not skip on other failures"

This reverts commit 68f7102.

* Revert "checkstyle"

This reverts commit fc1f179.

* fail tests at the end

* fix test

* temoprary disable flakes

* disable more

* Revert "disable more"

This reverts commit d6845e8.

* Revert "temoprary disable flakes"

This reverts commit 1f0e36c.

* Revert "fail tests at the end"

This reverts commit 8353d2b.

* Removed comments

---------

Co-authored-by: nikhil-ctds <ext-nikhil.erigila@datastax.com>
Co-authored-by: mukesh-ctds <151806568+mukesh-ctds@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 19, 2024
1 parent 599ff20 commit 1dfac82
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 19 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ flexible messaging model and an intuitive client API.</description>
<log4j2.version>2.18.0</log4j2.version>
<bouncycastle.version>1.69</bouncycastle.version>
<bouncycastlefips.version>1.0.2</bouncycastlefips.version>
<jackson.version>2.14.2</jackson.version>
<jackson.version>2.16.0</jackson.version>
<reflections.version>0.9.11</reflections.version>
<swagger.version>1.6.10</swagger.version>
<puppycrawl.checkstyle.version>8.37</puppycrawl.checkstyle.version>
Expand Down Expand Up @@ -159,7 +159,7 @@ flexible messaging model and an intuitive client API.</description>
<mariadb-jdbc.version>2.7.5</mariadb-jdbc.version>
<hdfs-offload-version3>3.3.5</hdfs-offload-version3>
<json-smart.version>2.4.10</json-smart.version>
<opensearch.version>1.2.4</opensearch.version>
<opensearch.version>2.16.0</opensearch.version>
<elasticsearch-java.version>8.5.2</elasticsearch-java.version>
<presto.version>334</presto.version>
<scala.binary.version>2.13</scala.binary.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import com.fasterxml.jackson.databind.AnnotationIntrospector;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.util.EnumResolver;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -58,7 +58,7 @@ public final class FieldParser {
private static final Map<String, Method> CONVERTERS = new HashMap<>();
private static final Map<Class<?>, Class<?>> WRAPPER_TYPES = new HashMap<>();

private static final AnnotationIntrospector ANNOTATION_INTROSPECTOR = new JacksonAnnotationIntrospector();
private static final DeserializationConfig DESERIALIZATION_CONFIG = new ObjectMapper().getDeserializationConfig();

static {
// Preload converters and wrapperTypes.
Expand Down Expand Up @@ -100,7 +100,7 @@ public static <T> T convert(Object from, Class<T> to) {

if (to.isEnum()) {
// Converting string to enum
EnumResolver r = EnumResolver.constructUsingToString((Class<Enum<?>>) to, ANNOTATION_INTROSPECTOR);
EnumResolver r = EnumResolver.constructUsingToString(DESERIALIZATION_CONFIG, to);
T value = (T) r.findEnum((String) from);
if (value == null) {
throw new RuntimeException("Invalid value '" + from + "' for enum " + to);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -229,7 +229,6 @@ public boolean indexDocument(String index, String documentId, String documentSou
if (!Strings.isNullOrEmpty(documentId)) {
indexRequest.id(documentId);
}
indexRequest.type(config.getTypeName());
indexRequest.source(documentSource, XContentType.JSON);

IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
Expand All @@ -245,7 +244,6 @@ public boolean indexDocument(String index, String documentId, String documentSou
public boolean deleteDocument(String index, String documentId) throws IOException {
DeleteRequest deleteRequest = Requests.deleteRequest(index);
deleteRequest.id(documentId);
deleteRequest.type(config.getTypeName());
DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
if (log.isDebugEnabled()) {
log.debug("delete result {}", deleteResponse.getResult());
Expand Down Expand Up @@ -301,7 +299,6 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO
if (!Strings.isNullOrEmpty(request.getDocumentId())) {
indexRequest.id(request.getDocumentId());
}
indexRequest.type(config.getTypeName());
indexRequest.source(request.getDocumentSource(), XContentType.JSON);
if (log.isDebugEnabled()) {
log.debug("append index request id={}, type={}, source={}", request.getDocumentId(), config.getTypeName(),
Expand All @@ -314,7 +311,6 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO
public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest request) throws IOException {
DeleteRequest deleteRequest = new DeleteRequestWithPulsarRecord(request.getIndex(), request.getRecord());
deleteRequest.id(request.getDocumentId());
deleteRequest.type(config.getTypeName());
if (log.isDebugEnabled()) {
log.debug("append delete request id={}, type={}", request.getDocumentId(), config.getTypeName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public abstract class ElasticSearchTestBase {
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");

public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
.orElse("opensearchproject/opensearch:1.2.4");
.orElse("opensearchproject/opensearch:2.16.0");

protected final String elasticImageName;

Expand All @@ -59,6 +59,7 @@ protected ElasticsearchContainer createElasticsearchContainer() {
if (elasticImageName.equals(OPENSEARCH)) {
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH).asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
elasticsearchContainer = new ElasticsearchContainer(dockerImageName)
.withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "0pEn7earch!")
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("bootstrap.memory_lock", "true")
.withEnv("plugins.security.disabled", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testSslBasic() throws IOException {
.setElasticSearchUrl("https://" + container.getHttpHostAddress())
.setIndexName(INDEX)
.setUsername("admin")
.setPassword("admin")
.setPassword("0pEn7earch!")
.setSsl(new ElasticSearchSslConfig()
.setEnabled(true)
.setTruststorePath(sslResourceDir + "/truststore.jks")
Expand All @@ -102,7 +102,7 @@ public void testSslWithHostnameVerification() throws IOException {
.setElasticSearchUrl("https://" + container.getHttpHostAddress())
.setIndexName(INDEX)
.setUsername("admin")
.setPassword("admin")
.setPassword("0pEn7earch!")
.setSsl(new ElasticSearchSslConfig()
.setEnabled(true)
.setProtocols("TLSv1.2")
Expand All @@ -128,7 +128,7 @@ public void testSslWithClientAuth() throws IOException {
.setElasticSearchUrl("https://" + container.getHttpHostAddress())
.setIndexName(INDEX)
.setUsername("admin")
.setPassword("admin")
.setPassword("0pEn7earch!")
.setSsl(new ElasticSearchSslConfig()
.setEnabled(true)
.setHostnameVerification(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/functions_worker.log
directory=/pulsar
environment=PULSAR_MEM="-Xmx128M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/pulsar/logs/functions",PULSAR_GC="-XX:+UseZGC"
environment=PULSAR_MEM="-Xmx128M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/pulsar/logs/functions",PULSAR_GC="-XX:+UseG1GC"
command=/pulsar/bin/pulsar functions-worker
user=pulsar
5 changes: 5 additions & 0 deletions tests/integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@
<artifactId>aws-java-sdk-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class OpenSearchSinkTester extends ElasticSearchSinkTester {

public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
.orElse("opensearchproject/opensearch:1.2.4");
.orElse("opensearchproject/opensearch:2.16.0");

private RestHighLevelClient elasticClient;

Expand All @@ -49,6 +49,7 @@ protected ElasticsearchContainer createElasticContainer() {
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
return new ElasticsearchContainer(dockerImageName)
.withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "0pEn7earch!")
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("bootstrap.memory_lock", "true")
.withEnv("plugins.security.disabled", "true");
Expand Down

0 comments on commit 1dfac82

Please sign in to comment.