Skip to content

Commit

Permalink
Merge pull request containerd#192 from yuchen0cc/main
Browse files Browse the repository at this point in the history
trace: support config prefetch concurrency
  • Loading branch information
liulanzheng authored Apr 6, 2023
2 parents 903c942 + 8a41bcc commit acbf437
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 17 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ Default configure file `overlaybd.json` is installed to `/etc/overlaybd/`.
| enableThread | Enable overlaybd device run in seprate thread or not. Note `cacheType` should be `ocf`. `false` is default. |
| auditPath | The path for audit file, `/var/log/overlaybd-audit.log` is the default value. |
| registryFsVersion | registry client version, 'v1' libcurl based, 'v2' is photon http based. 'v1' is the default value. |
| prefetchConfig.concurrency | Prefetch concurrency for reloading trace, `16` is default |

> NOTE: `download` is the config for background downloading. After an overlaybd device is lauched, a background task will be running to fetch the whole blobs into local directories. After downloading, I/O requests are directed to local files. Unlike other options, download config is reloaded when a device launching.
Expand Down
7 changes: 7 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ struct LogConfig : public ConfigUtils::Config {
APPCFG_PARA(logRotateNum, int, 3);
};

struct PrefetchConfig : public ConfigUtils::Config {
APPCFG_CLASS

APPCFG_PARA(concurrency, int, 16);
};

