Skip to content

Commit

Permalink
Fix linting
Browse files Browse the repository at this point in the history
  • Loading branch information
EnricoMi committed Nov 15, 2024
1 parent cb99357 commit e66878f
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 65 deletions.
57 changes: 29 additions & 28 deletions cpp/src/arrow/acero/assert_order_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ void CheckAssert(const std::shared_ptr<ChunkedArray>& array,
}
}

void CheckAssert(const std::shared_ptr<ChunkedArray>& array,
const Ordering& ordering,
void CheckAssert(const std::shared_ptr<ChunkedArray>& array, const Ordering& ordering,
const Status& expected_status = Status::OK()) {
CheckAssert(array, table_schema, ordering, expected_status);
}
Expand Down Expand Up @@ -111,7 +110,8 @@ TEST(AssertOrderNode, SingleColumnAsc) {
// TODO: add tests with NULL and NaN values

auto asc_sort_options =
Ordering({{compute::SortKey{"id", compute::SortOrder::Ascending}}}, compute::NullPlacement::AtStart);
Ordering({{compute::SortKey{"id", compute::SortOrder::Ascending}}},
compute::NullPlacement::AtStart);

CheckAssert(monotonic_ids, asc_sort_options);
CheckAssert(repeating_ids, asc_sort_options);
Expand All @@ -129,31 +129,32 @@ TEST(AssertOrderNode, SingleColumnAsc) {
}

TEST(AssertOrderNode, SingleColumnDesc) {
const auto monotonic_ids =
ChunkedArrayFromJSON(int64(), {"[9, 8, 7]", "[6, 5, 4]", "[3, 2, 1]"});
const auto repeating_ids =
ChunkedArrayFromJSON(int64(), {"[9, 7, 7]", "[5, 5, 5]", "[3, 3, 1]"});
const auto identical_ids =
ChunkedArrayFromJSON(int64(), {"[3, 3, 3]", "[3, 3, 3]", "[3, 3, 3]"});
const auto some_negative_ids =
ChunkedArrayFromJSON(int64(), {"[5, 4, 3]", "[2, 0, -2]", "[-3, -5, -6]"});
const auto all_negative_ids =
ChunkedArrayFromJSON(int64(), {"[-1, -2, -3]", "[-4, -5, -6]", "[-7, -8, -9]"});

const auto all_empty_batches = ChunkedArrayFromJSON(int64(), {"[]", "[]", "[]"});
const auto many_empty_batches = ChunkedArrayFromJSON(
int64(),
{"[]", "[]", "[9, 8, 7]", "[]", "[]", "[6, 5, 4]", "[]", "[3, 2, 1]", "[]", "[]"});

const auto unordered_batch =
ChunkedArrayFromJSON(int64(), {"[9, 8, 7]", "[6, 4, 5]", "[3, 2, 1]"});
const auto unordered_batches =
ChunkedArrayFromJSON(int64(), {"[9, 8, 7]", "[3, 2, 1]", "[6, 5, 4]"});

// TODO: add tests with NULL and NaN values

auto desc_sort_options =
Ordering({{compute::SortKey{"id", compute::SortOrder::Descending}}}, compute::NullPlacement::AtStart);
const auto monotonic_ids =
ChunkedArrayFromJSON(int64(), {"[9, 8, 7]", "[6, 5, 4]", "[3, 2, 1]"});
const auto repeating_ids =
ChunkedArrayFromJSON(int64(), {"[9, 7, 7]", "[5, 5, 5]", "[3, 3, 1]"});
const auto identical_ids =
ChunkedArrayFromJSON(int64(), {"[3, 3, 3]", "[3, 3, 3]", "[3, 3, 3]"});
const auto some_negative_ids =
ChunkedArrayFromJSON(int64(), {"[5, 4, 3]", "[2, 0, -2]", "[-3, -5, -6]"});
const auto all_negative_ids =
ChunkedArrayFromJSON(int64(), {"[-1, -2, -3]", "[-4, -5, -6]", "[-7, -8, -9]"});

const auto all_empty_batches = ChunkedArrayFromJSON(int64(), {"[]", "[]", "[]"});
const auto many_empty_batches = ChunkedArrayFromJSON(
int64(),
{"[]", "[]", "[9, 8, 7]", "[]", "[]", "[6, 5, 4]", "[]", "[3, 2, 1]", "[]", "[]"});

const auto unordered_batch =
ChunkedArrayFromJSON(int64(), {"[9, 8, 7]", "[6, 4, 5]", "[3, 2, 1]"});
const auto unordered_batches =
ChunkedArrayFromJSON(int64(), {"[9, 8, 7]", "[3, 2, 1]", "[6, 5, 4]"});

// TODO: add tests with NULL and NaN values

auto desc_sort_options =
Ordering({{compute::SortKey{"id", compute::SortOrder::Descending}}},
compute::NullPlacement::AtStart);

CheckAssert(monotonic_ids, desc_sort_options);
CheckAssert(repeating_ids, desc_sort_options);
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/acero/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ class ARROW_ACERO_EXPORT AssertOrderNodeOptions : public ExecNodeOptions {
public:
static constexpr std::string_view kName = "assert_order";
/// \brief create an instance from ordering
explicit AssertOrderNodeOptions(Ordering ordering)
: ordering(std::move(ordering)) {}
explicit AssertOrderNodeOptions(Ordering ordering) : ordering(std::move(ordering)) {}

/// \brief expected null placement and sort columns and directions
Ordering ordering;
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/compute/ordering.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class ARROW_EXPORT Ordering : public util::EqualityComparable<Ordering> {
public:
Ordering(std::vector<SortKey> sort_keys,
NullPlacement null_placement = NullPlacement::AtStart)
: sort_keys_(std::move(sort_keys)), null_placement_(null_placement), is_implicit_(false) {}
: sort_keys_(std::move(sort_keys)),
null_placement_(null_placement),
is_implicit_(false) {}
/// true if data ordered by other is also ordered by this
///
/// For example, if data is ordered by [a, b, c] then it is also ordered
Expand Down
16 changes: 9 additions & 7 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,9 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,

// the source can at most establish implicit ordering,
// the assert_order node will establish explicit ordering
auto source_ordering = require_sequenced_output || ordering.is_ordered() ? Ordering::Implicit() : Ordering::Unordered();
auto source_ordering = require_sequenced_output || ordering.is_ordered()
? Ordering::Implicit()
: Ordering::Unordered();

auto fields = scan_options->dataset_schema->fields();
if (scan_options->add_augmented_fields) {
Expand All @@ -1078,16 +1080,16 @@ Result<acero::ExecNode*> MakeScanNode(acero::ExecPlan* plan,
}
}

auto out = acero::MakeExecNode(
"source", plan, {},
acero::SourceNodeOptions{schema(std::move(fields)), std::move(gen), source_ordering});
auto out =
acero::MakeExecNode("source", plan, {},
acero::SourceNodeOptions{schema(std::move(fields)),
std::move(gen), source_ordering});

// assert and establish explicit ordering
if (ordering.is_explicit()) {
ARROW_ASSIGN_OR_RAISE(auto source, out);
out = acero::MakeExecNode(
"assert_order", plan, {source},
acero::AssertOrderNodeOptions{ordering});
out = acero::MakeExecNode("assert_order", plan, {source},
acero::AssertOrderNodeOptions{ordering});
}

return out;
Expand Down
12 changes: 5 additions & 7 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -579,13 +579,11 @@ class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
require_sequenced_output(require_sequenced_output),
ordering(std::move(ordering)) {}

explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
Ordering ordering = Ordering::Unordered())
: ScanNodeOptions(std::move(dataset),
std::move(scan_options),
false,
std::move(ordering)) { }
explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options,
Ordering ordering = Ordering::Unordered())
: ScanNodeOptions(std::move(dataset), std::move(scan_options), false,
std::move(ordering)) {}

std::shared_ptr<Dataset> dataset;
std::shared_ptr<ScanOptions> scan_options;
Expand Down
45 changes: 25 additions & 20 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -876,10 +876,10 @@ TEST(TestNewScanner, MissingColumn) {
AssertArraysEqual(*expected_nulls, *batches[0]->column(1));
}

void WriteIpcData(const std::string& path,
const std::shared_ptr<fs::FileSystem> file_system,
const std::shared_ptr<Table> input,
const ipc::IpcWriteOptions& options = ipc::IpcWriteOptions::Defaults()) {
void WriteIpcData(
const std::string& path, const std::shared_ptr<fs::FileSystem> file_system,
const std::shared_ptr<Table> input,
const ipc::IpcWriteOptions& options = ipc::IpcWriteOptions::Defaults()) {
EXPECT_OK_AND_ASSIGN(auto out_stream, file_system->OpenOutputStream(path));
ASSERT_OK_AND_ASSIGN(
auto file_writer,
Expand Down Expand Up @@ -2906,18 +2906,20 @@ Ordering CreateOrdering(const std::pair<std::string, compute::SortOrder>& sort_k
keys.emplace_back(FieldRef(sort_key.first), sort_key.second);
return {keys};
}
void AssertPlanHasAssertOrderNode(acero::Declaration declarations, bool expect_assert_node) {

Ordering CreateOrdering(const std::vector<std::pair<std::string, compute::SortOrder>>& sort_keys) {
Ordering CreateOrdering(
const std::vector<std::pair<std::string, compute::SortOrder>>& sort_keys) {
auto keys = std::vector<compute::SortKey>();
for (const auto& sort_key : sort_keys) {
keys.emplace_back(FieldRef(sort_key.first), sort_key.second);
}
return {keys};
}

void AssertPlanHasAssertOrderNode(acero::Declaration declarations,
bool expect_assert_node) {
ASSERT_OK_AND_ASSIGN(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(acero::QueryOptions(), ExecContext()));
ExecPlan::Make(acero::QueryOptions(), ExecContext()));
ASSERT_OK(declarations.AddToPlan(exec_plan.get()));
if (expect_assert_node) {
// we expect the scan node expanded into two nodes
Expand All @@ -2936,19 +2938,21 @@ TEST(ScanNode, AssertOrder) {
arrow::dataset::internal::Initialize();
ASSERT_OK_AND_ASSIGN(auto plan, acero::ExecPlan::Make());

auto dummy_schema = schema(
{field("id", int32()), field("rev", int32()), field("value", int32())});
auto dummy_schema =
schema({field("id", int32()), field("rev", int32()), field("value", int32())});

// creating a synthetic dataset using generators
static constexpr int kRowsPerBatch = 4;
static constexpr int kNumBatches = 32;

std::shared_ptr<Table> table = gen::Gen({
{"id", gen::Step(/*start=*/-kRowsPerBatch*kNumBatches/2, /*step=*/1, /*signed_int=*/true)},
{"rev", gen::Step(/*start=*/kRowsPerBatch*kNumBatches/2, /*step=*/-1, /*signed_int=*/true)},
{"value", gen::Random(int32())}
})->FailOnError()
->Table(kRowsPerBatch, kNumBatches);
std::shared_ptr<Table> table =
gen::Gen({{"id", gen::Step(/*start=*/-kRowsPerBatch * kNumBatches / 2, /*step=*/1,
/*signed_int=*/true)},
{"rev", gen::Step(/*start=*/kRowsPerBatch * kNumBatches / 2, /*step=*/-1,
/*signed_int=*/true)},
{"value", gen::Random(int32())}})
->FailOnError()
->Table(kRowsPerBatch, kNumBatches);

auto format = std::make_shared<arrow::dataset::IpcFileFormat>();
auto filesystem = std::make_shared<fs::LocalFileSystem>();
Expand Down Expand Up @@ -3000,9 +3004,10 @@ TEST(ScanNode, AssertOrder) {
compute::Ordering unordered = CreateOrdering(unordered_key);

// test existing orderings pass
for (const Ordering& ordering : {asc, desc, asc_desc, asc_desc_rand, desc_asc, desc_asc_rand}) {
declarations = acero::Declaration::Sequence(
{acero::Declaration({"scan", dataset::ScanNodeOptions{dataset, scan_options, ordering}})});
for (const Ordering& ordering :
{asc, desc, asc_desc, asc_desc_rand, desc_asc, desc_asc_rand}) {
declarations = acero::Declaration::Sequence({acero::Declaration(
{"scan", dataset::ScanNodeOptions{dataset, scan_options, ordering}})});
ASSERT_OK_AND_ASSIGN(auto actual, acero::DeclarationToTable(declarations));
// Scan node always emits augmented fields so we drop those
ASSERT_OK_AND_ASSIGN(auto actualMinusAugmented, actual->SelectColumns({0, 1, 2}));
Expand All @@ -3012,8 +3017,8 @@ TEST(ScanNode, AssertOrder) {

// test non-existing orderings fail
for (const Ordering& non_ordering : {not_asc, not_asc, unordered}) {
declarations = acero::Declaration::Sequence(
{acero::Declaration({"scan", dataset::ScanNodeOptions{dataset, scan_options, false, non_ordering}})});
declarations = acero::Declaration::Sequence({acero::Declaration(
{"scan", dataset::ScanNodeOptions{dataset, scan_options, false, non_ordering}})});
ASSERT_NOT_OK(acero::DeclarationToTable(declarations));
AssertPlanHasAssertOrderNode(declarations, true);
}
Expand Down

0 comments on commit e66878f

Please sign in to comment.