Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create an exportable version of plan loading #91

Merged
merged 39 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
0ea3c42
feat: add support for all subquery types
EpsilonPrime Aug 25, 2023
6758f3f
Ran clang tidy.
EpsilonPrime Jan 30, 2024
af4fb1e
Tidy fixes.
EpsilonPrime Jan 30, 2024
bfe2ff6
Fix for some errors occuring too often.
EpsilonPrime Jan 30, 2024
44188b6
Fixed problem with falling through to the wrong case.
EpsilonPrime Jan 30, 2024
20acf0b
Created initial external library
EpsilonPrime Feb 1, 2024
77fcd30
Now the library is in a usable state.
EpsilonPrime Feb 2, 2024
df1b42c
Switch to returning a structure instead of modifying passed in argume…
EpsilonPrime Feb 2, 2024
58bed47
Make the planloader library installable.
EpsilonPrime Feb 5, 2024
73bec1f
Remove extraneous Makefile.
EpsilonPrime Feb 5, 2024
74bf673
Added a planloader test.
EpsilonPrime Feb 6, 2024
80b2c3e
Ran clang tidy.
EpsilonPrime Feb 6, 2024
347fbe2
Remove accidentally added java library still in progress.
EpsilonPrime Feb 6, 2024
e4485db
Fixed SetComparison to require only a single relation.
EpsilonPrime Feb 6, 2024
a72a827
Handled the rest of the review notes.
EpsilonPrime Feb 7, 2024
d0c187d
Clean version
EpsilonPrime Feb 8, 2024
320053c
Cleaned merge.
EpsilonPrime Feb 8, 2024
125a8cb
Updated based on review.
EpsilonPrime Feb 8, 2024
db12b3d
Use int32 instead of uint32 for the buffer length for increased porta…
EpsilonPrime Feb 8, 2024
12b54fe
More tidyness.
EpsilonPrime Feb 9, 2024
3d1e40e
Try making everything position independent.
EpsilonPrime Feb 9, 2024
e49c11a
Be more explicit about what needs position independent code.
EpsilonPrime Feb 9, 2024
00de8bc
Try another way to force PIC.
EpsilonPrime Feb 9, 2024
e754a1c
Handle an impossible error to make a warning go away.
EpsilonPrime Feb 9, 2024
1b191ea
Overly force -fPIC everywhere for now.
EpsilonPrime Feb 9, 2024
f5ce243
Fix new/free mismatch.
EpsilonPrime Feb 9, 2024
755c4ef
Fix delete.
EpsilonPrime Feb 9, 2024
c28f6f0
Apparently delete doesn't need to worry about nullptr so removing thi…
EpsilonPrime Feb 9, 2024
dac8ea9
Removed one unnecessary fPIC setting.
EpsilonPrime Feb 9, 2024
5c3d5f2
Removed some more unnecessary fPIC settings.
EpsilonPrime Feb 9, 2024
dd6b5c3
and one more
EpsilonPrime Feb 9, 2024
8fc79fb
One last cleanup.
EpsilonPrime Feb 9, 2024
7f8a7da
Update export/planloader/planloader.h
EpsilonPrime Feb 9, 2024
fbe9231
Various review changes.
EpsilonPrime Feb 9, 2024
e487313
Added error test case.
EpsilonPrime Feb 10, 2024
8ea2556
fix tidy warning
EpsilonPrime Feb 10, 2024
0d0246d
Added error test case.
EpsilonPrime Feb 10, 2024
06cd5c9
Switch save_plan to also use int32_t.
EpsilonPrime Feb 10, 2024
9be359b
Fix path used for output files.
EpsilonPrime Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,4 @@ if(${SUBSTRAIT_CPP_BUILD_TESTING})
endif()

add_subdirectory(src/substrait)
add_subdirectory(export)
3 changes: 3 additions & 0 deletions export/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-License-Identifier: Apache-2.0

add_subdirectory(planloader)
24 changes: 24 additions & 0 deletions export/planloader/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-License-Identifier: Apache-2.0

if(NOT BUILD_SUBDIR_NAME EQUAL "release")
message(
SEND_ERROR,
"The planloader library does not work in Debug mode due to its dependencies."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? Does this mean you can't build the planloader in debug mode (annoying but ok)? Or does this mean that someone linking to the planloader cannot build in debug (probably an issue)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is all of the sanitization stuff that we're including in debug mode. An unfettered debug mode would be fine.

)
endif()

add_library(planloader SHARED planloader.cpp)
add_compile_options(-FPIC)
set_target_properties(planloader PROPERTIES SUBVERSION 1)
EpsilonPrime marked this conversation as resolved.
Show resolved Hide resolved

