Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow-flight connector #23032

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Conversation

sabbasani
Copy link
Contributor

@sabbasani sabbasani commented Jun 19, 2024

Description

Motivation and Context

Impact

Test Plan

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Add Arrow Flight connector :pr:`23032`
* Add documentation for the :doc:`/connector/base-arrow-flight`  :pr:`23032`

If release note is NOT required, use:

== NO RELEASE NOTE ==

 

@sabbasani sabbasani requested a review from a team as a code owner June 19, 2024 11:23
@sabbasani sabbasani requested a review from presto-oss June 19, 2024 11:23
@sabbasani sabbasani marked this pull request as draft June 19, 2024 11:42
@steveburnett
Copy link
Contributor

Consider adding documentation for the new connector.

Suggest revising the release note entry to follow the Release Note Guidelines:

== RELEASE NOTES ==

General Changes
* Add Arrow Flight connector :pr:`23032`

@tdcmeehan tdcmeehan self-assigned this Jul 15, 2024
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there's a hardcoded presumption that the underlying datasource accepts a SQL query. Can you remove this from the PR? The service may not accept SQL.

public List<Field> getColumnsList(String schema, String table, ConnectorSession connectorSession)
{
try {
String dbSpecificSchemaName = getDBSpecificSchemaName(config, schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid "DB" references as it might not be an underlying DB

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed "DB" references

Comment on lines 35 to 36
@JsonProperty("jdbcType") int jdbcType,
@JsonProperty("jdbcTypeName") String jdbcTypeName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't have references to JDBC in this connector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed JDBC re references

@tdcmeehan tdcmeehan requested a review from BryanCutler July 16, 2024 14:55
@steveburnett
Copy link
Contributor

Suggest change to the release note entry as follows:

== RELEASE NOTES ==

General Changes
* Add Arrow Flight connector :pr:`23032`

The documentation for Arrow Flight Connector appears to be being added in #23212 , so it doesn't need to be mentioned in this release note.

Copy link
Contributor

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a quick pass and had a few comments. I'll take another look at the rest a little later.

<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<grpc.version>1.53.0</grpc.version>
<dep.okhttp.version>4.10.0</dep.okhttp.version>
<arrow.version>11.0.0</arrow.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fairly old version of Arrow, can you use a more recent one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BryanCutler I have updated to leastest arrow version

presto-base-arrow-flight/pom.xml Outdated Show resolved Hide resolved
public ArrowColumnHandle(
@JsonProperty("columnName") String columnName,
@JsonProperty("columnType") Type columnType,
@JsonProperty("jdbcTypeHandle") ArrowTypeHandle arrowTypeHandle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean? "jdbcTypeHandle" -> "arrowTypeHandle"

return;
}
try {
RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would usually create a RootAllocator as a class member and they should be closed when not used anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BryanCutler I have made changes for closing allocator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not re-using the flight client. The root allocator will be closed when ArrowFlightClient is closed or auto closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this RootAllocator closed then? The FlightClient creates a child allocator that is closed along with the client, it will not close the root

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image image

ArrowFlightClient saves the root allocator. The root allocator is closed when ArrowFlightClient is closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You wouldn't normally create a RootAllocator for each client, it could be reused as each client will internally make a child allocator. It's ok to clean that up later, since you are closing it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we determine when the root allocator should be closed? Should we keep track of open flight clients and close root allocator when there are no flight clients open for some duration of time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would create 1 RootAllocator for the connector and then close it during Connector.shutdown(). The FlightClient creates it's own child allocator that is closed with the client, so the root won't be keeping any buffers directly.

}
}

logger.debug("location %s", location.getUri().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to stay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed log @BryanCutler

trustedCertificate.get().close();
}
shutdownTimer();
isClientClosed.set(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the calls to getClient() and close() need to be thread-safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BryanCutler I have addressed comments can please review changes

Copy link

linux-foundation-easycla bot commented Jul 17, 2024

CLA Signed

The committers listed above are authorized under a signed CLA.

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the base-arrow-flight.rst documentation in #23212. If this documentation is to be included in this PR - which I think would be a good idea to do this - please address my comments in my most recent review in #23212 here.

@sabbasani sabbasani force-pushed the arrow-connector branch 4 times, most recently from a69b92e to 7bd4ec9 Compare July 24, 2024 13:38
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

public class ArrowQueryBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might better belong in a submodule that depends on this module which implements the Flight SQL spec. I don't think it belongs here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan I have addressed comments can please review changes

@sabbasani sabbasani force-pushed the arrow-connector branch 8 times, most recently from 04cdc53 to 824175b Compare July 25, 2024 21:05
@sabbasani
Copy link
Contributor Author

sabbasani commented Jul 25, 2024

@tdcmeehan @BryanCutler @steveburnett
we have addressed all comments above.Can you please review

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the docs! Minor suggestions, mostly formatting.

A local doc build returns the following warning:

/Users/steveburnett/Documents/GitHub/presto/presto-docs/src/main/sphinx/connector/base-arrow-flight.rst: WARNING: document isn't included in any toctree

To address this warning,

@steveburnett
Copy link
Contributor

Suggest update of the release note to include the PR number in both entries, and to link to the new doc from the release note.

== RELEASE NOTES ==

General Changes
* Add Arrow Flight connector :pr:`23032`
* Add documentation for the :doc:`/connector/base-arrow-flight`  :pr:`23032`

@elbinpallimalilibm elbinpallimalilibm force-pushed the arrow-connector branch 12 times, most recently from 39a50c8 to 4d247bc Compare December 14, 2024 10:14

public abstract class AbstractArrowFlightClientHandler
{
private static final Logger logger = Logger.get(AbstractArrowFlightClientHandler.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused, please remove

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

return getFlightInfo(flightDescriptor, connectorSession).getSchemaOptional();
}

public void closeRootallocator()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

closeRootAllocator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

import java.nio.charset.StandardCharsets;
import java.util.Optional;

public class TestingArrowFlightRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove all unused member variables in this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed


protected abstract CredentialCallOption[] getCallOptions(ConnectorSession connectorSession);

public ArrowFlightConfig getConfig()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for this method. Instead, implementors of this module can wire the config object where needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

return config;
}

public ArrowFlightClient getClient(Optional<String> uri)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we create clients per call. Create two methods, one that has a URI and one that doesn't, instead of this method with the optional URI parameter. Call it createArrowFlightClient or something along those lines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Comment on lines 40 to 41
@JsonProperty("ticket") byte[] ticket,
@JsonProperty("locationUrls") List<String> locationUrls)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just embed the FlightEndpoint itself? It is serializable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan Tried making this Change in code and found that the FlightEndpoint is not serializable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is serializable, here is the protobuf definition of it: https://github.com/apache/arrow/blob/main/format/Flight.proto#L439

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

public abstract class AbstractArrowSplitManager
implements ConnectorSplitManager
{
private static final Logger logger = Logger.get(AbstractArrowSplitManager.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused logger

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

this.arrowBlockBuilder = requireNonNull(arrowBlockBuilder, "arrowPageBuilder is null");
}

protected abstract FlightDescriptor getFlightDescriptor(Optional<String> query, String schema, String table);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query parameter is always empty. Do we need it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the query parameter

Comment on lines 139 to 178

String schemaName = rootNode.get("interactionProperties").get("schema_name").asText(null);
String tableName = rootNode.get("interactionProperties").get("table_name").asText(null);
String selectStatement = rootNode.get("interactionProperties").get("select_statement").asText(null);

List<Field> fields = new ArrayList<>();
if (schemaName != null && tableName != null) {
String query = "SELECT * FROM INFORMATION_SCHEMA.COLUMNS " +
"WHERE TABLE_SCHEMA='" + schemaName.toUpperCase() + "' " +
"AND TABLE_NAME='" + tableName.toUpperCase() + "'";

try (ResultSet rs = connection.createStatement().executeQuery(query)) {
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
String dataType = rs.getString("TYPE_NAME");
String charMaxLength = rs.getString("CHARACTER_MAXIMUM_LENGTH");
int precision = rs.getInt("NUMERIC_PRECISION");
int scale = rs.getInt("NUMERIC_SCALE");

ArrowType arrowType = convertSqlTypeToArrowType(dataType, precision, scale);
Map<String, String> metaDataMap = new HashMap<>();
metaDataMap.put("columnNativeType", dataType);
if (charMaxLength != null) {
metaDataMap.put("columnLength", charMaxLength);
}
FieldType fieldType = new FieldType(true, arrowType, null, metaDataMap);
Field field = new Field(columnName, fieldType, null);
fields.add(field);
}
}
}
else if (selectStatement != null) {
selectStatement = selectStatement.toUpperCase();
logger.info("Executing SELECT query: " + selectStatement);
try (ResultSet rs = connection.createStatement().executeQuery(selectStatement)) {
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();

for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
String columnType = metaData.getColumnTypeName(i);
int precision = metaData.getPrecision(i);
int scale = metaData.getScale(i);

ArrowType arrowType = convertSqlTypeToArrowType(columnType, precision, scale);
Field field = new Field(columnName, FieldType.nullable(arrowType), null);
fields.add(field);
}
}
}
else {
throw new IllegalArgumentException("Either schema_name/table_name or select_statement must be provided.");
}

Schema schema = new Schema(fields);
FlightEndpoint endpoint = new FlightEndpoint(new Ticket(flightDescriptor.getCommand()));
return new FlightInfo(schema, flightDescriptor, Collections.singletonList(endpoint), -1, -1);
}
catch (Exception e) {
logger.error(e);
throw new RuntimeException("Failed to retrieve FlightInfo", e);
}
}

