From efbbd551cb5876d5dc73e2f1b88964e1e264f8a0 Mon Sep 17 00:00:00 2001 From: wjblanke Date: Thu, 4 Jun 2020 13:33:30 -0700 Subject: [PATCH] Speedywjb (#35) * make sure fstream reader and writer properly call seek. increased FileDisk Read and Write to 64 bit for large buffers * remove unneeded memory copies in write plot * rename instead of copy if parent path is the same * be nicer to ubuntu filesystem * combine WriteParkToFile writes * added command line buffer size * corrected file info --- python-bindings/chiapos.cpp | 5 +- src/cli.cpp | 4 +- src/plotter_disk.hpp | 611 +++++++++++++++++++++------------- src/sort_on_disk.hpp | 115 +++---- tests/test.cpp | 4 +- tests/test_python_bindings.py | 2 +- 6 files changed, 427 insertions(+), 314 deletions(-) diff --git a/python-bindings/chiapos.cpp b/python-bindings/chiapos.cpp index 9841e4841..4ac6d06a1 100644 --- a/python-bindings/chiapos.cpp +++ b/python-bindings/chiapos.cpp @@ -38,12 +38,13 @@ PYBIND11_MODULE(chiapos, m) { const std::string tmp2_dir, const std::string final_dir, const std::string filename, uint8_t k, - const py::bytes &memo, const py::bytes &id) { + const py::bytes &memo, const py::bytes &id, + uint32_t buffmegabytes) { std::string memo_str(memo); const uint8_t* memo_ptr = reinterpret_cast(memo_str.data()); std::string id_str(id); const uint8_t* id_ptr = reinterpret_cast(id_str.data()); - dp.CreatePlotDisk(tmp_dir, tmp2_dir, final_dir, filename, k, memo_ptr, len(memo), id_ptr, len(id)); + dp.CreatePlotDisk(tmp_dir, tmp2_dir, final_dir, filename, k, memo_ptr, len(memo), id_ptr, len(id), buffmegabytes); }); py::class_(m, "DiskProver") diff --git a/src/cli.cpp b/src/cli.cpp index dfcc1ec11..89c917548 100644 --- a/src/cli.cpp +++ b/src/cli.cpp @@ -70,6 +70,7 @@ int main(int argc, char *argv[]) { string operation = "help"; string memo = "0102030405"; string id = "022fb42c08c12de3a6af053880199806532e79515f94e83461612101f9412f9e"; + uint32_t buffmegabytes = 2*1024; // 2 gigabytes options.allow_unrecognised_options() .add_options() @@ -80,6 +81,7 @@ int main(int argc, char *argv[]) { ("f, file", "Filename", cxxopts::value(filename)) ("m, memo", "Memo to insert into the plot", cxxopts::value(memo)) ("i, id", "Unique 32-byte seed for the plot", cxxopts::value(id)) + ("b, buffer", "Megabytes to be used as buffer for sorting and plotting", cxxopts::value(buffmegabytes)) ("help", "Print help"); auto result = options.parse(argc, argv); @@ -108,7 +110,7 @@ int main(int argc, char *argv[]) { HexToBytes(id, id_bytes); DiskPlotter plotter = DiskPlotter(); - plotter.CreatePlotDisk(tempdir, tempdir2, finaldir, filename, k, memo_bytes, memo.size() / 2, id_bytes, 32); + plotter.CreatePlotDisk(tempdir, tempdir2, finaldir, filename, k, memo_bytes, memo.size() / 2, id_bytes, 32, buffmegabytes); delete[] memo_bytes; } else if (operation == "prove") { if (argc < 3) { diff --git a/src/plotter_disk.hpp b/src/plotter_disk.hpp index 647372f5d..f8cc9cf47 100644 --- a/src/plotter_disk.hpp +++ b/src/plotter_disk.hpp @@ -44,7 +44,6 @@ namespace fs = ghc::filesystem; // Constants that are only relevant for the plotting process. // Other constants can be found in pos_constants.hpp -const uint64_t kMemorySize = 2147483648; // 2^31, or 2GB // Number of buckets to use for SortOnDisk. const uint32_t kNumSortBuckets = 16; @@ -84,17 +83,20 @@ class DiskPlotter { // the process. void CreatePlotDisk(std::string tmp_dirname, std::string tmp2_dirname, std::string final_dirname, std::string filename, uint8_t k, const uint8_t* memo, - uint32_t memo_len, const uint8_t* id, uint32_t id_len) { + uint32_t memo_len, const uint8_t* id, uint32_t id_len, uint32_t buffmegabytes = 2*1024) { if (k < kMinPlotSize || k > kMaxPlotSize) { std::string err_string = "Plot size k=" + std::to_string(k) + " is invalid"; std::cerr << err_string << std::endl; throw err_string; } + memorySize=((uint64_t)buffmegabytes)*1024*1024; + std::cout << std::endl << "Starting plotting progress into temporary dirs: " << tmp_dirname << " and " << tmp2_dirname << std::endl; std::cout << "Memo: " << Util::HexStr(memo, memo_len) << std::endl; std::cout << "ID: " << Util::HexStr(id, id_len) << std::endl; std::cout << "Plot size is: " << static_cast(k) << std::endl; + std::cout << "Buffer size is: " << memorySize << std::endl; // Cross platform way to concatenate paths, gulrak library. fs::path tmp_1_filename = fs::path(tmp_dirname) / fs::path(filename + ".tmp"); @@ -120,55 +122,78 @@ class DiskPlotter { throw err_string; } - // These variables are used in the WriteParkToFile method. They are preallocatted here - // to save time. - first_line_point_bytes = new uint8_t[CalculateLinePointSize(k)]; - park_stubs_bytes = new uint8_t[CalculateStubsSize(k)]; - park_deltas_bytes = new uint8_t[CalculateMaxDeltasSize(k, 1)]; + remove(tmp_1_filename); + remove(tmp_2_filename); + remove(final_filename); + + std::ios_base::sync_with_stdio(false); + std::ostream *prevstr = std::cin.tie(NULL); + + { + // Scope for FileDisk + FileDisk tmp1_disk(tmp_1_filename); + FileDisk tmp2_disk(tmp_2_filename); + + // These variables are used in the WriteParkToFile method. They are preallocatted here + // to save time. + parkToFileBytes = new uint8_t[CalculateLinePointSize(k)+CalculateStubsSize(k)+2+CalculateMaxDeltasSize(k, 1)]; - assert(id_len == kIdLen); + assert(id_len == kIdLen); - std::cout << std::endl << "Starting phase 1/4: Forward Propagation... " << Timer::GetNow(); + // Memory to be used for sorting and buffers + uint8_t* memory = new uint8_t[memorySize]; - Timer p1; - Timer all_phases; - std::vector results = WritePlotFile(tmp_1_filename.string(), k, id, memo, memo_len); - p1.PrintElapsed("Time for phase 1 ="); + std::cout << std::endl << "Starting phase 1/4: Forward Propagation into " << tmp_1_filename << " ... " << Timer::GetNow(); - std::cout << std::endl << "Starting phase 2/4: Backpropagation into " << tmp_1_filename << " and " << tmp_2_filename << " ..." << Timer::GetNow(); + Timer p1; + Timer all_phases; + std::vector results = WritePlotFile(memory, tmp1_disk, k, id, memo, memo_len); + p1.PrintElapsed("Time for phase 1 ="); - Timer p2; - Backpropagate(tmp_2_filename.string(), tmp_1_filename.string(), k, id, memo, memo_len, results); - p2.PrintElapsed("Time for phase 2 ="); + std::cout << std::endl << "Starting phase 2/4: Backpropagation into " << tmp_1_filename << " ... " << Timer::GetNow(); - std::cout << std::endl << "Starting phase 3/4: Compression... " << Timer::GetNow(); - Timer p3; - Phase3Results res = CompressTables(k, results, tmp_2_filename.string(), tmp_1_filename.string(), id, memo, memo_len); - p3.PrintElapsed("Time for phase 3 ="); + Timer p2; + Backpropagate(memory, tmp1_disk, k, id, memo, memo_len, results); + p2.PrintElapsed("Time for phase 2 ="); - std::cout << std::endl << "Starting phase 4/4: Write Checkpoint tables... " << Timer::GetNow(); - Timer p4; - WriteCTables(k, k + 1, tmp_2_filename.string(), tmp_1_filename.string(), res); - p4.PrintElapsed("Time for phase 4 ="); + std::cout << std::endl << "Starting phase 3/4: Compression from " << tmp_1_filename << " into " << tmp_2_filename << " ... " << Timer::GetNow(); + Timer p3; + Phase3Results res = CompressTables(memory, k, results, tmp2_disk, tmp1_disk, id, memo, memo_len); + p3.PrintElapsed("Time for phase 3 ="); - std::cout << "Approximate working space used: " << + std::cout << std::endl << "Starting phase 4/4: Write Checkpoint tables from " << tmp_1_filename << " into " << tmp_2_filename << " ... " << Timer::GetNow(); + Timer p4; + WriteCTables(k, k + 1, tmp2_disk, tmp1_disk, res); + p4.PrintElapsed("Time for phase 4 ="); + + std::cout << "Approximate working space used: " << static_cast(res.plot_table_begin_pointers[8])/(1024*1024*1024) << " GB" << std::endl; - std::cout << "Final File size: " << + std::cout << "Final File size: " << static_cast(res.final_table_begin_pointers[11])/(1024*1024*1024) << " GB" << std::endl; - all_phases.PrintElapsed("Total time ="); + all_phases.PrintElapsed("Total time ="); - bool removed_1 = fs::remove(tmp_1_filename); - fs::copy(tmp_2_filename, final_filename, fs::copy_options::overwrite_existing); + delete[] memory; + delete[] parkToFileBytes; + } - bool removed_2 = fs::remove(tmp_2_filename); + std::cin.tie (prevstr); + std::ios_base::sync_with_stdio(true); + bool removed_1 = fs::remove(tmp_1_filename); std::cout << "Removed " << tmp_1_filename << "? " << removed_1 << std::endl; - std::cout << "Removed " << tmp_2_filename << "? " << removed_2 << std::endl; - std::cout << "Copied final file to " << final_filename << std::endl; - delete[] first_line_point_bytes; - delete[] park_stubs_bytes; - delete[] park_deltas_bytes; + if(tmp_2_filename.parent_path() == final_filename.parent_path()) { + fs::rename(tmp_2_filename, final_filename); + std::cout << "Moved final file to " << final_filename << std::endl; + } + else { + fs::copy(tmp_2_filename, final_filename, fs::copy_options::overwrite_existing); + bool removed_2 = fs::remove(tmp_2_filename); + std::cout << "Copied final file to " << final_filename << std::endl; + std::cout << "Removed " << tmp_2_filename << "? " << removed_2 << std::endl; + + } + } static uint32_t GetMaxEntrySize(uint8_t k, uint8_t table_index, bool phase_1_size) { @@ -233,12 +258,11 @@ class DiskPlotter { } private: - uint8_t* first_line_point_bytes; - uint8_t* park_stubs_bytes; - uint8_t* park_deltas_bytes; + uint64_t memorySize; + uint8_t* parkToFileBytes; // Writes the plot file header to a file - uint32_t WriteHeader(std::ofstream &plot_file, uint8_t k, const uint8_t* id, const uint8_t* memo, + uint32_t WriteHeader(FileDisk& plot_Disk, uint8_t k, const uint8_t* id, const uint8_t* memo, uint32_t memo_len) { // 19 bytes - "Proof of Space Plot" (utf-8) // 32 bytes - unique plot id @@ -249,26 +273,34 @@ class DiskPlotter { // x bytes - memo std::string header_text = "Proof of Space Plot"; - plot_file.write(header_text.data(), header_text.size()); - - plot_file.write(reinterpret_cast(id), kIdLen); + uint64_t write_pos=0; + plot_Disk.Write(write_pos,(uint8_t *)header_text.data(), header_text.size()); + write_pos+=header_text.size(); + plot_Disk.Write(write_pos, (id), kIdLen); + write_pos+=kIdLen; uint8_t k_buffer[1]; k_buffer[0] = k; - plot_file.write(reinterpret_cast(k_buffer), 1); + plot_Disk.Write(write_pos, (k_buffer), 1); + write_pos+=1; uint8_t size_buffer[2]; Bits(kFormatDescription.size(), 16).ToBytes(size_buffer); - plot_file.write(reinterpret_cast(size_buffer), 2); - plot_file.write(kFormatDescription.data(), kFormatDescription.size()); + plot_Disk.Write(write_pos, (size_buffer), 2); + write_pos+=2; + plot_Disk.Write(write_pos,(uint8_t *)kFormatDescription.data(), kFormatDescription.size()); + write_pos+=kFormatDescription.size(); Bits(memo_len, 16).ToBytes(size_buffer); - plot_file.write(reinterpret_cast(size_buffer), 2); - plot_file.write(reinterpret_cast(memo), memo_len); + plot_Disk.Write(write_pos, (size_buffer), 2); + write_pos+=2; + plot_Disk.Write(write_pos, (memo), memo_len); + write_pos+=memo_len; uint8_t pointers[10*8]; memset(pointers, 0, 10*8); - plot_file.write(reinterpret_cast(pointers), 10*8); + plot_Disk.Write(write_pos, (pointers), 10*8); + write_pos+=10*8; uint32_t bytes_written = header_text.size() + kIdLen + 1 + 2 + kFormatDescription.size() + 2 + memo_len + 10*8; @@ -282,15 +314,11 @@ class DiskPlotter { // proofs of space in it. First, F1 is computed, which is special since it uses // AES256, and each encrption provides multiple output values. Then, the rest of the // f functions are computed, and a sort on disk happens for each table. - std::vector WritePlotFile(std::string plot_filename, uint8_t k, const uint8_t* id, + std::vector WritePlotFile(uint8_t* memory, FileDisk& tmp1_disk, uint8_t k, const uint8_t* id, const uint8_t* memo, uint8_t memo_len) { - // Note that the plot file is not the final file that will be stored on disk, - // it is only present during plotting. - std::ofstream plot_file(plot_filename, std::ios::out | std::ios::trunc | std::ios::binary); - if (!plot_file.is_open()) { - throw std::string("File not opened correct"); - } - uint32_t header_size = WriteHeader(plot_file, k, id, memo, memo_len); + uint32_t header_size = WriteHeader(tmp1_disk, k, id, memo, memo_len); + + uint64_t plot_file=header_size; std::cout << "Computing table 1" << std::endl; Timer f1_start_time; @@ -317,7 +345,8 @@ class DiskPlotter { (std::get<0>(kv) + std::get<1>(kv)).ToBytes(buf); // We write the x, y pair - plot_file.write(reinterpret_cast(buf), entry_size_bytes); + tmp1_disk.Write(plot_file, (buf), entry_size_bytes); + plot_file+=entry_size_bytes; bucket_sizes[SortOnDiskUtils::ExtractNum(buf, entry_size_bytes, 0, kLogNumSortBuckets)] += 1; @@ -330,10 +359,11 @@ class DiskPlotter { break; } } - delete[] buf; // A zero entry is the end of table symbol. - Util::WriteZeroesStack(plot_file, entry_size_bytes); - plot_file.close(); + memset(buf, 0x00, entry_size_bytes); + tmp1_disk.Write(plot_file, (buf), entry_size_bytes); + plot_file+=entry_size_bytes; + delete[] buf; f1_start_time.PrintElapsed("F1 complete, Time = "); @@ -354,9 +384,6 @@ class DiskPlotter { // Number of buckets that y values will be put into. double num_buckets = ((uint64_t)1 << (k + kExtraBits)) / static_cast(kBC) + 1; - // Memory to be used for sorting - uint8_t* memory = new uint8_t[kMemorySize]; - // For tables 1 through 6, sort the table, calculate matches, and write // the next table. This is the left table index. for (uint8_t table_index = 1; table_index < 7; table_index++) { @@ -378,10 +405,8 @@ class DiskPlotter { // Performs a sort on the left table, Timer sort_timer; - FileDisk d(plot_filename); - Sorting::SortOnDisk(d, begin_byte, begin_byte_next, entry_size_bytes, - 0, bucket_sizes, memory, kMemorySize); - d.Close(); + Sorting::SortOnDisk(tmp1_disk, begin_byte, begin_byte_next, entry_size_bytes, + 0, bucket_sizes, memory, memorySize); sort_timer.PrintElapsed("\tSort time:"); Timer computation_pass_timer; @@ -389,11 +414,11 @@ class DiskPlotter { // Streams to read and right to tables. We will have handles to two tables. We will // read through the left table, compute matches, and evaluate f for matching entries, // writing results to the right table. - std::ifstream left_reader(plot_filename, std::fstream::in | std::fstream::binary); - std::fstream right_writer(plot_filename, std::fstream::out | std::fstream::in | std::fstream::binary); - - left_reader.seekg(begin_byte); - right_writer.seekp(begin_byte_next); + uint64_t left_reader=begin_byte; + uint64_t right_writer=begin_byte_next; + uint8_t *right_writer_buf=memory; + uint64_t right_buf_entries=memorySize/right_entry_size_bytes; + uint64_t right_writer_count=0; FxCalculator f(k, table_index + 1, id); @@ -409,7 +434,7 @@ class DiskPlotter { // Buffers for storing a left or a right entry, used for disk IO uint8_t* left_buf = new uint8_t[entry_size_bytes]; - uint8_t* right_buf = new uint8_t[right_entry_size_bytes]; + uint8_t* right_buf; Bits zero_bits(0, metadata_size); // Start at left table pos = 0 and iterate through the whole table. Note that the left table @@ -418,7 +443,9 @@ class DiskPlotter { PlotEntry left_entry; left_entry.right_metadata = 0; // Reads a left entry from disk - left_reader.read(reinterpret_cast(left_buf), entry_size_bytes); + tmp1_disk.Read(left_reader, left_buf, entry_size_bytes); + left_reader+=entry_size_bytes; + if (table_index == 1) { // For table 1, we only have y and metadata left_entry.y = Util::SliceInt64FromBytes(left_buf, entry_size_bytes, @@ -501,9 +528,18 @@ class DiskPlotter { new_entry += std::get<1>(f_output); // Fill with 0s if entry is not long enough new_entry.AppendValue(0, right_entry_size_bytes * 8 - new_entry.GetSize()); + + right_buf=right_writer_buf+(right_writer_count%right_buf_entries)*right_entry_size_bytes; + right_writer_count++; + new_entry.ToBytes(right_buf); // Writes the new entry into the right table - right_writer.write(reinterpret_cast(right_buf), right_entry_size_bytes); + + if(right_writer_count%right_buf_entries==0) { + tmp1_disk.Write(right_writer, right_writer_buf, + right_buf_entries*right_entry_size_bytes); + right_writer+=right_buf_entries*right_entry_size_bytes; + } // Computes sort bucket, so we can sort the table by y later, more easily right_bucket_sizes[SortOnDiskUtils::ExtractNum(right_buf, right_entry_size_bytes, 0, @@ -532,17 +568,24 @@ class DiskPlotter { std::cout << "\tTotal matches: " << matches << ". Per bucket: " << (matches / num_buckets) << std::endl; + right_buf=right_writer_buf+(right_writer_count%right_buf_entries)*right_entry_size_bytes; + right_writer_count++; + // Writes the 0 entry (EOT) - memset(right_buf, 0, right_entry_size_bytes); - Bits(0, right_entry_size_bytes * 8).ToBytes(right_buf); - right_writer.write(reinterpret_cast(right_buf), right_entry_size_bytes); + memset(right_buf, 0x00, right_entry_size_bytes); + + tmp1_disk.Write(right_writer, right_writer_buf, + (right_writer_count%right_buf_entries)*right_entry_size_bytes); + right_writer+=(right_writer_count%right_buf_entries)*right_entry_size_bytes; + // Writes the start of the table to the header, so we can resume plotting if it // interrups. - right_writer.seekp(header_size - 8 * (12 - table_index)); + right_writer=header_size - 8 * (12 - table_index); uint8_t pointer_buf[8]; Bits(begin_byte_next, 8*8).ToBytes(pointer_buf); - right_writer.write(reinterpret_cast(pointer_buf), 8); + tmp1_disk.Write(right_writer, (pointer_buf), 8); + right_writer+=8; // Resets variables plot_table_begin_pointers[table_index + 1] = begin_byte_next; @@ -550,18 +593,16 @@ class DiskPlotter { bucket_sizes = right_bucket_sizes; right_bucket_sizes = std::vector(kNumSortBuckets, 0); - left_reader.close(); - right_writer.close(); delete[] left_buf; - delete[] right_buf; computation_pass_timer.PrintElapsed("\tComputation pass time:"); table_timer.PrintElapsed("Forward propagation table time:"); } + // Pointer to the end of the last table + 1, used for spare space for disk sorting plot_table_begin_pointers[8] = plot_table_begin_pointers[7] + (right_entry_size_bytes * (total_table_entries + 1)); - delete[] memory; + std::cout << "Final plot table begin pointers: " << std::endl; for (uint8_t i = 1; i <= 8; i++) { std::cout << "\tTable " << int{i} << " 0x" @@ -575,7 +616,7 @@ class DiskPlotter { // The purpose of backpropagate is to eliminate any dead entries that don't contribute // to final values in f7, to minimize disk usage. A sort on disk is applied to each table, // so that they are sorted by position. - void Backpropagate(std::string filename, std::string plot_filename, uint8_t k, + void Backpropagate(uint8_t* memory, FileDisk& tmp1_disk, uint8_t k, const uint8_t* id, const uint8_t* memo, uint32_t memo_len, const std::vector& results) { std::vector plot_table_begin_pointers = results; @@ -589,20 +630,11 @@ class DiskPlotter { // The end of the table 7, is spare space that we can use for sorting uint64_t spare_pointer = plot_table_begin_pointers[8]; - // Memory to be used for sorting - uint8_t* memory = new uint8_t[kMemorySize]; - // Iterates through each table (with a left and right pointer), starting at 6 & 7. for (uint8_t table_index = 7; table_index > 1; --table_index) { //std::vector > match_positions; Timer table_timer; - // We will have reader and writer for both tables. - std::ifstream left_reader(plot_filename, std::ios::in | std::ios::binary); - std::ofstream left_writer(plot_filename, std::ios::in | std::ios::out | std::ios::binary); - std::ifstream right_reader(plot_filename, std::ios::in | std::ios::binary); - std::ofstream right_writer(plot_filename, std::ios::in | std::ios::out | std::ios::binary); - std::cout << "Backpropagating on table " << int{table_index} << std::endl; std::vector new_bucket_sizes_pos(kNumSortBuckets, 0); @@ -619,9 +651,6 @@ class DiskPlotter { // been pruned in previous iteration) uint16_t right_entry_size_bytes = GetMaxEntrySize(k, table_index, false); - left_writer.flush(); - right_writer.flush(); - // Doesn't sort table 7, since it's already sorted by pos6 (position into table 6). // The reason we sort, is so we can iterate through both tables at once. For example, // if we read a right entry (pos, offset) = (456, 2), the next one might be (458, 19), @@ -630,21 +659,29 @@ class DiskPlotter { std::cout << "\tSorting table " << int{table_index} << " starting at " << plot_table_begin_pointers[table_index] << std::endl; Timer sort_timer; - FileDisk d(plot_filename); - Sorting::SortOnDisk(d, plot_table_begin_pointers[table_index], spare_pointer, + Sorting::SortOnDisk(tmp1_disk, plot_table_begin_pointers[table_index], spare_pointer, right_entry_size_bytes, - 0, bucket_sizes_pos, memory, kMemorySize); - d.Close(); + 0, bucket_sizes_pos, memory, memorySize); + sort_timer.PrintElapsed("\tSort time:"); } Timer computation_pass_timer; - left_reader.seekg(plot_table_begin_pointers[table_index - 1]); - left_writer.seekp(plot_table_begin_pointers[table_index - 1]); - right_reader.seekg(plot_table_begin_pointers[table_index]); - right_writer.seekp(plot_table_begin_pointers[table_index]); - left_writer.flush(); - right_writer.flush(); + uint64_t left_reader=plot_table_begin_pointers[table_index - 1]; + uint64_t left_writer=plot_table_begin_pointers[table_index - 1]; + uint64_t right_reader=plot_table_begin_pointers[table_index]; + uint64_t right_writer=plot_table_begin_pointers[table_index]; + uint8_t *left_reader_buf=&(memory[0]); + uint8_t *left_writer_buf=&(memory[memorySize/4]); + uint8_t *right_reader_buf=&(memory[memorySize/2]); + uint8_t *right_writer_buf=&(memory[3*memorySize/4]); + uint64_t left_buf_entries=memorySize/4/left_entry_size_bytes; + uint64_t new_left_buf_entries=memorySize/4/new_left_entry_size_bytes; + uint64_t right_buf_entries=memorySize/4/right_entry_size_bytes; + uint64_t left_reader_count=0; + uint64_t right_reader_count=0; + uint64_t left_writer_count=0; + uint64_t right_writer_count=0; // We will divide by 2, so it must be even. assert(kCachedPositionsSize % 2 == 0); @@ -684,9 +721,9 @@ class DiskPlotter { uint64_t greatest_pos = 0; // This is the greatest position we have seen in R table // Buffers for reading and writing to disk - uint8_t* left_entry_buf = new uint8_t[left_entry_size_bytes]; - uint8_t* new_left_entry_buf = new uint8_t[new_left_entry_size_bytes]; - uint8_t* right_entry_buf = new uint8_t[right_entry_size_bytes]; + uint8_t* left_entry_buf; + uint8_t* new_left_entry_buf; + uint8_t* right_entry_buf; // Go through all right entries, and keep going since write pointer is behind read pointer while (!end_of_right_table || (current_pos - end_of_table_pos <= kReadMinusWrite)) { @@ -715,8 +752,17 @@ class DiskPlotter { while (!end_of_right_table) { if (should_read_entry) { // Need to read another entry at the current position - right_reader.read(reinterpret_cast(right_entry_buf), - right_entry_size_bytes); + if(right_reader_count%right_buf_entries==0) { + uint64_t readAmt=std::min(right_buf_entries*right_entry_size_bytes, + plot_table_begin_pointers[table_index+1]-plot_table_begin_pointers[table_index]-right_reader_count*right_entry_size_bytes); + + tmp1_disk.Read(right_reader, right_reader_buf, + readAmt); + right_reader+=readAmt; + } + right_entry_buf=right_reader_buf+(right_reader_count%right_buf_entries)*right_entry_size_bytes; + right_reader_count++; + if (table_index == 7) { // This is actually y for table 7 entry_sort_key = Util::SliceInt64FromBytes(right_entry_buf, right_entry_size_bytes, @@ -778,9 +824,16 @@ class DiskPlotter { break; } } - // Reads a left entry - left_reader.read(reinterpret_cast(left_entry_buf), - left_entry_size_bytes); + // ***Reads a left entry + if(left_reader_count%left_buf_entries==0) { + uint64_t readAmt=std::min(left_buf_entries*left_entry_size_bytes, + plot_table_begin_pointers[table_index]-plot_table_begin_pointers[table_index-1]-left_reader_count*left_entry_size_bytes); + tmp1_disk.Read(left_reader, left_reader_buf, + readAmt); + left_reader+=readAmt; + } + left_entry_buf=left_reader_buf+(left_reader_count%left_buf_entries)*left_entry_size_bytes; + left_reader_count++; // If this left entry is used, we rewrite it. If it's not used, we ignore it. if (used_positions[current_pos % kCachedPositionsSize]) { @@ -799,6 +852,10 @@ class DiskPlotter { entry_metadata = Util::SliceInt128FromBytes(left_entry_buf, left_entry_size_bytes, k + kExtraBits, left_metadata_size); } + + new_left_entry_buf=left_writer_buf+(left_writer_count%new_left_buf_entries)*new_left_entry_size_bytes; + left_writer_count++; + Bits new_left_entry; if (table_index > 2) { // The new left entry is slightly different. Metadata is dropped, to save space, @@ -819,7 +876,12 @@ class DiskPlotter { // std::cout << "Writing X:" << entry_metadata.GetValue() << std::endl; } new_left_entry.ToBytes(new_left_entry_buf); - left_writer.write(reinterpret_cast(new_left_entry_buf), new_left_entry_size_bytes); + + if(left_writer_count%new_left_buf_entries==0) { + tmp1_disk.Write(left_writer, left_writer_buf, + new_left_buf_entries*new_left_entry_size_bytes); + left_writer+=new_left_buf_entries*new_left_entry_size_bytes; + } new_bucket_sizes_pos[SortOnDiskUtils::ExtractNum(new_left_entry_buf, new_left_entry_size_bytes, 0, kLogNumSortBuckets)] += 1; @@ -848,12 +910,23 @@ class DiskPlotter { new_right_entry += new_pos_bin; //match_positions.push_back(std::make_pair(new_pos, new_offset_pos)); new_right_entry.AppendValue(new_offset_pos - new_pos, kOffsetSize); + + // Calculate right entry pointer for output + right_entry_buf=right_writer_buf+(right_writer_count%right_buf_entries)*right_entry_size_bytes; + right_writer_count++; + if (Util::ByteAlign(new_right_entry.GetSize()) < right_entry_size_bytes * 8) { memset(right_entry_buf, 0, right_entry_size_bytes); } new_right_entry.ToBytes(right_entry_buf); - right_writer.write(reinterpret_cast(right_entry_buf), - right_entry_size_bytes); + + // Check for write out to disk + if(right_writer_count%right_buf_entries==0) { + tmp1_disk.Write(right_writer, right_writer_buf, + right_buf_entries*right_entry_size_bytes); + right_writer+=right_buf_entries*right_entry_size_bytes; + } + } } ++current_pos; @@ -863,23 +936,27 @@ class DiskPlotter { computation_pass_timer.PrintElapsed("\tComputation pass time:"); table_timer.PrintElapsed("Total backpropagation time::"); - Bits(0, right_entry_size_bytes * 8).ToBytes(right_entry_buf); - right_writer.write(reinterpret_cast(right_entry_buf), right_entry_size_bytes); - Bits(0, new_left_entry_size_bytes * 8).ToBytes(new_left_entry_buf); - left_writer.write(reinterpret_cast(new_left_entry_buf), new_left_entry_size_bytes); + right_entry_buf=right_writer_buf+(right_writer_count%right_buf_entries)*right_entry_size_bytes; + right_writer_count++; - left_reader.close(); - left_writer.close(); - right_reader.close(); - right_writer.close(); + memset(right_entry_buf, 0x00, right_entry_size_bytes); + tmp1_disk.Write(right_writer, right_writer_buf, + (right_writer_count%right_buf_entries)*right_entry_size_bytes); + right_writer+=(right_writer_count%right_buf_entries)*right_entry_size_bytes; + + new_left_entry_buf=left_writer_buf+(left_writer_count%new_left_buf_entries)*new_left_entry_size_bytes; + left_writer_count++; + + memset(new_left_entry_buf,0x00,new_left_entry_size_bytes); + + tmp1_disk.Write(left_writer, left_writer_buf, + (left_writer_count%new_left_buf_entries)*new_left_entry_size_bytes); + left_writer+=(left_writer_count%new_left_buf_entries)*new_left_entry_size_bytes; + bucket_sizes_pos = new_bucket_sizes_pos; - delete[] left_entry_buf; - delete[] new_left_entry_buf; - delete[] right_entry_buf; } - delete[] memory; } // This writes a number of entries into a file, in the final, optimized format. The park contains @@ -888,16 +965,18 @@ class DiskPlotter { // the delta bits are optimized into a variable encoding scheme. Since we have many entries in each // park, we can approximate how much space each park with take. // Format is: [2k bits of first_line_point] [EPP-1 stubs] [Deltas size] [EPP-1 deltas].... [first_line_point] ... - void WriteParkToFile(std::ofstream &writer, uint64_t table_start, uint64_t park_index, uint32_t park_size_bytes, + void WriteParkToFile(FileDisk& final_disk, uint64_t table_start, uint64_t park_index, uint32_t park_size_bytes, uint128_t first_line_point, const std::vector& park_deltas, const std::vector& park_stubs, uint8_t k, uint8_t table_index) { // Parks are fixed size, so we know where to start writing. The deltas will not go over // into the next park. - writer.seekp(table_start + park_index * park_size_bytes); + uint64_t writer=table_start + park_index * park_size_bytes; + uint8_t *index = parkToFileBytes; + Bits first_line_point_bits(first_line_point, 2*k); - memset(first_line_point_bytes, 0, CalculateLinePointSize(k)); - first_line_point_bits.ToBytes(first_line_point_bytes); - writer.write((const char*)first_line_point_bytes, CalculateLinePointSize(k)); + memset(parkToFileBytes, 0, CalculateLinePointSize(k)); + first_line_point_bits.ToBytes(index); + index+=CalculateLinePointSize(k); // We use ParkBits insted of Bits since it allows storing more data ParkBits park_stubs_bits; @@ -905,9 +984,9 @@ class DiskPlotter { park_stubs_bits.AppendValue(stub, (k - kStubMinusBits)); } uint32_t stubs_size = CalculateStubsSize(k); - memset(park_stubs_bytes, 0, stubs_size); - park_stubs_bits.ToBytes(park_stubs_bytes); - writer.write((const char*)park_stubs_bytes, stubs_size); + memset(index, 0, stubs_size); + park_stubs_bits.ToBytes(index); + index+=stubs_size; // The stubs are random so they don't need encoding. But deltas are more likely to // be small, so we can compress them @@ -918,20 +997,29 @@ class DiskPlotter { { // Uncompressed uint16_t unencoded_size=0x8000|park_deltas.size(); - writer.write((const char*)&unencoded_size, 2); - writer.write((const char*)park_deltas.data(), park_deltas.size()); + + index[0]=unencoded_size&0xff; + index[1]=unencoded_size>>8; + index+=2; + + memcpy(index,park_deltas.data(),park_deltas.size()); + index+=park_deltas.size(); } else { // Compressed - deltas_bits.ToBytes(park_deltas_bytes); - uint16_t encoded_size = deltas_bits.GetSize() / 8; - assert((uint32_t)(encoded_size + 2) < CalculateMaxDeltasSize(k, table_index)); - writer.write((const char*)&encoded_size, 2); - writer.write((const char*)park_deltas_bytes, encoded_size); + index[0]=encoded_size&0xff; + index[1]=encoded_size>>8; + index+=2; + + deltas_bits.ToBytes(index); + index+=encoded_size; } + + assert((uint32_t)(index-parkToFileBytes) <= parkToFileBytes.size()); + final_disk.Write(writer, (uint8_t *)parkToFileBytes, index-parkToFileBytes); } // Compresses the plot file tables into the final file. In order to do this, entries must be @@ -945,15 +1033,11 @@ class DiskPlotter { // Converting into this format requires a few passes and sorts on disk. It also assumes that the // backpropagation step happened, so there will be no more dropped entries. See the design // document for more details on the algorithm. - Phase3Results CompressTables(uint8_t k, vector plot_table_begin_pointers, std::string filename, - std::string plot_filename, const uint8_t* id, const uint8_t* memo, + Phase3Results CompressTables(uint8_t* memory, uint8_t k, vector plot_table_begin_pointers, FileDisk& tmp2_disk /*filename*/, + FileDisk& tmp1_disk /*plot_filename*/, const uint8_t* id, const uint8_t* memo, uint32_t memo_len) { // In this phase we open a new file, where the final contents of the plot will be stored. - std::ofstream header_writer(filename, std::ios::out | std::ios::trunc | std::ios::binary); - if (!header_writer.is_open()) { - throw std::string("Final file not opened correct"); - } - uint32_t header_size = WriteHeader(header_writer, k, id, memo, memo_len); + uint32_t header_size = WriteHeader(tmp2_disk, k, id, memo, memo_len); uint8_t pos_size = k + 1; @@ -961,16 +1045,12 @@ class DiskPlotter { std::vector final_table_begin_pointers(12, 0); final_table_begin_pointers[1] = header_size; - header_writer.seekp(header_size - 10*8); uint8_t table_1_pointer_bytes[8*8]; Bits(final_table_begin_pointers[1], 8*8).ToBytes(table_1_pointer_bytes); - header_writer.write((const char*)table_1_pointer_bytes, 8); - header_writer.close(); + tmp2_disk.Write(header_size - 10*8, table_1_pointer_bytes, 8); uint64_t spare_pointer = plot_table_begin_pointers[8]; - uint8_t* memory = new uint8_t[kMemorySize]; - uint64_t final_entries_written = 0; uint32_t right_entry_size_bytes = 0; @@ -984,9 +1064,6 @@ class DiskPlotter { Timer table_timer; Timer computation_pass_1_timer; std::cout << "Compressing tables " << int{table_index} << " and " << int{table_index + 1} << std::endl; - std::ifstream left_reader(plot_filename, std::ios::in | std::ios::binary); - std::ifstream right_reader(plot_filename, std::ios::in | std::ios::binary); - std::ofstream right_writer(plot_filename, std::ios::in | std::ios::out | std::ios::binary); // The park size must be constant, for simplicity, but must be big enough to store EPP entries. // entry deltas are encoded with variable length, and thus there is no guarantee that they @@ -1004,9 +1081,17 @@ class DiskPlotter { uint32_t left_entry_size_bytes = GetMaxEntrySize(k, table_index, false); right_entry_size_bytes = GetMaxEntrySize(k, table_index + 1, false); - left_reader.seekg(plot_table_begin_pointers[table_index]); - right_reader.seekg(plot_table_begin_pointers[table_index + 1]); - right_writer.seekp(plot_table_begin_pointers[table_index + 1]); + uint64_t left_reader=plot_table_begin_pointers[table_index]; + uint64_t right_reader=plot_table_begin_pointers[table_index + 1]; + uint64_t right_writer=plot_table_begin_pointers[table_index + 1]; + uint8_t *left_reader_buf=&(memory[0]); + uint8_t *right_reader_buf=&(memory[memorySize/3]); + uint8_t *right_writer_buf=&(memory[2*memorySize/3]); + uint64_t left_buf_entries=memorySize/3/left_entry_size_bytes; + uint64_t right_buf_entries=memorySize/3/right_entry_size_bytes; + uint64_t left_reader_count=0; + uint64_t right_reader_count=0; + uint64_t right_writer_count=0; bool should_read_entry = true; std::vector left_new_pos(kCachedPositionsSize); @@ -1022,8 +1107,9 @@ class DiskPlotter { uint64_t end_of_table_pos = 0; uint64_t greatest_pos = 0; - uint8_t* right_entry_buf = new uint8_t[right_entry_size_bytes]; - uint8_t* left_entry_disk_buf = new uint8_t[left_entry_size_bytes]; + uint8_t* right_entry_buf; + uint8_t* right_entry_buf_out; + uint8_t* left_entry_disk_buf; uint64_t entry_sort_key, entry_pos, entry_offset; uint64_t cached_entry_sort_key = 0; uint64_t cached_entry_pos = 0; @@ -1037,8 +1123,17 @@ class DiskPlotter { while (!end_of_right_table) { if (should_read_entry) { // The right entries are in the format from backprop, (sort_key, pos, offset) - right_reader.read(reinterpret_cast(right_entry_buf), - right_entry_size_bytes); + if(right_reader_count%right_buf_entries==0) { + uint64_t readAmt=std::min(right_buf_entries*right_entry_size_bytes, + plot_table_begin_pointers[table_index+2]-plot_table_begin_pointers[table_index+1]-right_reader_count*right_entry_size_bytes); + + tmp1_disk.Read(right_reader, right_reader_buf, + readAmt); + right_reader+=readAmt; + } + right_entry_buf=right_reader_buf+(right_reader_count%right_buf_entries)*right_entry_size_bytes; + right_reader_count++; + entry_sort_key = Util::SliceInt64FromBytes(right_entry_buf, right_entry_size_bytes, 0, right_sort_key_size); entry_pos = Util::SliceInt64FromBytes(right_entry_buf, right_entry_size_bytes, @@ -1077,7 +1172,17 @@ class DiskPlotter { } } // The left entries are in the new format: (sort_key, new_pos), except for table 1: (y, x). - left_reader.read(reinterpret_cast(left_entry_disk_buf), left_entry_size_bytes); + if(left_reader_count%left_buf_entries==0) { + uint64_t readAmt=std::min(left_buf_entries*left_entry_size_bytes, + plot_table_begin_pointers[table_index+1]-plot_table_begin_pointers[table_index]-left_reader_count*left_entry_size_bytes); + + tmp1_disk.Read(left_reader, left_reader_buf, + readAmt); + left_reader+=readAmt; + } + left_entry_disk_buf=left_reader_buf+(left_reader_count%left_buf_entries)*left_entry_size_bytes; + left_reader_count++; + // We read the "new_pos" from the L table, which for table 1 is just x. For other tables, // the new_pos if (table_index == 1) { @@ -1116,40 +1221,51 @@ class DiskPlotter { Bits to_write = Bits(line_point, 2*k); to_write += Bits(old_sort_keys[write_pointer_pos % kReadMinusWrite][counter], right_sort_key_size); + right_entry_buf=right_writer_buf+(right_writer_count%right_buf_entries)*right_entry_size_bytes; + right_writer_count++; + to_write.ToBytes(right_entry_buf); - right_writer.write((const char*)right_entry_buf, right_entry_size_bytes); + + if(right_writer_count%right_buf_entries==0) { + tmp1_disk.Write(right_writer, right_writer_buf, + right_buf_entries*right_entry_size_bytes); + right_writer+=right_buf_entries*right_entry_size_bytes; + } + bucket_sizes[SortOnDiskUtils::ExtractNum(right_entry_buf, right_entry_size_bytes, 0, kLogNumSortBuckets)] += 1; } } current_pos += 1; } - memset(right_entry_buf, 0, right_entry_size_bytes); - right_writer.write(reinterpret_cast(right_entry_buf), - right_entry_size_bytes); + right_entry_buf=right_writer_buf+(right_writer_count%right_buf_entries)*right_entry_size_bytes; + right_writer_count++; - left_reader.close(); - right_writer.close(); - right_reader.close(); + memset(right_entry_buf, 0, right_entry_size_bytes); + tmp1_disk.Write(right_writer, right_writer_buf, + (right_writer_count%right_buf_entries)*right_entry_size_bytes); + right_writer+=(right_writer_count%right_buf_entries)*right_entry_size_bytes; + computation_pass_1_timer.PrintElapsed("\tFirst computation pass time:"); Timer sort_timer; std::cout << "\tSorting table " << int{table_index + 1} << std::endl; - FileDisk d = FileDisk(plot_filename); - Sorting::SortOnDisk(d, plot_table_begin_pointers[table_index + 1], spare_pointer, - right_entry_size_bytes, 0, bucket_sizes, memory, kMemorySize, /*quicksort=*/1); - d.Close(); + Sorting::SortOnDisk(tmp1_disk, plot_table_begin_pointers[table_index + 1], spare_pointer, + right_entry_size_bytes, 0, bucket_sizes, memory, memorySize, /*quicksort=*/1); + sort_timer.PrintElapsed("\tSort time:"); Timer computation_pass_2_timer; - std::ifstream right_reader_2(plot_filename, std::ios::in | std::ios::binary); - std::ofstream right_writer_2(plot_filename, std::ios::in | std::ios::out | std::ios::binary); - right_reader_2.seekg(plot_table_begin_pointers[table_index + 1]); - right_writer_2.seekp(plot_table_begin_pointers[table_index + 1]); + right_reader=plot_table_begin_pointers[table_index + 1]; + right_writer=plot_table_begin_pointers[table_index + 1]; + right_reader_buf=memory; + right_writer_buf=&(memory[memorySize/2]); + right_buf_entries=memorySize/2/right_entry_size_bytes; + right_reader_count=0; + right_writer_count=0; + uint64_t final_table_writer=final_table_begin_pointers[table_index]; - std::ofstream final_table_writer(filename, std::ios::in | std::ios::out | std::ios::binary); - final_table_writer.seekp(final_table_begin_pointers[table_index]); final_entries_written = 0; std::vector new_bucket_sizes(kNumSortBuckets, 0); @@ -1168,8 +1284,17 @@ class DiskPlotter { // checkpoint in each group. Bits right_entry_bits; for (uint64_t index = 0; index < total_r_entries; index++) { - right_reader_2.read(reinterpret_cast(right_entry_buf), - right_entry_size_bytes); + if(right_reader_count%right_buf_entries==0) { + uint64_t readAmt=std::min(right_buf_entries*right_entry_size_bytes, + (total_r_entries-right_reader_count)*right_entry_size_bytes); + + tmp1_disk.Read(right_reader, right_reader_buf, + readAmt); + right_reader+=readAmt; + } + right_entry_buf=right_reader_buf+(right_reader_count%right_buf_entries)*right_entry_size_bytes; + right_reader_count++; + // Right entry is read as (line_point, sort_key) uint128_t line_point = Util::SliceInt128FromBytes(right_entry_buf, right_entry_size_bytes, 0, 2*k); @@ -1179,16 +1304,27 @@ class DiskPlotter { // Write the new position (index) and the sort key Bits to_write = Bits(sort_key, right_sort_key_size); to_write += Bits(index, k + 1); - memset(right_entry_buf, 0, right_entry_size_bytes); - to_write.ToBytes(right_entry_buf); - right_writer_2.write(reinterpret_cast(right_entry_buf), right_entry_size_bytes); - new_bucket_sizes[SortOnDiskUtils::ExtractNum(right_entry_buf, right_entry_size_bytes, 0, + // Calculate right entry pointer for output + right_entry_buf_out=right_writer_buf+(right_writer_count%right_buf_entries)*right_entry_size_bytes; + right_writer_count++; + + memset(right_entry_buf_out, 0, right_entry_size_bytes); + to_write.ToBytes(right_entry_buf_out); + + // Check for write out to disk + if(right_writer_count%right_buf_entries==0) { + tmp1_disk.Write(right_writer, right_writer_buf, + right_buf_entries*right_entry_size_bytes); + right_writer+=right_buf_entries*right_entry_size_bytes; + } + + new_bucket_sizes[SortOnDiskUtils::ExtractNum(right_entry_buf_out, right_entry_size_bytes, 0, kLogNumSortBuckets)] += 1; // Every EPP entries, writes a park if (index % kEntriesPerPark == 0) { if (index != 0) { - WriteParkToFile(final_table_writer, final_table_begin_pointers[table_index], + WriteParkToFile(tmp2_disk, final_table_begin_pointers[table_index], park_index, park_size_bytes, checkpoint_line_point, park_deltas, park_stubs, k, table_index); park_index += 1; @@ -1222,12 +1358,15 @@ class DiskPlotter { } last_line_point = line_point; } - right_reader_2.close(); - right_writer_2.close(); + + tmp1_disk.Write(right_writer, right_writer_buf, + (right_writer_count%right_buf_entries)*right_entry_size_bytes); + right_writer+=(right_writer_count%right_buf_entries)*right_entry_size_bytes; + if (park_deltas.size() > 0) { // Since we don't have a perfect multiple of EPP entries, this writes the last ones - WriteParkToFile(final_table_writer, final_table_begin_pointers[table_index], + WriteParkToFile(tmp2_disk, final_table_begin_pointers[table_index], park_index, park_size_bytes, checkpoint_line_point, park_deltas, park_stubs, k, table_index); final_entries_written += (park_stubs.size() + 1); @@ -1238,33 +1377,26 @@ class DiskPlotter { final_table_begin_pointers[table_index + 1] = final_table_begin_pointers[table_index] + (park_index + 1) * park_size_bytes; - final_table_writer.seekp(header_size - 8 * (10 - table_index)); + final_table_writer=header_size - 8 * (10 - table_index); uint8_t table_pointer_bytes[8*8]; Bits(final_table_begin_pointers[table_index + 1], 8*8).ToBytes(table_pointer_bytes); - final_table_writer.write(reinterpret_cast(table_pointer_bytes), 8); - - final_table_writer.close(); + tmp2_disk.Write(final_table_writer, (table_pointer_bytes), 8); + final_table_writer+=8; computation_pass_2_timer.PrintElapsed("\tSecond computation pass time:"); Timer sort_timer_2; std::cout << "\tRe-Sorting table " << int{table_index + 1} << std::endl; - FileDisk d_2 = FileDisk(plot_filename); // This sort is needed so that in the next iteration, we can iterate through both tables // at ones. Note that sort_key represents y ordering, and the pos, offset coordinates from // forward/backprop represent positions in y ordered tables. - Sorting::SortOnDisk(d_2, plot_table_begin_pointers[table_index + 1], spare_pointer, - right_entry_size_bytes, 0, new_bucket_sizes, memory, kMemorySize); - d_2.Close(); - sort_timer_2.PrintElapsed("\tSort time:"); + Sorting::SortOnDisk(tmp1_disk, plot_table_begin_pointers[table_index + 1], spare_pointer, + right_entry_size_bytes, 0, new_bucket_sizes, memory, memorySize); - delete[] right_entry_buf; - delete[] left_entry_disk_buf; + sort_timer_2.PrintElapsed("\tSort time:"); table_timer.PrintElapsed("Total compress table time:"); } - delete[] memory; - // These results will be used to write table P7 and the checkpoint tables in phase 4. return Phase3Results{plot_table_begin_pointers, final_table_begin_pointers, final_entries_written, right_entry_size_bytes * 8, header_size}; @@ -1285,12 +1417,8 @@ class DiskPlotter { // C1 (checkpoint values) // C2 (checkpoint values into) // C3 (deltas of f7s between C1 checkpoints) - void WriteCTables(uint8_t k, uint8_t pos_size, std::string filename, std::string plot_filename, + void WriteCTables(uint8_t k, uint8_t pos_size, FileDisk& tmp2_disk /*filename*/, FileDisk& tmp1_disk /*plot_filename*/, Phase3Results& res) { - std::ofstream final_file_writer_1(filename, std::ios::in | std::ios::out | std::ios::binary); - std::ofstream final_file_writer_2(filename, std::ios::in | std::ios::out | std::ios::binary); - std::ofstream final_file_writer_3(filename, std::ios::in | std::ios::out | std::ios::binary); - std::ifstream plot_file_reader(plot_filename, std::ios::in | std::ios::binary); uint32_t P7_park_size = Util::ByteAlign((k+1) * kEntriesPerPark)/8; uint64_t number_of_p7_parks = ((res.final_entries_written == 0 ? 0 : res.final_entries_written - 1) @@ -1312,10 +1440,10 @@ class DiskPlotter { res.final_table_begin_pointers[10] = begin_byte_C3; res.final_table_begin_pointers[11] = end_byte; - plot_file_reader.seekg(res.plot_table_begin_pointers[7]); - final_file_writer_1.seekp(begin_byte_C1); - final_file_writer_2.seekp(begin_byte_C3); - final_file_writer_3.seekp(res.final_table_begin_pointers[7]); + uint64_t plot_file_reader=res.plot_table_begin_pointers[7]; + uint64_t final_file_writer_1=begin_byte_C1; + uint64_t final_file_writer_2=begin_byte_C3; + uint64_t final_file_writer_3=res.final_table_begin_pointers[7]; uint64_t prev_y = 0; std::vector C2; @@ -1335,8 +1463,9 @@ class DiskPlotter { // We read each table7 entry, which is sorted by f7, but we don't need f7 anymore. Instead, // we will just store pos6, and the deltas in table C3, and checkpoints in tables C1 and C2. for (uint64_t f7_position = 0; f7_position < res.final_entries_written; f7_position++) { - plot_file_reader.read(reinterpret_cast(right_entry_buf), + tmp1_disk.Read(plot_file_reader, (right_entry_buf), right_entry_size_bytes); + plot_file_reader+=right_entry_size_bytes; uint64_t entry_y = Util::SliceInt64FromBytes(right_entry_buf, right_entry_size_bytes, 0, k); uint64_t entry_new_pos = Util::SliceInt64FromBytes(right_entry_buf, right_entry_size_bytes, k, pos_size); @@ -1345,7 +1474,8 @@ class DiskPlotter { if (f7_position % kEntriesPerPark == 0 && f7_position > 0) { memset(P7_entry_buf, 0, P7_park_size); to_write_p7.ToBytes(P7_entry_buf); - final_file_writer_3.write(reinterpret_cast(P7_entry_buf), P7_park_size); + tmp2_disk.Write(final_file_writer_3, (P7_entry_buf), P7_park_size); + final_file_writer_3+=P7_park_size; to_write_p7 = ParkBits(); } @@ -1353,10 +1483,11 @@ class DiskPlotter { if (f7_position % kCheckpoint1Interval == 0) { entry_y_bits.ToBytes(C1_entry_buf); - final_file_writer_1.write(reinterpret_cast(C1_entry_buf), + tmp2_disk.Write(final_file_writer_1, (C1_entry_buf), Util::ByteAlign(k) / 8); + final_file_writer_1+= Util::ByteAlign(k) / 8; if (num_C1_entries > 0) { - final_file_writer_2.seekp(begin_byte_C3 + (num_C1_entries - 1) * size_C3); + final_file_writer_2=begin_byte_C3 + (num_C1_entries - 1) * size_C3; ParkBits to_write = Encoding::ANSEncodeDeltas(deltas_to_write, kC3R); // We need to be careful because deltas are variable sized, and they need to fit @@ -1367,7 +1498,8 @@ class DiskPlotter { Bits(to_write.GetSize() / 8, 16).ToBytes(C3_entry_buf); to_write.ToBytes(C3_entry_buf + 2); - final_file_writer_2.write(reinterpret_cast(C3_entry_buf), num_bytes); + tmp2_disk.Write(final_file_writer_2, (C3_entry_buf), num_bytes); + final_file_writer_2+=num_bytes; } prev_y = entry_y; if (f7_position % (kCheckpoint1Interval * kCheckpoint2Interval) == 0) { @@ -1389,35 +1521,40 @@ class DiskPlotter { memset(P7_entry_buf, 0, P7_park_size); to_write_p7.ToBytes(P7_entry_buf); - final_file_writer_3.write(reinterpret_cast(P7_entry_buf), P7_park_size); + tmp2_disk.Write(final_file_writer_3, (P7_entry_buf), P7_park_size); + final_file_writer_3+=P7_park_size; if (deltas_to_write.size() != 0) { ParkBits to_write = Encoding::ANSEncodeDeltas(deltas_to_write, kC3R); memset(C3_entry_buf, 0, size_C3); - final_file_writer_2.seekp(begin_byte_C3 + (num_C1_entries - 1) * size_C3); + final_file_writer_2=begin_byte_C3 + (num_C1_entries - 1) * size_C3; // Writes the size, and then the data Bits(to_write.GetSize() / 8, 16).ToBytes(C3_entry_buf); to_write.ToBytes(C3_entry_buf + 2); - final_file_writer_2.write(reinterpret_cast(C3_entry_buf), size_C3); + tmp2_disk.Write(final_file_writer_2, (C3_entry_buf), size_C3); + final_file_writer_2+=size_C3; } Bits(0, Util::ByteAlign(k)).ToBytes(C1_entry_buf); - final_file_writer_1.write(reinterpret_cast(C1_entry_buf), + tmp2_disk.Write(final_file_writer_1, (C1_entry_buf), Util::ByteAlign(k)/8); + final_file_writer_1+=Util::ByteAlign(k)/8; std::cout << "\tFinished writing C1 and C3 tables" << std::endl; std::cout << "\tWriting C2 table" << std::endl; for (Bits& C2_entry : C2) { C2_entry.ToBytes(C1_entry_buf); - final_file_writer_1.write(reinterpret_cast(C1_entry_buf), + tmp2_disk.Write(final_file_writer_1, (C1_entry_buf), Util::ByteAlign(k)/8); + final_file_writer_1+=Util::ByteAlign(k)/8; } Bits(0, Util::ByteAlign(k)).ToBytes(C1_entry_buf); - final_file_writer_1.write(reinterpret_cast(C1_entry_buf), + tmp2_disk.Write(final_file_writer_1, (C1_entry_buf), Util::ByteAlign(k)/8); + final_file_writer_1+=Util::ByteAlign(k)/8; std::cout << "\tFinished writing C2 table" << std::endl; delete[] C3_entry_buf; @@ -1425,21 +1562,19 @@ class DiskPlotter { delete[] P7_entry_buf; delete[] right_entry_buf; - final_file_writer_1.seekp(res.header_size - 8 * 3); + final_file_writer_1=res.header_size - 8 * 3; uint8_t table_pointer_bytes[8*8]; // Writes the pointers to the start of the tables, for proving Bits(res.final_table_begin_pointers[8], 8*8).ToBytes(table_pointer_bytes); - final_file_writer_1.write(reinterpret_cast(table_pointer_bytes), 8); + tmp2_disk.Write(final_file_writer_1, (table_pointer_bytes), 8); + final_file_writer_1+=8; Bits(res.final_table_begin_pointers[9], 8*8).ToBytes(table_pointer_bytes); - final_file_writer_1.write(reinterpret_cast(table_pointer_bytes), 8); + tmp2_disk.Write(final_file_writer_1, (table_pointer_bytes), 8); + final_file_writer_1+=8; Bits(res.final_table_begin_pointers[10], 8*8).ToBytes(table_pointer_bytes); - final_file_writer_1.write(reinterpret_cast(table_pointer_bytes), 8); - - plot_file_reader.close(); - final_file_writer_1.close(); - final_file_writer_2.close(); - final_file_writer_3.close(); + tmp2_disk.Write(final_file_writer_1, (table_pointer_bytes), 8); + final_file_writer_1+=8; std::cout << "\tFinal table pointers:" << std::endl; diff --git a/src/sort_on_disk.hpp b/src/sort_on_disk.hpp index 3c69e1680..3c78645ba 100644 --- a/src/sort_on_disk.hpp +++ b/src/sort_on_disk.hpp @@ -85,41 +85,45 @@ class SortOnDiskUtils { class Disk { public: virtual void Read(uint64_t begin, uint8_t* memcache, uint64_t length) = 0; - virtual void Write(uint64_t begin, uint8_t* memcache, uint64_t length) = 0; + virtual void Write(uint64_t begin, const uint8_t* memcache, uint64_t length) = 0; }; class FileDisk : public Disk { public: inline explicit FileDisk(const std::string& filename) { - Initialize(filename); + filename_ = filename; + + // Opens the file for reading and writing + f_=fopen(filename.c_str(), "w+b"); + + if (f_==NULL) { + std::cout << "Failed to open" << std::endl; + throw std::string("File not opened correct"); + } } - inline void Close() { - f_.close(); + ~FileDisk() { + fclose(f_); } inline void Read(uint64_t begin, uint8_t* memcache, uint64_t length) override { // Seek, read, and replace into memcache - if((!bReading)||(begin!=readPos)) - { - // std::cout << &f_ << ": Read seek " << begin << " for " << length << std::endl; - f_.seekg(begin); - bReading=true; - } - f_.read(reinterpret_cast(memcache), length); - readPos=begin+length; + if((!bReading)||(begin!=readPos)) { + fseek(f_,begin,SEEK_SET); + bReading=true; + } + fread(reinterpret_cast(memcache), sizeof(uint8_t), length, f_); + readPos=begin+length; } - inline void Write(uint64_t begin, uint8_t* memcache, uint64_t length) override { + inline void Write(uint64_t begin, const uint8_t* memcache, uint64_t length) override { // Seek and write from memcache - if((bReading)||(begin!=writePos)) - { - // std::cout << &f_ << ": Write seek " << begin << " for " << length << std::endl; - f_.seekp(begin); - bReading=false; - } - f_.write(reinterpret_cast(memcache), length); - writePos=begin+length; + if((bReading)||(begin!=writePos)) { + fseek(f_,begin,SEEK_SET); + bReading=false; + } + fwrite(reinterpret_cast(memcache), sizeof(uint8_t), length, f_); + writePos=begin+length; } inline std::string GetFileName() const noexcept { @@ -127,31 +131,12 @@ class FileDisk : public Disk { } private: - void Initialize(const std::string& filename) { - filename_ = filename; - - // Creates the file if it does not exist - std::fstream f; - - f.open(filename, std::fstream::out | std::fstream::app); - f << std::flush; - f.close(); - - // Opens the file for reading and writing - f_.open(filename, std::fstream::out | std::fstream::in | std::fstream::binary); - - if (!f_.is_open()) { - std::cout << "Fialed to open" << std::endl; - throw std::string("File not opened correct"); - } - } - uint64_t readPos=0; uint64_t writePos=0; bool bReading=true; std::string filename_; - std::fstream f_; + FILE *f_; }; // Store values bucketed by their leading bits into an array-like memcache. @@ -446,6 +431,7 @@ class Sorting { inline static void SortOnDisk(Disk& disk, uint64_t disk_begin, uint64_t spare_begin, uint32_t entry_len, uint32_t bits_begin, std::vector bucket_sizes, uint8_t* mem, uint64_t mem_len, int quicksort = 0) { + uint64_t length = mem_len / entry_len; uint64_t total_size = 0; // bucket_sizes[i] represent how many entries start with the prefix i (from 0000 to 1111). @@ -508,9 +494,9 @@ class Sorting { while (to_consume > 0) { uint64_t next_amount = std::min(length, to_consume); disk.Read(disk_begin + (bucket_begins[i] + consumed_per_bucket[i]) * entry_len, - mem, next_amount * entry_len); + mem, next_amount * entry_len); disk.Write(spare_begin + spare_written * entry_len, - mem, next_amount * entry_len); + mem, next_amount * entry_len); to_consume -= next_amount; spare_written += next_amount; consumed_per_bucket[i] += next_amount; @@ -526,7 +512,7 @@ class Sorting { // Populate BucketStore from spare. while (!bstore.IsFull() && spare_consumed < spare_written) { disk.Read(read_pos, buf, entry_len); - read_pos+=entry_len; + read_pos+=entry_len; bstore.Store(buf, entry_len); spare_consumed += 1; } @@ -549,7 +535,7 @@ class Sorting { } // Write the content of the bucket in the right spot (beginning of the bucket + number of entries already written // in that bucket). - uint64_t write_pos=disk_begin + (bucket_begins[b] + written_per_bucket[b]) * entry_len; + uint64_t write_pos=disk_begin + (bucket_begins[b] + written_per_bucket[b]) * entry_len; uint64_t size; // Don't extract from the bucket more entries than the difference between read and written entries (this avoids // overwritting entries that were not read yet). @@ -563,8 +549,8 @@ class Sorting { for (uint64_t i = 0; i < size; i += entry_size) { EntryToBytes(bucket_handle, i, i + entry_size, last_size, buf); disk.Write(write_pos, buf, entry_len); - write_pos+=entry_len; - written_per_bucket[b] += 1; + write_pos+=entry_len; + written_per_bucket[b] += 1; subbucket_sizes[b][SortOnDiskUtils::ExtractNum(buf, entry_len, bits_begin + bucket_log, bucket_log)] += 1; } @@ -590,7 +576,7 @@ class Sorting { uint64_t read_pos=disk_begin + (bucket_begins[i] + consumed_per_bucket[i]) * entry_len; while (!bstore.IsFull() && consumed_per_bucket[i] < bucket_sizes[i]) { disk.Read(read_pos, buf, entry_len); - read_pos+=entry_len; + read_pos+=entry_len; bstore.Store(buf, entry_len); consumed_per_bucket[i] += 1; } @@ -602,10 +588,10 @@ class Sorting { // If BucketStore still isn't full and we've read all entries from buckets, start populating from the spare space. if (!broke) { - uint64_t read_pos=spare_begin + spare_consumed * entry_len; + uint64_t read_pos=spare_begin + spare_consumed * entry_len; while (!bstore.IsFull() && spare_consumed < spare_written) { disk.Read(read_pos, buf, entry_len); - read_pos+=entry_len; + read_pos+=entry_len; bstore.Store(buf, entry_len); spare_consumed += 1; } @@ -673,7 +659,7 @@ class Sorting { buf_size = std::min((uint64_t) BUF_SIZE / entry_len, num_entries - i); buf_ptr = 0; disk.Read(read_pos, buffer, buf_size * entry_len); - read_pos+=buf_size * entry_len; + read_pos+=buf_size * entry_len; if (set_prefix == false) { // We don't store the common prefix of all entries in memory, instead just append it every time // in write buffer. @@ -712,8 +698,8 @@ class Sorting { if (buf_size + entry_len >= BUF_SIZE) { // Write buffer is full, write it and clean it. disk.Write(write_pos, buffer, buf_size); - write_pos+=buf_size; - entries_written += buf_size / entry_len; + write_pos+=buf_size; + entries_written += buf_size / entry_len; buf_size = 0; } // Write first the common prefix of all entries. @@ -726,8 +712,8 @@ class Sorting { // We still have some entries left in the write buffer, write them as well. if (buf_size > 0) { disk.Write(write_pos, buffer, buf_size); - write_pos+=buf_size; - entries_written += buf_size / entry_len; + write_pos+=buf_size; + entries_written += buf_size / entry_len; } assert(entries_written == num_entries); @@ -752,22 +738,11 @@ class Sorting { for (auto& n : bucket_sizes) total_size += n; cout << "CheckSortOnDisk entry_len: " << entry_len << " length: " << length << " total size: " << total_size << endl; - for(uint64_t chunkindex=0;chunkindex<(total_size+length-1)/length;chunkindex++) - { - disk.Read(disk_begin+(chunkindex*length*entry_len), mem, length * entry_len); - uint64_t i=1; - while(((chunkindex*length+i)= 0) - { - cout << "Bad sort!" << endl; - } - i++; - } + disk.Read(disk_begin, mem, 20 * entry_len); + + for(uint64_t i=0;i<20;i++) { + cout << i << ": " << Util::HexStr(mem + i * entry_len, entry_len) << endl; } - cout << "CheckSortOnDisk OK" << endl; } private: diff --git a/tests/test.cpp b/tests/test.cpp index c9fcedf9e..f302b3479 100644 --- a/tests/test.cpp +++ b/tests/test.cpp @@ -181,9 +181,9 @@ class FakeDisk : public Disk { f_.read(reinterpret_cast(memcache), length); } - void Write(uint64_t begin, uint8_t* memcache, uint64_t length) { + void Write(uint64_t begin, const uint8_t* memcache, uint64_t length) { f_.seekp(begin); - f_.write(reinterpret_cast(memcache), length); + f_.write(reinterpret_cast(memcache), length); } private: diff --git a/tests/test_python_bindings.py b/tests/test_python_bindings.py index 2d02aafe7..4686df7a9 100644 --- a/tests/test_python_bindings.py +++ b/tests/test_python_bindings.py @@ -13,7 +13,7 @@ def test_k_21(self): 10, 11, 129, 139, 171, 15, 23]) pl = DiskPlotter() - pl.create_plot_disk(".", ".", ".", "myplot.dat", 21, bytes([1, 2, 3, 4, 5]), plot_seed) + pl.create_plot_disk(".", ".", ".", "myplot.dat", 21, bytes([1, 2, 3, 4, 5]), plot_seed, 2*1024) pl = None pr = DiskProver(str(Path("myplot.dat")))