diff --git a/.gitignore b/.gitignore index e3c2fc2..d4db283 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ cmd/xb/xb # default compression test file enwik8* +/fox diff --git a/format.go b/format.go index edfec9a..9986dab 100644 --- a/format.go +++ b/format.go @@ -101,8 +101,8 @@ func newHashFunc(flags byte) (newHash func() hash.Hash, err error) { return } -// header provides the actual content of the xz file header: the flags. -type header struct { +// streamHeader provides the actual content of the xz stream: the flags. +type streamHeader struct { flags byte } @@ -112,18 +112,36 @@ var errHeaderMagic = errors.New("xz: invalid header magic bytes") // ValidHeader checks whether data is a correct xz file header. The // length of data must be HeaderLen. func ValidHeader(data []byte) bool { - var h header + var h streamHeader err := h.UnmarshalBinary(data) return err == nil } // String returns a string representation of the flags. -func (h header) String() string { +func (h streamHeader) String() string { return flagString(h.flags) } +func (h *streamHeader) UnmarshalReader(xz io.Reader) error { + data := make([]byte, HeaderLen) + if _, err := io.ReadFull(xz, data[:4]); err != nil { + return err + } + if bytes.Equal(data[:4], []byte{0, 0, 0, 0}) { + return errPadding + } + if _, err := io.ReadFull(xz, data[4:]); err != nil { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return err + } + + return h.UnmarshalBinary(data) +} + // UnmarshalBinary reads header from the provided data slice. -func (h *header) UnmarshalBinary(data []byte) error { +func (h *streamHeader) UnmarshalBinary(data []byte) error { // header length if len(data) != HeaderLen { return errors.New("xz: wrong file header length") @@ -155,7 +173,7 @@ func (h *header) UnmarshalBinary(data []byte) error { } // MarshalBinary generates the xz file header. -func (h *header) MarshalBinary() (data []byte, err error) { +func (h *streamHeader) MarshalBinary() (data []byte, err error) { if err = verifyFlags(h.flags); err != nil { return nil, err } @@ -193,7 +211,7 @@ func (f footer) String() string { // Minimum and maximum for the size of the index (backward size). const ( minIndexSize = 4 - maxIndexSize = (1 << 32) * 4 + maxIndexSize = 1 << 32 * 4 ) // MarshalBinary converts footer values into an xz file footer. Note @@ -213,7 +231,7 @@ func (f *footer) MarshalBinary() (data []byte, err error) { data = make([]byte, footerLen) // backward size (index size) - s := (f.indexSize / 4) - 1 + s := f.indexSize/4 - 1 putUint32LE(data[4:], uint32(s)) // flags data[9] = f.flags @@ -228,8 +246,7 @@ func (f *footer) MarshalBinary() (data []byte, err error) { return data, nil } -// UnmarshalBinary sets the footer value by unmarshalling an xz file -// footer. +// UnmarshalBinary sets the footer value by unmarshalling an xz file footer. func (f *footer) UnmarshalBinary(data []byte) error { if len(data) != footerLen { return errors.New("xz: wrong footer length") @@ -627,6 +644,11 @@ func (rec *record) MarshalBinary() (data []byte, err error) { return p[:n], nil } +// paddedLen returns the padded length of the compressed record. +func (rec *record) paddedLen() int64 { + return int64(padLen(rec.unpaddedSize)) + rec.unpaddedSize +} + // writeIndex writes the index, a sequence of records. func writeIndex(w io.Writer, index []record) (n int64, err error) { crc := crc32.NewIEEE() diff --git a/format_test.go b/format_test.go index 0b875d3..a336a80 100644 --- a/format_test.go +++ b/format_test.go @@ -10,12 +10,12 @@ import ( ) func TestHeader(t *testing.T) { - h := header{flags: CRC32} + h := streamHeader{flags: CRC32} data, err := h.MarshalBinary() if err != nil { t.Fatalf("MarshalBinary error %s", err) } - var g header + var g streamHeader if err = g.UnmarshalBinary(data); err != nil { t.Fatalf("UnmarshalBinary error %s", err) } diff --git a/reader.go b/reader.go index 22cd6d5..53a2125 100644 --- a/reader.go +++ b/reader.go @@ -61,7 +61,7 @@ type streamReader struct { xz io.Reader br *blockReader newHash func() hash.Hash - h header + h streamHeader index []record } @@ -137,27 +137,17 @@ func (c ReaderConfig) newStreamReader(xz io.Reader) (r *streamReader, err error) if err = c.Verify(); err != nil { return nil, err } - data := make([]byte, HeaderLen) - if _, err := io.ReadFull(xz, data[:4]); err != nil { - return nil, err - } - if bytes.Equal(data[:4], []byte{0, 0, 0, 0}) { - return nil, errPadding - } - if _, err = io.ReadFull(xz, data[4:]); err != nil { - if err == io.EOF { - err = io.ErrUnexpectedEOF - } - return nil, err - } + r = &streamReader{ ReaderConfig: c, xz: xz, index: make([]record, 0, 4), } - if err = r.h.UnmarshalBinary(data); err != nil { + + if err := r.h.UnmarshalReader(xz); err != nil { return nil, err } + xlog.Debugf("xz header %s", r.h) if r.newHash, err = newHashFunc(r.h.flags); err != nil { return nil, err @@ -188,15 +178,8 @@ func (r *streamReader) readTail() error { } } - p := make([]byte, footerLen) - if _, err = io.ReadFull(r.xz, p); err != nil { - if err == io.EOF { - err = io.ErrUnexpectedEOF - } - return err - } - var f footer - if err = f.UnmarshalBinary(p); err != nil { + f, err := readFooter(r.xz) + if err != nil { return err } xlog.Debugf("xz footer %s", f) @@ -209,6 +192,22 @@ func (r *streamReader) readTail() error { return nil } +func readFooter(r io.Reader) (*footer, error) { + p := make([]byte, footerLen) + if _, err := io.ReadFull(r, p); err != nil { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + return nil, err + } + var f footer + if err := f.UnmarshalBinary(p); err != nil { + return nil, err + } + + return &f, nil +} + // Read reads actual data from the xz stream. func (r *streamReader) Read(p []byte) (n int, err error) { for n < len(p) { diff --git a/reader_at.go b/reader_at.go new file mode 100644 index 0000000..25a86a8 --- /dev/null +++ b/reader_at.go @@ -0,0 +1,257 @@ +package xz + +import ( + "fmt" + "io" + "sync" + + "github.com/ulikunitz/xz/internal/xlog" +) + +// ReaderAtConfig defines the parameters for the xz readerat. +type ReaderAtConfig struct { + Len int64 +} + +// Verify checks the reader config for validity. Zero values will be replaced by +// default values. +func (c *ReaderAtConfig) Verify() error { + // if c == nil { + // return errors.New("xz: reader parameters are nil") + // } + return nil +} + +// ReaderAt supports the reading of one or multiple xz streams. +type ReaderAt struct { + conf ReaderAtConfig + + indices []index + + // len of the contents of the underlying xz data + len int64 + xz io.ReaderAt +} + +// NewReader creates a new xz reader using the default parameters. +// The function reads and checks the header of the first XZ stream. The +// reader will process multiple streams including padding. +func NewReaderAt(xz io.ReaderAt) (r *ReaderAt, err error) { + return ReaderAtConfig{}.NewReaderAt(xz) +} + +// NewReaderAt creates an xz stream reader. +func (c ReaderAtConfig) NewReaderAt(xz io.ReaderAt) (*ReaderAt, error) { + if err := c.Verify(); err != nil { + return nil, err + } + + r := &ReaderAt{ + conf: c, + len: 0, + indices: []index{}, + xz: xz, + } + + r.len = r.conf.Len + if r.len < 1 { + panic("todo: implement probing for Len") + } + + streamEnd := r.len - 1 + + for streamEnd > 0 { + streamStart, err := r.setupIndexAt(streamEnd) + if err != nil { + return nil, fmt.Errorf("trouble creating indices: %v", err) + } + + // the end of the next stream reading backwards is one before the start + // of the one we just processed. + streamEnd = streamStart - 1 + } + + return r, nil +} + +// An index carries all the information necessary for reading randomly into a +// single stream. +type index struct { + blockStartOffset int64 + streamHeader streamHeader + records []record +} + +func (i index) compressedBufferedSize() int64 { + size := int64(0) + for _, r := range i.records { + size += r.paddedLen() + } + return size +} + +func (i index) uncompressedSize() int64 { + size := int64(0) + for _, r := range i.records { + size += r.uncompressedSize + } + return size +} + +// setupIndexAt takes the offset of the end of a stream, or null bytes following +// the end of a stream. It builds an index for that stream, adds it to the +// beginning of the ReaderAt and returns the offset to the beginning of the stream. +func (r *ReaderAt) setupIndexAt(endOffset int64) (int64, error) { + // read backwards past potential null bytes until we find the end of the + // footer + for endOffset > 0 { + probe := make([]byte, 1) + n, err := r.xz.ReadAt(probe, endOffset) + if err != nil { + return 0, err + } + if n != len(probe) { + return 0, fmt.Errorf("read %d bytes", n) + } + if probe[0] != 0 { + break + } + endOffset-- + } + endOffset++ + + footerOffset := endOffset - footerLen + f, err := readFooter(newRat(r.xz, footerOffset)) + if err != nil { + return 0, err + } + + indexStartOffset := footerOffset - f.indexSize + + // readIndexBody assumes the indicator byte has already been read + indexRecs, _, err := readIndexBody(newRat(r.xz, indexStartOffset+1)) + if err != nil { + return 0, err + } + + ix := index{ + records: indexRecs, + } + ix.blockStartOffset = indexStartOffset - ix.compressedBufferedSize() + r.indices = append([]index{ix}, r.indices...) + + sh := streamHeader{} + headerStartOffset := ix.blockStartOffset - HeaderLen + err = sh.UnmarshalReader(newRat(r.xz, headerStartOffset)) + if err != nil { + return 0, fmt.Errorf("trouble reading stream header at offset %d: %v", headerStartOffset, err) + } + ix.streamHeader = sh + + xlog.Debugf("xz indices %+v", r.indices) + + return headerStartOffset, nil +} + +func (r *ReaderAt) Size() int64 { + total := int64(0) + for _, ix := range r.indices { + total += ix.uncompressedSize() + } + return total +} + +func (r *ReaderAt) ReadAt(p []byte, bufferPos int64) (int, error) { + lenRequested := len(p) + + indicesPos := int64(0) + + for _, index := range r.indices { + blockOffset := index.blockStartOffset + + for _, block := range index.records { + if indicesPos <= bufferPos && bufferPos <= indicesPos+block.uncompressedSize { + blockStartPos := bufferPos - indicesPos + blockEndPos := blockStartPos + int64(len(p)) + if blockEndPos > block.uncompressedSize { + blockEndPos = block.uncompressedSize + } + blockAmtToRead := blockEndPos - blockStartPos + + r.readBlockAt( + p[:blockAmtToRead], blockStartPos, + blockOffset, block.unpaddedSize, index.streamHeader.flags) + p = p[blockAmtToRead:] + bufferPos += blockAmtToRead + } + + blockOffset += block.paddedLen() + indicesPos += block.uncompressedSize + } + } + + var err error + if len(p) != 0 { + err = io.EOF + } + return lenRequested - len(p), err +} + +func (r *ReaderAt) readBlockAt( + p []byte, bufferPos int64, + blockOffset, blockLen int64, streamFlags byte, +) error { + viewStart := rat{ + Mutex: &sync.Mutex{}, + offset: blockOffset, + reader: r.xz, + } + + view := io.LimitReader(&viewStart, blockLen) + + blockHeader, hlen, err := readBlockHeader(view) + if err != nil { + return err + } + + readerConfig := ReaderConfig{} + + hashFn, err := newHashFunc(streamFlags) + if err != nil { + return err + } + blockReader, err := readerConfig.newBlockReader(view, blockHeader, hlen, hashFn()) + + trash := make([]byte, bufferPos) + _, err = io.ReadFull(blockReader, trash) + if err != nil { + return err + } + + _, err = io.ReadFull(blockReader, p) + return err +} + +// rat wraps a ReaderAt to fulfill the io.Reader interface. +type rat struct { + *sync.Mutex + offset int64 + reader io.ReaderAt +} + +func (r *rat) Read(p []byte) (int, error) { + r.Lock() + defer r.Unlock() + + n, err := r.reader.ReadAt(p, r.offset) + r.offset += int64(n) + return n, err +} + +func newRat(ra io.ReaderAt, offset int64) *rat { + return &rat{ + Mutex: &sync.Mutex{}, + offset: offset, + reader: ra, + } +} diff --git a/reader_at_test.go b/reader_at_test.go new file mode 100644 index 0000000..e2f3193 --- /dev/null +++ b/reader_at_test.go @@ -0,0 +1,86 @@ +package xz + +import ( + "bytes" + "io" + "io/ioutil" + "os" + "testing" +) + +const foxSentenceConst = "The quick brown fox jumps over the lazy dog.\n" + +func TestReaderAtBlocks(t *testing.T) { + f, fileSize := testOpenFile(t, "testfiles/fox.blocks.xz") + testFilePart(t, f, fileSize, foxSentenceConst, 0, len(foxSentenceConst)) +} + +func BenchmarkBlocks(b *testing.B) { + f, fileSize := testOpenFile(b, "testfiles/fox.blocks.xz") + + b.ResetTimer() + for i := 0; i < b.N; i++ { + testFilePart(b, f, fileSize, foxSentenceConst, 0, len(foxSentenceConst)) + } +} + +func TestReaderAtSimple(t *testing.T) { + f, fileSize := testOpenFile(t, "testfiles/fox.xz") + testFilePart(t, f, fileSize, foxSentenceConst, 0, 10) +} + +func TestReaderAtMS(t *testing.T) { + expect := foxSentenceConst + foxSentenceConst + foxSentenceConst + foxSentenceConst + + filePath := "testfiles/fox.blocks.xz" + + f, _ := testOpenFile(t, filePath) + fData, err := ioutil.ReadAll(f) + if err != nil { + t.Fatalf("Error reading file %s", err) + } + msBytes := testMultiStreams(fData) + msB := bytes.NewReader(msBytes) + + start := len(foxSentenceConst) + testFilePart(t, msB, int64(len(msBytes)), expect, start, len(expect)-start) +} + +func testOpenFile(t testing.TB, filePath string) (*os.File, int64) { + xz, err := os.Open(filePath) + if err != nil { + t.Fatalf("os.Open(%q) error %s", filePath, err) + } + + info, err := os.Stat(filePath) + if err != nil { + t.Fatalf("os.Stat(%q) error %s", filePath, err) + } + + return xz, info.Size() +} + +func testFilePart(t testing.TB, f io.ReaderAt, fileSize int64, expected string, start, size int) { + conf := ReaderAtConfig{ + Len: fileSize, + } + r, err := conf.NewReaderAt(f) + if err != nil { + t.Fatalf("NewReader error %s", err) + } + + decompressedBytes := make([]byte, size) + n, err := r.ReadAt(decompressedBytes, int64(start)) + if err != nil { + t.Fatalf("error while reading at: %v", err) + } + if n != len(decompressedBytes) { + t.Fatalf("unexpectedly didn't read all") + } + + subsetExpected := expected[start : start+size] + if string(decompressedBytes) != subsetExpected { + t.Fatalf("Unexpected decompression output. \"%s\" != \"%s\"", + string(decompressedBytes), subsetExpected) + } +} diff --git a/reader_test.go b/reader_test.go index 45e725b..8db8c11 100644 --- a/reader_test.go +++ b/reader_test.go @@ -13,7 +13,7 @@ import ( ) func TestReaderSimple(t *testing.T) { - const file = "fox.xz" + const file = "testfiles/fox.xz" xz, err := os.Open(file) if err != nil { t.Fatalf("os.Open(%q) error %s", file, err) @@ -29,7 +29,7 @@ func TestReaderSimple(t *testing.T) { } func TestReaderSingleStream(t *testing.T) { - data, err := ioutil.ReadFile("fox.xz") + data, err := ioutil.ReadFile("testfiles/fox.xz") if err != nil { t.Fatalf("ReadFile error %s", err) } @@ -56,20 +56,13 @@ func TestReaderSingleStream(t *testing.T) { } func TestReaderMultipleStreams(t *testing.T) { - data, err := ioutil.ReadFile("fox.xz") + data, err := ioutil.ReadFile("testfiles/fox.xz") if err != nil { t.Fatalf("ReadFile error %s", err) } - m := make([]byte, 0, 4*len(data)+4*4) - m = append(m, data...) - m = append(m, data...) - m = append(m, 0, 0, 0, 0) - m = append(m, data...) - m = append(m, 0, 0, 0, 0) - m = append(m, 0, 0, 0, 0) - m = append(m, data...) - m = append(m, 0, 0, 0, 0) - xz := bytes.NewReader(m) + + multiStream := testMultiStreams(data) + xz := bytes.NewReader(multiStream) r, err := NewReader(xz) if err != nil { t.Fatalf("NewReader error %s", err) @@ -80,8 +73,21 @@ func TestReaderMultipleStreams(t *testing.T) { } } +func testMultiStreams(singleStream []byte) []byte { + multiStream := make([]byte, 0, 4*len(singleStream)+4*4) + multiStream = append(multiStream, singleStream...) + multiStream = append(multiStream, singleStream...) + multiStream = append(multiStream, 0, 0, 0, 0) + multiStream = append(multiStream, singleStream...) + multiStream = append(multiStream, 0, 0, 0, 0) + multiStream = append(multiStream, 0, 0, 0, 0) + multiStream = append(multiStream, singleStream...) + multiStream = append(multiStream, 0, 0, 0, 0) + return multiStream +} + func TestCheckNone(t *testing.T) { - const file = "fox-check-none.xz" + const file = "testfiles/fox-check-none.xz" xz, err := os.Open(file) if err != nil { t.Fatalf("os.Open(%q) error %s", file, err) diff --git a/fox-check-none.xz b/testfiles/fox-check-none.xz similarity index 100% rename from fox-check-none.xz rename to testfiles/fox-check-none.xz diff --git a/testfiles/fox.blocks.xz b/testfiles/fox.blocks.xz new file mode 100644 index 0000000..f40cf8a Binary files /dev/null and b/testfiles/fox.blocks.xz differ diff --git a/fox.xz b/testfiles/fox.xz similarity index 100% rename from fox.xz rename to testfiles/fox.xz diff --git a/writer.go b/writer.go index aec10df..241d560 100644 --- a/writer.go +++ b/writer.go @@ -141,7 +141,7 @@ type Writer struct { xz io.Writer bw *blockWriter newHash func() hash.Hash - h header + h streamHeader index []record closed bool } @@ -183,7 +183,7 @@ func (c WriterConfig) NewWriter(xz io.Writer) (w *Writer, err error) { w = &Writer{ WriterConfig: c, xz: xz, - h: header{c.CheckSum}, + h: streamHeader{c.CheckSum}, index: make([]record, 0, 4), } if w.newHash, err = newHashFunc(c.CheckSum); err != nil {