Skip to content

Commit

Permalink
Update geoip config and use extensions (opensearch-project#3975)
Browse files Browse the repository at this point in the history
* Update geoip config and use extensions

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>

---------

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
  • Loading branch information
asifsmohammed authored Jan 22, 2024
1 parent ea80d0f commit b7970a0
Show file tree
Hide file tree
Showing 37 changed files with 298 additions and 881 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public void setUp() throws JsonProcessingException {
}

public GeoIPProcessorService createObjectUnderTest() {
return new GeoIPProcessorService(geoIPProcessorConfig, tempPath);
// TODO: pass in geoIpServiceConfig object
return new GeoIPProcessorService(null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.EntryConfig;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.EnrichFailedException;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIpConfigSupplier;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationcheck;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
Expand All @@ -41,29 +40,25 @@ public class GeoIPProcessor extends AbstractProcessor<Record<Event>, Record<Even
private final Counter geoIpProcessingMatchCounter;
private final Counter geoIpProcessingMismatchCounter;
private final GeoIPProcessorConfig geoIPProcessorConfig;
private final String tempPath;
private final List<String> tagsOnSourceNotFoundFailure;
private GeoIPProcessorService geoIPProcessorService;
private static final String TEMP_PATH_FOLDER = "GeoIP";

/**
* GeoIPProcessor constructor for initialization of required attributes
* @param pluginSetting pluginSetting
* @param geoCodingProcessorConfig geoCodingProcessorConfig
* @param geoIPProcessorConfig geoIPProcessorConfig
* @param geoIpConfigSupplier geoIpConfigSupplier
*/
@DataPrepperPluginConstructor
public GeoIPProcessor(PluginSetting pluginSetting,
final GeoIPProcessorConfig geoCodingProcessorConfig,
final GeoIPProcessorConfig geoIPProcessorConfig,
final GeoIpConfigSupplier geoIpConfigSupplier) {
super(pluginSetting);
this.geoIPProcessorConfig = geoCodingProcessorConfig;
this.tempPath = System.getProperty("java.io.tmpdir")+ File.separator + TEMP_PATH_FOLDER;
geoIPProcessorService = new GeoIPProcessorService(geoCodingProcessorConfig,tempPath);
tagsOnSourceNotFoundFailure = geoCodingProcessorConfig.getTagsOnSourceNotFoundFailure();
this.geoIPProcessorConfig = geoIPProcessorConfig;
this.geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService();
this.tagsOnSourceNotFoundFailure = geoIPProcessorConfig.getTagsOnFailure();
this.geoIpProcessingMatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MATCH);
this.geoIpProcessingMismatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH);
// TODO: use this service and clean up MaxMind service config from pipeline.yaml
//geoIPProcessorService = geoIpConfigSupplier.getGeoIPProcessorService();
}

/**
Expand All @@ -78,17 +73,17 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

for (final Record<Event> eventRecord : records) {
Event event = eventRecord.getData();
for (KeysConfig key : geoIPProcessorConfig.getKeysConfig()) {
String source = key.getKeyConfig().getSource();
List<String> attributes = key.getKeyConfig().getAttributes();
for (EntryConfig entry : geoIPProcessorConfig.getEntries()) {
String source = entry.getSource();
List<String> attributes = entry.getFields();
String ipAddress = event.get(source, String.class);

//Lookup from DB
if (ipAddress != null && (!(ipAddress.isEmpty()))) {
try {
if (IPValidationcheck.isPublicIpAddress(ipAddress)) {
geoData = geoIPProcessorService.getGeoData(InetAddress.getByName(ipAddress), attributes);
eventRecord.getData().put(key.getKeyConfig().getTarget(), geoData);
eventRecord.getData().put(entry.getTarget(), geoData);
geoIpProcessingMatchCounter.increment();
}
} catch (IOException | EnrichFailedException ex) {
Expand All @@ -107,16 +102,16 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

@Override
public void prepareForShutdown() {
LOG.info("GeoIP plugin prepare For Shutdown");
}

@Override
public boolean isReadyForShutdown() {
return false;
return true;
}

@Override
public void shutdown() {
//TODO: delete mmdb files
LOG.info("GeoIP plugin Shutdown");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.configuration.ServiceTypeOptions;
import org.opensearch.dataprepper.plugins.processor.extension.AwsAuthenticationOptionsConfig;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.plugins.processor.configuration.EntryConfig;

import java.util.List;

Expand All @@ -19,51 +18,30 @@
*/
public class GeoIPProcessorConfig {

@JsonProperty("aws")
@JsonProperty("entries")
@NotNull
@Size(min = 1)
@Valid
private AwsAuthenticationOptionsConfig awsAuthenticationOptionsConfig;
private List<EntryConfig> entries;

@JsonProperty("keys")
@NotNull
private List<KeysConfig> keysConfig;

@JsonProperty("tags_on_source_not_found")
private List<String> tagsOnSourceNotFoundFailure;
@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

@JsonProperty("service_type")
@NotNull
private ServiceTypeOptions serviceType;

/**
* Aws Authentication configuration Options
* @return AwsAuthenticationOptions
*/
public AwsAuthenticationOptionsConfig getAwsAuthenticationOptions() {
return awsAuthenticationOptionsConfig;
}

/**
* Lists of Source, target and attributes
* @return List of KeysConfig
* Get List of entries
* @return List of EntryConfig
*/
public List<KeysConfig> getKeysConfig() {
return keysConfig;
public List<EntryConfig> getEntries() {
return entries;
}

/**
* Get the List of failure tags
* @return List of failure tags
*/
public List<String> getTagsOnSourceNotFoundFailure() {
return tagsOnSourceNotFoundFailure;
public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

/**
* Service type Options
* @return ServiceTypeOptions
*/
public ServiceTypeOptions getServiceType() {
return serviceType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.dataprepper.plugins.processor;

import org.opensearch.dataprepper.plugins.processor.configuration.DatabasePathURLConfig;
import org.opensearch.dataprepper.plugins.processor.databasedownload.DBSourceOptions;
import org.opensearch.dataprepper.plugins.processor.databasedownload.LicenseTypeOptions;
import org.opensearch.dataprepper.plugins.processor.databasedownload.S3DBService;
Expand All @@ -16,6 +15,8 @@
import org.opensearch.dataprepper.plugins.processor.databaseenrich.GetGeoData;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.GetGeoIP2Data;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.GetGeoLite2Data;
import org.opensearch.dataprepper.plugins.processor.extension.GeoIpServiceConfig;
import org.opensearch.dataprepper.plugins.processor.extension.MaxMindConfig;
import org.opensearch.dataprepper.plugins.processor.utils.DbSourceIdentification;
import org.opensearch.dataprepper.plugins.processor.utils.LicenseTypeCheck;
import org.slf4j.Logger;
Expand All @@ -40,32 +41,35 @@ public class GeoIPProcessorService {
private static final Logger LOG = LoggerFactory.getLogger(GeoIPProcessorService.class);
public static final String DATABASE_1 = "first_database_path";
public static final String DATABASE_2 = "second_database_path";
private static final String TEMP_PATH_FOLDER = "GeoIP";
private GeoIPProcessorConfig geoIPProcessorConfig;
private LicenseTypeOptions licenseType;
private GetGeoData geoData;
private List<DatabasePathURLConfig> databasePath;
private List<String> databasePaths;
private final String tempPath;
private final ScheduledExecutorService scheduledExecutorService;
private final DBSourceOptions dbSourceOptions;
private final MaxMindConfig maxMindConfig;
public static volatile boolean downloadReady;
private boolean toggle;
private String flipDatabase;


/**
* GeoIPProcessorService constructor for initialization of required attributes
* @param geoIPProcessorConfig geoIPProcessorConfig
* @param tempPath tempPath
*
* @param geoIpServiceConfig geoIpServiceConfig
*/
public GeoIPProcessorService(GeoIPProcessorConfig geoIPProcessorConfig, String tempPath) {
public GeoIPProcessorService(final GeoIpServiceConfig geoIpServiceConfig) {
this.toggle = false;
this.geoIPProcessorConfig = geoIPProcessorConfig;
this.tempPath = tempPath;
this.databasePath = geoIPProcessorConfig.getServiceType().getMaxMindService().getDatabasePath();
this.maxMindConfig = geoIpServiceConfig.getMaxMindConfig();
this.databasePaths = maxMindConfig.getDatabasePaths();
flipDatabase = DATABASE_1;

dbSourceOptions = DbSourceIdentification.getDatabasePathType(databasePath);
final Duration checkInterval = Objects.requireNonNull(geoIPProcessorConfig.getServiceType().getMaxMindService().getCacheRefreshSchedule());
this.tempPath = System.getProperty("java.io.tmpdir")+ File.separator + TEMP_PATH_FOLDER;

dbSourceOptions = DbSourceIdentification.getDatabasePathType(databasePaths);
final Duration checkInterval = Objects.requireNonNull(maxMindConfig.getDatabaseRefreshInterval());
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService
.scheduleAtFixedRate(this::downloadThroughURLandS3, 0L, checkInterval.toSeconds(), TimeUnit.SECONDS);
Expand All @@ -82,10 +86,10 @@ public GeoIPProcessorService(GeoIPProcessorConfig geoIPProcessorConfig, String t
String finalPath = tempPath + File.separator;
licenseType = LicenseTypeCheck.isGeoLite2OrEnterpriseLicense(finalPath.concat(flipDatabase));
if (licenseType.equals(LicenseTypeOptions.FREE)) {
geoData = new GetGeoLite2Data(finalPath.concat(flipDatabase), geoIPProcessorConfig.getServiceType().getMaxMindService().getCacheSize(), geoIPProcessorConfig);
geoData = new GetGeoLite2Data(finalPath.concat(flipDatabase), maxMindConfig.getCacheSize());
}
else if (licenseType.equals(LicenseTypeOptions.ENTERPRISE)) {
geoData = new GetGeoIP2Data(finalPath.concat(flipDatabase), geoIPProcessorConfig.getServiceType().getMaxMindService().getCacheSize(), geoIPProcessorConfig);
geoData = new GetGeoIP2Data(finalPath.concat(flipDatabase), maxMindConfig.getCacheSize());
}
}
downloadReady = false;
Expand All @@ -107,17 +111,17 @@ public synchronized void downloadThroughURLandS3() {
switch (dbSourceOptions) {
case URL:
dbSource = new HttpDBDownloadService(flipDatabase);
dbSource.initiateDownload(databasePath);
dbSource.initiateDownload(databasePaths);
downloadReady = true;
break;
case S3:
dbSource = new S3DBService(geoIPProcessorConfig, flipDatabase);
dbSource.initiateDownload(databasePath);
dbSource = new S3DBService(maxMindConfig.getAwsAuthenticationOptionsConfig(), flipDatabase);
dbSource.initiateDownload(databasePaths);
downloadReady = true;
break;
case PATH:
dbSource = new LocalDBDownloadService(tempPath, flipDatabase);
dbSource.initiateDownload(databasePath);
dbSource.initiateDownload(databasePaths);
downloadReady = true;
break;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* PDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;

import java.util.List;

public class EntryConfig {
@JsonProperty("source")
@NotEmpty
private String source;

@JsonProperty("target")
private String target;

@JsonProperty("fields")
private List<String> fields;

public String getSource() {
return source;
}

public String getTarget() {
return target;
}

public List<String> getFields() {
return fields;
}
}

This file was deleted.

Loading

0 comments on commit b7970a0

Please sign in to comment.