diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index f0741b5465f3..0fbcb95fc126 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -850,6 +850,48 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
+ BuilderBinRISCV64:
+ needs: [DockerHubPush]
+ runs-on: [self-hosted, builder]
+ steps:
+ - name: Set envs
+ run: |
+ cat >> "$GITHUB_ENV" << 'EOF'
+ TEMP_PATH=${{runner.temp}}/build_check
+ IMAGES_PATH=${{runner.temp}}/images_path
+ REPO_COPY=${{runner.temp}}/build_check/ClickHouse
+ CACHES_PATH=${{runner.temp}}/../ccaches
+ BUILD_NAME=binary_riscv64
+ EOF
+ - name: Download changed images
+ uses: actions/download-artifact@v3
+ with:
+ name: changed_images
+ path: ${{ env.IMAGES_PATH }}
+ - name: Check out repository code
+ uses: ClickHouse/checkout@v1
+ with:
+ clear-repository: true
+ submodules: true
+ fetch-depth: 0 # otherwise we will have no info about contributors
+ - name: Build
+ run: |
+ sudo rm -fr "$TEMP_PATH"
+ mkdir -p "$TEMP_PATH"
+ cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
+ cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
+ - name: Upload build URLs to artifacts
+ if: ${{ success() || failure() }}
+ uses: actions/upload-artifact@v3
+ with:
+ name: ${{ env.BUILD_URLS }}
+ path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
+ - name: Cleanup
+ if: always()
+ run: |
+ docker ps --quiet | xargs --no-run-if-empty docker kill ||:
+ docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
+ sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
############################################################################################
##################################### Docker images #######################################
############################################################################################
@@ -932,6 +974,7 @@ jobs:
- BuilderBinDarwinAarch64
- BuilderBinFreeBSD
- BuilderBinPPC64
+ - BuilderBinRISCV64
- BuilderBinAmd64Compat
- BuilderBinAarch64V80Compat
- BuilderBinClangTidy
diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml
index afc08f3e6376..f898e764915a 100644
--- a/.github/workflows/pull_request.yml
+++ b/.github/workflows/pull_request.yml
@@ -911,6 +911,47 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
+ BuilderBinRISCV64:
+ needs: [DockerHubPush, FastTest, StyleCheck]
+ runs-on: [self-hosted, builder]
+ steps:
+ - name: Set envs
+ run: |
+ cat >> "$GITHUB_ENV" << 'EOF'
+ TEMP_PATH=${{runner.temp}}/build_check
+ IMAGES_PATH=${{runner.temp}}/images_path
+ REPO_COPY=${{runner.temp}}/build_check/ClickHouse
+ CACHES_PATH=${{runner.temp}}/../ccaches
+ BUILD_NAME=binary_riscv64
+ EOF
+ - name: Download changed images
+ uses: actions/download-artifact@v3
+ with:
+ name: changed_images
+ path: ${{ env.IMAGES_PATH }}
+ - name: Check out repository code
+ uses: ClickHouse/checkout@v1
+ with:
+ clear-repository: true
+ submodules: true
+ - name: Build
+ run: |
+ sudo rm -fr "$TEMP_PATH"
+ mkdir -p "$TEMP_PATH"
+ cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
+ cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
+ - name: Upload build URLs to artifacts
+ if: ${{ success() || failure() }}
+ uses: actions/upload-artifact@v3
+ with:
+ name: ${{ env.BUILD_URLS }}
+ path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
+ - name: Cleanup
+ if: always()
+ run: |
+ docker ps --quiet | xargs --no-run-if-empty docker kill ||:
+ docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
+ sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
############################################################################################
##################################### Docker images #######################################
############################################################################################
@@ -992,6 +1033,7 @@ jobs:
- BuilderBinDarwinAarch64
- BuilderBinFreeBSD
- BuilderBinPPC64
+ - BuilderBinRISCV64
- BuilderBinAmd64Compat
- BuilderBinAarch64V80Compat
- BuilderBinClangTidy
diff --git a/cmake/target.cmake b/cmake/target.cmake
index 5ef45576fb79..0791da87bf0a 100644
--- a/cmake/target.cmake
+++ b/cmake/target.cmake
@@ -33,6 +33,19 @@ if (CMAKE_CROSSCOMPILING)
elseif (ARCH_PPC64LE)
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_SENTRY OFF CACHE INTERNAL "")
+ elseif (ARCH_RISCV64)
+ # RISC-V support is preliminary
+ set (GLIBC_COMPATIBILITY OFF CACHE INTERNAL "")
+ set (ENABLE_LDAP OFF CACHE INTERNAL "")
+ set (OPENSSL_NO_ASM ON CACHE INTERNAL "")
+ set (ENABLE_JEMALLOC ON CACHE INTERNAL "")
+ set (ENABLE_PARQUET OFF CACHE INTERNAL "")
+ set (USE_UNWIND OFF CACHE INTERNAL "")
+ set (ENABLE_GRPC OFF CACHE INTERNAL "")
+ set (ENABLE_HDFS OFF CACHE INTERNAL "")
+ set (ENABLE_MYSQL OFF CACHE INTERNAL "")
+ # It might be ok, but we need to update 'sysroot'
+ set (ENABLE_RUST OFF CACHE INTERNAL "")
elseif (ARCH_S390X)
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_SENTRY OFF CACHE INTERNAL "")
diff --git a/docker/packager/packager b/docker/packager/packager
index 1b3df858cd2e..e12bd55dde38 100755
--- a/docker/packager/packager
+++ b/docker/packager/packager
@@ -138,6 +138,7 @@ def parse_env_variables(
ARM_V80COMPAT_SUFFIX = "-aarch64-v80compat"
FREEBSD_SUFFIX = "-freebsd"
PPC_SUFFIX = "-ppc64le"
+ RISCV_SUFFIX = "-riscv64"
AMD64_COMPAT_SUFFIX = "-amd64-compat"
result = []
@@ -150,6 +151,7 @@ def parse_env_variables(
is_cross_arm = compiler.endswith(ARM_SUFFIX)
is_cross_arm_v80compat = compiler.endswith(ARM_V80COMPAT_SUFFIX)
is_cross_ppc = compiler.endswith(PPC_SUFFIX)
+ is_cross_riscv = compiler.endswith(RISCV_SUFFIX)
is_cross_freebsd = compiler.endswith(FREEBSD_SUFFIX)
is_amd64_compat = compiler.endswith(AMD64_COMPAT_SUFFIX)
@@ -206,6 +208,11 @@ def parse_env_variables(
cmake_flags.append(
"-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-ppc64le.cmake"
)
+ elif is_cross_riscv:
+ cc = compiler[: -len(RISCV_SUFFIX)]
+ cmake_flags.append(
+ "-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-riscv64.cmake"
+ )
elif is_amd64_compat:
cc = compiler[: -len(AMD64_COMPAT_SUFFIX)]
result.append("DEB_ARCH=amd64")
@@ -370,6 +377,7 @@ def parse_args() -> argparse.Namespace:
"clang-16-aarch64",
"clang-16-aarch64-v80compat",
"clang-16-ppc64le",
+ "clang-16-riscv64",
"clang-16-amd64-compat",
"clang-16-freebsd",
),
diff --git a/docker/test/upgrade/run.sh b/docker/test/upgrade/run.sh
index 82a88272df9b..b80613093428 100644
--- a/docker/test/upgrade/run.sh
+++ b/docker/test/upgrade/run.sh
@@ -67,6 +67,13 @@ start
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
+# Start server from previous release
+# Let's enable S3 storage by default
+export USE_S3_STORAGE_FOR_MERGE_TREE=1
+# Previous version may not be ready for fault injections
+export ZOOKEEPER_FAULT_INJECTION=0
+configure
+
# force_sync=false doesn't work correctly on some older versions
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
| sed "s|false|true|" \
@@ -81,13 +88,6 @@ mv /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp /etc/cli
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
-# Start server from previous release
-# Let's enable S3 storage by default
-export USE_S3_STORAGE_FOR_MERGE_TREE=1
-# Previous version may not be ready for fault injections
-export ZOOKEEPER_FAULT_INJECTION=0
-configure
-
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml
diff --git a/docker/test/util/Dockerfile b/docker/test/util/Dockerfile
index 85e888f1df7d..b255a2cc23dc 100644
--- a/docker/test/util/Dockerfile
+++ b/docker/test/util/Dockerfile
@@ -94,7 +94,10 @@ RUN mkdir /tmp/ccache \
&& rm -rf /tmp/ccache
ARG TARGETARCH
-ARG SCCACHE_VERSION=v0.4.1
+ARG SCCACHE_VERSION=v0.5.4
+ENV SCCACHE_IGNORE_SERVER_IO_ERROR=1
+# sccache requires a value for the region. So by default we use The Default Region
+ENV SCCACHE_REGION=us-east-1
RUN arch=${TARGETARCH:-amd64} \
&& case $arch in \
amd64) rarch=x86_64 ;; \
diff --git a/docs/_includes/install/universal.sh b/docs/_includes/install/universal.sh
index 1699be138c8a..5d4571aed9ea 100755
--- a/docs/_includes/install/universal.sh
+++ b/docs/_includes/install/universal.sh
@@ -33,6 +33,9 @@ then
elif [ "${ARCH}" = "powerpc64le" -o "${ARCH}" = "ppc64le" ]
then
DIR="powerpc64le"
+ elif [ "${ARCH}" = "riscv64" ]
+ then
+ DIR="riscv64"
fi
elif [ "${OS}" = "FreeBSD" ]
then
diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md
index cff13302cdc2..5f6cf98646bd 100644
--- a/docs/en/operations/settings/settings.md
+++ b/docs/en/operations/settings/settings.md
@@ -3201,6 +3201,40 @@ ENGINE = Log
└──────────────────────────────────────────────────────────────────────────┘
```
+## default_temporary_table_engine {#default_temporary_table_engine}
+
+Same as [default_table_engine](#default_table_engine) but for temporary tables.
+
+Default value: `Memory`.
+
+In this example, any new temporary table that does not specify an `Engine` will use the `Log` table engine:
+
+Query:
+
+```sql
+SET default_temporary_table_engine = 'Log';
+
+CREATE TEMPORARY TABLE my_table (
+ x UInt32,
+ y UInt32
+);
+
+SHOW CREATE TEMPORARY TABLE my_table;
+```
+
+Result:
+
+```response
+┌─statement────────────────────────────────────────────────────────────────┐
+│ CREATE TEMPORARY TABLE default.my_table
+(
+ `x` UInt32,
+ `y` UInt32
+)
+ENGINE = Log
+└──────────────────────────────────────────────────────────────────────────┘
+```
+
## data_type_default_nullable {#data_type_default_nullable}
Allows data types without explicit modifiers [NULL or NOT NULL](../../sql-reference/statements/create/table.md/#null-modifiers) in column definition will be [Nullable](../../sql-reference/data-types/nullable.md/#data_type-nullable).
diff --git a/docs/en/operations/system-tables/asynchronous_metric_log.md b/docs/en/operations/system-tables/asynchronous_metric_log.md
index 4290799b6bcf..efe57a202d85 100644
--- a/docs/en/operations/system-tables/asynchronous_metric_log.md
+++ b/docs/en/operations/system-tables/asynchronous_metric_log.md
@@ -9,7 +9,6 @@ Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Event date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Event time.
-- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Event time with microseconds resolution.
- `name` ([String](../../sql-reference/data-types/string.md)) — Metric name.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — Metric value.
@@ -20,18 +19,18 @@ SELECT * FROM system.asynchronous_metric_log LIMIT 10
```
``` text
-┌─event_date─┬──────────event_time─┬────event_time_microseconds─┬─name─────────────────────────────────────┬─────value─┐
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ CPUFrequencyMHz_0 │ 2120.9 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pmuzzy │ 743 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pdirty │ 26288 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.run_intervals │ 0 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.num_runs │ 0 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.retained │ 60694528 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.mapped │ 303161344 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.resident │ 260931584 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.metadata │ 12079488 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.allocated │ 133756128 │
-└────────────┴─────────────────────┴────────────────────────────┴──────────────────────────────────────────┴───────────┘
+┌─event_date─┬──────────event_time─┬─name─────────────────────────────────────┬─────value─┐
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ CPUFrequencyMHz_0 │ 2120.9 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pmuzzy │ 743 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pdirty │ 26288 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.run_intervals │ 0 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.num_runs │ 0 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.retained │ 60694528 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.mapped │ 303161344 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.resident │ 260931584 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.metadata │ 12079488 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.allocated │ 133756128 │
+└────────────┴─────────────────────┴──────────────────────────────────────────┴───────────┘
```
**See Also**
diff --git a/docs/ru/operations/system-tables/asynchronous_metric_log.md b/docs/ru/operations/system-tables/asynchronous_metric_log.md
index 886fbb6cab0a..5145889c95f6 100644
--- a/docs/ru/operations/system-tables/asynchronous_metric_log.md
+++ b/docs/ru/operations/system-tables/asynchronous_metric_log.md
@@ -8,7 +8,6 @@ slug: /ru/operations/system-tables/asynchronous_metric_log
Столбцы:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — дата события.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — время события.
-- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — время события в микросекундах.
- `name` ([String](../../sql-reference/data-types/string.md)) — название метрики.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — значение метрики.
diff --git a/docs/zh/operations/system-tables/asynchronous_metric_log.md b/docs/zh/operations/system-tables/asynchronous_metric_log.md
index 419ad2a7ed60..9fa399f1aed2 100644
--- a/docs/zh/operations/system-tables/asynchronous_metric_log.md
+++ b/docs/zh/operations/system-tables/asynchronous_metric_log.md
@@ -8,7 +8,6 @@ slug: /zh/operations/system-tables/asynchronous_metric_log
列:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — 事件日期。
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — 事件时间。
-- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — 事件时间(微秒)。
- `name` ([String](../../sql-reference/data-types/string.md)) — 指标名。
- `value` ([Float64](../../sql-reference/data-types/float.md)) — 指标值。
@@ -17,18 +16,18 @@ slug: /zh/operations/system-tables/asynchronous_metric_log
SELECT * FROM system.asynchronous_metric_log LIMIT 10
```
``` text
-┌─event_date─┬──────────event_time─┬────event_time_microseconds─┬─name─────────────────────────────────────┬─────value─┐
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ CPUFrequencyMHz_0 │ 2120.9 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pmuzzy │ 743 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pdirty │ 26288 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.run_intervals │ 0 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.num_runs │ 0 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.retained │ 60694528 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.mapped │ 303161344 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.resident │ 260931584 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.metadata │ 12079488 │
-│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.allocated │ 133756128 │
-└────────────┴─────────────────────┴────────────────────────────┴──────────────────────────────────────────┴───────────┘
+┌─event_date─┬──────────event_time─┬─name─────────────────────────────────────┬─────value─┐
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ CPUFrequencyMHz_0 │ 2120.9 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pmuzzy │ 743 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pdirty │ 26288 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.run_intervals │ 0 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.num_runs │ 0 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.retained │ 60694528 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.mapped │ 303161344 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.resident │ 260931584 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.metadata │ 12079488 │
+│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.allocated │ 133756128 │
+└────────────┴─────────────────────┴──────────────────────────────────────────┴───────────┘
```
**另请参阅**
diff --git a/programs/disks/CommandCopy.cpp b/programs/disks/CommandCopy.cpp
index 1cfce7fc0224..5228b582d25d 100644
--- a/programs/disks/CommandCopy.cpp
+++ b/programs/disks/CommandCopy.cpp
@@ -59,7 +59,7 @@ class CommandCopy final : public ICommand
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);
- disk_from->copy(relative_path_from, disk_to, relative_path_to);
+ disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to);
}
};
}
diff --git a/programs/keeper-converter/KeeperConverter.cpp b/programs/keeper-converter/KeeperConverter.cpp
index a049e6bc2b34..20448aafa2f1 100644
--- a/programs/keeper-converter/KeeperConverter.cpp
+++ b/programs/keeper-converter/KeeperConverter.cpp
@@ -42,7 +42,7 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
{
auto keeper_context = std::make_shared(true);
keeper_context->setDigestEnabled(true);
- keeper_context->setSnapshotDisk(std::make_shared("Keeper-snapshots", options["output-dir"].as(), 0));
+ keeper_context->setSnapshotDisk(std::make_shared("Keeper-snapshots", options["output-dir"].as()));
DB::KeeperStorage storage(/* tick_time_ms */ 500, /* superdigest */ "", keeper_context, /* initialize_system_nodes */ false);
diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp
index d2d8a0d07fb7..755b7f17d988 100644
--- a/programs/server/Server.cpp
+++ b/programs/server/Server.cpp
@@ -1581,6 +1581,15 @@ try
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
+ /// Build loggers before tables startup to make log messages from tables
+ /// attach available in system.text_log
+ {
+ String level_str = config().getString("text_log.level", "");
+ int level = level_str.empty() ? INT_MAX : Poco::Logger::parseLevel(level_str);
+ setTextLog(global_context->getTextLog(), level);
+
+ buildLoggers(config(), logger());
+ }
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper);
attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA));
@@ -1707,14 +1716,6 @@ try
/// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread.
async_metrics.start();
- {
- String level_str = config().getString("text_log.level", "");
- int level = level_str.empty() ? INT_MAX : Poco::Logger::parseLevel(level_str);
- setTextLog(global_context->getTextLog(), level);
- }
-
- buildLoggers(config(), logger());
-
main_config_reloader->start();
access_control.startPeriodicReloading();
diff --git a/src/Backups/tests/gtest_backup_entries.cpp b/src/Backups/tests/gtest_backup_entries.cpp
index ca603d20787a..75972b35ba46 100644
--- a/src/Backups/tests/gtest_backup_entries.cpp
+++ b/src/Backups/tests/gtest_backup_entries.cpp
@@ -24,7 +24,7 @@ class BackupEntriesTest : public ::testing::Test
/// Make local disk.
temp_dir = std::make_unique();
temp_dir->createDirectories();
- local_disk = std::make_shared("local_disk", temp_dir->path() + "/", 0);
+ local_disk = std::make_shared("local_disk", temp_dir->path() + "/");
/// Make encrypted disk.
auto settings = std::make_unique();
@@ -38,7 +38,7 @@ class BackupEntriesTest : public ::testing::Test
settings->current_key = key;
settings->current_key_fingerprint = fingerprint;
- encrypted_disk = std::make_shared("encrypted_disk", std::move(settings), true);
+ encrypted_disk = std::make_shared("encrypted_disk", std::move(settings));
}
void TearDown() override
diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp
index 105a7c0548f8..f61cfae566db 100644
--- a/src/Common/CurrentMetrics.cpp
+++ b/src/Common/CurrentMetrics.cpp
@@ -93,8 +93,8 @@
M(ThreadPoolFSReaderThreadsActive, "Number of threads in the thread pool for local_filesystem_read_method=threadpool running a task.") \
M(BackupsIOThreads, "Number of threads in the BackupsIO thread pool.") \
M(BackupsIOThreadsActive, "Number of threads in the BackupsIO thread pool running a task.") \
- M(DiskObjectStorageAsyncThreads, "Number of threads in the async thread pool for DiskObjectStorage.") \
- M(DiskObjectStorageAsyncThreadsActive, "Number of threads in the async thread pool for DiskObjectStorage running a task.") \
+ M(DiskObjectStorageAsyncThreads, "Obsolete metric, shows nothing.") \
+ M(DiskObjectStorageAsyncThreadsActive, "Obsolete metric, shows nothing.") \
M(StorageHiveThreads, "Number of threads in the StorageHive thread pool.") \
M(StorageHiveThreadsActive, "Number of threads in the StorageHive thread pool running a task.") \
M(TablesLoaderThreads, "Number of threads in the tables loader thread pool.") \
@@ -141,6 +141,8 @@
M(MergeTreeOutdatedPartsLoaderThreadsActive, "Number of active threads in the threadpool for loading Outdated data parts.") \
M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \
M(MergeTreePartsCleanerThreadsActive, "Number of threads in the MergeTree parts cleaner thread pool running a task.") \
+ M(IDiskCopierThreads, "Number of threads for copying data between disks of different types.") \
+ M(IDiskCopierThreadsActive, "Number of threads for copying data between disks of different types running a task.") \
M(SystemReplicasThreads, "Number of threads in the system.replicas thread pool.") \
M(SystemReplicasThreadsActive, "Number of threads in the system.replicas thread pool running a task.") \
M(RestartReplicaThreads, "Number of threads in the RESTART REPLICA thread pool.") \
diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp
index ee268be45f63..054a60cb91db 100644
--- a/src/Common/Exception.cpp
+++ b/src/Common/Exception.cpp
@@ -418,6 +418,18 @@ PreformattedMessage getCurrentExceptionMessageAndPattern(bool with_stacktrace, b
<< " (version " << VERSION_STRING << VERSION_OFFICIAL << ")";
}
catch (...) {}
+
+// #ifdef ABORT_ON_LOGICAL_ERROR
+// try
+// {
+// throw;
+// }
+// catch (const std::logic_error &)
+// {
+// abortOnFailedAssertion(stream.str());
+// }
+// catch (...) {}
+// #endif
}
catch (...)
{
diff --git a/src/Coordination/KeeperContext.cpp b/src/Coordination/KeeperContext.cpp
index 3c3c0500540a..408344ee67fd 100644
--- a/src/Coordination/KeeperContext.cpp
+++ b/src/Coordination/KeeperContext.cpp
@@ -220,7 +220,7 @@ KeeperContext::Storage KeeperContext::getLogsPathFromConfig(const Poco::Util::Ab
if (!fs::exists(path))
fs::create_directories(path);
- return std::make_shared("LocalLogDisk", path, 0);
+ return std::make_shared("LocalLogDisk", path);
};
/// the most specialized path
@@ -246,7 +246,7 @@ KeeperContext::Storage KeeperContext::getSnapshotsPathFromConfig(const Poco::Uti
if (!fs::exists(path))
fs::create_directories(path);
- return std::make_shared("LocalSnapshotDisk", path, 0);
+ return std::make_shared("LocalSnapshotDisk", path);
};
/// the most specialized path
@@ -272,7 +272,7 @@ KeeperContext::Storage KeeperContext::getStatePathFromConfig(const Poco::Util::A
if (!fs::exists(path))
fs::create_directories(path);
- return std::make_shared("LocalStateFileDisk", path, 0);
+ return std::make_shared("LocalStateFileDisk", path);
};
if (config.has("keeper_server.state_storage_disk"))
diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp
index 0f60c960b8b0..6df149bbfbef 100644
--- a/src/Coordination/tests/gtest_coordination.cpp
+++ b/src/Coordination/tests/gtest_coordination.cpp
@@ -71,16 +71,16 @@ class CoordinationTest : public ::testing::TestWithParam
DB::KeeperContextPtr keeper_context = std::make_shared(true);
Poco::Logger * log{&Poco::Logger::get("CoordinationTest")};
- void setLogDirectory(const std::string & path) { keeper_context->setLogDisk(std::make_shared("LogDisk", path, 0)); }
+ void setLogDirectory(const std::string & path) { keeper_context->setLogDisk(std::make_shared("LogDisk", path)); }
void setSnapshotDirectory(const std::string & path)
{
- keeper_context->setSnapshotDisk(std::make_shared("SnapshotDisk", path, 0));
+ keeper_context->setSnapshotDisk(std::make_shared("SnapshotDisk", path));
}
void setStateFileDirectory(const std::string & path)
{
- keeper_context->setStateFileDisk(std::make_shared("StateFile", path, 0));
+ keeper_context->setStateFileDisk(std::make_shared("StateFile", path));
}
};
@@ -1503,9 +1503,9 @@ void testLogAndStateMachine(
using namespace DB;
ChangelogDirTest snapshots("./snapshots");
- keeper_context->setSnapshotDisk(std::make_shared("SnapshotDisk", "./snapshots", 0));
+ keeper_context->setSnapshotDisk(std::make_shared("SnapshotDisk", "./snapshots"));
ChangelogDirTest logs("./logs");
- keeper_context->setLogDisk(std::make_shared("LogDisk", "./logs", 0));
+ keeper_context->setLogDisk(std::make_shared("LogDisk", "./logs"));
ResponsesQueue queue(std::numeric_limits::max());
SnapshotsQueue snapshots_queue{1};
diff --git a/src/Core/Settings.h b/src/Core/Settings.h
index b7d12a518c8b..59373df3ecec 100644
--- a/src/Core/Settings.h
+++ b/src/Core/Settings.h
@@ -517,6 +517,7 @@ class IColumn;
M(Seconds, wait_for_window_view_fire_signal_timeout, 10, "Timeout for waiting for window view fire signal in event time processing", 0) \
M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\
+ M(DefaultTableEngine, default_temporary_table_engine, DefaultTableEngine::Memory, "Default table engine used when ENGINE is not set in CREATE TEMPORARY statement.",0) \
M(DefaultTableEngine, default_table_engine, DefaultTableEngine::None, "Default table engine used when ENGINE is not set in CREATE statement.",0) \
M(Bool, show_table_uuid_in_table_create_query_if_not_nil, false, "For tables in databases with Engine=Atomic show UUID of the table in its CREATE query.", 0) \
M(Bool, database_atomic_wait_for_drop_and_detach_synchronously, false, "When executing DROP or DETACH TABLE in Atomic database, wait for table data to be finally dropped or detached.", 0) \
diff --git a/src/Disks/DiskEncrypted.cpp b/src/Disks/DiskEncrypted.cpp
index 6b515b100c9e..677dd73cc002 100644
--- a/src/Disks/DiskEncrypted.cpp
+++ b/src/Disks/DiskEncrypted.cpp
@@ -285,19 +285,32 @@ class DiskEncryptedReservation : public IReservation
};
DiskEncrypted::DiskEncrypted(
- const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_, bool use_fake_transaction_)
- : DiskEncrypted(name_, parseDiskEncryptedSettings(name_, config_, config_prefix_, map_), use_fake_transaction_)
+ const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_)
+ : DiskEncrypted(name_, parseDiskEncryptedSettings(name_, config_, config_prefix_, map_), config_, config_prefix_)
{
}
-DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr settings_, bool use_fake_transaction_)
+DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr settings_,
+ const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_)
+ : IDisk(name_, config_, config_prefix_)
+ , delegate(settings_->wrapped_disk)
+ , encrypted_name(name_)
+ , disk_path(settings_->disk_path)
+ , disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)
+ , current_settings(std::move(settings_))
+ , use_fake_transaction(config_.getBool(config_prefix_ + ".use_fake_transaction", true))
+{
+ delegate->createDirectories(disk_path);
+}
+
+DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr settings_)
: IDisk(name_)
, delegate(settings_->wrapped_disk)
, encrypted_name(name_)
, disk_path(settings_->disk_path)
, disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)
, current_settings(std::move(settings_))
- , use_fake_transaction(use_fake_transaction_)
+ , use_fake_transaction(true)
{
delegate->createDirectories(disk_path);
}
@@ -310,32 +323,6 @@ ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
return std::make_unique(std::static_pointer_cast(shared_from_this()), std::move(reservation));
}
-void DiskEncrypted::copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path)
-{
- /// Check if we can copy the file without deciphering.
- if (isSameDiskType(*this, *to_disk))
- {
- /// Disk type is the same, check if the key is the same too.
- if (auto * to_disk_enc = typeid_cast(to_disk.get()))
- {
- auto from_settings = current_settings.get();
- auto to_settings = to_disk_enc->current_settings.get();
- if (from_settings->all_keys == to_settings->all_keys)
- {
- /// Keys are the same so we can simply copy the encrypted file.
- auto wrapped_from_path = wrappedPath(from_path);
- auto to_delegate = to_disk_enc->delegate;
- auto wrapped_to_path = to_disk_enc->wrappedPath(to_path);
- delegate->copy(wrapped_from_path, to_delegate, wrapped_to_path);
- return;
- }
- }
- }
-
- /// Copy the file through buffers with deciphering.
- copyThroughBuffers(from_path, to_disk, to_path);
-}
-
void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir)
{
@@ -359,11 +346,8 @@ void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::sha
}
}
- if (!to_disk->exists(to_dir))
- to_disk->createDirectories(to_dir);
-
/// Copy the file through buffers with deciphering.
- copyThroughBuffers(from_dir, to_disk, to_dir);
+ IDisk::copyDirectoryContent(from_dir, to_disk, to_dir);
}
std::unique_ptr DiskEncrypted::readFile(
@@ -443,7 +427,7 @@ std::unordered_map DiskEncrypted::getSerializedMetadata(const st
void DiskEncrypted::applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
- ContextPtr /*context*/,
+ ContextPtr context,
const String & config_prefix,
const DisksMap & disk_map)
{
@@ -455,6 +439,7 @@ void DiskEncrypted::applyNewSettings(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Сhanging disk path on the fly is not supported. Disk {}", name);
current_settings.set(std::move(new_settings));
+ IDisk::applyNewSettings(config, context, config_prefix, disk_map);
}
void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check)
@@ -467,7 +452,7 @@ void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check)
const DisksMap & map) -> DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
- DiskPtr disk = std::make_shared(name, config, config_prefix, map, config.getBool(config_prefix + ".use_fake_transaction", true));
+ DiskPtr disk = std::make_shared(name, config, config_prefix, map);
disk->startup(context, skip_access_check);
return disk;
};
diff --git a/src/Disks/DiskEncrypted.h b/src/Disks/DiskEncrypted.h
index 69d051a95371..9963770bd1c7 100644
--- a/src/Disks/DiskEncrypted.h
+++ b/src/Disks/DiskEncrypted.h
@@ -21,8 +21,10 @@ class WriteBufferFromFileBase;
class DiskEncrypted : public IDisk
{
public:
- DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_, bool use_fake_transaction_);
- DiskEncrypted(const String & name_, std::unique_ptr settings_, bool use_fake_transaction_);
+ DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_);
+ DiskEncrypted(const String & name_, std::unique_ptr settings_,
+ const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_);
+ DiskEncrypted(const String & name_, std::unique_ptr settings_);
const String & getName() const override { return encrypted_name; }
const String & getPath() const override { return disk_absolute_path; }
@@ -110,8 +112,6 @@ class DiskEncrypted : public IDisk
delegate->listFiles(wrapped_path, file_names);
}
- void copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path) override;
-
void copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir) override;
std::unique_ptr readFile(
diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp
index c76ea2891013..9a61c176cf65 100644
--- a/src/Disks/DiskLocal.cpp
+++ b/src/Disks/DiskLocal.cpp
@@ -417,29 +417,12 @@ bool inline isSameDiskType(const IDisk & one, const IDisk & another)
return typeid(one) == typeid(another);
}
-void DiskLocal::copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path)
-{
- if (isSameDiskType(*this, *to_disk))
- {
- fs::path to = fs::path(to_disk->getPath()) / to_path;
- fs::path from = fs::path(disk_path) / from_path;
- if (from_path.ends_with('/'))
- from = from.parent_path();
- if (fs::is_directory(from))
- to /= from.filename();
-
- fs::copy(from, to, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
- }
- else
- copyThroughBuffers(from_path, to_disk, to_path, /* copy_root_dir */ true); /// Base implementation.
-}
-
void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir)
{
if (isSameDiskType(*this, *to_disk))
- fs::copy(from_dir, to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
+ fs::copy(fs::path(disk_path) / from_dir, fs::path(to_disk->getPath()) / to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
else
- copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir */ false); /// Base implementation.
+ IDisk::copyDirectoryContent(from_dir, to_disk, to_dir);
}
SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
@@ -448,7 +431,7 @@ SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
}
-void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &)
+void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & disk_map)
{
String new_disk_path;
UInt64 new_keep_free_space_bytes;
@@ -460,10 +443,13 @@ void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & confi
if (keep_free_space_bytes != new_keep_free_space_bytes)
keep_free_space_bytes = new_keep_free_space_bytes;
+
+ IDisk::applyNewSettings(config, context, config_prefix, disk_map);
}
-DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
- : IDisk(name_)
+DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_,
+ const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
+ : IDisk(name_, config, config_prefix)
, disk_path(path_)
, keep_free_space_bytes(keep_free_space_bytes_)
, logger(&Poco::Logger::get("DiskLocal"))
@@ -472,13 +458,24 @@ DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_fre
}
DiskLocal::DiskLocal(
- const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context, UInt64 local_disk_check_period_ms)
- : DiskLocal(name_, path_, keep_free_space_bytes_)
+ const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context,
+ const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
+ : DiskLocal(name_, path_, keep_free_space_bytes_, config, config_prefix)
{
+ auto local_disk_check_period_ms = config.getUInt("local_disk_check_period_ms", 0);
if (local_disk_check_period_ms > 0)
disk_checker = std::make_unique(this, context, local_disk_check_period_ms);
}
+DiskLocal::DiskLocal(const String & name_, const String & path_)
+ : IDisk(name_)
+ , disk_path(path_)
+ , keep_free_space_bytes(0)
+ , logger(&Poco::Logger::get("DiskLocal"))
+ , data_source_description(getLocalDataSourceDescription(disk_path))
+{
+}
+
DataSourceDescription DiskLocal::getDataSourceDescription() const
{
return data_source_description;
@@ -720,7 +717,7 @@ void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check)
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
std::shared_ptr disk
- = std::make_shared(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0));
+ = std::make_shared(name, path, keep_free_space_bytes, context, config, config_prefix);
disk->startup(context, skip_access_check);
return disk;
};
diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h
index 3d340ae40b72..b30732b67fdb 100644
--- a/src/Disks/DiskLocal.h
+++ b/src/Disks/DiskLocal.h
@@ -19,13 +19,17 @@ class DiskLocal : public IDisk
friend class DiskLocalCheckThread;
friend class DiskLocalReservation;
- DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_);
+ DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_,
+ const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
DiskLocal(
const String & name_,
const String & path_,
UInt64 keep_free_space_bytes_,
ContextPtr context,
- UInt64 local_disk_check_period_ms);
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix);
+
+ DiskLocal(const String & name_, const String & path_);
const String & getPath() const override { return disk_path; }
@@ -63,8 +67,6 @@ class DiskLocal : public IDisk
void replaceFile(const String & from_path, const String & to_path) override;
- void copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path) override;
-
void copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir) override;
void listFiles(const String & path, std::vector & file_names) const override;
diff --git a/src/Disks/DiskSelector.cpp b/src/Disks/DiskSelector.cpp
index 9894e4251a2b..e51f79867b59 100644
--- a/src/Disks/DiskSelector.cpp
+++ b/src/Disks/DiskSelector.cpp
@@ -53,7 +53,7 @@ void DiskSelector::initialize(const Poco::Util::AbstractConfiguration & config,
disks.emplace(
default_disk_name,
std::make_shared(
- default_disk_name, context->getPath(), 0, context, config.getUInt("local_disk_check_period_ms", 0)));
+ default_disk_name, context->getPath(), 0, context, config, config_prefix));
}
is_initialized = true;
diff --git a/src/Disks/Executor.h b/src/Disks/Executor.h
deleted file mode 100644
index 7330bcdd559d..000000000000
--- a/src/Disks/Executor.h
+++ /dev/null
@@ -1,42 +0,0 @@
-#pragma once
-
-#include
-#include
-
-namespace DB
-{
-
-/// Interface to run task asynchronously with possibility to wait for execution.
-class Executor
-{
-public:
- virtual ~Executor() = default;
- virtual std::future execute(std::function task) = 0;
-};
-
-/// Executes task synchronously in case when disk doesn't support async operations.
-class SyncExecutor : public Executor
-{
-public:
- SyncExecutor() = default;
- std::future execute(std::function task) override
- {
- auto promise = std::make_shared>();
- try
- {
- task();
- promise->set_value();
- }
- catch (...)
- {
- try
- {
- promise->set_exception(std::current_exception());
- }
- catch (...) { }
- }
- return promise->get_future();
- }
-};
-
-}
diff --git a/src/Disks/IDisk.cpp b/src/Disks/IDisk.cpp
index bca867fec76c..544ba014fde8 100644
--- a/src/Disks/IDisk.cpp
+++ b/src/Disks/IDisk.cpp
@@ -1,5 +1,4 @@
#include "IDisk.h"
-#include "Disks/Executor.h"
#include
#include
#include
@@ -80,18 +79,33 @@ UInt128 IDisk::getEncryptedFileIV(const String &) const
using ResultsCollector = std::vector>;
-void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, Executor & exec, ResultsCollector & results, bool copy_root_dir, const WriteSettings & settings)
+void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, ThreadPool & pool, ResultsCollector & results, bool copy_root_dir, const WriteSettings & settings)
{
if (from_disk.isFile(from_path))
{
- auto result = exec.execute(
- [&from_disk, from_path, &to_disk, to_path, &settings]()
+ auto promise = std::make_shared>();
+ auto future = promise->get_future();
+
+ pool.scheduleOrThrowOnError(
+ [&from_disk, from_path, &to_disk, to_path, &settings, promise, thread_group = CurrentThread::getGroup()]()
{
- setThreadName("DiskCopier");
- from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), settings);
+ try
+ {
+ SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached(););
+
+ if (thread_group)
+ CurrentThread::attachToGroup(thread_group);
+
+ from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), settings);
+ promise->set_value();
+ }
+ catch (...)
+ {
+ promise->set_exception(std::current_exception());
+ }
});
- results.push_back(std::move(result));
+ results.push_back(std::move(future));
}
else
{
@@ -104,13 +118,12 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
}
for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next())
- asyncCopy(from_disk, it->path(), to_disk, dest, exec, results, true, settings);
+ asyncCopy(from_disk, it->path(), to_disk, dest, pool, results, true, settings);
}
}
void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr & to_disk, const String & to_path, bool copy_root_dir)
{
- auto & exec = to_disk->getExecutor();
ResultsCollector results;
WriteSettings settings;
@@ -118,17 +131,12 @@ void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr & to_disk, const String & to_path)
-{
- copyThroughBuffers(from_path, to_disk, to_path, true);
+ result.get(); /// May rethrow an exception
}
@@ -137,7 +145,7 @@ void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<
if (!to_disk->exists(to_dir))
to_disk->createDirectories(to_dir);
- copyThroughBuffers(from_dir, to_disk, to_dir, false);
+ copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir */ false);
}
void IDisk::truncateFile(const String &, size_t)
@@ -233,4 +241,9 @@ catch (Exception & e)
throw;
}
+void IDisk::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr /*context*/, const String & config_prefix, const DisksMap & /*map*/)
+{
+ copying_thread_pool.setMaxThreads(config.getInt(config_prefix + ".thread_pool_size", 16));
+}
+
}
diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h
index 5d75f3b70e5a..ccef3db2dac1 100644
--- a/src/Disks/IDisk.h
+++ b/src/Disks/IDisk.h
@@ -6,7 +6,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -35,6 +34,12 @@ namespace Poco
}
}
+namespace CurrentMetrics
+{
+ extern const Metric IDiskCopierThreads;
+ extern const Metric IDiskCopierThreadsActive;
+}
+
namespace DB
{
@@ -110,9 +115,15 @@ class IDisk : public Space
{
public:
/// Default constructor.
- explicit IDisk(const String & name_, std::shared_ptr executor_ = std::make_shared())
+ IDisk(const String & name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: name(name_)
- , executor(executor_)
+ , copying_thread_pool(CurrentMetrics::IDiskCopierThreads, CurrentMetrics::IDiskCopierThreadsActive, config.getUInt(config_prefix + ".thread_pool_size", 16))
+ {
+ }
+
+ explicit IDisk(const String & name_)
+ : name(name_)
+ , copying_thread_pool(CurrentMetrics::IDiskCopierThreads, CurrentMetrics::IDiskCopierThreadsActive, 16)
{
}
@@ -181,9 +192,6 @@ class IDisk : public Space
/// If a file with `to_path` path already exists, it will be replaced.
virtual void replaceFile(const String & from_path, const String & to_path) = 0;
- /// Recursively copy data containing at `from_path` to `to_path` located at `to_disk`.
- virtual void copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path);
-
/// Recursively copy files from from_dir to to_dir. Create to_dir if not exists.
virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr & to_disk, const String & to_dir);
@@ -379,7 +387,7 @@ class IDisk : public Space
virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const;
/// Applies new settings for disk in runtime.
- virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) {}
+ virtual void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map);
/// Quite leaky abstraction. Some disks can use additional disk to store
/// some parts of metadata. In general case we have only one disk itself and
@@ -459,9 +467,6 @@ class IDisk : public Space
const String name;
- /// Returns executor to perform asynchronous operations.
- virtual Executor & getExecutor() { return *executor; }
-
/// Base implementation of the function copy().
/// It just opens two files, reads data by portions from the first file, and writes it to the second one.
/// A derived class may override copy() to provide a faster implementation.
@@ -470,7 +475,7 @@ class IDisk : public Space
virtual void checkAccessImpl(const String & path);
private:
- std::shared_ptr executor;
+ ThreadPool copying_thread_pool;
bool is_custom_disk = false;
/// Check access to the disk.
diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/registerDiskAzureBlobStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/registerDiskAzureBlobStorage.cpp
index 562b2b2fec08..a09befe84a84 100644
--- a/src/Disks/ObjectStorages/AzureBlobStorage/registerDiskAzureBlobStorage.cpp
+++ b/src/Disks/ObjectStorages/AzureBlobStorage/registerDiskAzureBlobStorage.cpp
@@ -31,9 +31,6 @@ void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context));
- uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
- bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
-
auto metadata_storage = std::make_shared(metadata_disk, "");
std::shared_ptr azure_blob_storage_disk = std::make_shared(
@@ -42,8 +39,8 @@ void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access
"DiskAzureBlobStorage",
std::move(metadata_storage),
std::move(azure_object_storage),
- send_metadata,
- copy_thread_pool_size
+ config,
+ config_prefix
);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp
index e5bbd2ca0c6c..8553b4791493 100644
--- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp
+++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp
@@ -18,12 +18,6 @@
#include
#include
-namespace CurrentMetrics
-{
- extern const Metric DiskObjectStorageAsyncThreads;
- extern const Metric DiskObjectStorageAsyncThreadsActive;
-}
-
namespace DB
{
@@ -37,55 +31,6 @@ namespace ErrorCodes
extern const int DIRECTORY_DOESNT_EXIST;
}
-namespace
-{
-
-/// Runs tasks asynchronously using thread pool.
-class AsyncThreadPoolExecutor : public Executor
-{
-public:
- AsyncThreadPoolExecutor(const String & name_, int thread_pool_size)
- : name(name_)
- , pool(CurrentMetrics::DiskObjectStorageAsyncThreads, CurrentMetrics::DiskObjectStorageAsyncThreadsActive, thread_pool_size)
- {}
-
- std::future execute(std::function task) override
- {
- auto promise = std::make_shared>();
- pool.scheduleOrThrowOnError(
- [promise, task]()
- {
- try
- {
- task();
- promise->set_value();
- }
- catch (...)
- {
- tryLogCurrentException("Failed to run async task");
-
- try
- {
- promise->set_exception(std::current_exception());
- }
- catch (...) {}
- }
- });
-
- return promise->get_future();
- }
-
- void setMaxThreads(size_t threads)
- {
- pool.setMaxThreads(threads);
- }
-
-private:
- String name;
- ThreadPool pool;
-};
-
-}
DiskTransactionPtr DiskObjectStorage::createTransaction()
{
@@ -105,27 +50,20 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction()
send_metadata ? metadata_helper.get() : nullptr);
}
-std::shared_ptr DiskObjectStorage::getAsyncExecutor(const std::string & log_name, size_t size)
-{
- static auto reader = std::make_shared(log_name, size);
- return reader;
-}
-
DiskObjectStorage::DiskObjectStorage(
const String & name_,
const String & object_storage_root_path_,
const String & log_name,
MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_,
- bool send_metadata_,
- uint64_t thread_pool_size_)
- : IDisk(name_, getAsyncExecutor(log_name, thread_pool_size_))
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix)
+ : IDisk(name_, config, config_prefix)
, object_storage_root_path(object_storage_root_path_)
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
- , send_metadata(send_metadata_)
- , threadpool_size(thread_pool_size_)
+ , send_metadata(config.getBool(config_prefix + ".send_metadata", false))
, metadata_helper(std::make_unique(this, ReadSettings{}))
{}
@@ -234,19 +172,23 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
transaction->commit();
}
-
-void DiskObjectStorage::copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path)
+void DiskObjectStorage::copyFile( /// NOLINT
+ const String & from_file_path,
+ IDisk & to_disk,
+ const String & to_file_path,
+ const WriteSettings & settings)
{
- /// It's the same object storage disk
- if (this == to_disk.get())
+ if (this == &to_disk)
{
+ /// It may use s3-server-side copy
auto transaction = createObjectStorageTransaction();
- transaction->copyFile(from_path, to_path);
+ transaction->copyFile(from_file_path, to_file_path);
transaction->commit();
}
else
{
- IDisk::copy(from_path, to_disk, to_path);
+ /// Copy through buffers
+ IDisk::copyFile(from_file_path, to_disk, to_file_path, settings);
}
}
@@ -519,14 +461,15 @@ bool DiskObjectStorage::isWriteOnce() const
DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
{
+ const auto config_prefix = "storage_configuration.disks." + name;
return std::make_shared(
getName(),
object_storage_root_path,
getName(),
metadata_storage,
object_storage,
- send_metadata,
- threadpool_size);
+ Context::getGlobalContextInstance()->getConfigRef(),
+ config_prefix);
}
std::unique_ptr DiskObjectStorage::readFile(
@@ -582,13 +525,12 @@ void DiskObjectStorage::writeFileUsingBlobWritingFunction(const String & path, W
}
void DiskObjectStorage::applyNewSettings(
- const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &)
+ const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String & /*config_prefix*/, const DisksMap & disk_map)
{
+ /// FIXME we cannot use config_prefix that was passed through arguments because the disk may be wrapped with cache and we need another name
const auto config_prefix = "storage_configuration.disks." + name;
object_storage->applyNewSettings(config, config_prefix, context_);
-
- if (AsyncThreadPoolExecutor * exec = dynamic_cast(&getExecutor()))
- exec->setMaxThreads(config.getInt(config_prefix + ".thread_pool_size", 16));
+ IDisk::applyNewSettings(config, context_, config_prefix, disk_map);
}
void DiskObjectStorage::restoreMetadataIfNeeded(
diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.h b/src/Disks/ObjectStorages/DiskObjectStorage.h
index c9820956a4d3..d22d13104137 100644
--- a/src/Disks/ObjectStorages/DiskObjectStorage.h
+++ b/src/Disks/ObjectStorages/DiskObjectStorage.h
@@ -33,8 +33,8 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;
const String & log_name,
MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_,
- bool send_metadata_,
- uint64_t thread_pool_size_);
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix);
/// Create fake transaction
DiskTransactionPtr createTransaction() override;
@@ -152,7 +152,11 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;
Strings getBlobPath(const String & path) const override;
void writeFileUsingBlobWritingFunction(const String & path, WriteMode mode, WriteBlobFunction && write_blob_function) override;
- void copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path) override;
+ void copyFile( /// NOLINT
+ const String & from_file_path,
+ IDisk & to_disk,
+ const String & to_file_path,
+ const WriteSettings & settings = {}) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;
@@ -198,8 +202,6 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;
NameSet getCacheLayersNames() const override;
#endif
- static std::shared_ptr getAsyncExecutor(const std::string & log_name, size_t size);
-
bool supportsStat() const override { return metadata_storage->supportsStat(); }
struct stat stat(const String & path) const override;
@@ -225,7 +227,6 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;
std::optional tryReserve(UInt64 bytes);
const bool send_metadata;
- size_t threadpool_size;
std::unique_ptr metadata_helper;
};
diff --git a/src/Disks/ObjectStorages/DiskObjectStorageCommon.cpp b/src/Disks/ObjectStorages/DiskObjectStorageCommon.cpp
index 5ac6128c3c05..cc9e4b0b712a 100644
--- a/src/Disks/ObjectStorages/DiskObjectStorageCommon.cpp
+++ b/src/Disks/ObjectStorages/DiskObjectStorageCommon.cpp
@@ -25,7 +25,7 @@ std::pair prepareForLocalMetadata(
/// where the metadata files are stored locally
auto metadata_path = getDiskMetadataPath(name, config, config_prefix, context);
fs::create_directories(metadata_path);
- auto metadata_disk = std::make_shared(name + "-metadata", metadata_path, 0);
+ auto metadata_disk = std::make_shared(name + "-metadata", metadata_path, 0, config, config_prefix);
return std::make_pair(metadata_path, metadata_disk);
}
diff --git a/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp b/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp
index 74d1698bf016..bbcdd40d85f0 100644
--- a/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp
+++ b/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.cpp
@@ -8,6 +8,14 @@
#include
#include
#include
+#include
+
+
+namespace CurrentMetrics
+{
+ extern const Metric LocalThread;
+ extern const Metric LocalThreadActive;
+}
namespace DB
{
@@ -101,7 +109,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema
updateObjectMetadata(object.remote_path, metadata);
}
}
-void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
+void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, ThreadPool & pool)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
@@ -120,29 +128,26 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecu
/// The whole directory can be migrated asynchronously.
if (dir_contains_only_files)
{
- auto result = disk->getExecutor().execute([this, path]
+ pool.scheduleOrThrowOnError([this, path]
{
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
migrateFileToRestorableSchema(it->path());
});
-
- results.push_back(std::move(result));
}
else
{
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
- if (!disk->isDirectory(it->path()))
+ {
+ if (disk->isDirectory(it->path()))
{
- auto source_path = it->path();
- auto result = disk->getExecutor().execute([this, source_path]
- {
- migrateFileToRestorableSchema(source_path);
- });
-
- results.push_back(std::move(result));
+ migrateToRestorableSchemaRecursive(it->path(), pool);
}
else
- migrateToRestorableSchemaRecursive(it->path(), results);
+ {
+ auto source_path = it->path();
+ pool.scheduleOrThrowOnError([this, source_path] { migrateFileToRestorableSchema(source_path); });
+ }
+ }
}
}
@@ -153,16 +158,13 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchema()
{
LOG_INFO(disk->log, "Start migration to restorable schema for disk {}", disk->name);
- Futures results;
+ ThreadPool pool{CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive};
for (const auto & root : data_roots)
if (disk->exists(root))
- migrateToRestorableSchemaRecursive(root + '/', results);
+ migrateToRestorableSchemaRecursive(root + '/', pool);
- for (auto & result : results)
- result.wait();
- for (auto & result : results)
- result.get();
+ pool.wait();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
}
@@ -355,8 +357,8 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
{
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
- std::vector> results;
- auto restore_files = [this, &source_object_storage, &restore_information, &results](const RelativePathsWithMetadata & objects)
+ ThreadPool pool{CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive};
+ auto restore_files = [this, &source_object_storage, &restore_information, &pool](const RelativePathsWithMetadata & objects)
{
std::vector keys_names;
for (const auto & object : objects)
@@ -378,12 +380,10 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
if (!keys_names.empty())
{
- auto result = disk->getExecutor().execute([this, &source_object_storage, &restore_information, keys_names]()
+ pool.scheduleOrThrowOnError([this, &source_object_storage, &restore_information, keys_names]()
{
processRestoreFiles(source_object_storage, restore_information.source_path, keys_names);
});
-
- results.push_back(std::move(result));
}
return true;
@@ -394,10 +394,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
restore_files(children);
- for (auto & result : results)
- result.wait();
- for (auto & result : results)
- result.get();
+ pool.wait();
LOG_INFO(disk->log, "Files are restored for disk {}", disk->name);
diff --git a/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h b/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h
index cb8d9b8a5af7..e7de4afcaf31 100644
--- a/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h
+++ b/src/Disks/ObjectStorages/DiskObjectStorageRemoteMetadataRestoreHelper.h
@@ -75,7 +75,7 @@ class DiskObjectStorageRemoteMetadataRestoreHelper
void saveSchemaVersion(const int & version) const;
void updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const;
void migrateFileToRestorableSchema(const String & path) const;
- void migrateToRestorableSchemaRecursive(const String & path, Futures & results);
+ void migrateToRestorableSchemaRecursive(const String & path, ThreadPool & pool);
void readRestoreInformation(RestoreInformation & restore_information);
void restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information);
diff --git a/src/Disks/ObjectStorages/HDFS/registerDiskHDFS.cpp b/src/Disks/ObjectStorages/HDFS/registerDiskHDFS.cpp
index 693b966caf2a..e72e7028c4b4 100644
--- a/src/Disks/ObjectStorages/HDFS/registerDiskHDFS.cpp
+++ b/src/Disks/ObjectStorages/HDFS/registerDiskHDFS.cpp
@@ -44,7 +44,6 @@ void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check)
auto [_, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
auto metadata_storage = std::make_shared(metadata_disk, uri);
- uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk = std::make_shared(
@@ -53,8 +52,8 @@ void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check)
"DiskHDFS",
std::move(metadata_storage),
std::move(hdfs_storage),
- /* send_metadata = */ false,
- copy_thread_pool_size);
+ config,
+ config_prefix);
disk->startup(context, skip_access_check);
return disk;
diff --git a/src/Disks/ObjectStorages/Local/registerLocalObjectStorage.cpp b/src/Disks/ObjectStorages/Local/registerLocalObjectStorage.cpp
index 251fc77d1f8b..eb9039fed44d 100644
--- a/src/Disks/ObjectStorages/Local/registerLocalObjectStorage.cpp
+++ b/src/Disks/ObjectStorages/Local/registerLocalObjectStorage.cpp
@@ -34,7 +34,7 @@ void registerDiskLocalObjectStorage(DiskFactory & factory, bool global_skip_acce
metadata_storage = std::make_shared(metadata_disk, path);
auto disk = std::make_shared(
- name, path, "Local", metadata_storage, local_storage, false, /* threadpool_size */16);
+ name, path, "Local", metadata_storage, local_storage, config, config_prefix);
disk->startup(context, global_skip_access_check);
return disk;
};
diff --git a/src/Disks/ObjectStorages/S3/registerDiskS3.cpp b/src/Disks/ObjectStorages/S3/registerDiskS3.cpp
index f3a57069a30f..fb125ae8517d 100644
--- a/src/Disks/ObjectStorages/S3/registerDiskS3.cpp
+++ b/src/Disks/ObjectStorages/S3/registerDiskS3.cpp
@@ -150,17 +150,14 @@ void registerDiskS3(DiskFactory & factory, bool global_skip_access_check)
}
}
- bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
- uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
-
DiskObjectStoragePtr s3disk = std::make_shared(
name,
uri.key,
type == "s3" ? "DiskS3" : "DiskS3Plain",
std::move(metadata_storage),
std::move(s3_storage),
- send_metadata,
- copy_thread_pool_size);
+ config,
+ config_prefix);
s3disk->startup(context, skip_access_check);
diff --git a/src/Disks/ObjectStorages/Web/registerDiskWebServer.cpp b/src/Disks/ObjectStorages/Web/registerDiskWebServer.cpp
index 8a54de81815a..bc6c17863eff 100644
--- a/src/Disks/ObjectStorages/Web/registerDiskWebServer.cpp
+++ b/src/Disks/ObjectStorages/Web/registerDiskWebServer.cpp
@@ -52,8 +52,8 @@ void registerDiskWebServer(DiskFactory & factory, bool global_skip_access_check)
"DiskWebServer",
metadata_storage,
object_storage,
- /* send_metadata */false,
- /* threadpool_size */16);
+ config,
+ config_prefix);
disk->startup(context, skip_access_check);
return disk;
};
diff --git a/src/Disks/loadLocalDiskConfig.cpp b/src/Disks/loadLocalDiskConfig.cpp
index 0e5eca17ca74..0a9cdae1ae35 100644
--- a/src/Disks/loadLocalDiskConfig.cpp
+++ b/src/Disks/loadLocalDiskConfig.cpp
@@ -56,7 +56,7 @@ void loadDiskLocalConfig(const String & name,
tmp_path = context->getPath();
// Create tmp disk for getting total disk space.
- keep_free_space_bytes = static_cast(DiskLocal("tmp", tmp_path, 0).getTotalSpace() * ratio);
+ keep_free_space_bytes = static_cast(DiskLocal("tmp", tmp_path, 0, config, config_prefix).getTotalSpace() * ratio);
}
}
diff --git a/src/Disks/tests/gtest_cascade_and_memory_write_buffer.cpp b/src/Disks/tests/gtest_cascade_and_memory_write_buffer.cpp
index 16acd109c276..d65808f5b6ba 100644
--- a/src/Disks/tests/gtest_cascade_and_memory_write_buffer.cpp
+++ b/src/Disks/tests/gtest_cascade_and_memory_write_buffer.cpp
@@ -33,7 +33,7 @@ class TestCascadeWriteBufferWithDisk : public testing::Test
void SetUp() override
{
fs::create_directories(tmp_root);
- disk = std::make_shared("local_disk", tmp_root, 0);
+ disk = std::make_shared("local_disk", tmp_root);
}
void TearDown() override
diff --git a/src/Disks/tests/gtest_disk.cpp b/src/Disks/tests/gtest_disk.cpp
index 1f33f5363995..d57ca7bd81b0 100644
--- a/src/Disks/tests/gtest_disk.cpp
+++ b/src/Disks/tests/gtest_disk.cpp
@@ -10,7 +10,7 @@ namespace fs = std::filesystem;
DB::DiskPtr createDisk()
{
fs::create_directory("tmp/");
- return std::make_shared("local_disk", "tmp/", 0);
+ return std::make_shared("local_disk", "tmp/");
}
void destroyDisk(DB::DiskPtr & disk)
diff --git a/src/Disks/tests/gtest_disk_encrypted.cpp b/src/Disks/tests/gtest_disk_encrypted.cpp
index ee9e284d409f..b61b6140b0cf 100644
--- a/src/Disks/tests/gtest_disk_encrypted.cpp
+++ b/src/Disks/tests/gtest_disk_encrypted.cpp
@@ -23,7 +23,7 @@ class DiskEncryptedTest : public ::testing::Test
/// Make local disk.
temp_dir = std::make_unique();
temp_dir->createDirectories();
- local_disk = std::make_shared("local_disk", getDirectory(), 0);
+ local_disk = std::make_shared("local_disk", getDirectory());
}
void TearDown() override
@@ -42,7 +42,7 @@ class DiskEncryptedTest : public ::testing::Test
settings->current_key = key;
settings->current_key_fingerprint = fingerprint;
settings->disk_path = path;
- encrypted_disk = std::make_shared("encrypted_disk", std::move(settings), true);
+ encrypted_disk = std::make_shared("encrypted_disk", std::move(settings));
}
String getFileNames()
diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp
index 7482450d529f..442f1b913f1e 100644
--- a/src/Interpreters/Context.cpp
+++ b/src/Interpreters/Context.cpp
@@ -875,9 +875,9 @@ catch (...)
"It is ok to skip this exception as cleaning old temporary files is not necessary", path));
}
-static VolumePtr createLocalSingleDiskVolume(const std::string & path)
+static VolumePtr createLocalSingleDiskVolume(const std::string & path, const Poco::Util::AbstractConfiguration & config_)
{
- auto disk = std::make_shared("_tmp_default", path, 0);
+ auto disk = std::make_shared("_tmp_default", path, 0, config_, "storage_configuration.disks._tmp_default");
VolumePtr volume = std::make_shared("_tmp_default", disk, 0);
return volume;
}
@@ -893,7 +893,7 @@ void Context::setTemporaryStoragePath(const String & path, size_t max_size)
if (!shared->tmp_path.ends_with('/'))
shared->tmp_path += '/';
- VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path);
+ VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path, getConfigRef());
for (const auto & disk : volume->getDisks())
{
@@ -966,7 +966,7 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t
LOG_DEBUG(shared->log, "Using file cache ({}) for temporary files", file_cache->getBasePath());
shared->tmp_path = file_cache->getBasePath();
- VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path);
+ VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path, getConfigRef());
shared->root_temp_data_on_disk = std::make_shared(volume, file_cache.get(), max_size);
}
diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp
index 4218a8ea4e12..66dc1aa7bde8 100644
--- a/src/Interpreters/GraceHashJoin.cpp
+++ b/src/Interpreters/GraceHashJoin.cpp
@@ -288,10 +288,7 @@ void GraceHashJoin::initBuckets()
size_t initial_num_buckets = roundUpToPowerOfTwoOrZero(std::clamp(settings.grace_hash_join_initial_buckets, 1, settings.grace_hash_join_max_buckets));
- for (size_t i = 0; i < initial_num_buckets; ++i)
- {
- addBucket(buckets);
- }
+ addBuckets(initial_num_buckets);
if (buckets.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No buckets created");
@@ -356,52 +353,66 @@ bool GraceHashJoin::hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const
return hasMemoryOverflow(total_rows, total_bytes);
}
-GraceHashJoin::Buckets GraceHashJoin::rehashBuckets(size_t to_size)
+GraceHashJoin::Buckets GraceHashJoin::rehashBuckets()
{
std::unique_lock lock(rehash_mutex);
- size_t current_size = buckets.size();
- if (to_size <= current_size)
- return buckets;
+ if (!isPowerOf2(buckets.size())) [[unlikely]]
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of buckets should be power of 2 but it's {}", buckets.size());
- chassert(isPowerOf2(to_size));
+ const size_t to_size = buckets.size() * 2;
+ size_t current_size = buckets.size();
if (to_size > max_num_buckets)
{
- throw Exception(ErrorCodes::LIMIT_EXCEEDED,
+ throw Exception(
+ ErrorCodes::LIMIT_EXCEEDED,
"Too many grace hash join buckets ({} > {}), "
"consider increasing grace_hash_join_max_buckets or max_rows_in_join/max_bytes_in_join",
- to_size, max_num_buckets);
+ to_size,
+ max_num_buckets);
}
LOG_TRACE(log, "Rehashing from {} to {}", current_size, to_size);
- buckets.reserve(to_size);
- for (size_t i = current_size; i < to_size; ++i)
- addBucket(buckets);
+ addBuckets(to_size - current_size);
return buckets;
}
-void GraceHashJoin::addBucket(Buckets & destination)
+void GraceHashJoin::addBuckets(const size_t bucket_count)
{
- // There could be exceptions from createStream, In ci tests
- // there is a certain probability of failure in allocating memory, see memory_tracker_fault_probability.
- // It may terminate this thread and leave a broken hash_join, and another thread cores when it tries to
- // use the broken hash_join. So we print an exception message here to help debug.
- try
- {
- auto & left_file = tmp_data->createStream(left_sample_block);
- auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
+ // Exception can be thrown in number of cases:
+ // - during creation of temporary files for buckets
+ // - in CI tests, there is a certain probability of failure in allocating memory, see memory_tracker_fault_probability
+ // Therefore, new buckets are added only after all of them created successfully,
+ // otherwise we can end up having unexpected number of buckets
+
+ const size_t current_size = buckets.size();
+ Buckets tmp_buckets;
+ tmp_buckets.reserve(bucket_count);
+ for (size_t i = 0; i < bucket_count; ++i)
+ try
+ {
+ auto & left_file = tmp_data->createStream(left_sample_block);
+ auto & right_file = tmp_data->createStream(prepareRightBlock(right_sample_block));
- BucketPtr new_bucket = std::make_shared(destination.size(), left_file, right_file, log);
- destination.emplace_back(std::move(new_bucket));
- }
- catch (...)
- {
- LOG_ERROR(&Poco::Logger::get("GraceHashJoin"), "Can't create bucket. current buckets size: {}", destination.size());
- throw;
- }
+ BucketPtr new_bucket = std::make_shared(current_size + i, left_file, right_file, log);
+ tmp_buckets.emplace_back(std::move(new_bucket));
+ }
+ catch (...)
+ {
+ LOG_ERROR(
+ &Poco::Logger::get("GraceHashJoin"),
+ "Can't create bucket {} due to error: {}",
+ current_size + i,
+ getCurrentExceptionMessage(false));
+ throw;
+ }
+
+ buckets.reserve(buckets.size() + bucket_count);
+ for (auto & bucket : tmp_buckets)
+ buckets.emplace_back(std::move(bucket));
}
void GraceHashJoin::checkTypesOfKeys(const Block & block) const
@@ -638,11 +649,6 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
if (current_block.rows() > 0)
{
std::lock_guard lock(hash_join_mutex);
- auto current_buckets = getCurrentBuckets();
- if (!isPowerOf2(current_buckets.size())) [[unlikely]]
- {
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Broken buckets. its size({}) is not power of 2", current_buckets.size());
- }
if (!hash_join)
hash_join = makeInMemoryJoin();
@@ -654,7 +660,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = {};
// Must use the latest buckets snapshot in case that it has been rehashed by other threads.
- buckets_snapshot = rehashBuckets(current_buckets.size() * 2);
+ buckets_snapshot = rehashBuckets();
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
hash_join = nullptr;
diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h
index b8d83f4cad0b..78ba70bc764c 100644
--- a/src/Interpreters/GraceHashJoin.h
+++ b/src/Interpreters/GraceHashJoin.h
@@ -101,15 +101,16 @@ class GraceHashJoin final : public IJoin
bool hasMemoryOverflow(const InMemoryJoinPtr & hash_join_) const;
bool hasMemoryOverflow(const BlocksList & blocks) const;
- /// Create new bucket at the end of @destination.
- void addBucket(Buckets & destination);
+ /// Add bucket_count new buckets
+ /// Throws if a bucket creation fails
+ void addBuckets(size_t bucket_count);
/// Increase number of buckets to match desired_size.
/// Called when HashJoin in-memory table for one bucket exceeds the limits.
///
/// NB: after @rehashBuckets there may be rows that are written to the buckets that they do not belong to.
/// It is fine; these rows will be written to the corresponding buckets during the third stage.
- Buckets rehashBuckets(size_t to_size);
+ Buckets rehashBuckets();
/// Perform some bookkeeping after all calls to @joinBlock.
void startReadingDelayedBlocks();
diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp
index d0bb3dd389f1..55d2449f7395 100644
--- a/src/Interpreters/InterpreterCreateQuery.cpp
+++ b/src/Interpreters/InterpreterCreateQuery.cpp
@@ -881,48 +881,26 @@ void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & creat
}
}
-String InterpreterCreateQuery::getTableEngineName(DefaultTableEngine default_table_engine)
+namespace
{
- switch (default_table_engine)
+ void checkTemporaryTableEngineName(const String& name)
{
- case DefaultTableEngine::Log:
- return "Log";
-
- case DefaultTableEngine::StripeLog:
- return "StripeLog";
-
- case DefaultTableEngine::MergeTree:
- return "MergeTree";
-
- case DefaultTableEngine::ReplacingMergeTree:
- return "ReplacingMergeTree";
-
- case DefaultTableEngine::ReplicatedMergeTree:
- return "ReplicatedMergeTree";
-
- case DefaultTableEngine::ReplicatedReplacingMergeTree:
- return "ReplicatedReplacingMergeTree";
+ if (name.starts_with("Replicated") || name == "KeeperMap")
+ throw Exception(ErrorCodes::INCORRECT_QUERY, "Temporary tables cannot be created with Replicated or KeeperMap table engines");
+ }
- case DefaultTableEngine::Memory:
- return "Memory";
+ void setDefaultTableEngine(ASTStorage &storage, DefaultTableEngine engine)
+ {
+ if (engine == DefaultTableEngine::None)
+ throw Exception(ErrorCodes::ENGINE_REQUIRED, "Table engine is not specified in CREATE query");
- default:
- throw Exception(ErrorCodes::LOGICAL_ERROR, "default_table_engine is set to unknown value");
+ auto engine_ast = std::make_shared();
+ engine_ast->name = SettingFieldDefaultTableEngine(engine).toString();
+ engine_ast->no_empty_args = true;
+ storage.set(storage.engine, engine_ast);
}
}
-void InterpreterCreateQuery::setDefaultTableEngine(ASTStorage & storage, ContextPtr local_context)
-{
- if (local_context->getSettingsRef().default_table_engine.value == DefaultTableEngine::None)
- throw Exception(ErrorCodes::ENGINE_REQUIRED, "Table engine is not specified in CREATE query");
-
- auto engine_ast = std::make_shared();
- auto default_table_engine = local_context->getSettingsRef().default_table_engine.value;
- engine_ast->name = getTableEngineName(default_table_engine);
- engine_ast->no_empty_args = true;
- storage.set(storage.engine, engine_ast);
-}
-
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.as_table_function)
@@ -936,32 +914,23 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
if (create.temporary)
{
- /// It's possible if some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not.
- /// It makes sense when default_table_engine setting is used, but not for temporary tables.
- /// For temporary tables we ignore this setting to allow CREATE TEMPORARY TABLE query without specifying ENGINE
+ /// Some part of storage definition is specified, but ENGINE is not: just set the one from default_temporary_table_engine setting.
if (!create.cluster.empty())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Temporary tables cannot be created with ON CLUSTER clause");
- if (create.storage)
- {
- if (create.storage->engine)
- {
- if (create.storage->engine->name.starts_with("Replicated") || create.storage->engine->name == "KeeperMap")
- throw Exception(ErrorCodes::INCORRECT_QUERY, "Temporary tables cannot be created with Replicated or KeeperMap table engines");
- }
- else
- throw Exception(ErrorCodes::INCORRECT_QUERY, "Invalid storage definition for temporary table");
- }
- else
+ if (!create.storage)
{
- auto engine_ast = std::make_shared();
- engine_ast->name = "Memory";
- engine_ast->no_empty_args = true;
auto storage_ast = std::make_shared();
- storage_ast->set(storage_ast->engine, engine_ast);
create.set(create.storage, storage_ast);
}
+
+ if (!create.storage->engine)
+ {
+ setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_temporary_table_engine.value);
+ }
+
+ checkTemporaryTableEngineName(create.storage->engine->name);
return;
}
@@ -969,7 +938,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
/// Some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not: just set default one.
if (!create.storage->engine)
- setDefaultTableEngine(*create.storage, getContext());
+ setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
return;
}
@@ -1008,7 +977,7 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
}
create.set(create.storage, std::make_shared());
- setDefaultTableEngine(*create.storage, getContext());
+ setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
}
static void generateUUIDForTable(ASTCreateQuery & create)
diff --git a/src/Interpreters/InterpreterCreateQuery.h b/src/Interpreters/InterpreterCreateQuery.h
index a5fa65760914..67339dea928b 100644
--- a/src/Interpreters/InterpreterCreateQuery.h
+++ b/src/Interpreters/InterpreterCreateQuery.h
@@ -90,8 +90,6 @@ class InterpreterCreateQuery : public IInterpreter, WithMutableContext
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
TableProperties getTablePropertiesAndNormalizeCreateQuery(ASTCreateQuery & create) const;
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
- static String getTableEngineName(DefaultTableEngine default_table_engine);
- static void setDefaultTableEngine(ASTStorage & storage, ContextPtr local_context);
void setEngine(ASTCreateQuery & create) const;
AccessRightsElements getRequiredAccess() const;
diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp
index b73e2cca314f..e1921f45eda0 100644
--- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp
+++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp
@@ -455,22 +455,34 @@ MutableDataPartStoragePtr DataPartStorageOnDiskBase::freeze(
MutableDataPartStoragePtr DataPartStorageOnDiskBase::clonePart(
const std::string & to,
const std::string & dir_path,
- const DiskPtr & disk,
+ const DiskPtr & dst_disk,
Poco::Logger * log) const
{
String path_to_clone = fs::path(to) / dir_path / "";
+ auto src_disk = volume->getDisk();
- if (disk->exists(path_to_clone))
+ if (dst_disk->exists(path_to_clone))
{
- LOG_WARNING(log, "Path {} already exists. Will remove it and clone again.", fullPath(disk, path_to_clone));
- disk->removeRecursive(path_to_clone);
+ throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS,
+ "Cannot clone part {} from '{}' to '{}': path '{}' already exists",
+ dir_path, getRelativePath(), path_to_clone, fullPath(dst_disk, path_to_clone));
}
- disk->createDirectories(to);
- volume->getDisk()->copy(getRelativePath(), disk, to);
- volume->getDisk()->removeFileIfExists(fs::path(path_to_clone) / "delete-on-destroy.txt");
+ try
+ {
+ dst_disk->createDirectories(to);
+ src_disk->copyDirectoryContent(getRelativePath(), dst_disk, path_to_clone);
+ }
+ catch (...)
+ {
+ /// It's safe to remove it recursively (even with zero-copy-replication)
+ /// because we've just did full copy through copyDirectoryContent
+ LOG_WARNING(log, "Removing directory {} after failed attempt to move a data part", path_to_clone);
+ dst_disk->removeRecursive(path_to_clone);
+ throw;
+ }
- auto single_disk_volume = std::make_shared(disk->getName(), disk, 0);
+ auto single_disk_volume = std::make_shared(dst_disk->getName(), dst_disk, 0);
return create(single_disk_volume, to, dir_path, /*initialize=*/ true);
}
diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.h b/src/Storages/MergeTree/DataPartStorageOnDiskBase.h
index 5f7dcc3fd320..648bc908f59e 100644
--- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.h
+++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.h
@@ -68,7 +68,7 @@ class DataPartStorageOnDiskBase : public IDataPartStorage
MutableDataPartStoragePtr clonePart(
const std::string & to,
const std::string & dir_path,
- const DiskPtr & disk,
+ const DiskPtr & dst_disk,
Poco::Logger * log) const override;
void rename(
diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp
index b95918648697..bd3f91abeeb8 100644
--- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp
+++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp
@@ -502,8 +502,10 @@ void IMergeTreeDataPart::removeIfNeeded()
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set",
getDataPartStorage().getPartDirectory(), name);
- const auto part_parent_directory = directoryPath(part_directory);
- bool is_moving_part = part_parent_directory.ends_with("moving/");
+ fs::path part_directory_path = getDataPartStorage().getRelativePath();
+ if (part_directory_path.filename().empty())
+ part_directory_path = part_directory_path.parent_path();
+ bool is_moving_part = part_directory_path.parent_path().filename() == "moving";
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part)
{
LOG_ERROR(
diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h
index 8c379af193dd..3ad47d3253b1 100644
--- a/src/Storages/MergeTree/MergeTreeData.h
+++ b/src/Storages/MergeTree/MergeTreeData.h
@@ -1030,7 +1030,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// Fetch part only if some replica has it on shared storage like S3
/// Overridden in StorageReplicatedMergeTree
- virtual MutableDataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return nullptr; }
+ virtual MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return nullptr; }
/// Check shared data usage on other replicas for detached/freezed part
/// Remove local files and remote files if needed
diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp
index a8f34ba4ceca..44987a4f0824 100644
--- a/src/Storages/MergeTree/MergeTreePartsMover.cpp
+++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp
@@ -233,9 +233,15 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me
disk->createDirectories(path_to_clone);
- cloned_part_storage = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name);
+ auto zero_copy_part = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name);
- if (!cloned_part_storage)
+ if (zero_copy_part)
+ {
+ /// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder
+ zero_copy_part->is_temp = false; /// Do not remove it in dtor
+ cloned_part_storage = zero_copy_part->getDataPartStoragePtr();
+ }
+ else
{
LOG_INFO(log, "Part {} was not fetched, we are the first who move it to another disk, so we will copy it", part->name);
cloned_part_storage = part->getDataPartStorage().clonePart(path_to_clone, part->getDataPartStorage().getPartDirectory(), disk, log);
diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp
index dac9e6923a57..7861dc6917c0 100644
--- a/src/Storages/StorageReplicatedMergeTree.cpp
+++ b/src/Storages/StorageReplicatedMergeTree.cpp
@@ -1987,7 +1987,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
}
-MutableDataPartStoragePtr StorageReplicatedMergeTree::executeFetchShared(
+MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::executeFetchShared(
const String & source_replica,
const String & new_part_name,
const DiskPtr & disk,
@@ -4476,7 +4476,7 @@ bool StorageReplicatedMergeTree::fetchPart(
}
-MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
+MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & source_replica_path,
@@ -4582,7 +4582,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part {} from {}:{}", part_name, zookeeper_name, source_replica_path);
- return part->getDataPartStoragePtr();
+ return part;
}
void StorageReplicatedMergeTree::startup()
@@ -8901,7 +8901,7 @@ std::pair StorageReplicatedMergeTree::unlockSharedDataByID(
}
-MutableDataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
+MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::tryToFetchIfShared(
const IMergeTreeDataPart & part,
const DiskPtr & disk,
const String & path)
diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h
index bdd3f0da5bff..f3c8d0173339 100644
--- a/src/Storages/StorageReplicatedMergeTree.h
+++ b/src/Storages/StorageReplicatedMergeTree.h
@@ -244,7 +244,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;
/// Fetch part only when it stored on shared storage like S3
- MutableDataPartStoragePtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
+ MutableDataPartPtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
/// Lock part in zookeeper for use shared data in several nodes
void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional hardlinked_files) const override;
@@ -286,7 +286,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
MergeTreeDataFormatVersion data_format_version);
/// Fetch part only if some replica has it on shared storage like S3
- MutableDataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
+ MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
/// Get best replica having this partition on a same type remote disk
String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const;
@@ -717,7 +717,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
* Used for replace local part on the same s3-shared part in hybrid storage.
* Returns false if part is already fetching right now.
*/
- MutableDataPartStoragePtr fetchExistsPart(
+ MutableDataPartPtr fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,
diff --git a/tests/ci/ci_config.py b/tests/ci/ci_config.py
index c680b5810fc3..ea7d112c73ee 100644
--- a/tests/ci/ci_config.py
+++ b/tests/ci/ci_config.py
@@ -173,6 +173,16 @@
"with_coverage": False,
"comment": "SSE2-only build",
},
+ "binary_riscv64": {
+ "compiler": "clang-16-riscv64",
+ "build_type": "",
+ "sanitizer": "",
+ "package_type": "binary",
+ "static_binary_name": "riscv64",
+ "tidy": "disable",
+ "with_coverage": False,
+ "comment": "",
+ },
},
"builds_report_config": {
"ClickHouse build check": [
@@ -194,6 +204,7 @@
"binary_freebsd",
"binary_darwin_aarch64",
"binary_ppc64le",
+ "binary_riscv64",
"binary_amd64_compat",
],
},
diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py
index 21398790be38..13f4c6a83c92 100644
--- a/tests/integration/helpers/cluster.py
+++ b/tests/integration/helpers/cluster.py
@@ -36,6 +36,7 @@
from confluent_kafka.avro.cached_schema_registry_client import (
CachedSchemaRegistryClient,
)
+ from .hdfs_api import HDFSApi # imports requests_kerberos
except Exception as e:
logging.warning(f"Cannot import some modules, some tests may not work: {e}")
@@ -51,7 +52,6 @@
import docker
from .client import Client
-from .hdfs_api import HDFSApi
from .config_cluster import *
diff --git a/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml b/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml
index 4480327c4b5c..235b9a7b7a12 100644
--- a/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml
+++ b/tests/integration/test_merge_tree_s3_failover/configs/config.d/storage_conf.xml
@@ -72,4 +72,6 @@
+
+ true
diff --git a/tests/integration/test_merge_tree_s3_failover/test.py b/tests/integration/test_merge_tree_s3_failover/test.py
index 05aeeff2ec1d..90dda631924e 100644
--- a/tests/integration/test_merge_tree_s3_failover/test.py
+++ b/tests/integration/test_merge_tree_s3_failover/test.py
@@ -183,7 +183,8 @@ def test_move_failover(cluster):
) ENGINE=MergeTree()
ORDER BY id
TTL dt + INTERVAL 4 SECOND TO VOLUME 'external'
- SETTINGS storage_policy='s3_cold'
+ SETTINGS storage_policy='s3_cold', temporary_directories_lifetime=1,
+ merge_tree_clear_old_temporary_directories_interval_seconds=1
"""
)
diff --git a/tests/integration/test_s3_zero_copy_ttl/configs/s3.xml b/tests/integration/test_s3_zero_copy_ttl/configs/s3.xml
index 5ffeb0c0d014..e179c848be1a 100644
--- a/tests/integration/test_s3_zero_copy_ttl/configs/s3.xml
+++ b/tests/integration/test_s3_zero_copy_ttl/configs/s3.xml
@@ -33,4 +33,6 @@
true
+
+ true
diff --git a/tests/integration/test_s3_zero_copy_ttl/test.py b/tests/integration/test_s3_zero_copy_ttl/test.py
index 7dcf3734653d..04bff4a44fbf 100644
--- a/tests/integration/test_s3_zero_copy_ttl/test.py
+++ b/tests/integration/test_s3_zero_copy_ttl/test.py
@@ -35,7 +35,7 @@ def test_ttl_move_and_s3(started_cluster):
ORDER BY id
PARTITION BY id
TTL date TO DISK 's3_disk'
- SETTINGS storage_policy='s3_and_default'
+ SETTINGS storage_policy='s3_and_default', temporary_directories_lifetime=1
""".format(
i
)
diff --git a/tests/queries/0_stateless/00941_system_columns_race_condition.sh b/tests/queries/0_stateless/00941_system_columns_race_condition.sh
index 69dfb30cd2c5..4f2cd6ee91ba 100755
--- a/tests/queries/0_stateless/00941_system_columns_race_condition.sh
+++ b/tests/queries/0_stateless/00941_system_columns_race_condition.sh
@@ -14,35 +14,43 @@ $CLICKHOUSE_CLIENT -q "CREATE TABLE alter_table (a UInt8, b Int16, c Float32, d
function thread1()
{
- # NOTE: database = $CLICKHOUSE_DATABASE is unwanted
- while true; do $CLICKHOUSE_CLIENT --query "SELECT name FROM system.columns UNION ALL SELECT name FROM system.columns FORMAT Null"; done
+ local TIMELIMIT=$((SECONDS+$1))
+ while [ $SECONDS -lt "$TIMELIMIT" ]; do
+ # NOTE: database = $CLICKHOUSE_DATABASE is unwanted
+ $CLICKHOUSE_CLIENT --query "SELECT name FROM system.columns UNION ALL SELECT name FROM system.columns FORMAT Null";
+ done
}
function thread2()
{
- while true; do $CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table ADD COLUMN h String; ALTER TABLE alter_table MODIFY COLUMN h UInt64; ALTER TABLE alter_table DROP COLUMN h;"; done
+ local TIMELIMIT=$((SECONDS+$1))
+ while [ $SECONDS -lt "$TIMELIMIT" ]; do
+ $CLICKHOUSE_CLIENT -n --query "ALTER TABLE alter_table ADD COLUMN h String; ALTER TABLE alter_table MODIFY COLUMN h UInt64; ALTER TABLE alter_table DROP COLUMN h;";
+ done
}
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f thread1;
export -f thread2;
-timeout 15 bash -c thread1 2> /dev/null &
-timeout 15 bash -c thread1 2> /dev/null &
-timeout 15 bash -c thread1 2> /dev/null &
-timeout 15 bash -c thread1 2> /dev/null &
-timeout 15 bash -c thread2 2> /dev/null &
-timeout 15 bash -c thread2 2> /dev/null &
-timeout 15 bash -c thread2 2> /dev/null &
-timeout 15 bash -c thread2 2> /dev/null &
-timeout 15 bash -c thread1 2> /dev/null &
-timeout 15 bash -c thread1 2> /dev/null &
-timeout 15 bash -c thread1 2> /dev/null &
-timeout 15 bash -c thread1 2> /dev/null &
-timeout 15 bash -c thread2 2> /dev/null &
-timeout 15 bash -c thread2 2> /dev/null &
-timeout 15 bash -c thread2 2> /dev/null &
-timeout 15 bash -c thread2 2> /dev/null &
+TIMEOUT=15
+
+thread1 $TIMEOUT 2> /dev/null &
+thread1 $TIMEOUT 2> /dev/null &
+thread1 $TIMEOUT 2> /dev/null &
+thread1 $TIMEOUT 2> /dev/null &
+thread2 $TIMEOUT 2> /dev/null &
+thread2 $TIMEOUT 2> /dev/null &
+thread2 $TIMEOUT 2> /dev/null &
+thread2 $TIMEOUT 2> /dev/null &
+thread1 $TIMEOUT 2> /dev/null &
+thread1 $TIMEOUT 2> /dev/null &
+thread1 $TIMEOUT 2> /dev/null &
+thread1 $TIMEOUT 2> /dev/null &
+thread2 $TIMEOUT 2> /dev/null &
+thread2 $TIMEOUT 2> /dev/null &
+thread2 $TIMEOUT 2> /dev/null &
+thread2 $TIMEOUT 2> /dev/null &
wait
diff --git a/tests/queries/0_stateless/01320_create_sync_race_condition_zookeeper.sh b/tests/queries/0_stateless/01320_create_sync_race_condition_zookeeper.sh
index aee69e64b1b8..57409d782ae1 100755
--- a/tests/queries/0_stateless/01320_create_sync_race_condition_zookeeper.sh
+++ b/tests/queries/0_stateless/01320_create_sync_race_condition_zookeeper.sh
@@ -12,22 +12,27 @@ $CLICKHOUSE_CLIENT --allow_deprecated_database_ordinary=1 --query "CREATE DATABA
function thread1()
{
- while true; do
- $CLICKHOUSE_CLIENT -n --query "CREATE TABLE test_01320.r (x UInt64) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r') ORDER BY x;
- DROP TABLE test_01320.r;" 2>&1 | grep -F "Code:" | grep -v "UNKNOWN_DATABASE"
+ local TIMELIMIT=$((SECONDS+$1))
+ while [ $SECONDS -lt "$TIMELIMIT" ]; do
+ $CLICKHOUSE_CLIENT -n --query "CREATE TABLE test_01320.r (x UInt64) ENGINE = ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/table', 'r') ORDER BY x; DROP TABLE test_01320.r;"
done
}
function thread2()
{
- while true; do $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA test_01320.r" 2>/dev/null; done
+ local TIMELIMIT=$((SECONDS+$1))
+ while [ $SECONDS -lt "$TIMELIMIT" ]; do
+ $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA test_01320.r" 2>/dev/null;
+ done
}
export -f thread1
export -f thread2
-timeout 10 bash -c thread1 &
-timeout 10 bash -c thread2 &
+TIMEOUT=10
+
+thread1 $TIMEOUT &
+thread2 $TIMEOUT &
wait
diff --git a/tests/queries/0_stateless/01632_tinylog_read_write.sh b/tests/queries/0_stateless/01632_tinylog_read_write.sh
index 69f985a9d0d1..10625ec5d279 100755
--- a/tests/queries/0_stateless/01632_tinylog_read_write.sh
+++ b/tests/queries/0_stateless/01632_tinylog_read_write.sh
@@ -11,14 +11,16 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
$CLICKHOUSE_CLIENT --multiquery --query "DROP TABLE IF EXISTS test; CREATE TABLE IF NOT EXISTS test (x UInt64, s Array(Nullable(String))) ENGINE = TinyLog;"
function thread_select {
- while true; do
+ local TIMELIMIT=$((SECONDS+$1))
+ while [ $SECONDS -lt "$TIMELIMIT" ]; do
$CLICKHOUSE_CLIENT --local_filesystem_read_method pread --query "SELECT * FROM test FORMAT Null"
sleep 0.0$RANDOM
done
}
function thread_insert {
- while true; do
+ local TIMELIMIT=$((SECONDS+$1))
+ while [ $SECONDS -lt "$1" ]; do
$CLICKHOUSE_CLIENT --query "INSERT INTO test VALUES (1, ['Hello'])"
sleep 0.0$RANDOM
done
@@ -30,15 +32,17 @@ export -f thread_insert
# Do randomized queries and expect nothing extraordinary happens.
-timeout 10 bash -c 'thread_select' &
-timeout 10 bash -c 'thread_select' &
-timeout 10 bash -c 'thread_select' &
-timeout 10 bash -c 'thread_select' &
+TIMEOUT=10
-timeout 10 bash -c 'thread_insert' &
-timeout 10 bash -c 'thread_insert' &
-timeout 10 bash -c 'thread_insert' &
-timeout 10 bash -c 'thread_insert' &
+thread_select $TIMEOUT &
+thread_select $TIMEOUT &
+thread_select $TIMEOUT &
+thread_select $TIMEOUT &
+
+thread_insert $TIMEOUT &
+thread_insert $TIMEOUT &
+thread_insert $TIMEOUT &
+thread_insert $TIMEOUT &
wait
echo "Done"
diff --git a/tests/queries/0_stateless/02050_client_profile_events.sh b/tests/queries/0_stateless/02050_client_profile_events.sh
index dce0c80525a6..05e48de771d0 100755
--- a/tests/queries/0_stateless/02050_client_profile_events.sh
+++ b/tests/queries/0_stateless/02050_client_profile_events.sh
@@ -25,7 +25,7 @@ profile_events="$($CLICKHOUSE_CLIENT --max_block_size 1 --print-profile-events -
test "$profile_events" -gt 1 && echo OK || echo "FAIL ($profile_events)"
echo 'print each 100 ms'
-profile_events="$($CLICKHOUSE_CLIENT --max_block_size 1 --print-profile-events --profile-events-delay-ms=100 -q 'select sleep(1) from numbers(2) format Null' |& grep -c 'SelectedRows')"
+profile_events="$($CLICKHOUSE_CLIENT --max_block_size 1 --print-profile-events --profile-events-delay-ms=100 -q 'select sleep(0.2) from numbers(10) format Null' |& grep -c 'SelectedRows')"
test "$profile_events" -gt 1 && echo OK || echo "FAIL ($profile_events)"
echo 'check that ProfileEvents is new for each query'
diff --git a/tests/queries/0_stateless/02184_default_table_engine.reference b/tests/queries/0_stateless/02184_default_table_engine.reference
index 870dff90efa6..495b9627acb0 100644
--- a/tests/queries/0_stateless/02184_default_table_engine.reference
+++ b/tests/queries/0_stateless/02184_default_table_engine.reference
@@ -27,3 +27,4 @@ CREATE TABLE default.val2\n(\n `n` Int32\n) AS values(\'n int\', 1, 2)
CREATE TABLE default.log\n(\n `n` Int32\n)\nENGINE = Log
CREATE TABLE default.kek\n(\n `n` Int32\n)\nENGINE = Memory
CREATE TABLE default.lol\n(\n `n` Int32\n)\nENGINE = MergeTree\nORDER BY n\nSETTINGS min_bytes_for_wide_part = 123, index_granularity = 8192
+CREATE TEMPORARY TABLE tmp_log\n(\n `n` Int32\n)\nENGINE = Log
diff --git a/tests/queries/0_stateless/02184_default_table_engine.sql b/tests/queries/0_stateless/02184_default_table_engine.sql
index 109875d53a5b..a984ec1b6c92 100644
--- a/tests/queries/0_stateless/02184_default_table_engine.sql
+++ b/tests/queries/0_stateless/02184_default_table_engine.sql
@@ -83,8 +83,8 @@ CREATE TEMPORARY TABLE tmp (n int);
SHOW CREATE TEMPORARY TABLE tmp;
CREATE TEMPORARY TABLE tmp1 (n int) ENGINE=Memory;
CREATE TEMPORARY TABLE tmp2 (n int) ENGINE=Log;
-CREATE TEMPORARY TABLE tmp2 (n int) ORDER BY n; -- {serverError 80}
-CREATE TEMPORARY TABLE tmp2 (n int, PRIMARY KEY (n)); -- {serverError 80}
+CREATE TEMPORARY TABLE tmp2 (n int) ORDER BY n; -- {serverError 36}
+CREATE TEMPORARY TABLE tmp2 (n int, PRIMARY KEY (n)); -- {serverError 36}
CREATE TABLE log (n int);
SHOW CREATE log;
@@ -128,3 +128,7 @@ SHOW CREATE TABLE kek;
SHOW CREATE TABLE lol;
DROP TABLE kek;
DROP TABLE lol;
+
+SET default_temporary_table_engine = 'Log';
+CREATE TEMPORARY TABLE tmp_log (n int);
+SHOW CREATE TEMPORARY TABLE tmp_log;
diff --git a/tests/queries/0_stateless/02470_mutation_sync_race.sh b/tests/queries/0_stateless/02470_mutation_sync_race.sh
index 6c259e46cb17..37e99663ab5d 100755
--- a/tests/queries/0_stateless/02470_mutation_sync_race.sh
+++ b/tests/queries/0_stateless/02470_mutation_sync_race.sh
@@ -12,7 +12,11 @@ $CLICKHOUSE_CLIENT -q "insert into src values (0)"
function thread()
{
+ local TIMELIMIT=$((SECONDS+$1))
for i in $(seq 1000); do
+ if [ $SECONDS -ge "$TIMELIMIT" ]; then
+ return
+ fi
$CLICKHOUSE_CLIENT -q "alter table src detach partition tuple()"
$CLICKHOUSE_CLIENT -q "alter table src attach partition tuple()"
$CLICKHOUSE_CLIENT -q "alter table src update A = ${i} where 1 settings mutations_sync=2"
@@ -20,8 +24,6 @@ function thread()
done
}
-export -f thread;
-
TIMEOUT=30
-timeout $TIMEOUT bash -c thread || true
+thread $TIMEOUT || true
\ No newline at end of file
diff --git a/tests/queries/0_stateless/02481_async_insert_race_long.sh b/tests/queries/0_stateless/02481_async_insert_race_long.sh
index cec9278c127d..c4b026c6abae 100755
--- a/tests/queries/0_stateless/02481_async_insert_race_long.sh
+++ b/tests/queries/0_stateless/02481_async_insert_race_long.sh
@@ -11,21 +11,24 @@ export MY_CLICKHOUSE_CLIENT="$CLICKHOUSE_CLIENT --async_insert_busy_timeout_ms 1
function insert1()
{
- while true; do
+ local TIMELIMIT=$((SECONDS+$1))
+ while [ $SECONDS -lt "$TIMELIMIT" ]; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT CSV 1,"a"'
done
}
function insert2()
{
- while true; do
+ local TIMELIMIT=$((SECONDS+$1))
+ while [ $SECONDS -lt "$TIMELIMIT" ]; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 0 -q 'INSERT INTO async_inserts_race FORMAT JSONEachRow {"id": 5, "s": "e"} {"id": 6, "s": "f"}'
done
}
function insert3()
{
- while true; do
+ local TIMELIMIT=$((SECONDS+$1))
+ while [ $SECONDS -lt "$TIMELIMIT" ]; do
${MY_CLICKHOUSE_CLIENT} --wait_for_async_insert 1 -q "INSERT INTO async_inserts_race VALUES (7, 'g') (8, 'h')" &
sleep 0.05
done
@@ -33,29 +36,29 @@ function insert3()
function select1()
{
- while true; do
+ local TIMELIMIT=$((SECONDS+$1))
+ while [ $SECONDS -lt "$TIMELIMIT" ]; do
${MY_CLICKHOUSE_CLIENT} -q "SELECT * FROM async_inserts_race FORMAT Null"
done
-
}
${CLICKHOUSE_CLIENT} -q "DROP TABLE IF EXISTS async_inserts_race"
${CLICKHOUSE_CLIENT} -q "CREATE TABLE async_inserts_race (id UInt32, s String) ENGINE = MergeTree ORDER BY id"
-TIMEOUT=10
-
export -f insert1
export -f insert2
export -f insert3
export -f select1
+TIMEOUT=10
+
for _ in {1..3}; do
- timeout $TIMEOUT bash -c insert1 &
- timeout $TIMEOUT bash -c insert2 &
- timeout $TIMEOUT bash -c insert3 &
+ insert1 $TIMEOUT &
+ insert2 $TIMEOUT &
+ insert3 $TIMEOUT &
done
-timeout $TIMEOUT bash -c select1 &
+select1 $TIMEOUT &
wait
echo "OK"
diff --git a/utils/keeper-data-dumper/main.cpp b/utils/keeper-data-dumper/main.cpp
index 5a6fd15d72cb..51a09b676dc3 100644
--- a/utils/keeper-data-dumper/main.cpp
+++ b/utils/keeper-data-dumper/main.cpp
@@ -64,8 +64,8 @@ int main(int argc, char *argv[])
SnapshotsQueue snapshots_queue{1};
CoordinationSettingsPtr settings = std::make_shared();
KeeperContextPtr keeper_context = std::make_shared(true);
- keeper_context->setLogDisk(std::make_shared("LogDisk", argv[2], 0));
- keeper_context->setSnapshotDisk(std::make_shared("LogDisk", argv[1], 0));
+ keeper_context->setLogDisk(std::make_shared("LogDisk", argv[2]));
+ keeper_context->setSnapshotDisk(std::make_shared("LogDisk", argv[1]));
auto state_machine = std::make_shared(queue, snapshots_queue, settings, keeper_context, nullptr);
state_machine->init();