Skip to content

Commit

Permalink
improve struct schema-change behavior for add sub-columns
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan committed Jan 16, 2025
1 parent e35e019 commit 9d451d9
Show file tree
Hide file tree
Showing 8 changed files with 533 additions and 50 deletions.
49 changes: 33 additions & 16 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Status ColumnReader::create_struct(const ColumnReaderOptions& opts, const Column
std::unique_ptr<ColumnReader> struct_reader(
new ColumnReader(opts, meta, num_rows, file_reader));
struct_reader->_sub_readers.reserve(meta.children_columns_size());
// now we support struct column can add the children columns according to the schema-change behavior
for (size_t i = 0; i < meta.children_columns_size(); i++) {
std::unique_ptr<ColumnReader> sub_reader;
RETURN_IF_ERROR(ColumnReader::create(opts, meta.children_columns(i),
Expand Down Expand Up @@ -714,7 +715,7 @@ Status ColumnReader::seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterat
return Status::OK();
}

Status ColumnReader::new_iterator(ColumnIterator** iterator) {
Status ColumnReader::new_iterator(ColumnIterator** iterator, const TabletColumn* tablet_column) {
if (is_empty()) {
*iterator = new EmptyFileColumnIterator();
return Status::OK();
Expand All @@ -729,13 +730,13 @@ Status ColumnReader::new_iterator(ColumnIterator** iterator) {
return new_agg_state_iterator(iterator);
}
case FieldType::OLAP_FIELD_TYPE_STRUCT: {
return new_struct_iterator(iterator);
return new_struct_iterator(iterator, tablet_column);
}
case FieldType::OLAP_FIELD_TYPE_ARRAY: {
return new_array_iterator(iterator);
return new_array_iterator(iterator, tablet_column);
}
case FieldType::OLAP_FIELD_TYPE_MAP: {
return new_map_iterator(iterator);
return new_map_iterator(iterator, tablet_column);
}
case FieldType::OLAP_FIELD_TYPE_VARIANT: {
*iterator = new VariantRootColumnIterator(new FileColumnIterator(this));
Expand All @@ -753,55 +754,71 @@ Status ColumnReader::new_agg_state_iterator(ColumnIterator** iterator) {
return Status::OK();
}

Status ColumnReader::new_array_iterator(ColumnIterator** iterator) {
Status ColumnReader::new_array_iterator(ColumnIterator** iterator,
const TabletColumn* tablet_column) {
ColumnIterator* item_iterator = nullptr;
RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&item_iterator));
RETURN_IF_ERROR(_sub_readers[0]->new_iterator(
&item_iterator, tablet_column ? &tablet_column->get_sub_column(0) : nullptr));

ColumnIterator* offset_iterator = nullptr;
RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&offset_iterator));
RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&offset_iterator, tablet_column));
auto* ofcIter =
new OffsetFileColumnIterator(reinterpret_cast<FileColumnIterator*>(offset_iterator));

ColumnIterator* null_iterator = nullptr;
if (is_nullable()) {
RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&null_iterator));
RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&null_iterator, tablet_column));
}
*iterator = new ArrayFileColumnIterator(this, ofcIter, item_iterator, null_iterator);
return Status::OK();
}

Status ColumnReader::new_map_iterator(ColumnIterator** iterator) {
Status ColumnReader::new_map_iterator(ColumnIterator** iterator,
const TabletColumn* tablet_column) {
ColumnIterator* key_iterator = nullptr;
RETURN_IF_ERROR(_sub_readers[0]->new_iterator(&key_iterator));
RETURN_IF_ERROR(_sub_readers[0]->new_iterator(
&key_iterator, tablet_column ? &tablet_column->get_sub_column(0) : nullptr));
ColumnIterator* val_iterator = nullptr;
RETURN_IF_ERROR(_sub_readers[1]->new_iterator(&val_iterator));
RETURN_IF_ERROR(_sub_readers[1]->new_iterator(
&val_iterator, tablet_column ? &tablet_column->get_sub_column(1) : nullptr));
ColumnIterator* offsets_iterator = nullptr;
RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&offsets_iterator));
RETURN_IF_ERROR(_sub_readers[2]->new_iterator(&offsets_iterator, nullptr));
auto* ofcIter =
new OffsetFileColumnIterator(reinterpret_cast<FileColumnIterator*>(offsets_iterator));

