Skip to content

Commit

Permalink
Merge pull request #37 from treeverse/compile_with_java_8
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnyaug authored Sep 12, 2023
2 parents 8b4f37b + 73e0056 commit 095e5b7
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 53 deletions.
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
<description>A custom Iceberg catalog implementation for lakeFS</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<iceberg.version>1.2.1</iceberg.version>
<hadoop.version>3.3.5</hadoop.version>
</properties>
Expand Down Expand Up @@ -84,6 +84,12 @@
<version>3.3.2</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents.client5/httpclient5 -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.1</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
Expand Down
66 changes: 32 additions & 34 deletions src/main/java/io/lakefs/iceberg/LakeFSReporter.java
Original file line number Diff line number Diff line change
@@ -1,64 +1,62 @@
package io.lakefs.iceberg;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.http.ContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;

public class LakeFSReporter {
private URI lakeFSEndpoint;
private String authHeader;
private HttpClient httpClient;
private CloseableHttpAsyncClient httpClient;
private final String reportClient = "iceberg-catalog/" + getClass().getPackage().getImplementationVersion();
private final Logger logger = LoggerFactory.getLogger(LakeFSReporter.class);
private LakeFSReporter() {
}

private LakeFSReporter(){}
public LakeFSReporter(Configuration hadoopConfig) {
String lakeFSServerURL = hadoopConfig.get("fs.s3a.endpoint");
this.lakeFSEndpoint = URI.create(StringUtils.stripEnd(lakeFSServerURL, "/") + "/api/v1/statistics");
this.authHeader = generateBasicAuthHeader(hadoopConfig);
this.httpClient = HttpClient.newHttpClient();
this.httpClient = HttpAsyncClients.createDefault();
}

public void logOp(String op) {
try {
Map<String, Object> reportMap = Map.of("class", "integration", "name", op, "count", 1);
HttpRequest request = generateRequest(reportMap);
httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
} catch (JsonProcessingException ignored) { }
Map<String, Object> reportMap = ImmutableMap.of("class", "integration", "name", op, "count", 1);
SimpleHttpRequest request = generateRequest(reportMap);
httpClient.execute(request, null);
} catch (Throwable ignored) {
logger.warn("Failed to report operation", ignored);
}
}