struct GlobalConfig : public ConfigUtils::Config {
APPCFG_CLASS

Expand All @@ -137,6 +143,7 @@ struct GlobalConfig : public ConfigUtils::Config {
APPCFG_PARA(cacheConfig, CacheConfig);
APPCFG_PARA(gzipCacheConfig, GzipCacheConfig);
APPCFG_PARA(logConfig, LogConfig);
APPCFG_PARA(prefetchConfig, PrefetchConfig);
};

struct AuthConfig : public ConfigUtils::Config {
Expand Down
5 changes: 3 additions & 2 deletions src/image_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ int ImageFile::init_image_file() {
bool record_no_download = false;
bool has_error = false;
auto lowers = conf.lowers();
auto concurrency = image_service.global_conf.prefetchConfig().concurrency();

if (conf.accelerationLayer() && !conf.recordTracePath().empty()) {
LOG_ERROR("Cannot record trace while acceleration layer exists");
Expand All @@ -439,7 +440,7 @@ int ImageFile::init_image_file() {
std::string trace_file = accel_layer + "/trace";
if (Prefetcher::detect_mode(trace_file) ==
Prefetcher::Mode::Replay) {
m_prefetcher = new_prefetcher(trace_file);
m_prefetcher = new_prefetcher(trace_file, concurrency);
}

} else if (!conf.recordTracePath().empty()) {
Expand All @@ -448,7 +449,7 @@ int ImageFile::init_image_file() {
LOG_ERROR("Prefetch: incorrect mode ` for prefetching", mode);
goto ERROR_EXIT;
}
m_prefetcher = new_prefetcher(conf.recordTracePath());
m_prefetcher = new_prefetcher(conf.recordTracePath(), concurrency);
if (mode == Prefetcher::Mode::Record) {
record_no_download = true;
}
Expand Down
48 changes: 34 additions & 14 deletions src/prefetch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class PrefetchFile : public ForwardFile_Ownership {

class PrefetcherImpl : public Prefetcher {
public:
explicit PrefetcherImpl(const string &trace_file_path) {
explicit PrefetcherImpl(const string &trace_file_path, int concurrency) : m_concurrency(concurrency) {
// Detect mode
size_t file_size = 0;
m_mode = detect_mode(trace_file_path, &file_size);
Expand Down Expand Up @@ -87,9 +87,13 @@ class PrefetcherImpl : public Prefetcher {

} else if (m_mode == Mode::Replay) {
m_replay_stopped = true;
for (auto th : m_replay_threads) {
photon::thread_shutdown((photon::thread *)th);
photon::thread_join(th);
if (m_replay_thread) {
for (auto th : m_replay_threads) {
if (th) {
photon::thread_shutdown((photon::thread *)th);
}
}
photon::thread_join(m_replay_thread);
}
}

Expand All @@ -110,20 +114,35 @@ class PrefetcherImpl : public Prefetcher {
m_record_array.push_back(trace);
}

void do_replay() {
struct timeval start;
gettimeofday(&start, NULL);
LOG_INFO("Prefetch: Replay ` records from ` layers, concurrency `",
m_replay_queue.size(), m_src_files.size(), m_concurrency);
for (int i = 0; i < m_concurrency; ++i) {
auto th = photon::thread_create11(&PrefetcherImpl::replay_worker_thread, this);
auto join_handle = photon::thread_enable_join(th);
m_replay_threads.push_back(join_handle);
}
for (auto &th : m_replay_threads) {
photon::thread_join(th);
th = nullptr;
}
struct timeval end;
gettimeofday(&end, NULL);
uint64_t elapsed = 1000000UL * (end.tv_sec - start.tv_sec) + end.tv_usec - start.tv_usec;
LOG_INFO("Prefetch: Replay done, time cost ` ms", elapsed / 1000);
}

void replay() override {
if (m_mode != Mode::Replay) {
return;
}
if (m_replay_queue.empty() || m_src_files.empty()) {
return;
}
LOG_INFO("Prefetch: Replay ` records from ` layers", m_replay_queue.size(),
m_src_files.size());
for (int i = 0; i < REPLAY_CONCURRENCY; ++i) {
auto th = photon::thread_create11(&PrefetcherImpl::replay_worker_thread, this);
auto join_handle = photon::thread_enable_join(th);
m_replay_threads.push_back(join_handle);
}
auto th = photon::thread_create11(&PrefetcherImpl::do_replay, this);
m_replay_thread = photon::thread_enable_join(th);
}

int replay_worker_thread() {
Expand Down Expand Up @@ -166,13 +185,13 @@ class PrefetcherImpl : public Prefetcher {
};

static const int MAX_IO_SIZE = 1024 * 1024;
static const int REPLAY_CONCURRENCY = 16;
static const uint32_t TRACE_MAGIC = 3270449184; // CRC32 of `Container Image Trace Format`

vector<TraceFormat> m_record_array;
queue<TraceFormat> m_replay_queue;
map<uint32_t, IFile *> m_src_files;
vector<photon::join_handle *> m_replay_threads;
photon::join_handle *m_replay_thread = nullptr;
photon::join_handle *m_detect_thread = nullptr;
bool m_detect_thread_interruptible = false;
string m_lock_file_path;
Expand All @@ -181,6 +200,7 @@ class PrefetcherImpl : public Prefetcher {
bool m_replay_stopped = false;
bool m_record_stopped = false;
bool m_buffer_released = false;
int m_concurrency;

int dump() {
if (m_trace_file == nullptr) {
Expand Down Expand Up @@ -314,8 +334,8 @@ ssize_t PrefetchFile::pread(void *buf, size_t count, off_t offset) {
return n_read;
}

Prefetcher *new_prefetcher(const string &trace_file_path) {
return new PrefetcherImpl(trace_file_path);
Prefetcher *new_prefetcher(const string &trace_file_path, int concurrency) {
return new PrefetcherImpl(trace_file_path, concurrency);
}

Prefetcher::Mode Prefetcher::detect_mode(const string &trace_file_path, size_t *file_size) {
Expand Down
2 changes: 1 addition & 1 deletion src/prefetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ class Prefetcher : public Object {
Mode m_mode;
};

Prefetcher *new_prefetcher(const std::string &trace_file_path);
Prefetcher *new_prefetcher(const std::string &trace_file_path, int concurrency);

0 comments on commit acbf437

Please sign in to comment.