add_dependencies(planloader substrait_io)
target_link_libraries(planloader substrait_io)

install(
TARGETS planloader
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
PRIVATE_HEADER DESTINATION ${CMAKE_INSTALL_INCDIR})

if(${SUBSTRAIT_CPP_BUILD_TESTING})
add_subdirectory(tests)
endif()
62 changes: 62 additions & 0 deletions export/planloader/planloader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/* SPDX-License-Identifier: Apache-2.0 */

#include "planloader.h"

#include <substrait/common/Io.h>

extern "C" {

// Load a Substrait plan (in any format) from disk.
// Stores the Substrait plan in planBuffer in serialized form.
// Returns a SerializedPlan structure containing either the serialized plan or
// an error message. errorMessage is nullptr upon success.
SerializedPlan* load_substrait_plan(const char* filename) {
auto newPlan = new SerializedPlan();
newPlan->buffer = nullptr;
newPlan->size = 0;
newPlan->errorMessage = nullptr;

auto planOrError = io::substrait::loadPlan(filename);
if (!planOrError.ok()) {
auto errMsg = planOrError.status().message();
newPlan->errorMessage = new char[errMsg.length()+1];
strncpy(newPlan->errorMessage, errMsg.data(), errMsg.length()+1);
return newPlan;
}
::substrait::proto::Plan plan = *planOrError;
std::string text = plan.SerializeAsString();
newPlan->buffer = new char[text.length()+1];
memcpy(newPlan->buffer, text.data(), text.length()+1);
newPlan->size = text.length();
return newPlan;
}

void free_substrait_plan(SerializedPlan* plan) {
if (plan->buffer) {
free(plan->buffer);
}
if (plan->errorMessage) {
free(plan->errorMessage);
}
free(plan);
}

// Write a serialized Substrait plan to disk in the specified format.
// On error returns a non-empty error message.
// On success a nullptr is returned.
const char* save_substrait_plan(
const char* planData,
uint32_t planDataLength,
const char* filename,
io::substrait::PlanFileFormat format) {
::substrait::proto::Plan plan;
std::string data(planData, planDataLength);
plan.ParseFromString(data);
auto result = io::substrait::savePlan(plan, filename, format);
if (!result.ok()) {
return result.message().data();
}
return nullptr;
}

} // extern "C"
35 changes: 35 additions & 0 deletions export/planloader/planloader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* SPDX-License-Identifier: Apache-2.0 */

#include <substrait/common/Io.h>

extern "C" {

using SerializedPlan = struct {
char *buffer;
uint32_t size;
char *errorMessage;
EpsilonPrime marked this conversation as resolved.
Show resolved Hide resolved
};

// Since this is actually C code, stick to C style names for exporting.
// NOLINTBEGIN(readability-identifier-naming)
EpsilonPrime marked this conversation as resolved.
Show resolved Hide resolved

// Load a Substrait plan (in any format) from disk.
// Stores the Substrait plan in planBuffer in serialized form.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Stores the Substrait plan in planBuffer in serialized form.
// Stores the Substrait plan in planBuffer in protobuf serialized form.

// Returns a SerializedPlan structure containing either the serialized plan or
// an error message.
SerializedPlan* load_substrait_plan(const char* filename);
EpsilonPrime marked this conversation as resolved.
Show resolved Hide resolved

void free_substrait_plan(SerializedPlan* plan);
EpsilonPrime marked this conversation as resolved.
Show resolved Hide resolved

// Write a serialized Substrait plan to disk in the specified format.
// On error returns a non-empty error message.
// On success an empty string is returned.
const char* save_substrait_plan(
const char* planData,
EpsilonPrime marked this conversation as resolved.
Show resolved Hide resolved
uint32_t planDataLength,
const char* filename,
io::substrait::PlanFileFormat format);

// NOLINTEND(readability-identifier-naming)

} // extern "C"
35 changes: 35 additions & 0 deletions export/planloader/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# SPDX-License-Identifier: Apache-2.0

cmake_path(GET CMAKE_CURRENT_BINARY_DIR PARENT_PATH
CMAKE_CURRENT_BINARY_PARENT_DIR)
cmake_path(GET CMAKE_CURRENT_BINARY_PARENT_DIR PARENT_PATH
CMAKE_CURRENT_BINARY_TOPLEVEL_DIR)

set(CMAKE_RUNTIME_OUTPUT_DIRECTORY
"${CMAKE_CURRENT_BINARY_TOPLEVEL_DIR}/${BUILD_SUBDIR_NAME}")

