Skip to content

Commit

Permalink
code files cache
Browse files Browse the repository at this point in the history
  • Loading branch information
saul-data committed Oct 10, 2023
1 parent 2567ff1 commit 6f2efdd
Show file tree
Hide file tree
Showing 16 changed files with 205 additions and 137 deletions.
55 changes: 36 additions & 19 deletions app/mainapp/code_editor/dfs_cache/invalidate_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,39 @@ import (
"github.com/dataplane-app/dataplane/app/mainapp/database"
"github.com/dataplane-app/dataplane/app/mainapp/database/models"
"github.com/dataplane-app/dataplane/app/mainapp/messageq"
"gorm.io/gorm"
)

/*
New or edited file - add to database, invalidate node and file level cache and this will automatically added by reference to cache on run time.
*/
func InvalidateCacheSingle(nodeID string, environmentID string, fileID string) error {
// Write to node level cache
err := database.DBConn.Model(&models.CodeNodeCache{}).Where("node_id = ? and environment_id = ?", nodeID, environmentID).Update("cache_valid", false).Error

if err != nil {
return err
}
err := database.DBConn.Transaction(func(tx *gorm.DB) error {
// Write to node level cache
errdb := tx.Model(&models.CodeNodeCache{}).Where("node_id = ? and environment_id = ?", nodeID, environmentID).Update("cache_valid", false).Error

// Write to file level cache (file gets overwritten)
err = database.DBConn.Where("node_id = ? and environment_id = ? and file_id = ?", nodeID, environmentID, fileID).Delete(&models.CodeFilesCache{}).Error
if errdb != nil {
return errdb
}

// Pipeline: Write to file level cache (file gets overwritten)
errdb = tx.Where("node_id = ? and environment_id = ? and file_id = ?", nodeID, environmentID, fileID).Delete(&models.CodeFilesCache{}).Error

if errdb != nil {
return errdb
}

// Code editor: Write to file level cache (file gets overwritten)
errdb = tx.Where("node_id = ? and environment_id = ? and file_id = ?", nodeID, environmentID, fileID).Delete(&models.CodeRunFilesCache{}).Error

if errdb != nil {
return errdb
}

return nil

})

if err != nil {
return err
Expand All @@ -33,7 +51,7 @@ func InvalidateCacheSingle(nodeID string, environmentID string, fileID string) e
/*
Delete a file, move file or folder name change - invalidate the node cache, remove all file level cache and remove the entire folder in each worker.
*/
func InvalidateCacheNode(nodeID string, environmentID string, folderpath string) error {
func InvalidateCacheNode(nodeID string, pipelineID string, environmentID string) error {
// Write to node level cache
err := database.DBConn.Model(&models.CodeNodeCache{}).Where("node_id = ? and environment_id = ?", nodeID, environmentID).Update("cache_valid", false).Error

Expand All @@ -48,6 +66,13 @@ func InvalidateCacheNode(nodeID string, environmentID string, folderpath string)
return err
}

// Write to file level cache (file gets overwritten)
err = database.DBConn.Where("node_id = ? and environment_id = ?", nodeID, environmentID).Delete(&models.CodeRunFilesCache{}).Error

if err != nil {
return err
}

var response models.TaskResponse

getWorkerGroup := models.PipelineNodes{}
Expand All @@ -56,11 +81,7 @@ func InvalidateCacheNode(nodeID string, environmentID string, folderpath string)
log.Println("Error getting worker groups for cache delete", err)
return err
}
channel := "DisributedStorageRemoval." + getWorkerGroup.WorkerGroup

if dpconfig.Debug == "true" {
log.Println("folder to delete:", folderpath)
}
channel := "DisributedStorageRemoval." + environmentID + "." + getWorkerGroup.WorkerGroup

_, errnats := messageq.MsgReply(channel, folderpath, &response)

Expand All @@ -75,7 +96,7 @@ func InvalidateCacheNode(nodeID string, environmentID string, folderpath string)
/*
Delete or change pipeline.
*/
func InvalidateCachePipeline(environmentID string, folderpath string, pipelineID string) error {
func InvalidateCachePipeline(environmentID string, pipelineID string) error {
// Write to node level cache

updateQuery := `
Expand Down Expand Up @@ -111,15 +132,11 @@ func InvalidateCachePipeline(environmentID string, folderpath string, pipelineID
return err
}

if dpconfig.Debug == "true" {
log.Println("folder to delete:", folderpath)
}

var response models.TaskResponse
for _, x := range getWorkerGroups {
channel := "DisributedStorageRemoval." + x.WorkerGroup
// log.Println(channel)
_, errnats := messageq.MsgReply(channel, folderpath, &response)
_, errnats := messageq.MsgReply(channel, pipelineID, &response)

if errnats != nil {
log.Println("Send to worker error nats:", errnats)
Expand Down
116 changes: 65 additions & 51 deletions app/mainapp/code_editor/filesystem/file_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/dataplane-app/dataplane/app/mainapp/database/models"

"github.com/google/uuid"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

Expand All @@ -27,73 +28,86 @@ func CreateFile(input models.CodeFiles, Folder string, Content []byte) (models.C

createFile := dpconfig.CodeDirectory + Folder + filename

// Does the file already exists?
var existingFile models.CodeFiles
database.DBConn.Where("environment_id = ? and node_id =? and file_name = ? and folder_id =?", input.EnvironmentID, input.NodeID, input.FileName, input.FolderID).First(&existingFile)
err := database.DBConn.Transaction(func(tx *gorm.DB) error {

// -------- if LocalFile --------
if dpconfig.FSCodeFileStorage == "LocalFile" {
if _, err := os.Stat(dpconfig.CodeDirectory + Folder); os.IsNotExist(err) {
// Does the file already exists?
var existingFile models.CodeFiles
tx.Where("environment_id = ? and node_id =? and file_name = ? and folder_id =?", input.EnvironmentID, input.NodeID, input.FileName, input.FolderID).First(&existingFile)

if dpconfig.Debug == "true" {
log.Println("Directory doesnt exists: ", dpconfig.CodeDirectory+Folder)
return input, returnpath, errors.New("Directory doesnt exists")
}
// -------- if LocalFile --------
if dpconfig.FSCodeFileStorage == "LocalFile" {
if _, err := os.Stat(dpconfig.CodeDirectory + Folder); os.IsNotExist(err) {

} else {
if dpconfig.Debug == "true" {
log.Println("Directory doesnt exists: ", dpconfig.CodeDirectory+Folder)
return errors.New("Directory doesnt exists")
}

} else {

err := os.WriteFile(createFile, Content, 0644)
if err != nil {
return errors.New("Failed to write file")
}

if dpconfig.Debug == "true" {
log.Println("Created file: ", createFile)
}

err := os.WriteFile(createFile, Content, 0644)
if err != nil {
return input, returnpath, errors.New("Failed to write file")
}
}

// Create record if doesnt exist
if existingFile.FileID == "" {
id := uuid.NewString()
input.FileID = id
errdb := tx.Create(&input).Error
if errdb != nil {
if dpconfig.Debug == "true" {
log.Println("Directory create error:", errdb)
return errors.New("File create database error")
}

if dpconfig.Debug == "true" {
log.Println("Created file: ", createFile)
}
} else {
tx.Model(&models.CodeFiles{}).Where("file_id = ?", existingFile.FileID).Update("updated_at", time.Now().UTC())
input.FileID = existingFile.FileID
}

md5byte := md5.Sum(Content)
md5string := fmt.Sprintf("%x", md5byte)

codefile := models.CodeFilesStore{
FileID: input.FileID,
FileStore: Content,
RunInclude: true,
External: false,
ChecksumMD5: md5string,
EnvironmentID: input.EnvironmentID,
}
}

// Create record if doesnt exist
if existingFile.FileID == "" {
id := uuid.NewString()
input.FileID = id
errdb := database.DBConn.Create(&input).Error
errdb := tx.Clauses(clause.OnConflict{
UpdateAll: true,
}).Create(&codefile).Error
if errdb != nil {
if dpconfig.Debug == "true" {
log.Println("Directory create error:", errdb)
return input, returnpath, errors.New("File create database error")
}

log.Println("Create file in database:", errdb)
return errors.New("Create file in database error")
}
} else {
database.DBConn.Model(&models.CodeFiles{}).Where("file_id = ?", existingFile.FileID).Update("updated_at", time.Now().UTC())
input.FileID = existingFile.FileID
}

md5byte := md5.Sum(Content)
md5string := fmt.Sprintf("%x", md5byte)
// ---- Invalidate the cache cache for this file -----
errcache := dfscache.InvalidateCacheSingle(input.NodeID, input.EnvironmentID, input.FileID)
if errcache != nil {
log.Println("Create file cache invalidate:", errcache)

codefile := models.CodeFilesStore{
FileID: input.FileID,
FileStore: Content,
RunInclude: true,
External: false,
ChecksumMD5: md5string,
EnvironmentID: input.EnvironmentID,
}
return errors.New("Create file cache invalidate:" + errcache.Error())
}

errdb := database.DBConn.Clauses(clause.OnConflict{
UpdateAll: true,
}).Create(&codefile).Error
if errdb != nil {
log.Println("Create file in database:", errdb)
return input, returnpath, errors.New("Create file in database error")
}
return nil

})

errcache := dfscache.InvalidateCacheSingle(input.NodeID, input.EnvironmentID, input.FileID)
if errcache != nil {
log.Println("Create file cache invalidate:", errdb)
if err != nil {
return input, returnpath, err
}

return input, returnpath, nil
Expand Down
20 changes: 5 additions & 15 deletions app/mainapp/code_editor/runcode/run_code_file_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,34 +40,24 @@ func RunCodeFile(workerGroup string, fileID string, envID string, pipelineID str
/* ------------------------ Python node -------------------------- */
case "python":

// ------ Obtain folder structure from file id
// ------ Obtain the file name
filesdata := models.CodeFiles{}
dberror := database.DBConn.Where("file_id = ? and environment_id =? and level = ?", fileID, envID, "node_file").Find(&filesdata).Error
dberror := database.DBConn.Select("file_name").Where("file_id = ? and environment_id =? and level = ?", fileID, envID, "node_file").Find(&filesdata).Error
if dberror != nil {
rerror := "Code run obtain folder structure error:" + dberror.Error()
WSLogError(envID, runid, rerror, models.CodeRun{})
return models.CodeRun{}, errors.New(rerror)
}

parentfolderdata := envID + "/coderun/" + pipelineID + "/" + nodeID
// parentfolderdata := envID + "/coderun/" + pipelineID + "/" + nodeID
var err error
// if filesdata.FolderID != "" {

// // The folder structure will look like <environment ID>/coderun/<pipeline ID>/<node ID>
// parentfolderdata, err = filesystem.NodeLevelFolderConstructByID(database.DBConn, filesdata.FolderID, envID)
// // parentfolderdata, err = filesystem.FolderConstructByID(database.DBConn, filesdata.FolderID, envID, "pipelines")

// log.Println("parent folder code run:", parentfolderdata)

// if err != nil {
// return models.CodeRun{}, errors.New("File record not found")
// }
// } else {
// return models.CodeRun{}, errors.New("File record not found")
// }
// filesdata, parentfolderdata, filesdata.FolderID,

commands = append(commands, "python3 -u ${{nodedirectory}}"+filesdata.FileName)
runSend, err = RunCodeServerWorker(envID, nodeID, workerGroup, runid, commands, filesdata, parentfolderdata, filesdata.FolderID, replayRunID)
runSend, err = RunCodeServerWorker(envID, pipelineID, nodeID, workerGroup, runid, commands, replayRunID)
if err != nil {
/* Send back any local errors not happening on the remote worker */
WSLogError(envID, runid, err.Error(), models.CodeRun{})
Expand Down
2 changes: 1 addition & 1 deletion app/mainapp/code_editor/runcode/run_code_server_cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func RunCodeServerCancel(runid string, environmentID string) error {
}

var response models.TaskResponse
_, errnats := messageq.MsgReply("runcodefilecancel."+task.WorkerGroup+"."+task.WorkerID, tasksend, &response)
_, errnats := messageq.MsgReply("runcodefilecancel."+environmentID+"."+task.WorkerGroup+"."+task.WorkerID, tasksend, &response)

if errnats != nil {
log.Println("Send to worker error nats:", errnats)
Expand Down
18 changes: 11 additions & 7 deletions app/mainapp/code_editor/runcode/run_code_server_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import (
"gorm.io/gorm"
)

func RunCodeServerWorker(envID string, nodeID string, workerGroup string, runid string, commands []string, filesdata models.CodeFiles, folderMap string, folderIDMap string, replayRunID string) (models.CodeRun, error) {
// filesdata models.CodeFiles,
// folderMap string, folderIDMap string,

func RunCodeServerWorker(envID string, pipelineID string, nodeID string, workerGroup string, runid string, commands []string, replayRunID string) (models.CodeRun, error) {

/* Look up chosen workers -
if none, keep trying for 10 x 2 seconds
Expand Down Expand Up @@ -99,18 +102,19 @@ func RunCodeServerWorker(envID string, nodeID string, workerGroup string, runid
}

runSend = models.CodeRun{
RunID: runid,
NodeID: nodeID,
FileID: filesdata.FileID,
RunID: runid,
NodeID: nodeID,
PipelineID: pipelineID,
// FileID: filesdata.FileID,
ReplayRunID: replayRunID,
CreatedAt: time.Now().UTC(),
EnvironmentID: envID,
WorkerGroup: workerGroup,
WorkerID: loadbalanceNext,
Commands: commandJSON,
Status: "Queue",
Folder: folderMap,
FolderID: folderIDMap,
// Folder: folderMap,
// FolderID: folderIDMap,
}

err2 := database.DBConn.Create(&runSend)
Expand All @@ -129,7 +133,7 @@ func RunCodeServerWorker(envID string, nodeID string, workerGroup string, runid
*/

// log.Println("Task channel: ", "task."+workerGroup+"."+loadbalanceNext)
channel := "runcodefile." + workerGroup + "." + loadbalanceNext
channel := "runcodefile." + envID + "." + workerGroup + "." + loadbalanceNext
// log.Println(channel)
_, errnats := messageq.MsgReply(channel, runSend, &response)

Expand Down
3 changes: 2 additions & 1 deletion app/mainapp/database/migrations/db_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func Migrate() {

migrateVersion := "0.0.81"
migrateVersion := "0.0.82"

connectURL := fmt.Sprintf(
"postgres://%s:%s@%s:%s/%s?sslmode=%s",
Expand Down Expand Up @@ -124,6 +124,7 @@ func Migrate() {
&models.CodeRun{},
&models.CodeRunLock{},
&models.CodePackages{},
&models.CodeRunFilesCache{},

// Deployments
&models.DeployPipelines{},
Expand Down
18 changes: 18 additions & 0 deletions app/mainapp/database/models/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,24 @@ type DeployCodeFilesCache struct {
UpdatedAt *time.Time `json:"updated_at"`
}

func (CodeRunFilesCache) IsEntity() {}

func (CodeRunFilesCache) TableName() string {
return "code_run_files_cache"
}

type CodeRunFilesCache struct {
FileID string `gorm:"primaryKey;type:varchar(48);" json:"file_id"`
NodeID string `gorm:"primaryKey;type:varchar(48);" json:"node_id"`
Version string `gorm:"primaryKey;type:varchar(48);" json:"version"`
WorkerID string `gorm:"primaryKey;type:varchar(48);" json:"worker_id"`
WorkerGroup string `json:"worker_group"`
EnvironmentID string `json:"environment_id"`
ChecksumMD5Check bool `gorm:"default:false;" json:"checksum_md5_check"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt *time.Time `json:"updated_at"`
}

func (DeployCodeNodeCache) IsEntity() {}

func (DeployCodeNodeCache) TableName() string {
Expand Down
Loading

0 comments on commit 6f2efdd

Please sign in to comment.