Skip to content

Commit

Permalink
[rest] Optimize partition methods to let rest return table not partit…
Browse files Browse the repository at this point in the history
…ioned (#4979)
  • Loading branch information
JingsongLi authored Jan 22, 2025
1 parent f3e0a0d commit 8821b04
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.exceptions.ServiceUnavailableException;
import org.apache.paimon.rest.exceptions.UnsupportedOperationException;
import org.apache.paimon.rest.responses.ErrorResponse;

/** Default error handler. */
Expand Down Expand Up @@ -61,7 +61,7 @@ public void accept(ErrorResponse error) {
case 500:
throw new ServiceFailureException("Server error: %s", message);
case 501:
throw new UnsupportedOperationException(message);
throw new NotImplementedException(message);
case 503:
throw new ServiceUnavailableException("Service unavailable: %s", message);
default:
Expand Down
125 changes: 55 additions & 70 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterPartitionsRequest;
Expand Down Expand Up @@ -84,7 +85,6 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
Expand Down Expand Up @@ -392,7 +392,7 @@ public void alterTable(
throw new ColumnAlreadyExistException(identifier, e.resourceName());
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
} catch (org.apache.paimon.rest.exceptions.UnsupportedOperationException e) {
} catch (NotImplementedException e) {
throw new UnsupportedOperationException(e.getMessage());
} catch (ServiceFailureException e) {
throw new IllegalStateException(e.getMessage());
Expand Down Expand Up @@ -422,38 +422,35 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
@Override
public void createPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
CreatePartitionsRequest request = new CreatePartitionsRequest(partitions);
client.post(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
try {
CreatePartitionsRequest request = new CreatePartitionsRequest(partitions);
client.post(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
// not a metastore partitioned table
}
}

@Override
public void dropPartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
DropPartitionsRequest request = new DropPartitionsRequest(partitions);
client.post(
resourcePaths.dropPartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
} else {
FileStoreTable fileStoreTable = (FileStoreTable) table;
try {
DropPartitionsRequest request = new DropPartitionsRequest(partitions);
client.post(
resourcePaths.dropPartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
// not a metastore partitioned table
FileStoreTable fileStoreTable = (FileStoreTable) getTable(identifier);
try (FileStoreCommit commit =
fileStoreTable
.store()
Expand All @@ -468,65 +465,58 @@ public void dropPartitions(Identifier identifier, List<Map<String, String>> part
@Override
public void alterPartitions(Identifier identifier, List<Partition> partitions)
throws TableNotExistException {
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
AlterPartitionsRequest request = new AlterPartitionsRequest(partitions);
client.post(
resourcePaths.alterPartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
try {
AlterPartitionsRequest request = new AlterPartitionsRequest(partitions);
client.post(
resourcePaths.alterPartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
// not a metastore partitioned table
}
}

@Override
public void markDonePartitions(Identifier identifier, List<Map<String, String>> partitions)
throws TableNotExistException {
Table table = getTable(identifier);
if (isMetaStorePartitionedTable(table)) {
try {
MarkDonePartitionsRequest request = new MarkDonePartitionsRequest(partitions);
client.post(
resourcePaths.markDonePartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
}
try {
MarkDonePartitionsRequest request = new MarkDonePartitionsRequest(partitions);
client.post(
resourcePaths.markDonePartitions(
identifier.getDatabaseName(), identifier.getTableName()),
request,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
// not a metastore partitioned table
}
}

@Override
public List<Partition> listPartitions(Identifier identifier) throws TableNotExistException {
Table table = getTable(identifier);
if (!isMetaStorePartitionedTable(table)) {
return listPartitionsFromFileSystem(table);
}

ListPartitionsResponse response;
try {
response =
ListPartitionsResponse response =
client.get(
resourcePaths.partitions(
identifier.getDatabaseName(), identifier.getTableName()),
ListPartitionsResponse.class,
headers());
if (response == null || response.getPartitions() == null) {
return Collections.emptyList();
}
return response.getPartitions();
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
} catch (NotImplementedException e) {
// not a metastore partitioned table
return listPartitionsFromFileSystem(getTable(identifier));
}

if (response == null || response.getPartitions() == null) {
return Collections.emptyList();
}

return response.getPartitions();
}

@Override
Expand Down Expand Up @@ -626,11 +616,6 @@ public void close() throws Exception {
}
}

private boolean isMetaStorePartitionedTable(Table table) {
Options options = Options.fromMap(table.options());
return Boolean.TRUE.equals(options.get(METASTORE_PARTITIONED_TABLE));
}

private Map<String, String> headers() {
return catalogAuth.getHeaders();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package org.apache.paimon.rest.exceptions;

/** Exception thrown on HTTP 501 - UnsupportedOperationException. */
public class UnsupportedOperationException extends RESTException {
/** Exception thrown on HTTP 501 - NotImplementedException. */
public class NotImplementedException extends RESTException {

public UnsupportedOperationException(String message, Object... args) {
public NotImplementedException(String message, Object... args) {
super(String.format(message, args));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.exceptions.ServiceUnavailableException;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void testHandleErrorResponse() {
ServiceFailureException.class,
() -> defaultErrorHandler.accept(generateErrorResponse(500)));
assertThrows(
org.apache.paimon.rest.exceptions.UnsupportedOperationException.class,
NotImplementedException.class,
() -> defaultErrorHandler.accept(generateErrorResponse(501)));
assertThrows(
RESTException.class, () -> defaultErrorHandler.accept(generateErrorResponse(502)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.rest;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Database;
Expand Down Expand Up @@ -68,6 +69,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
Expand Down Expand Up @@ -178,6 +180,11 @@ public MockResponse dispatch(RecordedRequest request) {
if (isDropPartitions) {
String tableName = resources[2];
Identifier identifier = Identifier.create(databaseName, tableName);
Optional<MockResponse> error =
checkTablePartitioned(catalog, identifier);
if (error.isPresent()) {
return error.get();
}
DropPartitionsRequest dropPartitionsRequest =
OBJECT_MAPPER.readValue(
request.getBody().readUtf8(),
Expand All @@ -188,6 +195,11 @@ public MockResponse dispatch(RecordedRequest request) {
} else if (isAlterPartitions) {
String tableName = resources[2];
Identifier identifier = Identifier.create(databaseName, tableName);
Optional<MockResponse> error =
checkTablePartitioned(catalog, identifier);
if (error.isPresent()) {
return error.get();
}
AlterPartitionsRequest alterPartitionsRequest =
OBJECT_MAPPER.readValue(
request.getBody().readUtf8(),
Expand All @@ -198,6 +210,11 @@ public MockResponse dispatch(RecordedRequest request) {
} else if (isMarkDonePartitions) {
String tableName = resources[2];
Identifier identifier = Identifier.create(databaseName, tableName);
Optional<MockResponse> error =
checkTablePartitioned(catalog, identifier);
if (error.isPresent()) {
return error.get();
}
MarkDonePartitionsRequest markDonePartitionsRequest =
OBJECT_MAPPER.readValue(
request.getBody().readUtf8(),
Expand All @@ -207,6 +224,12 @@ public MockResponse dispatch(RecordedRequest request) {
return new MockResponse().setResponseCode(200);
} else if (isPartitions) {
String tableName = resources[2];
Optional<MockResponse> error =
checkTablePartitioned(
catalog, Identifier.create(databaseName, tableName));
if (error.isPresent()) {
return error.get();
}
return partitionsApiHandler(catalog, request, databaseName, tableName);
} else if (isTableToken) {
GetTableTokenResponse getTableTokenResponse =
Expand Down Expand Up @@ -326,6 +349,24 @@ public MockResponse dispatch(RecordedRequest request) {
};
}

private static Optional<MockResponse> checkTablePartitioned(
Catalog catalog, Identifier identifier) {
Table table;
try {
table = catalog.getTable(identifier);
} catch (Catalog.TableNotExistException e) {
return Optional.of(
mockResponse(
new ErrorResponse(ErrorResponseResourceType.TABLE, null, "", 404),
404));
}
boolean partitioned = CoreOptions.fromMap(table.options()).partitionedTableInMetastore();
if (!partitioned) {
return Optional.of(mockResponse(new ErrorResponse(null, null, "", 501), 501));
}
return Optional.empty();
}

private static MockResponse commitTableApiHandler(
Catalog catalog, RecordedRequest request, String databaseName, String tableName)
throws Exception {
Expand Down

0 comments on commit 8821b04

Please sign in to comment.