add_test_case(
planloader_test
SOURCES
PlanLoaderTest.cpp
EXTRA_LINK_LIBS
planloader
gtest
gtest_main)

set(TEXTPLAN_SOURCE_DIR "${CMAKE_SOURCE_DIR}/src/substrait/textplan")

add_custom_command(
TARGET planloader_test
POST_BUILD
COMMAND ${CMAKE_COMMAND} -E echo "Copying unit test data.."
COMMAND ${CMAKE_COMMAND} -E make_directory
"${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/tests/data"
COMMAND
${CMAKE_COMMAND} -E copy
"${TEXTPLAN_SOURCE_DIR}/converter/data/q6_first_stage.json"
"${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/tests/data/q6_first_stage.json")

message(
STATUS "test data will be here: ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/tests/data"
)
32 changes: 32 additions & 0 deletions export/planloader/tests/PlanLoaderTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/* SPDX-License-Identifier: Apache-2.0 */

#include <gtest/gtest.h>
#include <functional>

#include "../planloader.h"
#include "substrait/proto/plan.pb.h"

namespace io::substrait::textplan {
namespace {

TEST(PlanLoaderTest, LoadAndSave) {
auto serializedPlan = load_substrait_plan("data/q6_first_stage.json");
ASSERT_EQ(serializedPlan->errorMessage, nullptr);

::substrait::proto::Plan plan;
bool parseStatus =
plan.ParseFromArray(serializedPlan->buffer, serializedPlan->size);
ASSERT_TRUE(parseStatus) << "Failed to parse the plan.";

const char* saveStatus = save_substrait_plan(
serializedPlan->buffer,
serializedPlan->size,
"outfile.splan",
PlanFileFormat::kText);
ASSERT_EQ(saveStatus, nullptr);

free_substrait_plan(serializedPlan);
}
EpsilonPrime marked this conversation as resolved.
Show resolved Hide resolved

} // namespace
} // namespace io::substrait::textplan
4 changes: 4 additions & 0 deletions src/substrait/textplan/ParseResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class ParseResult {
return errors;
}

void addErrors(const std::vector<std::string>& errors) {
syntaxErrors_.insert(syntaxErrors_.end(), errors.begin(), errors.end());
}

// Add the capability for ::testing::PrintToString to print ParseResult.
friend std::ostream& operator<<(std::ostream& os, const ParseResult& result);

