Skip to content

Commit

Permalink
implement rest endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jkhaliqi committed Aug 22, 2024
1 parent 797019c commit fadf190
Show file tree
Hide file tree
Showing 16 changed files with 836 additions and 7 deletions.
20 changes: 15 additions & 5 deletions presto-function-namespace-managers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,21 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-netty</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>http-client</artifactId>
</dependency>

<!-- for testing -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand All @@ -174,11 +189,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-netty</artifactId>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution.rest;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;

import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@BindingAnnotation
public @interface RestClient
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution.rest;

import com.google.inject.Binder;
import com.google.inject.Module;

import static com.facebook.airlift.http.client.HttpClientBinder.httpClientBinder;

public class RestComminicationModule
implements Module
{
@Override
public void configure(Binder binder)
{
httpClientBinder(binder).bindHttpClient("rest", RestClient.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution.rest;

import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutionModule;
import com.facebook.presto.spi.function.SqlFunctionExecutor;
import com.google.inject.Binder;
import com.google.inject.Scopes;

public class RestSqlFunctionExecutionModule
extends SqlFunctionExecutionModule
{
@Override
protected void setup(Binder binder)
{
binder.bind(SqlFunctionExecutor.class).to(RestSqlFunctionExecutor.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution.rest;

import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.HttpStatus;
import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.Response;
import com.facebook.airlift.http.client.ResponseHandler;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.function.SqlFunctionResult;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.function.FunctionImplementationType;
import com.facebook.presto.spi.function.RemoteScalarFunctionImplementation;
import com.facebook.presto.spi.function.SqlFunctionExecutor;
import com.facebook.presto.spi.function.SqlFunctionHandle;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.InputStreamSliceInput;
import io.airlift.slice.SliceInput;

import javax.inject.Inject;

import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture;
import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static com.facebook.airlift.http.client.Request.Builder.preparePost;
import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator;
import static com.facebook.presto.spi.StandardErrorCode.FUNCTION_SERVER_FAILURE;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.function.FunctionImplementationType.REST;
import static com.facebook.presto.spi.page.PagesSerdeUtil.readSerializedPage;
import static com.facebook.presto.spi.page.PagesSerdeUtil.writeSerializedPage;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.net.HttpHeaders.ACCEPT;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;

public class RestSqlFunctionExecutor
implements SqlFunctionExecutor
{
private BlockEncodingSerde blockEncodingSerde;
private PagesSerde pageSerde;
private HttpClient httpClient;
private final NodeManager nodeManager;
private final RestURIServerConfig restURIServerConfig;

@Inject
public RestSqlFunctionExecutor(NodeManager nodeManager, RestURIServerConfig restURIServerConfig, @RestClient HttpClient httpClient)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.restURIServerConfig = requireNonNull(restURIServerConfig, "restURIServerConfig is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");
}

@Override
public FunctionImplementationType getImplementationType()
{
return REST;
}

@Override
public void setBlockEncodingSerde(BlockEncodingSerde blockEncodingSerde)
{
checkState(this.blockEncodingSerde == null, "blockEncodingSerde already set");
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.pageSerde = new PagesSerde(blockEncodingSerde, Optional.empty(), Optional.empty(), Optional.empty());
}

@Override
public CompletableFuture<SqlFunctionResult> executeFunction(
String source,
RemoteScalarFunctionImplementation functionImplementation,
Page input,
List<Integer> channels,
List<Type> argumentTypes,
Type returnType)
{
SqlFunctionHandle functionHandle = functionImplementation.getFunctionHandle();
SqlFunctionId functionId = functionHandle.getFunctionId();
DynamicSliceOutput sliceOutput = new DynamicSliceOutput((int) input.getRetainedSizeInBytes());
writeSerializedPage(sliceOutput, pageSerde.serialize(input));
try {
Request request = preparePost()
.setUri(getNativeWorkerUri(functionId, returnType))
.setBodyGenerator(createStaticBodyGenerator(sliceOutput.slice().byteArray()))
.setHeader(CONTENT_TYPE, PLAIN_TEXT_UTF_8.toString())
.setHeader(ACCEPT, PLAIN_TEXT_UTF_8.toString())
.build();
HttpClient.HttpResponseFuture<SqlFunctionResult> future = httpClient.executeAsync(request, createSqlFunctionResultResponseHandler());
Futures.addCallback(future, new FutureCallback<SqlFunctionResult>() {
@Override
public void onSuccess(SqlFunctionResult result)
{
result.getResult();
}

@Override
public void onFailure(Throwable t)
{
throw new PrestoException(FUNCTION_SERVER_FAILURE, "Failed with message " + t.getMessage());
}
}, directExecutor());
return toCompletableFuture(future);
}
catch (Exception e) {
throw new IllegalStateException("Failed to get function definitions for REST server/ Native worker, " + e.getMessage());
}
}

private URI getNativeWorkerUri(SqlFunctionId functionId, Type returnType)
{
List<String> functionArgumentTypes = functionId.getArgumentTypes().stream().map(TypeSignature::toString).collect(toImmutableList());
if (restURIServerConfig.getRestUri() == null) {
throw new PrestoException(NOT_FOUND, "Failed to find native node !");
}

HttpUriBuilder uri = uriBuilderFrom(URI.create(restURIServerConfig.getRestUri()))
.appendPath("/v1/functions/" + functionId.getFunctionName().getObjectName())
.addParameter("returnType", returnType.toString())
.addParameter("numArgs", "" + functionArgumentTypes.size());
for (int i = 1; i <= functionArgumentTypes.size(); i++) {
uri.addParameter("argType" + i, functionArgumentTypes.get(i - 1));
}
return uri.build();
}

ResponseHandler createSqlFunctionResultResponseHandler()
{
return new ResponseHandler<SqlFunctionResult, RuntimeException>() {
@Override
public SqlFunctionResult handleException(Request request, Exception exception)
{
return null;
}

@Override
public SqlFunctionResult handle(Request request, Response response)
{
if (response.getStatusCode() != 200 || response.getStatusCode() != HttpStatus.OK.code()) {
throw new PrestoException(FUNCTION_SERVER_FAILURE, "Failed to get response for rest function call from function server. Response code: " + response.getStatusCode());
}
try {
SliceInput input = new InputStreamSliceInput(response.getInputStream());
SerializedPage serializedPage = readSerializedPage(input);
if (pageSerde.deserialize(serializedPage).getChannelCount() == 1) {
SqlFunctionResult output = new SqlFunctionResult(pageSerde.deserialize(serializedPage).getBlock(0), 1);
return output;
}
else {
throw new IllegalStateException("More than one Block in the Page");
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution.rest;

import com.facebook.airlift.configuration.Config;

public class RestURIServerConfig
{
private String restUri;

public String getRestUri()
{
return restUri;
}

@Config("rest-uri")
public RestURIServerConfig setRestUri(String restUri)
{
this.restUri = restUri;
return this;
}
}
Loading

0 comments on commit fadf190

Please sign in to comment.