diff --git a/dxfuse.go b/dxfuse.go index ffe9834..ea0497c 100644 --- a/dxfuse.go +++ b/dxfuse.go @@ -27,7 +27,7 @@ import ( const ( // namespace for xattrs - XATTR_TAG = "tag" + XATTR_TAG = "tag" XATTR_PROP = "prop" XATTR_BASE = "base" ) @@ -48,7 +48,7 @@ type Filesys struct { // a pool of http clients, for short requests, such as file creation, // or file describe. - httpClientPool chan(*http.Client) + httpClientPool chan (*http.Client) // metadata database mdb *MetadataDb @@ -70,11 +70,11 @@ type Filesys struct { // all open files fhCounter uint64 - fhTable map[fuseops.HandleID]*FileHandle + fhTable map[fuseops.HandleID]*FileHandle // all open directories dhCounter uint64 - dhTable map[fuseops.HandleID]*DirHandle + dhTable map[fuseops.HandleID]*DirHandle tmpFileCounter uint64 @@ -98,20 +98,20 @@ const ( type FileHandle struct { // a lock allowing multiple readers or a single writer. accessMode int - inode int64 - size int64 // this is up-to-date only for remote files - hid fuseops.HandleID + inode int64 + size int64 // this is up-to-date only for remote files + hid fuseops.HandleID // URL used for downloading file ranges. // Used for read-only files. - url *DxDownloadURL + url *DxDownloadURL // A file-descriptor for files with a local copy - fd *os.File + fd *os.File } type DirHandle struct { - d Dir + d Dir entries []fuseutil.Dirent } @@ -122,24 +122,24 @@ func NewDxfuse( // initialize a pool of http-clients. httpIoPool := make(chan *http.Client, HttpClientPoolSize) - for i:=0; i < HttpClientPoolSize; i++ { + for i := 0; i < HttpClientPoolSize; i++ { httpIoPool <- dxda.NewHttpClient() } dxfuseBaseDir := MakeFSBaseDir() fsys := &Filesys{ - dxEnv : dxEnv, - options: options, - mutex : &sync.Mutex{}, - httpClientPool: httpIoPool, - ops : NewDxOps(dxEnv, options), - fhCounter : 1, - fhTable : make(map[fuseops.HandleID]*FileHandle), - dhCounter : 1, - dhTable : make(map[fuseops.HandleID]*DirHandle), - tmpFileCounter : 0, - shutdownCalled : false, - createdFilesDir : dxfuseBaseDir + "/" + CreatedFilesDir, + dxEnv: dxEnv, + options: options, + mutex: &sync.Mutex{}, + httpClientPool: httpIoPool, + ops: NewDxOps(dxEnv, options), + fhCounter: 1, + fhTable: make(map[fuseops.HandleID]*FileHandle), + dhCounter: 1, + dhTable: make(map[fuseops.HandleID]*DirHandle), + tmpFileCounter: 0, + shutdownCalled: false, + createdFilesDir: dxfuseBaseDir + "/" + CreatedFilesDir, } // Create a directory for new files @@ -176,10 +176,10 @@ func NewDxfuse( fsys.pgs = NewPrefetchGlobalState(options.VerboseLevel, dxEnv) // describe all the projects, we need their upload parameters - httpClient := <- fsys.httpClientPool + httpClient := <-fsys.httpClientPool defer func() { fsys.httpClientPool <- httpClient - } () + }() if options.ReadOnly { // we don't need the file upload module @@ -264,12 +264,12 @@ func (fsys *Filesys) opOpen() *OpHandle { if err != nil { log.Panic("Could not open transaction") } - httpClient := <- fsys.httpClientPool + httpClient := <-fsys.httpClientPool return &OpHandle{ - httpClient : httpClient, - txn : txn, - err : nil, + httpClient: httpClient, + txn: txn, + err: nil, } } @@ -280,9 +280,9 @@ func (fsys *Filesys) opOpenNoHttpClient() *OpHandle { } return &OpHandle{ - httpClient : nil, - txn : txn, - err : nil, + httpClient: nil, + txn: txn, + err: nil, } } @@ -473,7 +473,7 @@ func (fsys *Filesys) SetInodeAttributes(ctx context.Context, op *fuseops.SetInod attrs.Mtime = *op.Mtime } // we don't handle atime - err = fsys.mdb.UpdateFileAttrs(ctx, oph, file.Inode, int64(attrs.Size), attrs.Mtime, &attrs.Mode); + err = fsys.mdb.UpdateFileAttrs(ctx, oph, file.Inode, int64(attrs.Size), attrs.Mtime, &attrs.Mode) if err != nil { fsys.log("database error in OpenFile %s", err.Error()) return fuse.EIO @@ -598,7 +598,7 @@ func (fsys *Filesys) MkDir(ctx context.Context, op *fuseops.MkDirOp) error { nowSeconds, nowSeconds, mode, - parentDir.FullPath + "/" + op.Name) + parentDir.FullPath+"/"+op.Name) if err != nil { fsys.log("database error in MkDir") return fuse.EIO @@ -617,15 +617,14 @@ func (fsys *Filesys) MkDir(ctx context.Context, op *fuseops.MkDirOp) error { } tWindow := fsys.calcExpirationTime(childAttrs) op.Entry = fuseops.ChildInodeEntry{ - Child : fuseops.InodeID(dnode), - Attributes : childAttrs, - AttributesExpiration : tWindow, - EntryExpiration : tWindow, + Child: fuseops.InodeID(dnode), + Attributes: childAttrs, + AttributesExpiration: tWindow, + EntryExpiration: tWindow, } return nil } - func (fsys *Filesys) RmDir(ctx context.Context, op *fuseops.RmDirOp) error { fsys.mutex.Lock() defer fsys.mutex.Unlock() @@ -795,10 +794,10 @@ func (fsys *Filesys) CreateFile(ctx context.Context, op *fuseops.CreateFileOp) e // soon change. We are writing new content into the file. tWindow := fsys.calcExpirationTime(childAttrs) op.Entry = fuseops.ChildInodeEntry{ - Child : fuseops.InodeID(file.Inode), - Attributes : childAttrs, - AttributesExpiration : tWindow, - EntryExpiration : tWindow, + Child: fuseops.InodeID(file.Inode), + Attributes: childAttrs, + AttributesExpiration: tWindow, + EntryExpiration: tWindow, } // Note: we can't open the file in exclusive mode, because another process @@ -810,11 +809,11 @@ func (fsys *Filesys) CreateFile(ctx context.Context, op *fuseops.CreateFileOp) e } fh := FileHandle{ - accessMode : AM_RW_Local, - inode : file.Inode, - size : file.Size, - url : nil, - fd : writer, + accessMode: AM_RW_Local, + inode: file.Inode, + size: file.Size, + url: nil, + fd: writer, } op.Handle = fsys.insertIntoFileHandleTable(&fh) return nil @@ -1144,9 +1143,9 @@ func (fsys *Filesys) readEntireDir(ctx context.Context, oph *OpHandle, dir Dir) } dirEnt := fuseutil.Dirent{ Offset: 0, - Inode : fuseops.InodeID(oDesc.Inode), - Name : oname, - Type : dType, + Inode: fuseops.InodeID(oDesc.Inode), + Name: oname, + Type: dType, } dEntries = append(dEntries, dirEnt) } @@ -1155,9 +1154,9 @@ func (fsys *Filesys) readEntireDir(ctx context.Context, oph *OpHandle, dir Dir) for subDirName, dirDesc := range subdirs { dirEnt := fuseutil.Dirent{ Offset: 0, - Inode : fuseops.InodeID(dirDesc.Inode), - Name : subDirName, - Type : fuseutil.DT_Directory, + Inode: fuseops.InodeID(dirDesc.Inode), + Name: subDirName, + Type: fuseutil.DT_Directory, } dEntries = append(dEntries, dirEnt) } @@ -1179,7 +1178,6 @@ func (fsys *Filesys) readEntireDir(ctx context.Context, oph *OpHandle, dir Dir) return dEntries, nil } - // OpenDir return nil error allows open dir // COMMON for drivers func (fsys *Filesys) OpenDir(ctx context.Context, op *fuseops.OpenDirOp) error { @@ -1206,7 +1204,7 @@ func (fsys *Filesys) OpenDir(ctx context.Context, op *fuseops.OpenDirOp) error { } dh := &DirHandle{ - d : dir, + d: dir, entries: dentries, } op.Handle = fsys.insertIntoDirHandleTable(dh) @@ -1253,7 +1251,6 @@ func (fsys *Filesys) ReleaseDirHandle(ctx context.Context, op *fuseops.ReleaseDi return nil } - // === // File handling // @@ -1272,10 +1269,10 @@ func (fsys *Filesys) openRegularFile( } fh := &FileHandle{ accessMode: AM_RW_Local, - inode : f.Inode, - size : f.Size, - url: nil, - fd : reader, + inode: f.Inode, + size: f.Size, + url: nil, + fd: reader, } return fh, nil @@ -1297,10 +1294,10 @@ func (fsys *Filesys) openRegularFile( fh := &FileHandle{ accessMode: AM_RO_Remote, - inode : f.Inode, - size : f.Size, - url: &u, - fd : nil, + inode: f.Inode, + size: f.Size, + url: &u, + fd: nil, } return fh, nil @@ -1364,14 +1361,14 @@ func (fsys *Filesys) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error // directly. There is no need to generate a preauthenticated // URL. fh = &FileHandle{ - accessMode : AM_RO_Remote, - inode : file.Inode, - size : file.Size, - url : &DxDownloadURL{ - URL : file.Symlink, - Headers : nil, + accessMode: AM_RO_Remote, + inode: file.Inode, + size: file.Size, + url: &DxDownloadURL{ + URL: file.Symlink, + Headers: nil, }, - fd : nil, + fd: nil, } default: // can't open an applet/workflow/etc. @@ -1391,7 +1388,6 @@ func (fsys *Filesys) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error return nil } - func (fsys *Filesys) getWritableFD(ctx context.Context, handle fuseops.HandleID) (*os.File, error) { fsys.mutex.Lock() defer fsys.mutex.Unlock() @@ -1412,7 +1408,6 @@ func (fsys *Filesys) getWritableFD(ctx context.Context, handle fuseops.HandleID) return fh.fd, nil } - // read a remote immutable file // func (fsys *Filesys) readRemoteFile(ctx context.Context, op *fuseops.ReadFileOp, fh *FileHandle) error { @@ -1458,7 +1453,7 @@ func (fsys *Filesys) readRemoteFile(ctx context.Context, op *fuseops.ReadFileOp, } // Take an http client from the pool. Return it when done. - httpClient := <- fsys.httpClientPool + httpClient := <-fsys.httpClientPool err := dxda.DxHttpRequestData( ctx, httpClient, @@ -1476,7 +1471,7 @@ func (fsys *Filesys) readRemoteFile(ctx context.Context, op *fuseops.ReadFileOp, func (fsys *Filesys) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error { // Here, we start from the file handle fsys.mutex.Lock() - fh,ok := fsys.fhTable[op.Handle] + fh, ok := fsys.fhTable[op.Handle] if !ok { // invalid file handle. It doesn't exist in the table fsys.mutex.Unlock() @@ -1518,7 +1513,7 @@ func (fsys *Filesys) prepareFileForWrite(ctx context.Context, op *fuseops.WriteF fsys.mutex.Lock() defer fsys.mutex.Unlock() - fh,ok := fsys.fhTable[op.Handle] + fh, ok := fsys.fhTable[op.Handle] if !ok { // invalid file handle. It doesn't exist in the table return nil, fuse.EINVAL @@ -1596,7 +1591,7 @@ func (fsys *Filesys) WriteFile(ctx context.Context, op *fuseops.WriteFileOp) err // Try to efficiently calculate the size and mtime, instead // of doing a filesystem call. - fSize := MaxInt64(op.Offset + int64(nBytes), fh.size) + fSize := MaxInt64(op.Offset+int64(nBytes), fh.size) mtime := time.Now() // Update the file attributes in the database (size, mtime) @@ -1729,7 +1724,7 @@ func (fsys *Filesys) xattrParseName(name string) (string, string, error) { return "", "", fuse.EINVAL } namespace := name[:prefixLen] - attrName := name[len(namespace) + 1 : ] + attrName := name[len(namespace)+1:] return namespace, attrName, nil } @@ -1809,7 +1804,6 @@ func (fsys *Filesys) RemoveXattr(ctx context.Context, op *fuseops.RemoveXattrOp) return nil } - func (fsys *Filesys) getXattrFill(op *fuseops.GetXattrOp, val_str string) error { value := []byte(val_str) op.BytesRead = len(value) @@ -1829,7 +1823,7 @@ func (fsys *Filesys) GetXattr(ctx context.Context, op *fuseops.GetXattrOp) error oph := fsys.opOpen() defer fsys.opClose(oph) - if fsys.options.Verbose { + if fsys.options.VerboseLevel > 1 { fsys.log("GetXattr %d", op.Inode) } @@ -1874,7 +1868,7 @@ func (fsys *Filesys) GetXattr(ctx context.Context, op *fuseops.GetXattrOp) error return fsys.getXattrFill(op, file.State) case "archivalState": return fsys.getXattrFill(op, file.ArchivalState) - case "id" : + case "id": return fsys.getXattrFill(op, file.Id) } } @@ -1903,14 +1897,14 @@ func (fsys *Filesys) ListXattr(ctx context.Context, op *fuseops.ListXattrOp) err // collect all the properties into one array var xattrKeys []string for _, tag := range file.Tags { - xattrKeys = append(xattrKeys, XATTR_TAG + "." + tag) + xattrKeys = append(xattrKeys, XATTR_TAG+"."+tag) } for key, _ := range file.Properties { - xattrKeys = append(xattrKeys, XATTR_PROP + "." + key) + xattrKeys = append(xattrKeys, XATTR_PROP+"."+key) } // Special attributes - for _, key := range []string{ "state", "archivalState", "id"} { - xattrKeys = append(xattrKeys, XATTR_BASE + "." + key) + for _, key := range []string{"state", "archivalState", "id"} { + xattrKeys = append(xattrKeys, XATTR_BASE+"."+key) } if fsys.options.Verbose { fsys.log("attribute keys: %v", xattrKeys) @@ -1924,7 +1918,7 @@ func (fsys *Filesys) ListXattr(ctx context.Context, op *fuseops.ListXattrOp) err if len(dst) >= keyLen { copy(dst, key) - dst[keyLen-1] = 0 // null terminate the string + dst[keyLen-1] = 0 // null terminate the string dst = dst[keyLen:] } else if len(op.Dst) != 0 { return syscall.ERANGE diff --git a/metadata_db.go b/metadata_db.go index 9e3ab3f..c0be1af 100644 --- a/metadata_db.go +++ b/metadata_db.go @@ -8,33 +8,32 @@ import ( "fmt" "log" "math" - "path/filepath" "os" + "path/filepath" "strings" "time" "github.com/dnanexus/dxda" ) - const ( - nsDirType = 1 + nsDirType = 1 nsDataObjType = 2 ) type MetadataDb struct { // an open handle to the database - db *sql.DB - dbFullPath string + db *sql.DB + dbFullPath string // configuration information for accessing dnanexus servers - dxEnv dxda.DXEnvironment + dxEnv dxda.DXEnvironment // mapping from mounted directory to project ID baseDir2ProjectId map[string]string - inodeCnt int64 - options Options + inodeCnt int64 + options Options } func NewMetadataDb( @@ -42,18 +41,18 @@ func NewMetadataDb( dxEnv dxda.DXEnvironment, options Options) (*MetadataDb, error) { // create a connection to the database, that will be kept open - db, err := sql.Open("sqlite3", dbFullPath + "?mode=rwc") + db, err := sql.Open("sqlite3", dbFullPath+"?mode=rwc") if err != nil { return nil, fmt.Errorf("Could not open the database %s", dbFullPath) } return &MetadataDb{ - db : db, - dbFullPath : dbFullPath, - dxEnv : dxEnv, + db: db, + dbFullPath: dbFullPath, + dxEnv: dxEnv, baseDir2ProjectId: make(map[string]string), - inodeCnt : InodeRoot + 1, - options : options, + inodeCnt: InodeRoot + 1, + options: options, }, nil } @@ -73,9 +72,9 @@ func (mdb *MetadataDb) opOpen() *OpHandle { } return &OpHandle{ - httpClient : nil, - txn : txn, - err : nil, + httpClient: nil, + txn: txn, + err: nil, } } @@ -93,7 +92,6 @@ func (mdb *MetadataDb) opClose(oph *OpHandle) { } } - // Construct a local sql database that holds metadata for // a large number of dx:files. This metadata_db will be consulted // when performing dxfuse operations. For example, a read-dir is @@ -128,7 +126,7 @@ func tagsMarshal(tags []string) string { return "" } payload, err := json.Marshal(MTags{ - Elements : tags, + Elements: tags, }) if err != nil { log.Panicf("failed to marshal tags (%v), %s", tags, err.Error()) @@ -150,13 +148,12 @@ func tagsUnmarshal(buf string) []string { var coded MTags err = json.Unmarshal(originalBytes, &coded) if err != nil { - log.Panicf("failed to unmarshal tags (%s), %s", buf, err.Error()) + log.Panicf("failed to unmarshal tags (%s), %s", buf, err.Error()) return nil } return coded.Elements } - // Marshal a DNAx object properties to/from a string that // is stored in a database table. We use base64 encoding for the // same reason as tags (see above). @@ -169,7 +166,7 @@ func propertiesMarshal(props map[string]string) string { return "" } payload, err := json.Marshal(MProperties{ - Elements : props, + Elements: props, }) if err != nil { log.Panicf("failed to marshal properties (%v), %s", props, err.Error()) @@ -465,7 +462,7 @@ func (mdb *MetadataDb) lookupDirByInode(oph *OpHandle, parent string, dname stri var populated int var ctime int64 var mtime int64 - var mode int + var mode int rows.Scan(&d.ProjId, &d.ProjFolder, &populated, &ctime, &mtime, &mode) @@ -498,7 +495,6 @@ func (mdb *MetadataDb) lookupDirByInode(oph *OpHandle, parent string, dname stri return d, true, nil } - // search for a file with a particular inode. // func (mdb *MetadataDb) LookupByInode(ctx context.Context, oph *OpHandle, inode int64) (Node, bool, error) { @@ -615,7 +611,6 @@ func (mdb *MetadataDb) UpdateInodeFileId(inode int64, fileId string) error { return nil } - // The directory is in the database, read it in its entirety. func (mdb *MetadataDb) directoryReadAllEntries( oph *OpHandle, @@ -649,13 +644,13 @@ func (mdb *MetadataDb) directoryReadAllEntries( rows.Scan(&inode, &projId, &dname, &ctime, &mtime, &mode) subdirs[dname] = Dir{ - Parent : dirFullName, - Dname : dname, - FullPath : filepath.Clean(filepath.Join(dirFullName, dname)), - Inode : inode, - Ctime : SecondsToTime(ctime), - Mtime : SecondsToTime(mtime), - Mode : os.FileMode(mode), + Parent: dirFullName, + Dname: dname, + FullPath: filepath.Clean(filepath.Join(dirFullName, dname)), + Inode: inode, + Ctime: SecondsToTime(ctime), + Mtime: SecondsToTime(mtime), + Mode: os.FileMode(mode), } } rows.Close() @@ -686,7 +681,7 @@ func (mdb *MetadataDb) directoryReadAllEntries( var mode int var dirtyData int var dirtyMetadata int - rows.Scan(&f.Kind,&f.Id, &f.ProjId, &f.State, &f.ArchivalState, &f.Inode, + rows.Scan(&f.Kind, &f.Id, &f.ProjId, &f.State, &f.ArchivalState, &f.Inode, &f.Size, &ctime, &mtime, &mode, &tags, &props, &f.Symlink, &f.LocalPath, &dirtyData, &dirtyMetadata, &f.Name) f.Ctime = SecondsToTime(ctime) @@ -723,7 +718,7 @@ func (mdb *MetadataDb) createDataObject( size int64, ctime int64, mtime int64, - tags []string, + tags []string, properties map[string]string, mode os.FileMode, parentDir string, @@ -732,7 +727,7 @@ func (mdb *MetadataDb) createDataObject( localPath string) (int64, error) { if mdb.options.VerboseLevel > 1 { mdb.log("createDataObject %s:%s %s", projId, objId, - filepath.Clean(parentDir + "/" + fname)) + filepath.Clean(parentDir+"/"+fname)) } // File doesn't exist, we need to choose a new inode number. // Note: it is on stable storage, and will not change. @@ -795,7 +790,7 @@ func (mdb *MetadataDb) createEmptyDir( sqlStmt := fmt.Sprintf(` INSERT INTO namespace VALUES ('%s', '%s', '%d', '%d');`, - parentDir, basename, nsDirType, inode) + parentDir, basename, nsDirType, inode) if _, err := oph.txn.Exec(sqlStmt); err != nil { mdb.log("createEmptyDir: error inserting into namespace table %s/%s, err=%s", parentDir, basename, err.Error()) @@ -903,7 +898,7 @@ func symlinkOfFile(kind int, o DxDescribeDataObject) string { kind = FK_Symlink } - switch (kind) { + switch kind { case FK_Symlink: return o.SymlinkPath default: @@ -934,7 +929,12 @@ func (mdb *MetadataDb) populateDir( if mdb.options.VerboseLevel > 1 { mdb.log("inserting files") } - + var fileMode os.FileMode + if mdb.options.ReadOnly { + fileMode = fileReadOnlyMode + } else { + fileMode = fileReadWriteMode + } for _, o := range dxObjs { kind := mdb.kindOfFile(o) symlink := symlinkOfFile(kind, o) @@ -953,7 +953,7 @@ func (mdb *MetadataDb) populateDir( o.MtimeSeconds, o.Tags, o.Properties, - fileReadWriteMode, + fileMode, dirPath, o.Name, symlink, @@ -973,14 +973,14 @@ func (mdb *MetadataDb) populateDir( // is false. _, err := mdb.createEmptyDir( oph, - projId, filepath.Clean(projFolder + "/" + subDirName), + projId, filepath.Clean(projFolder+"/"+subDirName), ctime, mtime, dirReadWriteMode, - filepath.Clean(dirPath + "/" + subDirName), + filepath.Clean(dirPath+"/"+subDirName), false) if err != nil { mdb.log("Error creating empty directory %s while populating directory %s", - filepath.Clean(projFolder + "/" + subDirName), dirPath) + filepath.Clean(projFolder+"/"+subDirName), dirPath) return err } } @@ -1092,7 +1092,6 @@ func (mdb *MetadataDb) directoryReadFromDNAx( return nil } - // Add a directory with its contents to an exisiting database func (mdb *MetadataDb) ReadDirAll(ctx context.Context, oph *OpHandle, dir *Dir) (map[string]File, map[string]Dir, error) { if mdb.options.Verbose { @@ -1119,7 +1118,6 @@ func (mdb *MetadataDb) ReadDirAll(ctx context.Context, oph *OpHandle, dir *Dir) return mdb.directoryReadAllEntries(oph, dir.FullPath) } - // Search for a file/subdir in a directory // Look for file [filename] in directory [parent]/[dname]. // @@ -1196,7 +1194,7 @@ func (mdb *MetadataDb) PopulateRoot(ctx context.Context, oph *OpHandle, manifest for _, d := range dirSkel { _, err := mdb.createEmptyDir( oph, - "", "", // There is no backing project/folder + "", "", // There is no backing project/folder nowSeconds, nowSeconds, dirReadOnlyMode, // skeleton directories are scaffolding, they cannot be modified. d, true) @@ -1283,16 +1281,16 @@ func (mdb *MetadataDb) CreateFile( inode, err := mdb.createDataObject( oph, FK_Regular, - true, // file is dirty, it should be uploaded. + true, // file is dirty, it should be uploaded. false, dir.ProjId, "closed", "live", - "", // no file-id yet - 0, /* the file is empty */ + "", // no file-id yet + 0, /* the file is empty */ nowSeconds, nowSeconds, - nil, // A local file initially doesn't have tags or properties + nil, // A local file initially doesn't have tags or properties nil, mode, dir.FullPath, @@ -1306,18 +1304,18 @@ func (mdb *MetadataDb) CreateFile( // 3. return a File structure return File{ - Kind: FK_Regular, - Id : "", - ProjId : dir.ProjId, - ArchivalState : "live", - Name : fname, - Size : 0, - Inode : inode, - Ctime : SecondsToTime(nowSeconds), - Mtime : SecondsToTime(nowSeconds), - Mode : mode, - Symlink: "", - LocalPath : localPath, + Kind: FK_Regular, + Id: "", + ProjId: dir.ProjId, + ArchivalState: "live", + Name: fname, + Size: 0, + Inode: inode, + Ctime: SecondsToTime(nowSeconds), + Mtime: SecondsToTime(nowSeconds), + Mode: mode, + Symlink: "", + LocalPath: localPath, }, nil } @@ -1410,8 +1408,6 @@ func (mdb *MetadataDb) UpdateFileLocalPath( return nil } - - // Move a file // 1) Can move a file from one directory to another, // or leave it in the same directory @@ -1439,8 +1435,7 @@ func (mdb *MetadataDb) MoveFile( return nil } - -type MoveRecord struct { +type MoveRecord struct { oldFullPath string name string newParent string @@ -1529,7 +1524,7 @@ func (mdb *MetadataDb) MoveDir( SELECT parent, name, inode, obj_type FROM namespace WHERE parent LIKE '%s';`, - oldDir.FullPath + "%") + oldDir.FullPath+"%") rows, err := oph.txn.Query(sqlStmt) if err != nil { return oph.RecordError(err) @@ -1558,15 +1553,15 @@ func (mdb *MetadataDb) MoveDir( midPath := newName + "/" + strings.TrimPrefix(parent, oldDir.FullPath) var newProjFolder string if nsObjType == nsDirType { - newProjFolder = filepath.Clean(newParentDir.ProjFolder + "/" + midPath + "/" + name) + newProjFolder = filepath.Clean(newParentDir.ProjFolder + "/" + midPath + "/" + name) } mr := MoveRecord{ - oldFullPath : parent + "/" + name, - name : name, - newParent : filepath.Clean(newParentDir.FullPath + "/" + midPath), - newProjFolder : newProjFolder, - inode : inode, - nsObjType : nsObjType, + oldFullPath: parent + "/" + name, + name: name, + newParent: filepath.Clean(newParentDir.FullPath + "/" + midPath), + newProjFolder: newProjFolder, + inode: inode, + nsObjType: nsObjType, } records = append(records, mr) } @@ -1575,12 +1570,12 @@ func (mdb *MetadataDb) MoveDir( // add the top level directory (A) to be moved. Note, that the top level directory may // change name. records = append(records, MoveRecord{ - oldFullPath : oldDir.FullPath, - name : newName, - newParent : filepath.Clean(newParentDir.FullPath), - newProjFolder : filepath.Clean(filepath.Join(newParentDir.ProjFolder, newName)), - inode : oldDir.Inode, - nsObjType: nsDirType, + oldFullPath: oldDir.FullPath, + name: newName, + newParent: filepath.Clean(newParentDir.FullPath), + newProjFolder: filepath.Clean(filepath.Join(newParentDir.ProjFolder, newName)), + inode: oldDir.Inode, + nsObjType: nsDirType, }) if mdb.options.Verbose { mdb.log("found %d records under directory %s: %v", len(records), oldDir.FullPath, records) @@ -1694,7 +1689,7 @@ func (mdb *MetadataDb) DirtyFilesGetAndReset(flag int) ([]DirtyFileInfo, error) rows.Close() // Figure out the project folder for each file - for i, _ := range(fAr) { + for i, _ := range fAr { projId, projFolder, err := mdb.lookupDirByName(oph, fAr[i].Directory) if err != nil { return nil, err diff --git a/posix.go b/posix.go index c7c2064..5b4887b 100644 --- a/posix.go +++ b/posix.go @@ -32,13 +32,13 @@ import ( // zoo regular file // type PosixDir struct { - path string // entire directory path + path string // entire directory path dataObjects []DxDescribeDataObject subdirs []string // additional subdirectories holding files that have multiple versions, // and could not be placed in the original location. - fauxSubdirs map[string]([]DxDescribeDataObject) + fauxSubdirs map[string]([]DxDescribeDataObject) } type Posix struct { @@ -47,7 +47,7 @@ type Posix struct { func NewPosix(options Options) *Posix { return &Posix{ - options : options, + options: options, } } @@ -102,7 +102,6 @@ func (px *Posix) pickFauxDirNames(subdirs []string, uniqueFileNames []string, nu return dirNames } - // Find all the unique file names. // func (px *Posix) uniqueFileNames(dxObjs []DxDescribeDataObject) []string { @@ -127,7 +126,7 @@ func (px *Posix) chooseAllObjectsWithName( name string) []DxDescribeDataObject { objs := make([]DxDescribeDataObject, 0) - for _, o := range(dxObjs) { + for _, o := range dxObjs { if o.Name == name { objs = append(objs, o) } @@ -138,7 +137,6 @@ func (px *Posix) chooseAllObjectsWithName( return objs } - // main entry point // // 1. Keep directory names fixed @@ -159,7 +157,7 @@ func (px *Posix) FixDir(dxFolder *DxFolder) (*PosixDir, error) { for _, subDirName := range dxFolder.subdirs { // Make SURE that the subdirectory does not contain a slash. lastPart := strings.TrimPrefix(subDirName, dxFolder.path) - lastPart = strings.TrimPrefix(lastPart,"/") + lastPart = strings.TrimPrefix(lastPart, "/") if strings.Contains(lastPart, "/") { px.log("Dropping subdirectory %s, it contains a slash", lastPart) continue @@ -216,40 +214,44 @@ func (px *Posix) FixDir(dxFolder *DxFolder) (*PosixDir, error) { // Take all the data-objects that have names that aren't already taken // up by subdirs. They go in the top level var topLevelObjs []DxDescribeDataObject + if len(allDxObjs) == len(uniqueFileNames) && len(subdirSet) < 1 { + px.log("Dir contains no duplicate filenames or subdirs with the same name as a filename, all objects are top level") + topLevelObjs = allDxObjs + } else { + for _, oName := range uniqueFileNames { + dxObjs := px.chooseAllObjectsWithName(allDxObjs, oName) + if px.options.VerboseLevel > 1 { + px.log("name=%s len(objs)=%d", oName, len(dxObjs)) + } - for _, oName := range(uniqueFileNames) { - dxObjs := px.chooseAllObjectsWithName(allDxObjs, oName) - if px.options.VerboseLevel > 1 { - px.log("name=%s len(objs)=%d", oName, len(dxObjs)) - } - - _, ok := subdirSet[oName] - if !ok { - // There is no directory with this name, we - // place the object at the toplevel - topLevelObjs = append(topLevelObjs, dxObjs[0]) - dxObjs = dxObjs[1:] - } - - // spread the remaining copies across the faux subdirectories - for i, obj := range(dxObjs) { - dName := fauxDirNames[i] - vec, ok := fauxSubDirs[dName] + _, ok := subdirSet[oName] if !ok { - // need to start a new faux subdir called "dName" - v := make([]DxDescribeDataObject, 1) - v[0] = obj - fauxSubDirs[dName] = v - } else { - fauxSubDirs[dName] = append(vec, obj) + // There is no directory with this name, we + // place the object at the toplevel + topLevelObjs = append(topLevelObjs, dxObjs[0]) + dxObjs = dxObjs[1:] + } + + // spread the remaining copies across the faux subdirectories + for i, obj := range dxObjs { + dName := fauxDirNames[i] + vec, ok := fauxSubDirs[dName] + if !ok { + // need to start a new faux subdir called "dName" + v := make([]DxDescribeDataObject, 1) + v[0] = obj + fauxSubDirs[dName] = v + } else { + fauxSubDirs[dName] = append(vec, obj) + } } } } posixDxFolder := &PosixDir{ - path: dxFolder.path, + path: dxFolder.path, dataObjects: topLevelObjs, - subdirs: subdirs, + subdirs: subdirs, fauxSubdirs: fauxSubDirs, } if px.options.VerboseLevel > 1 { diff --git a/prefetch.go b/prefetch.go index 940eb50..97ee45b 100644 --- a/prefetch.go +++ b/prefetch.go @@ -7,8 +7,8 @@ import ( "context" "errors" "fmt" - "log" "io/ioutil" + "log" "math/bits" "net/http" "os" @@ -26,8 +26,8 @@ const ( periodicTime = 30 * time.Second slowIoThresh = 60 // when does a slow IO become worth reporting - prefetchMinIoSize = (256 * KiB) // threshold for deciding the file is sequentially accessed - prefetchIoFactor = 4 + prefetchMinIoSize = (256 * KiB) // threshold for deciding the file is sequentially accessed + prefetchIoFactor = 4 numSlotsInChunk = 64 @@ -38,7 +38,7 @@ const ( // maximum number of prefetch threads, regardless of machine size maxNumPrefetchThreads = 32 - minFileSize = 1 * MiB // do not track files smaller than this size + minFileSize = 1 * MiB // do not track files smaller than this size // An prefetch request time limit readRequestTimeout = 90 * time.Second @@ -46,95 +46,95 @@ const ( // enumerated type for the state of a PFM (file metadata) const ( - PFM_NIL = 1 // No IOs have been seen yet, cache is empty - PFM_DETECT_SEQ = 2 // First accesses, detecting if access is sequential + PFM_NIL = 1 // No IOs have been seen yet, cache is empty + PFM_DETECT_SEQ = 2 // First accesses, detecting if access is sequential PFM_PREFETCH_IN_PROGRESS = 3 // prefetch is ongoing - PFM_EOF = 4 // reached the end of the file + PFM_EOF = 4 // reached the end of the file ) // state of an io-vector const ( - IOV_HOLE = 1 // empty + IOV_HOLE = 1 // empty IOV_IN_FLIGHT = 2 // in progress - IOV_DONE = 3 // completed successfully - IOV_ERRORED = 4 // completed with an error + IOV_DONE = 3 // completed successfully + IOV_ERRORED = 4 // completed with an error ) // A request that one of the IO-threads will pick up type IoReq struct { - hid fuseops.HandleID - inode int64 - size int64 - url DxDownloadURL + hid fuseops.HandleID + inode int64 + size int64 + url DxDownloadURL - ioSize int64 // The io size - startByte int64 // start byte, counting from the beginning of the file. - endByte int64 + ioSize int64 // The io size + startByte int64 // start byte, counting from the beginning of the file. + endByte int64 - id uint64 // a unique id for this IO + id uint64 // a unique id for this IO } type Iovec struct { - ioSize int64 // The io size - startByte int64 // start byte, counting from the beginning of the file. - endByte int64 - touched uint64 // mark the areas that have been accessed by the user - data []byte + ioSize int64 // The io size + startByte int64 // start byte, counting from the beginning of the file. + endByte int64 + touched uint64 // mark the areas that have been accessed by the user + data []byte // io-vector statue (ongoing, done, errored) - state int + state int // Allow user reads to wait until prefetch IO complete - cond *sync.Cond + cond *sync.Cond } // A cache of all the data retrieved from the platform, for one file. // It is a contiguous range of chunks. All IOs are the same size. type Cache struct { - prefetchIoSize int64 // size of the IO to issue when prefetching - maxNumIovecs int + prefetchIoSize int64 // size of the IO to issue when prefetching + maxNumIovecs int - startByte int64 - endByte int64 - iovecs [](*Iovec) + startByte int64 + endByte int64 + iovecs [](*Iovec) } type MeasureWindow struct { - timestamp time.Time - numIOs int - numBytesPrefetched int64 - numPrefetchIOs int + timestamp time.Time + numIOs int + numBytesPrefetched int64 + numPrefetchIOs int } type PrefetchFileMetadata struct { - mutex sync.Mutex + mutex sync.Mutex // the file being tracked - hid fuseops.HandleID - inode int64 - id string - size int64 - url DxDownloadURL - state int + hid fuseops.HandleID + inode int64 + id string + size int64 + url DxDownloadURL + state int - lastIoTimestamp time.Time // Last time an IO hit this file - hiUserAccessOfs int64 // highest file offset accessed by the user - mw MeasureWindow // statistics for stream + lastIoTimestamp time.Time // Last time an IO hit this file + hiUserAccessOfs int64 // highest file offset accessed by the user + mw MeasureWindow // statistics for stream // cached io vectors. // The assumption is that the user is accessing the last io-vector. // If this assumption isn't true, prefetch is ineffective. The algorithm // should detect and stop it. - cache Cache + cache Cache } // global limits type PrefetchGlobalState struct { - mutex sync.Mutex // Lock used to control the files table + mutex sync.Mutex // Lock used to control the files table verbose bool verboseLevel int handlesInfo map[fuseops.HandleID](*PrefetchFileMetadata) // tracking state per handle - ioQueue chan IoReq // queue of IOs to prefetch + ioQueue chan IoReq // queue of IOs to prefetch wg sync.WaitGroup prefetchMaxIoSize int64 numPrefetchThreads int @@ -157,10 +157,14 @@ func (iov Iovec) intersectBuffer(startOfs int64, endOfs int64) []byte { func (iov Iovec) stateString() string { switch iov.state { - case IOV_HOLE: return "HOLE" - case IOV_IN_FLIGHT: return "IN_FLIGHT" - case IOV_DONE: return "DONE" - case IOV_ERRORED: return "ERRORED" + case IOV_HOLE: + return "HOLE" + case IOV_IN_FLIGHT: + return "IN_FLIGHT" + case IOV_DONE: + return "DONE" + case IOV_ERRORED: + return "ERRORED" default: panic(fmt.Sprintf("bad state for iovec %d", iov.state)) } @@ -174,10 +178,14 @@ func (pfm *PrefetchFileMetadata) log(a string, args ...interface{}) { func (pfm *PrefetchFileMetadata) stateString() string { switch pfm.state { - case PFM_NIL: return "NIL" - case PFM_DETECT_SEQ: return "DETECT_SEQ" - case PFM_PREFETCH_IN_PROGRESS: return "PREFETCHING" - case PFM_EOF: return "EOF" + case PFM_NIL: + return "NIL" + case PFM_DETECT_SEQ: + return "DETECT_SEQ" + case PFM_PREFETCH_IN_PROGRESS: + return "PREFETCHING" + case PFM_EOF: + return "EOF" default: panic(fmt.Sprintf("bad state for pfm %d", pfm.state)) } @@ -223,7 +231,7 @@ func NewPrefetchGlobalState(verboseLevel int, dxEnv dxda.DXEnvironment) *Prefetc // 2) not have more than two workers per CPU // 3) not go over an overall limit, regardless of machine size numCPUs := runtime.NumCPU() - numPrefetchThreads := MinInt(numCPUs * 2, maxNumPrefetchThreads) + numPrefetchThreads := MinInt(numCPUs*2, maxNumPrefetchThreads) log.Printf("Number of prefetch threads=%d", numPrefetchThreads) // The number of read-ahead should be limited to 8 @@ -252,18 +260,18 @@ func NewPrefetchGlobalState(verboseLevel int, dxEnv dxda.DXEnvironment) *Prefetc totalMemoryBytes := 2 * maxNumEntriesInTable * prefetchMaxIoSize totalMemoryBytes += int64(maxNumChunksReadAhead) * prefetchMaxIoSize - log.Printf("maximal memory usage: %dMiB", totalMemoryBytes / MiB) + log.Printf("maximal memory usage: %dMiB", totalMemoryBytes/MiB) log.Printf("number of prefetch worker threads: %d", numPrefetchThreads) log.Printf("maximal number of read-ahead chunks: %d", maxNumChunksReadAhead) pgs := &PrefetchGlobalState{ - verbose : verboseLevel >= 1, - verboseLevel : verboseLevel, - handlesInfo : make(map[fuseops.HandleID](*PrefetchFileMetadata)), - ioQueue : make(chan IoReq), - prefetchMaxIoSize : prefetchMaxIoSize, - numPrefetchThreads: numPrefetchThreads, - maxNumChunksReadAhead : maxNumChunksReadAhead, + verbose: verboseLevel >= 1, + verboseLevel: verboseLevel, + handlesInfo: make(map[fuseops.HandleID](*PrefetchFileMetadata)), + ioQueue: make(chan IoReq), + prefetchMaxIoSize: prefetchMaxIoSize, + numPrefetchThreads: numPrefetchThreads, + maxNumChunksReadAhead: maxNumChunksReadAhead, } // limit the number of prefetch IOs @@ -406,20 +414,20 @@ func (pgs *PrefetchGlobalState) DownloadEntireFile( endOfs := size - 1 startByte := int64(0) for startByte <= endOfs { - endByte := MinInt64(startByte + pgs.prefetchMaxIoSize - 1, endOfs) + endByte := MinInt64(startByte+pgs.prefetchMaxIoSize-1, endOfs) iovLen := endByte - startByte + 1 // read one chunk of the file uniqueId := atomic.AddUint64(&pgs.ioCounter, 1) ioReq := IoReq{ - hid : 0, // Invalid handle, shouldn't be in a table - inode : inode, - size : size, - url : url, - ioSize : iovLen, - startByte : startByte, - endByte : endByte, - id : uniqueId, + hid: 0, // Invalid handle, shouldn't be in a table + inode: inode, + size: size, + url: url, + ioSize: iovLen, + startByte: startByte, + endByte: endByte, + id: uniqueId, } data, err := pgs.readData(client, ioReq) @@ -480,7 +488,6 @@ func (pgs *PrefetchGlobalState) addIoReqToCache(pfm *PrefetchFileMetadata, ioReq pfm.cache.iovecs[iovIdx].cond.Broadcast() } - func (pgs *PrefetchGlobalState) getAndLockPfm(hid fuseops.HandleID) *PrefetchFileMetadata { pgs.mutex.Lock() @@ -534,7 +541,6 @@ func (pgs *PrefetchGlobalState) prefetchIoWorker() { } } - // Check if a file is worth tracking. func (pgs *PrefetchGlobalState) isWorthIt(pfm *PrefetchFileMetadata, now time.Time) bool { if now.After(pfm.lastIoTimestamp.Add(maxDeltaTime)) { @@ -592,20 +598,20 @@ func (pgs *PrefetchGlobalState) newPrefetchFileMetadata( url DxDownloadURL) *PrefetchFileMetadata { now := time.Now() return &PrefetchFileMetadata{ - mutex : sync.Mutex{}, - hid : hid, - inode : f.Inode, - id : f.Id, - size : f.Size, - url : url, - state : PFM_NIL, // Initial state of the file; no IOs were detected yet - lastIoTimestamp : now, - hiUserAccessOfs : 0, - mw : MeasureWindow{ - timestamp : now, - numIOs : 0, - numBytesPrefetched : 0, - numPrefetchIOs : 0, + mutex: sync.Mutex{}, + hid: hid, + inode: f.Inode, + id: f.Id, + size: f.Size, + url: url, + state: PFM_NIL, // Initial state of the file; no IOs were detected yet + lastIoTimestamp: now, + hiUserAccessOfs: 0, + mw: MeasureWindow{ + timestamp: now, + numIOs: 0, + numBytesPrefetched: 0, + numPrefetchIOs: 0, }, } } @@ -623,29 +629,29 @@ func (pgs *PrefetchGlobalState) firstAccessToStream(pfm *PrefetchFileMetadata, o startOfs := (ofs / pageSize) * pageSize iov1 := &Iovec{ - ioSize : prefetchMinIoSize, - startByte : startOfs, - endByte : startOfs + prefetchMinIoSize - 1, - touched : 0, - data : nil, - state : IOV_HOLE, - cond : sync.NewCond(&pfm.mutex), + ioSize: prefetchMinIoSize, + startByte: startOfs, + endByte: startOfs + prefetchMinIoSize - 1, + touched: 0, + data: nil, + state: IOV_HOLE, + cond: sync.NewCond(&pfm.mutex), } iov2 := &Iovec{ - ioSize : prefetchMinIoSize, - startByte : iov1.startByte + prefetchMinIoSize, - endByte : iov1.endByte + prefetchMinIoSize, - touched : 0, - data : nil, - state : IOV_HOLE, - cond : sync.NewCond(&pfm.mutex), + ioSize: prefetchMinIoSize, + startByte: iov1.startByte + prefetchMinIoSize, + endByte: iov1.endByte + prefetchMinIoSize, + touched: 0, + data: nil, + state: IOV_HOLE, + cond: sync.NewCond(&pfm.mutex), } pfm.cache = Cache{ - prefetchIoSize : prefetchMinIoSize, - maxNumIovecs : 2, - startByte : iov1.startByte, - endByte : iov2.endByte, - iovecs : make([]*Iovec, 2), + prefetchIoSize: prefetchMinIoSize, + maxNumIovecs: 2, + startByte: iov1.startByte, + endByte: iov2.endByte, + iovecs: make([]*Iovec, 2), } pfm.cache.iovecs[0] = iov1 pfm.cache.iovecs[1] = iov2 @@ -707,7 +713,7 @@ func (pfm *PrefetchFileMetadata) markRangeInIovec(iovec *Iovec, startOfs int64, } check(endSlot >= 0 && endSlot <= numSlotsInChunk) - for slot := startSlot; slot <= endSlot ; slot++ { + for slot := startSlot; slot <= endSlot; slot++ { // Sets the bit at position [slot] iovec.touched |= (1 << uint(slot)) } @@ -755,7 +761,6 @@ func (pgs *PrefetchGlobalState) findCoveredRange( return first, last } - // Setup cache state for the next prefetch. // // 1) we want there to be several chunks ahead of us. @@ -787,33 +792,33 @@ func (pgs *PrefetchGlobalState) moveCacheWindow(pfm *PrefetchFileMetadata, iovIn if startByte > lastByteInFile { break } - endByte := MinInt64(startByte + int64(pfm.cache.prefetchIoSize) - 1, lastByteInFile) + endByte := MinInt64(startByte+int64(pfm.cache.prefetchIoSize)-1, lastByteInFile) iov := &Iovec{ - ioSize : endByte - startByte + 1, - startByte : startByte, - endByte : endByte, - touched : 0, - data : nil, - state : IOV_IN_FLIGHT, - cond : sync.NewCond(&pfm.mutex), + ioSize: endByte - startByte + 1, + startByte: startByte, + endByte: endByte, + touched: 0, + data: nil, + state: IOV_IN_FLIGHT, + cond: sync.NewCond(&pfm.mutex), } uniqueId := atomic.AddUint64(&pgs.ioCounter, 1) pgs.ioQueue <- IoReq{ - hid : pfm.hid, - inode : pfm.inode, - size : pfm.size, - url : pfm.url, - ioSize : iov.ioSize, - startByte : iov.startByte, - endByte : iov.endByte, - id : uniqueId, + hid: pfm.hid, + inode: pfm.inode, + size: pfm.size, + url: pfm.url, + ioSize: iov.ioSize, + startByte: iov.startByte, + endByte: iov.endByte, + id: uniqueId, } check(iov.ioSize <= pgs.prefetchMaxIoSize) pfm.cache.iovecs = append(pfm.cache.iovecs, iov) if pgs.verbose { pfm.log("Adding chunk %d [%d -- %d]", - len(pfm.cache.iovecs) - 1, + len(pfm.cache.iovecs)-1, iov.startByte, iov.endByte) } @@ -835,7 +840,7 @@ func (pgs *PrefetchGlobalState) moveCacheWindow(pfm *PrefetchFileMetadata, iovIn } } if nRemoved > 0 { - pfm.cache.iovecs = pfm.cache.iovecs[nRemoved : ] + pfm.cache.iovecs = pfm.cache.iovecs[nRemoved:] if pgs.verbose { pfm.log("Removed %d chunks", nRemoved) } @@ -857,7 +862,6 @@ func (pgs *PrefetchGlobalState) moveCacheWindow(pfm *PrefetchFileMetadata, iovIn } } - func (pgs *PrefetchGlobalState) markAccessedAndMaybeStartPrefetch( pfm *PrefetchFileMetadata, startOfs int64, @@ -877,9 +881,9 @@ func (pgs *PrefetchGlobalState) markAccessedAndMaybeStartPrefetch( numAccessed := bits.OnesCount64(currentIovec.touched) if pgs.verboseLevel >= 2 { pfm.log("touch: ofs=%d len=%d numAccessed=%d", - startOfs, endOfs - startOfs, numAccessed) + startOfs, endOfs-startOfs, numAccessed) } - if numAccessed < numSlotsInChunk { + if numAccessed < (numSlotsInChunk - 32) { return true } // A sufficient number of the slots were accessed. Start a prefetch for @@ -892,7 +896,7 @@ func (pgs *PrefetchGlobalState) markAccessedAndMaybeStartPrefetch( // increase io size, using a bounded exponential formula if pfm.cache.prefetchIoSize < pgs.prefetchMaxIoSize { pfm.cache.prefetchIoSize = - MinInt64(pgs.prefetchMaxIoSize, pfm.cache.prefetchIoSize * prefetchIoFactor) + MinInt64(pgs.prefetchMaxIoSize, pfm.cache.prefetchIoSize*prefetchIoFactor) } if pfm.cache.prefetchIoSize == pgs.prefetchMaxIoSize { // Give each stream at least one read-ahead request. If there @@ -908,14 +912,13 @@ func (pgs *PrefetchGlobalState) markAccessedAndMaybeStartPrefetch( pgs.moveCacheWindow(pfm, last) // Have we reached the end of the file? - if pfm.cache.endByte >= pfm.size - 1 { + if pfm.cache.endByte >= pfm.size-1 { pfm.state = PFM_EOF } } return true } - const ( DATA_OUTSIDE_CACHE = 1 // data not in cache DATA_IN_CACHE = 2 // data is in cache @@ -925,11 +928,16 @@ const ( func cacheCode2string(retCode int) string { switch retCode { - case DATA_OUTSIDE_CACHE: return "OUTSIDE_CACHE" - case DATA_IN_CACHE: return "IN_CACHE" - case DATA_HOLE: return "HOLE" - case DATA_WAIT : return "WAIT" - default: panic(fmt.Sprintf("unknown cache code %d", retCode)) + case DATA_OUTSIDE_CACHE: + return "OUTSIDE_CACHE" + case DATA_IN_CACHE: + return "IN_CACHE" + case DATA_HOLE: + return "HOLE" + case DATA_WAIT: + return "WAIT" + default: + panic(fmt.Sprintf("unknown cache code %d", retCode)) } } @@ -972,20 +980,19 @@ func (pgs *PrefetchGlobalState) isDataInCache( return DATA_IN_CACHE } - // The IO is entirely in the cache, and all the data // is there. func (pgs *PrefetchGlobalState) copyDataFromCache( pfm *PrefetchFileMetadata, startOfs int64, endOfs int64, - data []byte) int { + data []byte) int { first, last := pgs.findCoveredRange(pfm, startOfs, endOfs) check(0 <= first && 0 <= last) cursor := 0 for i := first; i <= last; i++ { iov := pfm.cache.iovecs[i] - check (iov.state == IOV_DONE) + check(iov.state == IOV_DONE) subBuf := iov.intersectBuffer(startOfs, endOfs) len := copy(data[cursor:], subBuf) cursor += len @@ -1001,7 +1008,7 @@ func (pgs *PrefetchGlobalState) getDataFromCache( pfm *PrefetchFileMetadata, startOfs int64, endOfs int64, - data []byte) (int, int) { + data []byte) (int, int) { numTries := 3 for i := 0; i < numTries; i++ { retCode := pgs.isDataInCache(pfm, startOfs, endOfs) @@ -1081,9 +1088,13 @@ func (pgs *PrefetchGlobalState) CacheLookup(hid fuseops.HandleID, startOfs int64 case PFM_EOF: // don't issue any more prefetch IOs, we have reached the end of the file - _, len := pgs.getDataFromCache(pfm, startOfs, endOfs, data) + retCode, len := pgs.getDataFromCache(pfm, startOfs, endOfs, data) + if retCode == DATA_OUTSIDE_CACHE { + // The file is being accessed again, perhaps reading from a different region + // reset the cache and start over + pgs.resetPfm(pfm) + } return len - default: log.Panicf("bad state %d for fileId=%s", pfm.state, pfm.id) return 0 diff --git a/util.go b/util.go index 8cb900a..fc690b4 100644 --- a/util.go +++ b/util.go @@ -26,10 +26,10 @@ const ( HttpClientPoolSize = 4 FileWriteInactivityThresh = 5 * time.Minute WritableFileSizeLimit = 16 * MiB - MaxDirSize = 10 * 1000 + MaxDirSize = 255 * 1000 MaxNumFileHandles = 1000 * 1000 NumRetriesDefault = 3 - Version = "v0.22.4" + Version = "v0.23.0" ) const ( InodeInvalid = 0