Expand Down
6 changes: 6 additions & 0 deletions src/substrait/textplan/StructuredSymbolData.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ struct RelationData {
const SymbolInfo* continuingPipeline{nullptr};
// The next nodes in the pipelines that this node starts.
std::vector<const SymbolInfo*> newPipelines;
// Expressions in this relation consume subqueries with these symbols.
std::vector<const SymbolInfo*> subQueryPipelines;

// The information corresponding to the relation without any references to
// other relations or inputs.
Expand All @@ -46,6 +48,10 @@ struct RelationData {
// references to the symbol would use the alias.)
std::map<size_t, std::string> generatedFieldReferenceAlternativeExpression;

// Temporary storage for global aliases for expressions. Used during the
// construction of a relation.
std::map<size_t, std::string> generatedFieldReferenceAliases;

// If populated, supersedes the combination of fieldReferences and
// generatedFieldReferences for the field symbols exposed by this relation.
std::vector<const SymbolInfo*> outputFieldReferences;
Expand Down
4 changes: 4 additions & 0 deletions src/substrait/textplan/SubstraitErrorListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class SubstraitErrorListener {
addError(-1, -1, msg);
};

void addErrorInstances(const std::vector<ErrorInstance>& errors) {
errors_.insert(errors_.end(), errors.begin(), errors.end());
}

const std::vector<ErrorInstance>& getErrors() {
return errors_;
};
Expand Down
42 changes: 39 additions & 3 deletions src/substrait/textplan/SymbolTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ const SymbolInfo SymbolInfo::kUnknown = {
std::nullopt};

bool operator==(const SymbolInfo& left, const SymbolInfo& right) {
return (left.name == right.name) && (left.location == right.location) &&
return (left.name == right.name) &&
(left.sourceLocation == right.sourceLocation) &&
(left.type == right.type);
}

Expand Down Expand Up @@ -118,13 +119,31 @@ size_t SymbolTable::findSymbolIndex(const SymbolInfo& symbol) {
return symbols_.size();
}

void SymbolTable::updateLocation(
void SymbolTable::addPermanentLocation(
const SymbolInfo& symbol,
const Location& location) {
auto index = findSymbolIndex(symbol);
symbols_[index]->permanentLocation = location;
symbolsByLocation_.insert(std::make_pair(location, index));
}

void SymbolTable::setParentQueryLocation(
const io::substrait::textplan::SymbolInfo& symbol,
const io::substrait::textplan::Location& location) {
auto index = findSymbolIndex(symbol);
symbols_[index]->parentQueryLocation = location;

int highestIndex = -1;
for (const auto& sym : symbols_) {
if (sym->parentQueryLocation == location) {
if (sym->parentQueryIndex > highestIndex) {
highestIndex = sym->parentQueryIndex;
}
}
}
symbols_[index]->parentQueryIndex = highestIndex + 1;
}

void SymbolTable::addAlias(const std::string& alias, const SymbolInfo* symbol) {
auto index = findSymbolIndex(*symbol);
symbols_[index]->alias = alias;
Expand Down Expand Up @@ -169,6 +188,19 @@ const SymbolInfo* SymbolTable::lookupSymbolByLocationAndTypes(
return nullptr;
}

const SymbolInfo* SymbolTable::lookupSymbolByParentQueryAndType(
const Location& location,
int index,
SymbolType type) const {
for (const auto& symbol : symbols_) {
if (symbol->parentQueryLocation == location &&
symbol->parentQueryIndex == index && symbol->type == type) {
return symbol.get();
}
}
return nullptr;
}

const SymbolInfo& SymbolTable::nthSymbolByType(uint32_t n, SymbolType type)
const {
int count = 0;
Expand Down Expand Up @@ -200,7 +232,11 @@ std::string SymbolTable::toDebugString() const {
}
auto relationData = ANY_CAST(std::shared_ptr<RelationData>, symbol->blob);
result << std::left << std::setw(4) << relationCount++;
result << std::left << std::setw(20) << symbol->name << std::endl;
result << std::left << std::setw(20) << symbol->name;
if (!relationData->subQueryPipelines.empty()) {
result << " SQC=" << relationData->subQueryPipelines.size();
}
result << std::endl;

int32_t fieldNum = 0;
for (const auto& field : relationData->fieldReferences) {
Expand Down
28 changes: 21 additions & 7 deletions src/substrait/textplan/SymbolTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ enum class SymbolType {
};

enum class RelationType {
// Logical plans
// Logical
kUnknown = 0,
kRead = 1,
kProject = 2,
Expand All @@ -45,11 +45,11 @@ enum class RelationType {
kFilter = 8,
kSet = 9,

// Physical plans
// Physical
kHashJoin = 31,
kMergeJoin = 32,

// Write relations, currently unreachable in Plan protos.
// Write
kExchange = 50,
kDdl = 51,
kWrite = 52,
Expand Down Expand Up @@ -79,7 +79,10 @@ struct SymbolInfo {
std::string name;
std::string alias{}; // If present, use this instead of name.
const SymbolInfo* schema{nullptr}; // The related schema symbol if present.
Location location;
Location sourceLocation;
Location permanentLocation{Location::kUnknownLocation};
Location parentQueryLocation{Location::kUnknownLocation};
int parentQueryIndex{-1};
SymbolType type;
std::any subtype;
std::any blob;
Expand All @@ -91,7 +94,7 @@ struct SymbolInfo {
std::any newSubtype,
std::any newBlob)
: name(std::move(newName)),
location(newLocation),
sourceLocation(newLocation),
type(newType),
subtype(std::move(newSubtype)),
blob(std::move(newBlob)){};
Expand Down Expand Up @@ -145,8 +148,14 @@ class SymbolTable {
const std::any& subtype,
const std::any& blob);

// Changes the location for a specified existing symbol.
void updateLocation(const SymbolInfo& symbol, const Location& location);
// Changes the permanent location (the version stored in the symbol table)
// for a specified existing symbol.
void addPermanentLocation(const SymbolInfo& symbol, const Location& location);

// Sets the location of the parent query.
void setParentQueryLocation(
const SymbolInfo& symbol,
const Location& location);

// Adds an alias to the given symbol.
void addAlias(const std::string& alias, const SymbolInfo* symbol);
Expand All @@ -165,6 +174,11 @@ class SymbolTable {
const Location& location,
std::unordered_set<SymbolType> types) const;

[[nodiscard]] const SymbolInfo* lookupSymbolByParentQueryAndType(
const Location& location,
int index,
SymbolType type) const;

[[nodiscard]] const SymbolInfo& nthSymbolByType(uint32_t n, SymbolType type)
const;

Expand Down
Loading
Loading