ColumnIterator* null_iterator = nullptr;
if (is_nullable()) {
RETURN_IF_ERROR(_sub_readers[3]->new_iterator(&null_iterator));
RETURN_IF_ERROR(_sub_readers[3]->new_iterator(&null_iterator, nullptr));
}
*iterator = new MapFileColumnIterator(this, null_iterator, ofcIter, key_iterator, val_iterator);
return Status::OK();
}

Status ColumnReader::new_struct_iterator(ColumnIterator** iterator) {
Status ColumnReader::new_struct_iterator(ColumnIterator** iterator,
const TabletColumn* tablet_column) {
std::vector<ColumnIterator*> sub_column_iterators;
size_t child_size = is_nullable() ? _sub_readers.size() - 1 : _sub_readers.size();
size_t tablet_column_size = tablet_column ? tablet_column->get_sub_columns().size() : 0;
sub_column_iterators.reserve(child_size);

ColumnIterator* sub_column_iterator;
for (size_t i = 0; i < child_size; i++) {
RETURN_IF_ERROR(_sub_readers[i]->new_iterator(&sub_column_iterator));
RETURN_IF_ERROR(_sub_readers[i]->new_iterator(
&sub_column_iterator, tablet_column ? &tablet_column->get_sub_column(i) : nullptr));
sub_column_iterators.push_back(sub_column_iterator);
}

// create default_iterator for schema-change behavior which increase column
for (size_t i = child_size; i < tablet_column_size; i++) {
TabletColumn column = tablet_column->get_sub_column(i);
std::unique_ptr<ColumnIterator>* it = new std::unique_ptr<ColumnIterator>();
RETURN_IF_ERROR(Segment::new_default_iterator(column, it));
sub_column_iterators.push_back(it->get());
}

ColumnIterator* null_iterator = nullptr;
if (is_nullable()) {
RETURN_IF_ERROR(_sub_readers[child_size]->new_iterator(&null_iterator));
RETURN_IF_ERROR(_sub_readers[child_size]->new_iterator(&null_iterator, nullptr));
}
*iterator = new StructFileColumnIterator(this, null_iterator, sub_column_iterators);
return Status::OK();
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ class ColumnReader : public MetadataAdder<ColumnReader> {
virtual ~ColumnReader();

// create a new column iterator. Client should delete returned iterator
Status new_iterator(ColumnIterator** iterator);
Status new_array_iterator(ColumnIterator** iterator);
Status new_struct_iterator(ColumnIterator** iterator);
Status new_map_iterator(ColumnIterator** iterator);
Status new_iterator(ColumnIterator** iterator, const TabletColumn* tablet_column);
Status new_array_iterator(ColumnIterator** iterator, const TabletColumn* tablet_column);
Status new_struct_iterator(ColumnIterator** iterator, const TabletColumn* tablet_column);
Status new_map_iterator(ColumnIterator** iterator, const TabletColumn* tablet_column);
Status new_agg_state_iterator(ColumnIterator** iterator);
// Client should delete returned iterator
Status new_bitmap_index_iterator(BitmapIndexIterator** iterator);
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/hierarchical_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Status HierarchicalDataReader::create(std::unique_ptr<ColumnIterator>* reader,
// like {"a" : "b" : {"e" : 1.1}} in jsonb format
if (read_type == ReadType::MERGE_SPARSE) {
ColumnIterator* it;
RETURN_IF_ERROR(root->data.reader->new_iterator(&it));
RETURN_IF_ERROR(root->data.reader->new_iterator(&it, nullptr));
stream_iter->set_root(std::make_unique<SubstreamIterator>(
root->data.file_column_type->create_column(), std::unique_ptr<ColumnIterator>(it),
root->data.file_column_type));
Expand Down Expand Up @@ -132,7 +132,7 @@ Status HierarchicalDataReader::add_stream(const SubcolumnColumnReaders::Node* no
}
CHECK(node);
ColumnIterator* it;
RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
RETURN_IF_ERROR(node->data.reader->new_iterator(&it, nullptr));
std::unique_ptr<ColumnIterator> it_ptr;
it_ptr.reset(it);
SubstreamIterator reader(node->data.file_column_type->create_column(), std::move(it_ptr),
Expand Down
34 changes: 8 additions & 26 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -702,32 +702,12 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
return Status::OK();
}

static Status new_default_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter) {
if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
return Status::InternalError(
"invalid nonexistent column without default value. column_uid={}, column_name={}, "
"column_type={}",
tablet_column.unique_id(), tablet_column.name(), tablet_column.type());
}
auto type_info = get_type_info(&tablet_column);
std::unique_ptr<DefaultValueColumnIterator> default_value_iter(new DefaultValueColumnIterator(
tablet_column.has_default_value(), tablet_column.default_value(),
tablet_column.is_nullable(), std::move(type_info), tablet_column.precision(),
tablet_column.frac()));
ColumnIteratorOptions iter_opts;

RETURN_IF_ERROR(default_value_iter->init(iter_opts));
*iter = std::move(default_value_iter);
return Status::OK();
}

Status Segment::_new_iterator_with_variant_root(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter,
const SubcolumnColumnReaders::Node* root,
vectorized::DataTypePtr target_type_hint) {
ColumnIterator* it;
RETURN_IF_ERROR(root->data.reader->new_iterator(&it));
RETURN_IF_ERROR(root->data.reader->new_iterator(&it, &tablet_column));
auto* stream_iter = new ExtractReader(
tablet_column,
std::make_unique<SubstreamIterator>(root->data.file_column_type->create_column(),
Expand Down Expand Up @@ -794,7 +774,7 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
assert(leaf);
std::unique_ptr<ColumnIterator> sibling_iter;
ColumnIterator* sibling_iter_ptr;
RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr));
RETURN_IF_ERROR(leaf->data.reader->new_iterator(&sibling_iter_ptr, &tablet_column));
sibling_iter.reset(sibling_iter_ptr);
*iter = std::make_unique<DefaultNestedColumnIterator>(std::move(sibling_iter),
leaf->data.file_column_type);
Expand Down Expand Up @@ -825,7 +805,7 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
return Status::OK();
}
ColumnIterator* it;
RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
RETURN_IF_ERROR(node->data.reader->new_iterator(&it, &tablet_column));
iter->reset(it);
return Status::OK();
}
Expand All @@ -836,7 +816,7 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
// Direct read extracted columns
const auto* node = _sub_column_tree[unique_id].find_leaf(relative_path);
ColumnIterator* it;
RETURN_IF_ERROR(node->data.reader->new_iterator(&it));
RETURN_IF_ERROR(node->data.reader->new_iterator(&it, &tablet_column));
iter->reset(it);
} else {
// Node contains column with children columns or has correspoding sparse columns
Expand Down Expand Up @@ -890,7 +870,8 @@ Status Segment::new_column_iterator(const TabletColumn& tablet_column,
}
// init iterator by unique id
ColumnIterator* it;
RETURN_IF_ERROR(_column_readers.at(tablet_column.unique_id())->new_iterator(&it));
RETURN_IF_ERROR(
_column_readers.at(tablet_column.unique_id())->new_iterator(&it, &tablet_column));
iter->reset(it);

if (config::enable_column_type_check && !tablet_column.is_agg_state_type() &&
Expand All @@ -909,7 +890,8 @@ Status Segment::new_column_iterator(int32_t unique_id, const StorageReadOptions*
std::unique_ptr<ColumnIterator>* iter) {
RETURN_IF_ERROR(_create_column_readers_once(opt->stats));
ColumnIterator* it;
RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it));
TabletColumn tablet_column = _tablet_schema->column_by_uid(unique_id);
RETURN_IF_ERROR(_column_readers.at(unique_id)->new_iterator(&it, &tablet_column));
iter->reset(it);
return Status::OK();
}
Expand Down
22 changes: 22 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,28 @@ using SegmentSharedPtr = std::shared_ptr<Segment>;
// change finished, client should disable all cached Segment for old TabletSchema.
class Segment : public std::enable_shared_from_this<Segment>, public MetadataAdder<Segment> {
public:
static Status new_default_iterator(const TabletColumn& tablet_column,
std::unique_ptr<ColumnIterator>* iter) {
if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
return Status::InternalError(
"invalid nonexistent column without default value. column_uid={}, "
"column_name={}, "
"column_type={}",
tablet_column.unique_id(), tablet_column.name(), tablet_column.type());
}
auto type_info = get_type_info(&tablet_column);
std::unique_ptr<DefaultValueColumnIterator> default_value_iter(
new DefaultValueColumnIterator(tablet_column.has_default_value(),
tablet_column.default_value(),
tablet_column.is_nullable(), std::move(type_info),
tablet_column.precision(), tablet_column.frac()));
ColumnIteratorOptions iter_opts;

RETURN_IF_ERROR(default_value_iter->init(iter_opts));
*iter = std::move(default_value_iter);
return Status::OK();
}

static Status open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id,
uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
const io::FileReaderOptions& reader_options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,14 @@ public static void checkSupportSchemaChangeForComplexType(Type checkType, Type o
if (checkType.isStructType() && other.isStructType()) {
StructType thisStructType = (StructType) checkType;
StructType otherStructType = (StructType) other;
if (thisStructType.getFields().size() != otherStructType.getFields().size()) {
throw new DdlException("Cannot change struct type with different field size");
if (thisStructType.getFields().size() > otherStructType.getFields().size()) {
throw new DdlException("Cannot change " + checkType.toSql() + " to " + other.toSql());
}
for (int i = 0; i < thisStructType.getFields().size(); i++) {
// do not support struct same position field name change
if (!thisStructType.getFields().get(i).getName().equals(otherStructType.getFields().get(i).getName())) {
throw new DdlException("Cannot change " + checkType.toSql() + " to " + other.toSql());
}
checkSupportSchemaChangeForComplexType(thisStructType.getFields().get(i).getType(),
otherStructType.getFields().get(i).getType(), true);
}
Expand Down
52 changes: 52 additions & 0 deletions regression-test/data/schema_change_p0/test_modify_struct.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sc_before --
0 {"col":"commiter"}
1 {"col":"amory"}

-- !sc_after --
0 {"col":"commiter"}
1 {"col":"amory"}
2 {"col":"amoryIsBetter"}

-- !sc_after1 --
0 {"col":"commiter", "col1":null}
1 {"col":"amory", "col1":null}
2 {"col":"amoryIsBetter", "col1":null}
3 {"col":"amoryIsBetterCommiter", "col1":1}

-- !sc_before --
1 1 {"col":"xxx"}
2 2 {"col":"yyy"}
3 3 {"col":"zzz"}

-- !sc_after --
1 1 {"col":"xxx", "col1":null}
2 2 {"col":"yyy", "col1":null}
3 3 {"col":"zzz", "col1":null}
3 4 {"col":"amoryIsBetterCommiter", "col1":1}
4 4 {"col":"amoryIsBetterCommiterAgain", "col1":2}

-- !sc_before --
1 1 {"col":"xxx"}
2 2 {"col":"yyy"}
3 3 {"col":"zzz"}

-- !sc_after --
1 1 {"col":"xxx", "col1":null}
2 2 {"col":"yyy", "col1":null}
3 3 {"col":"zzz", "col1":null}
3 4 {"col":"amoryIsBetterCommiter", "col1":1}
4 4 {"col":"amoryIsBetterCommiterAgain", "col1":2}

-- !sc_before --
1 1 {"col":"xxx"}
2 2 {"col":"yyy"}
3 3 {"col":"zzz"}

-- !sc_after --
1 1 {"col":"xxx", "col1":null}
2 2 {"col":"yyy", "col1":null}
3 3 {"col":"zzz", "col1":null}
3 4 {"col":"amoryIsBetterCommiter", "col1":1}
4 4 {"col":"amoryIsBetterCommiterAgain", "col1":2}

Loading

0 comments on commit 9d451d9

Please sign in to comment.