@Override
public Runnable acceptPut(CallContext callContext, FlightStream flightStream, StreamListener<PutResult> streamListener)
{
throw new UnsupportedOperationException("This operation is not supported");
}

@Override
public void doAction(CallContext callContext, Action action, StreamListener<Result> streamListener)
{
try {
String jsonRequest = new String(action.getBody(), StandardCharsets.UTF_8);
JsonNode rootNode = objectMapper.readTree(jsonRequest);
String schemaName = rootNode.get("interactionProperties").get("schema_name").asText(null);

String query;
if (schemaName == null) {
query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA";
}
else {
schemaName = schemaName.toUpperCase();
query = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='" + schemaName + "'";
}
ResultSet rs = connection.createStatement().executeQuery(query);
List<String> names = new ArrayList<>();
while (rs.next()) {
names.add(rs.getString(1));
}

String jsonResponse = objectMapper.writeValueAsString(names);
streamListener.onNext(new Result(jsonResponse.getBytes(StandardCharsets.UTF_8)));
streamListener.onCompleted();
}
catch (Exception e) {
streamListener.onError(e);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like these are just copied from TestingArrowServer, but don't really apply to the test case where they're used. Can you make it simpler please, seems like this can just be hardcoded since it's not returning TPCH data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting hardcoded values.

Comment on lines 251 to 291
switch (sqlType.toUpperCase()) {
case "VARCHAR":
case "CHAR":
case "CHARACTER VARYING":
case "CHARACTER":
case "CLOB":
return new ArrowType.Utf8();
case "INTEGER":
case "INT":
return new ArrowType.Int(32, true);
case "BIGINT":
return new ArrowType.Int(64, true);
case "SMALLINT":
return new ArrowType.Int(16, true);
case "TINYINT":
return new ArrowType.Int(8, true);
case "DOUBLE":
case "DOUBLE PRECISION":
case "FLOAT":
return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
case "REAL":
return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
case "BOOLEAN":
return new ArrowType.Bool();
case "DATE":
return new ArrowType.Date(DateUnit.DAY);
case "TIMESTAMP":
return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
case "TIME":
return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
case "DECIMAL":
case "NUMERIC":
return new ArrowType.Decimal(precision, scale);
case "BINARY":
case "VARBINARY":
return new ArrowType.Binary();
case "NULL":
return new ArrowType.Null();
default:
throw new IllegalArgumentException("Unsupported SQL type: " + sqlType);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment--do you need this here at all? If not, please remove.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed


public Optional<Schema> getSchema(FlightDescriptor flightDescriptor, ConnectorSession connectorSession)
{
return getFlightInfo(flightDescriptor, connectorSession).getSchemaOptional();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is problematic because this could involve the costly allocation of resources (like executing a query), only to discard the results and return the schema. I think we should use the GetSchema RPC on the flight client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use getSchema RPC

Statement stmt = conn.createStatement();

// Create schema
stmt.execute("CREATE SCHEMA IF NOT EXISTS testdb");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this tpch instead.

Suggested change
stmt.execute("CREATE SCHEMA IF NOT EXISTS testdb");
stmt.execute("CREATE SCHEMA IF NOT EXISTS tpch");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use tpch


private Type getPrestoTypeFromArrowField(Field field)
{
return arrowBlockBuilder.getPrestoTypeFromArrowField(field);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdcmeehan We are injecting an instance of ArrowBlockBuilder to re-use the arrow to presto field mappings.

@elbinpallimalilibm elbinpallimalilibm force-pushed the arrow-connector branch 6 times, most recently from a9a9a50 to 57de3f1 Compare December 19, 2024 03:21
protected final String name;
protected final Module module;

public ArrowPlugin(String name, Module module)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work--plugins are loaded with ServiceLoaders, which require a no-arg constructor.

Copy link
Contributor

@elbinpallimalilibm elbinpallimalilibm Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementing module will have to define the plugin like below.

public class MyArrowPlugin
        extends ArrowPlugin
{
    public MyArrowPlugin()
    {
        super("my-arrow-flight", new MyArrowModule());
    }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TestingArrowPlugin to reference implementation in test to make this clear.

Co-authored-by: sai bhaskar reddy <sai.bhaskar.reddy.sabbasani1@ibm.com>
Co-authored-by: SthuthiGhosh9400 <Sthuthi.Ghosh@ibm.com>
Co-authored-by: lithinwxd <Lithin.Purushothaman@ibm.com>
Co-authored-by: Steve Burnett <burnett@pobox.com>
Co-authored-by: elbinpallimalilibm <elbin.pallimalil@ibm.com>
Co-authored-by: Timothy Meehan <tim@timdmeehan.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
from:IBM PR from IBM
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants