Skip to content

Commit

Permalink
implement rest endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jkhaliqi committed Jul 23, 2024
1 parent 3f778e9 commit aee25a0
Show file tree
Hide file tree
Showing 13 changed files with 530 additions and 9 deletions.
21 changes: 15 additions & 6 deletions presto-function-namespace-managers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,21 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-netty</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>http-client</artifactId>
</dependency>

<!-- for testing -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand All @@ -174,12 +189,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-netty</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution.rest;

import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutionModule;
import com.facebook.presto.spi.function.SqlFunctionExecutor;
import com.google.inject.Binder;
import com.google.inject.Scopes;

public class RestSqlFunctionExecutionModule
extends SqlFunctionExecutionModule
{
@Override
protected void setup(Binder binder)
{
binder.bind(SqlFunctionExecutor.class).to(RestSqlFunctionExecutor.class).in(Scopes.SINGLETON);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution.rest;

import com.facebook.airlift.http.client.HttpUriBuilder;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.function.SqlFunctionResult;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RestURIManager;
import com.facebook.presto.spi.function.FunctionImplementationType;
import com.facebook.presto.spi.function.RemoteScalarFunctionImplementation;
import com.facebook.presto.spi.function.SqlFunctionExecutor;
import com.facebook.presto.spi.function.SqlFunctionHandle;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.page.PagesSerde;
import com.facebook.presto.spi.page.SerializedPage;
import io.airlift.slice.BasicSliceInput;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

import javax.annotation.PostConstruct;
import javax.inject.Inject;

import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.function.FunctionImplementationType.REST;
import static com.facebook.presto.spi.page.PagesSerdeUtil.readSerializedPage;
import static com.facebook.presto.spi.page.PagesSerdeUtil.writeSerializedPage;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class RestSqlFunctionExecutor
implements SqlFunctionExecutor
{
private BlockEncodingSerde blockEncodingSerde;
private PagesSerde pageSerde;
private OkHttpClient okHttpClient;
private NodeManager nodeManager;
private final RestURIManager restURIManager;

@Inject
public RestSqlFunctionExecutor(NodeManager nodeManager, RestURIManager restURIManager)
{
this.nodeManager = nodeManager;
this.restURIManager = restURIManager;
}

@PostConstruct
public void init()
{
this.okHttpClient = new OkHttpClient();
}

@Override
public FunctionImplementationType getImplementationType()
{
return REST;
}

@Override
public void setBlockEncodingSerde(BlockEncodingSerde blockEncodingSerde)
{
checkState(this.blockEncodingSerde == null, "blockEncodingSerde already set");
requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.blockEncodingSerde = blockEncodingSerde;
this.pageSerde = new PagesSerde(blockEncodingSerde, Optional.empty(), Optional.empty(), Optional.empty());
}

@Override
public CompletableFuture<SqlFunctionResult> executeFunction(
String source,
RemoteScalarFunctionImplementation functionImplementation,
Page input,
List<Integer> channels,
List<Type> argumentTypes,
Type returnType)
{
SqlFunctionHandle functionHandle = functionImplementation.getFunctionHandle();
SqlFunctionId functionId = functionHandle.getFunctionId();
DynamicSliceOutput sliceOutput = new DynamicSliceOutput((int) input.getRetainedSizeInBytes());
writeSerializedPage(sliceOutput, pageSerde.serialize(input));
RequestBody body = RequestBody.create(MediaType.parse("text/plain; charset=utf-8"), sliceOutput.slice().byteArray());
Request request = new Request.Builder()
.url(getNativeWorkerUri(nodeManager, functionId, returnType).toString())
.post(body)
.build();
CallbackFuture future = new CallbackFuture();
okHttpClient.newCall(request).enqueue(future);
return future;
}

private URI getNativeWorkerUri(NodeManager nodeManager, SqlFunctionId functionId, Type returnType)
{
List<String> functionArgumentTypes = functionId.getArgumentTypes().stream().map(TypeSignature::toString).collect(toImmutableList());
if (restURIManager.getRestUriManager() == null) {
throw new PrestoException(NOT_FOUND, "failed to find native node !");
}
HttpUriBuilder uri = uriBuilderFrom(URI.create("http://" + restURIManager.getRestUriManager()))
.appendPath("/v1/function/" + functionId.getFunctionName().getObjectName())
.addParameter("returnType", returnType.toString())
.addParameter("numArgs", "" + functionArgumentTypes.size());
for (int i = 1; i <= functionArgumentTypes.size(); i++) {
uri.addParameter("argType" + i, functionArgumentTypes.get(i - 1));
}
return uri.build();
}

private class CallbackFuture
extends CompletableFuture<SqlFunctionResult>
implements Callback
{
@Override
public void onFailure(Call call, IOException e)
{
super.completeExceptionally(e);
}
@Override
public void onResponse(Call call, Response response)
{
try {
if (response.code() != 200) {
super.completeExceptionally(new IllegalStateException("Failed to get response for rest function call. Response code: " + response.code()));
}
Slice slice = Slices.wrappedBuffer(response.body().bytes());
SerializedPage serializedPage = readSerializedPage(new BasicSliceInput(slice));
SqlFunctionResult output = new SqlFunctionResult(pageSerde.deserialize(serializedPage).getBlock(0), response.receivedResponseAtMillis());
super.complete(output);
}
catch (IOException | NullPointerException e) {
super.completeExceptionally(new IllegalStateException("Failed to get response for rest function call, " + e.getMessage()));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution.rest;

import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutionModule;
import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutors;
import com.facebook.presto.spi.function.FunctionImplementationType;
import com.facebook.presto.spi.function.RoutineCharacteristics.Language;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.TypeLiteral;

import java.util.Map;

import static com.facebook.presto.spi.function.FunctionImplementationType.REST;
import static com.google.inject.Scopes.SINGLETON;
import static java.util.Objects.requireNonNull;

public class SimpleAddressRestSqlFunctionExecutorsModule
extends AbstractConfigurationAwareModule
{
private final SqlFunctionExecutionModule sqlFunctionExecutorModule;

public SimpleAddressRestSqlFunctionExecutorsModule()
{
this(new RestSqlFunctionExecutionModule());
}

public SimpleAddressRestSqlFunctionExecutorsModule(SqlFunctionExecutionModule sqlFunctionExecutorModule)
{
this.sqlFunctionExecutorModule = requireNonNull(sqlFunctionExecutorModule, "sqlFunctionExecutorModule is null");
}

@Override
protected void setup(Binder binder)
{
Map<Language, FunctionImplementationType> languageImplementationTypeMap = ImmutableMap.of(new Language("CPP"), REST);
Map<String, FunctionImplementationType> supportedLanguages = ImmutableMap.of("CPP", REST);
// for SqlFunctionExecutor
sqlFunctionExecutorModule.setSupportedLanguages(supportedLanguages);
install(sqlFunctionExecutorModule);
// for SqlFunctionExecutors
binder.bind(SqlFunctionExecutors.class).in(SINGLETON);
binder.bind(new TypeLiteral<Map<Language, FunctionImplementationType>>() {}).toInstance(languageImplementationTypeMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
openapi: 3.0.0
info:
title: Presto FunctionNameSpace Rest API
description: API for retrieving functions in Presto.
version: "1"
servers:
- url: http://localhost:8080
description: Presto endpoint when running locally
# http://127.0.0.1:7777/v1/function/array_constructor?returnType=array(unknown)&numArgs=0
paths:
/v1/function/{functionName}:
post:
summary: Retrieve value from function
parameters:
- in: path
name: functionName
schema:
$ref: '#/components/schemas/QualifiedObjectName/properties/objectName'
required: true
description: The return type of the function of the user to get
- in: query
name: returnType
schema:
$ref: '#/components/schemas/TypeSignature'
required: true
description: The return type of the function
- in: query
name: numArgs
schema:
$ref: '#/components/schemas/SqlFunctionId/properties/size'
required: true
description: The numbers of arguments passed in
- in: query
name: argType
schema:
type: object
# $ref: '#/components/schemas/TypeSignature/parameters'
additionalProperties:
type: string
pattern: '^[A-Za-z][A-Za-z0-9]*$'
example:
argType1: string
argType2: double
description: The type for each argument passed in
requestBody:
required: true
content:
text/plain; charset=utf-8:
schema:
$ref: '#/components/schemas/SerializedPageBinary'
type: object
properties:
username:
type: string
format: binary
responses:
'200':
description: function value output
content:
text/plain; charset=utf-8:
schema:
$ref: '#/components/schemas/SerializedPageBinary'
components:
schemas:
QualifiedObjectName:
type: object
properties:
catalogName:
type: string
description: The name of the catalog
schemaName:
type: string
description: The name of the Schema
objectName:
type: string
description: The name of the function
# Serialized in function parseTypeSignature from TypeSignature.java
TypeSignature:
type: string
SqlFunctionId:
type: object
properties:
functionName:
$ref: '#/components/schemas/QualifiedObjectName'
argumentTypes:
type: array
items:
$ref: '#/components/schemas/TypeSignature'
size:
type: integer
SqlFunctionResult:
type: object
properties:
result:
type: object
description: the Block type result
cpuTimeMs:
type: number
description: time it took on the cpu
# Match this format https://prestodb.io/docs/current/develop/serialized-page.html
SerializedPageBinary:
type: string
format: binary
Page:
type: string
format: binary
Loading

0 comments on commit aee25a0

Please sign in to comment.