Skip to content

Commit

Permalink
Merge pull request #229 from trheyi/main
Browse files Browse the repository at this point in the history
Fix memory leaks, Add ReadCloser and WriteCloser methods to FileSystem interface; refactor Process to manage execution results
  • Loading branch information
trheyi authored Oct 30, 2024
2 parents e0a7d2f + b39978d commit e6131b3
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 31 deletions.
41 changes: 30 additions & 11 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,27 @@ func (path Path) defaultHandler(getArgs argsHandler) func(c *gin.Context) {
}
}

// Release Memory
defer func() { resp = nil; body = nil }()
switch data := body.(type) {
case maps.Map, map[string]interface{}, []interface{}, []maps.Map, []map[string]interface{}:
defer func() { data = nil }()
c.JSON(status, data)
c.Done()
return

case []byte:
defer func() { data = nil }()
c.Data(status, contentType, data)
c.Done()
return

case io.ReadCloser:
defer data.Close()
c.DataFromReader(status, -1, contentType, data, nil)
c.Done()
return

case error:
ex := exception.Err(data, 500)
c.JSON(ex.Code, gin.H{"message": ex.Message, "code": ex.Code})
Expand Down Expand Up @@ -281,13 +291,13 @@ func (path Path) execProcess(ctx context.Context, chRes chan<- interface{}, c *g
}

process.WithContext(ctx)
res, err := process.Exec()
err = process.Execute()
if err != nil {
log.Error("[Path] %s %s", path.Path, err.Error())
chRes <- err
return
}
chRes <- res
chRes <- process.Value()
}

func (path Path) runProcess(ctx context.Context, c *gin.Context, getArgs argsHandler) interface{} {
Expand All @@ -308,7 +318,12 @@ func (path Path) runProcess(ctx context.Context, c *gin.Context, getArgs argsHan
}

process.WithContext(ctx)
return process.Run()
err := process.Execute()
if err != nil {
log.Error("[Path] %s %s", path.Path, err.Error())
exception.Err(err, 500).Throw()
}
return process.Value()
}

