diff --git a/pkg/appendable/csv_handler.go b/pkg/appendable/csv_handler.go index ef4d6b80..6c91fe1c 100644 --- a/pkg/appendable/csv_handler.go +++ b/pkg/appendable/csv_handler.go @@ -18,11 +18,19 @@ type CSVHandler struct { } func (c CSVHandler) Synchronize(f *IndexFile) error { - var headers []string var err error - isHeader := true + isHeader := false + + if len(f.Indexes) == 0 { + isHeader = true + } else { + for _, index := range f.Indexes { + headers = append(headers, index.FieldName) + } + } + scanner := bufio.NewScanner(f.data) for i := 0; scanner.Scan(); i++ { @@ -40,13 +48,6 @@ func (c CSVHandler) Synchronize(f *IndexFile) error { f.Checksums = append(f.Checksums, xxhash.Sum64(line)) - if i == 0 { - fmt.Printf("Header %d - StartOffset: %d, EndOffset: %d, Checksum: %d\n\n", i, start, start+uint64(len(line))+1, xxhash.Sum64(line)) - - } else { - fmt.Printf("Line %d - StartOffset: %d, EndOffset: %d, Checksum: %d\n", i, start, start+uint64(len(line))+1, xxhash.Sum64(line)) - } - if isHeader { dec := csv.NewReader(bytes.NewReader(line)) headers, err = dec.Read() @@ -58,10 +59,9 @@ func (c CSVHandler) Synchronize(f *IndexFile) error { } dec := csv.NewReader(bytes.NewReader(line)) - f.handleCSVLine(dec, headers, []string{}, uint64(existingCount), start) + f.handleCSVLine(dec, headers, []string{}, uint64(existingCount)-1, start) } - fmt.Printf("%v\n\n", f.Checksums) return nil } @@ -135,8 +135,6 @@ func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []stri value, fieldType := inferCSVField(fieldValue) - fmt.Printf("Field '%s' - Offset: %d, Length: %d, Value: %v, Type: %v\n", fieldName, fieldOffset, fieldLength, value, fieldType) - switch fieldType { case protocol.FieldTypeBoolean, protocol.FieldTypeString, protocol.FieldTypeNumber: tree := i.Indexes[i.findIndex(name, value)].IndexRecords @@ -161,7 +159,5 @@ func (i *IndexFile) handleCSVLine(dec *csv.Reader, headers []string, path []stri cumulativeLength += fieldLength } - fmt.Printf("\n") - return nil } diff --git a/pkg/appendable/index_file_csv_test.go b/pkg/appendable/index_file_csv_test.go index 3b8d745b..0ad0291b 100644 --- a/pkg/appendable/index_file_csv_test.go +++ b/pkg/appendable/index_file_csv_test.go @@ -8,8 +8,8 @@ import ( func TestAppendDataRowCSV(t *testing.T) { - var mockCsv string = "test\ntest1\n" - var mockCsv2 string = "test\ntest1\ntest3\n" + var mockCsv string = "header1\ntest1\n" + var mockCsv2 string = "header1\ntest1\ntest3\n" t.Run("no schema changes", func(t *testing.T) { @@ -49,63 +49,61 @@ func TestAppendDataRowCSV(t *testing.T) { } }) - /* - t.Run("correctly sets field offset", func(t *testing.T) { - i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv)}) - if err != nil { - t.Fatal(err) - } + t.Run("append index to existing", func(t *testing.T) { + i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv)}) + if err != nil { + t.Fatal(err) + } - buf := &bytes.Buffer{} + buf := &bytes.Buffer{} - if err := i.Serialize(buf); err != nil { - t.Fatal(err) - } + if err := i.Serialize(buf); err != nil { + t.Fatal(err) + } - j, err := ReadIndexFile(buf, CSVHandler{ReadSeeker: strings.NewReader(mockCsv2)}) - if err != nil { - t.Fatal(err) - } + j, err := ReadIndexFile(buf, CSVHandler{ReadSeeker: strings.NewReader(mockCsv2)}) + if err != nil { + t.Fatal(err) + } - // check that the index file now has the additional data ranges but same number of indices - if len(j.Indexes) != 1 { - t.Errorf("got len(j.Indexes) = %d, want 1", len(j.Indexes)) - } + // check that the index file now has the additional data ranges but same number of indices + if len(j.Indexes) != 1 { + t.Errorf("got len(j.Indexes) = %d, want 1", len(j.Indexes)) + } - t.Logf("len: %v", j.Indexes[0].IndexRecords) + if len(j.Indexes[0].IndexRecords) != 2 { + t.Errorf("got len(j.Indexes[0].IndexRecords) = %d, want 2", len(j.Indexes[0].IndexRecords)) + } - if len(j.Indexes[0].IndexRecords) != 2 { - t.Errorf("got len(j.Indexes[0].IndexRecords) = %d, want 2", len(j.Indexes[0].IndexRecords)) + if len(j.Indexes[0].IndexRecords["test1"]) != 1 { + t.Errorf("got len(j.Indexes[0].IndexRecords[\"test1\"]) = %d, want 1", len(j.Indexes[0].IndexRecords["test1"])) + } + if len(j.Indexes[0].IndexRecords["test3"]) != 1 { + for key, records := range j.Indexes[0].IndexRecords { + t.Errorf("\n\n\nKey: %v, Records: %+v", key, records) } + t.Errorf("got len(j.Indexes[0].IndexRecords[\"test3\"]) = %d, want 1", len(j.Indexes[0].IndexRecords["test3"])) + } - if len(j.Indexes[0].IndexRecords["test1"]) != 1 { - t.Errorf("got len(j.Indexes[0].IndexRecords[\"test1\"]) = %d, want 1", len(j.Indexes[0].IndexRecords["test1"])) - } - if len(j.Indexes[0].IndexRecords["test3"]) != 1 { - for key, records := range j.Indexes[0].IndexRecords { - t.Errorf("\n\n\nKey: %v, Records: %+v", key, records) - } - t.Errorf("got len(j.Indexes[0].IndexRecords[\"test3\"]) = %d, want 1", len(j.Indexes[0].IndexRecords["test3"])) - } + if j.Indexes[0].IndexRecords["test1"][0].DataNumber != 0 { + t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].DataNumber = %d, want 0", j.Indexes[0].IndexRecords["test1"][0].DataNumber) + } + if j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("{\"test\":")) { + t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want 10", j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset) + } - if j.Indexes[0].IndexRecords["test1"][0].DataNumber != 0 { - t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].DataNumber = %d, want 0", j.Indexes[0].IndexRecords["test1"][0].DataNumber) - } - if j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset != uint64(len("{\"test\":")) { - t.Errorf("got i.Indexes[0].IndexRecords[\"test1\"][0].FieldStartByteOffset = %d, want 10", j.Indexes[0].IndexRecords["test1"][0].FieldStartByteOffset) - } + if j.Indexes[0].IndexRecords["test3"][0].DataNumber != 0 { + t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].DataNumber = %d, want 1", j.Indexes[0].IndexRecords["test3"][0].DataNumber) + } - if j.Indexes[0].IndexRecords["test3"][0].DataNumber != 1 { - t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][1].DataNumber = %d, want 1", j.Indexes[0].IndexRecords["test3"][1].DataNumber) - } - if j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("test\ntest1\n")) { - t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][1].FieldStartByteOffset = %d, want 10", j.Indexes[0].IndexRecords["test3"][1].FieldStartByteOffset) - } - }) - */ + // verify byte offset calculation + if j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset != uint64(len("header\ntest1\n")+1) { + t.Errorf("got i.Indexes[0].IndexRecords[\"test3\"][0].FieldStartByteOffset = %d, want 14", j.Indexes[0].IndexRecords["test3"][0].FieldStartByteOffset) + } + }) t.Run("generate index file", func(t *testing.T) { - i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader(mockCsv2)}) + i, err := NewIndexFile(CSVHandler{ReadSeeker: strings.NewReader("")}) if err != nil { t.Fatal(err) diff --git a/pkg/appendable/io.go b/pkg/appendable/io.go index 5f4bd9fa..36888c07 100644 --- a/pkg/appendable/io.go +++ b/pkg/appendable/io.go @@ -28,13 +28,10 @@ func NewIndexFile(data DataHandler) (*IndexFile, error) { } func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { - fmt.Println("\n\n===READ INDEX FILE ===") f := &IndexFile{} f.data = data - fmt.Printf("data looks like %v\n", data) - // read the version version, err := encoding.ReadByte(r) if err != nil { @@ -111,6 +108,7 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { switch value.(type) { case nil, bool, int, int8, int16, int32, int64, float32, float64, string: + fmt.Printf("appending: %v", value) index.IndexRecords[value] = append(index.IndexRecords[value], ir) default: return nil, fmt.Errorf("unsupported type: %T", value) @@ -140,9 +138,8 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { start = f.EndByteOffsets[0] startIndex = 1 } - for i := startIndex; i < int(ifh.DataCount); i++ { - fmt.Printf("Current start idx: %d, position: %d\n", i, start) + for i := startIndex; i < int(ifh.DataCount); i++ { if _, isCsv := data.(CSVHandler); isCsv { if i > 1 { @@ -159,7 +156,6 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { if _, err := io.CopyN(buf, data, int64(f.EndByteOffsets[i]-start-1)); err != nil { return nil, fmt.Errorf("failed to read data file: %w", err) } - fmt.Printf("string: %v and \n bytes look like: %v\n", buf.String(), buf.Bytes()) if xxhash.Sum64(buf.Bytes()) != f.Checksums[i] { return nil, fmt.Errorf("checksum mismatch a %d, b %d", xxhash.Sum64(buf.Bytes()), f.Checksums[i]) @@ -170,8 +166,6 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { return nil, fmt.Errorf("unsupported version: %d", version) } - fmt.Printf("endbyteoffsets: %v\n", f.EndByteOffsets) - // we've deserialized the underlying file, seek to the end of the last data range to prepare for appending if len(f.EndByteOffsets) > 0 { if _, err := data.Seek(int64(f.EndByteOffsets[len(f.EndByteOffsets)-1]), io.SeekStart); err != nil { @@ -184,7 +178,7 @@ func ReadIndexFile(r io.Reader, data DataHandler) (*IndexFile, error) { } } - fmt.Println("End of ReadIndexFile, calling Synchronize") + // extract headers from 0 -> endByteOffsets[0] return f, data.Synchronize(f) } diff --git a/pkg/protocol/protocol.go b/pkg/protocol/protocol.go index 2b4ca28e..78a16d54 100644 --- a/pkg/protocol/protocol.go +++ b/pkg/protocol/protocol.go @@ -136,8 +136,6 @@ func (i IndexRecord) CSVField(r io.ReadSeeker) (any, error) { return nil, fmt.Errorf("failed to decode field: %w", err) } - fmt.Printf("Fields read at offset %d: %v\n", i.FieldStartByteOffset, fields) - if _, err := r.Seek(offset, io.SeekStart); err != nil { return nil, fmt.Errorf("failed to seek to original offset: %w", err) }