private HttpRequest generateRequest(Map<String, Object> body) throws JsonProcessingException {
private SimpleHttpRequest generateRequest(Map<String, Object> body) throws JsonProcessingException {
String requestBody = prepareRequestBody(body);
return HttpRequest
.newBuilder()
.uri(this.lakeFSEndpoint)
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.header("Accept", "application/json")
.header("Content-Type", "application/json")
.header("Authorization", authHeader)
.header("X-Lakefs-Client", reportClient)
return SimpleRequestBuilder.post(this.lakeFSEndpoint).setBody(requestBody, ContentType.APPLICATION_JSON)
.addHeader("Accept", "application/json")
.addHeader("Authorization", authHeader)
.addHeader("X-Lakefs-Client", reportClient)
.build();
}

private String prepareRequestBody(Map<String, Object> requestMap) throws JsonProcessingException {
var statisticsRequest = new HashMap<String, Map<String, Object>[]>() {
{
put("events", new Map[]{requestMap});
}
};

var objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(statisticsRequest);
return new ObjectMapper().writeValueAsString(ImmutableMap.of("events", new Map[] { requestMap }));
}

private String generateBasicAuthHeader(Configuration hadoopConfig) {
String key = hadoopConfig.get("fs.s3a.access.key");
String secret = hadoopConfig.get("fs.s3a.secret.key");
Expand Down
32 changes: 17 additions & 15 deletions src/test/java/io/lakefs/iceberg/TestCatalogMigration.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.Test;
import static org.junit.Assert.assertEquals;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import static org.junit.Assert.assertTrue;

public class TestCatalogMigration {
public static SparkConf newSparkSharedConfig(HashMap<String,String> lakeFSConf, HashMap<String, String> srcConf) {
Expand Down Expand Up @@ -45,12 +46,12 @@ public static SparkConf newSparkSharedConfig(HashMap<String,String> lakeFSConf,
@Test
public void testMigrateHadoopToLakeFSCatalog() throws NoSuchTableException {

var catalog = "hadoop_prod";
var db = "db";
var table = "mytable";
var branch = "main";
var lakeFSCatalog = "lakefs";
var lakeFSRepo = "<LAKEFS_TABLE_NAME>";
String catalog = "hadoop_prod";
String db = "db";
String table = "mytable";
String branch = "main";
String lakeFSCatalog = "lakefs";
String lakeFSRepo = "<LAKEFS_TABLE_NAME>";

// hadoop catalog on s3 iceberg config (source)
HashMap<String, String> hadoopConf = new HashMap<>();
Expand All @@ -69,7 +70,7 @@ public void testMigrateHadoopToLakeFSCatalog() throws NoSuchTableException {
// pre-lakeFS simulate hadoop catalog on s3 catalog iceberg table

// create spark session for hadoop on s3 catalog
var conf = TestCatalogMigration.newSparkSharedConfig( null, hadoopConf);
SparkConf conf = TestCatalogMigration.newSparkSharedConfig(null, hadoopConf);
SparkSession spark = SparkSession.builder().master("local").config(conf).getOrCreate();

// create table in hadoop catalog
Expand All @@ -81,7 +82,7 @@ public void testMigrateHadoopToLakeFSCatalog() throws NoSuchTableException {

// populate with data
Row row = RowFactory.create(10);
Dataset<Row> df = spark.createDataFrame(List.of(row), schema).toDF("val");
Dataset<Row> df = spark.createDataFrame(Collections.singletonList(row), schema).toDF("val");
df.writeTo(String.format("%s.%s_%s", catalog, db, table)).append();

// show created data
Expand All @@ -92,8 +93,8 @@ public void testMigrateHadoopToLakeFSCatalog() throws NoSuchTableException {

// simulate migration into lakeFS catalog for an iceberg table

var hiveFullTableName = String.format("%s.%s_%s", catalog, db, table);
var lakeFSFullTableName=String.format("%s.%s.%s.%s", lakeFSCatalog, branch, db, table);
String hiveFullTableName = String.format("%s.%s_%s", catalog, db, table);
String lakeFSFullTableName = String.format("%s.%s.%s.%s", lakeFSCatalog, branch, db, table);

// create spark session that is configured to both source (hadoop catalog) and lakeFS catalog.

Expand All @@ -107,12 +108,13 @@ public void testMigrateHadoopToLakeFSCatalog() throws NoSuchTableException {
sharedSpark.sql(String.format("CREATE TABLE IF NOT EXISTS %s USING iceberg AS SELECT * FROM %s", lakeFSFullTableName,hiveFullTableName));

// show cloned table in lakeFS
var lakeFSDf = sharedSpark.sql(String.format("SELECT * FROM %s.%s.%s.%s", lakeFSCatalog, branch, db, table));
Dataset<Row> lakeFSDf;
lakeFSDf = sharedSpark.sql(String.format("SELECT * FROM %s.%s.%s.%s", lakeFSCatalog, branch, db, table));
lakeFSDf.show();

// assert source and target tables are equal
var diff = lakeFSDf.except(sharedSpark.sql(String.format("SELECT * FROM %s", hiveFullTableName)));
assertEquals(true, diff.isEmpty());
Dataset<Row> diff = lakeFSDf.except(sharedSpark.sql(String.format("SELECT * FROM %s", hiveFullTableName)));
assertTrue(diff.isEmpty());
}

}
4 changes: 2 additions & 2 deletions src/test/java/io/lakefs/iceberg/TestLakeFSSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.spark.sql.types.StructType;
import org.junit.Test;

import java.util.List;
import java.util.Collections;

public class TestLakeFSSpark {
@Test
Expand Down Expand Up @@ -40,7 +40,7 @@ public void testLakeFSWithSpark() throws NoSuchTableException {
DataTypes.createStructField("val", DataTypes.IntegerType, false)
});
Row row = RowFactory.create(10);
Dataset<Row> df = spark.createDataFrame(List.of(row), schema).toDF("val");
Dataset<Row> df = spark.createDataFrame(Collections.singletonList(row), schema).toDF("val");
df.writeTo(String.format("%s.%s.%s.%s", catalog, branch, db, table)).append();
spark.sql(String.format("SELECT * FROM %s.%s.%s.%s", catalog, branch, db, table)).show();
}
Expand Down

0 comments on commit 095e5b7

Please sign in to comment.