func (path Path) reqContentType(c *gin.Context) string {
Expand All @@ -322,23 +337,27 @@ func (path Path) reqContentType(c *gin.Context) string {
func (path Path) setResponseHeaders(c *gin.Context, resp interface{}, contentType string) string {
if len(path.Out.Headers) > 0 {
res := any.Of(resp)
if res.IsMap() { // 处理变量

// Parse Headers
if res.IsMap() {
data := res.Map().MapStrAny.Dot()
for name, value := range path.Out.Headers {
v := helper.Bind(value, data)
if v != nil {
c.Writer.Header().Set(name, fmt.Sprintf("%v", v))
path.Out.Headers[name] = fmt.Sprintf("%v", v)
}
}
} else {
for name, value := range path.Out.Headers {
c.Writer.Header().Set(name, value)
if name == "Content-Type" {
contentType = value
}
}

// Set Headers and replace Content-Type if exists
for name, value := range path.Out.Headers {
c.Writer.Header().Set(name, value)
if name == "Content-Type" {
contentType = value
}
}
}

return contentType
}

Expand Down
11 changes: 7 additions & 4 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (http HTTP) parseIn(in []interface{}) func(c *gin.Context) []interface{} {
}

ext := filepath.Ext(file.Filename)
dir, err := os.MkdirTemp(os.TempDir(), "upload")
dir, err := os.MkdirTemp("", "upload")
if err != nil {
return types.UploadFile{Error: fmt.Sprintf("%s %s", arg[1], err.Error())}
}
Expand All @@ -366,13 +366,13 @@ func (http HTTP) parseIn(in []interface{}) func(c *gin.Context) []interface{} {
if err != nil {
return types.UploadFile{Error: fmt.Sprintf("%s %s", arg[1], err.Error())}
}
defer tmpfile.Close()

err = c.SaveUploadedFile(file, tmpfile.Name())
if err != nil {
if err := c.SaveUploadedFile(file, tmpfile.Name()); err != nil {
return types.UploadFile{Error: fmt.Sprintf("%s %s", arg[1], err.Error())}
}

return types.UploadFile{
uploadFile := types.UploadFile{
UID: c.GetHeader("Content-Uid"),
Range: c.GetHeader("Content-Range"),
Sync: c.GetHeader("Content-Sync") == "true", // sync upload or not
Expand All @@ -381,6 +381,9 @@ func (http HTTP) parseIn(in []interface{}) func(c *gin.Context) []interface{} {
Size: file.Size,
Header: file.Header,
}
file = nil
tmpfile = nil
return uploadFile
})
} else { // 原始数值
new := v
Expand Down
10 changes: 10 additions & 0 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,23 @@ func ReadFile(xfs FileSystem, file string) ([]byte, error) {
return xfs.ReadFile(file)
}

// ReadCloser returns a ReadCloser to read the named file.
func ReadCloser(xfs FileSystem, file string) (io.ReadCloser, error) {
return xfs.ReadCloser(file)
}

// WriteFile writes data to the named file, creating it if necessary.
//
// If the file does not exist, WriteFile creates it with permissions perm (before umask); otherwise WriteFile truncates it before writing, without changing permissions.
func WriteFile(xfs FileSystem, file string, data []byte, perm uint32) (int, error) {
return xfs.WriteFile(file, data, perm)
}

// WriteCloser returns a WriteCloser that writes to the named file.
func WriteCloser(xfs FileSystem, file string, perm uint32) (io.WriteCloser, error) {
return xfs.WriteCloser(file, perm)
}

// Write writes the content of reader to the named file, creating it if necessary.
func Write(xfs FileSystem, file string, reader io.Reader, perm uint32) (int, error) {
return xfs.Write(file, reader, perm)
Expand Down
47 changes: 47 additions & 0 deletions fs/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"fmt"
"image"
"io"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -286,6 +287,52 @@ func TestWrite(t *testing.T) {
}
}

func TestReadCloser(t *testing.T) {
stores := testStores(t)
f := testFiles(t)
for name, stor := range stores {
clear(stor, t)
data := testData(t)

// Write
length, err := WriteFile(stor, f["F1"], data, 0644)
assert.Nil(t, err, name)
checkFileExists(stor, t, f["F1"], name)
checkFileSize(stor, t, f["F1"], length, name)
checkFileMode(stor, t, f["F1"], 0644, name)

// ReadCloser
rc, err := ReadCloser(stor, f["F1"])
assert.Nil(t, err, name)
content, err := io.ReadAll(rc)
assert.Nil(t, err, name)
assert.Equal(t, data, content, name)
rc.Close()
}
}

func TestWriteCloser(t *testing.T) {
stores := testStores(t)
f := testFiles(t)
for name, stor := range stores {
clear(stor, t)

// WriteCloser
wc, err := WriteCloser(stor, f["F1"], 0644)
assert.Nil(t, err, name)
data := testData(t)
n, err := wc.Write(data)
assert.Nil(t, err, name)
assert.Equal(t, len(data), n, name)
wc.Close()

// Check the content
fileContent, err := ReadFile(stor, f["F1"])
assert.Nil(t, err, name)
assert.Equal(t, data, fileContent, name)
}
}

func TestAppendFile(t *testing.T) {
stores := testStores(t)
f := testFiles(t)
Expand Down
4 changes: 2 additions & 2 deletions fs/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func processDownload(process *process.Process) interface{} {
process.ValidateArgNums(1)
stor := stor(process)
file := process.ArgsString(0)
data, err := ReadFile(stor, file)
reader, err := ReadCloser(stor, file)
if err != nil {
exception.New(err.Error(), 500).Throw()
}
Expand All @@ -586,7 +586,7 @@ func processDownload(process *process.Process) interface{} {
}

return map[string]interface{}{
"content": data,
"content": reader,
"type": mimeType,
}
}
Expand Down
25 changes: 25 additions & 0 deletions fs/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,31 @@ func (f *File) ReadFile(file string) ([]byte, error) {
return os.ReadFile(file)
}

// ReadCloser returns a ReadCloser with the file content
func (f *File) ReadCloser(file string) (io.ReadCloser, error) {
file, err := f.absPath(file)
if err != nil {
return nil, err
}
return os.OpenFile(file, os.O_RDONLY, 0)
}

// WriteCloser returns a WriteCloser with the file content
func (f *File) WriteCloser(file string, perm uint32) (io.WriteCloser, error) {
file, err := f.absPath(file)
if err != nil {
return nil, err
}

dir := filepath.Dir(file)
err = os.MkdirAll(dir, os.ModePerm)
if err != nil && !os.IsExist(err) {
return nil, err
}

return os.OpenFile(file, os.O_CREATE|os.O_WRONLY, fs.FileMode(perm))
}

// WriteFile writes data to the named file, creating it if necessary.
//
// If the file does not exist, WriteFile creates it with permissions perm (before umask); otherwise WriteFile truncates it before writing, without changing permissions.
Expand Down
3 changes: 3 additions & 0 deletions fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type FileSystem interface {
MkdirTemp(dir string, pattern string) (string, error)
Glob(pattern string) ([]string, error)

ReadCloser(file string) (io.ReadCloser, error)
WriteCloser(file string, perm uint32) (io.WriteCloser, error)

Remove(name string) error
RemoveAll(name string) error

Expand Down
5 changes: 4 additions & 1 deletion http/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,15 @@ func processHTTPStream(p *process.Process) interface{} {
return HandlerReturnError
}

res, err := procesHandler.WithSID(p.Sid).WithGlobal(p.Global).Exec()
err = procesHandler.WithSID(p.Sid).WithGlobal(p.Global).Execute()
if err != nil {
log.Error("[http.Stream] %s %s", handler, err.Error())
return HandlerReturnError
}
defer procesHandler.Release()

// Get the result
res := procesHandler.Value()
if v, ok := res.(int); ok {
return v
}
Expand Down
76 changes: 63 additions & 13 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,54 @@ func Of(name string, args ...interface{}) (*Process, error) {
return process, nil
}

// Execute execute the process and return error only
func (process *Process) Execute() error {
var hd Handler
hd, err := process.handler()
if err != nil {
return err
}

defer func() { err = exception.Catch(recover()) }()
value := hd(process)
process._val = &value
return err
}

// Release the value of the process
func (process *Process) Release() {
process._val = nil
}

// Dispose the process after run success
func (process *Process) Dispose() {
if process.Runtime != nil {
process.Runtime.Dispose()
}

process.Args = nil
process.Global = nil
process.Context = nil
process.Runtime = nil
process._val = nil
process = nil
}

// Value get the result of the process
func (process *Process) Value() interface{} {
if process._val != nil {
return *process._val
}
return nil
}

// Run the process
// ****
//
// This function causes a memory leak, will be disposed in the future,
// Use Execute() instead
//
// ****
func (process *Process) Run() interface{} {
hd, err := process.handler()
if err != nil {
Expand All @@ -42,6 +89,22 @@ func (process *Process) Run() interface{} {
}

// Exec execute the process and return error
//
// ****
//
// This function causes a memory leak, will be disposed in the future,
// Use Execute() instead
// Example:
//
// process := Of("models.user.pet.Find", 1, {})
// err := process.Execute();
// if err != nil {
// // handle error
// }
// defer process.Release() // or process.Dispose() if you want to relese the runtime isolate after run success
// result := process.Value() // Get the result
//
// ****
func (process *Process) Exec() (value interface{}, err error) {

var hd Handler
Expand Down Expand Up @@ -104,19 +167,6 @@ func (process *Process) WithRuntime(runtime Runtime) *Process {
return process
}

// Dispose the process after run success
func (process *Process) Dispose() {
if process.Runtime != nil {
process.Runtime.Dispose()
}

process.Args = nil
process.Global = nil
process.Context = nil
process.Runtime = nil
process = nil
}

// handler get the process handler
func (process *Process) handler() (Handler, error) {
if hander, has := Handlers[process.Handler]; has && hander != nil {
Expand Down
Loading

0 comments on commit e6131b3

Please sign in to comment.