Skip to content

Commit

Permalink
added async cals using worker queue
Browse files Browse the repository at this point in the history
  • Loading branch information
rotarur committed Sep 27, 2023
1 parent 49172dd commit b91deb2
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 48 deletions.
121 changes: 74 additions & 47 deletions internal/sync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"os"
"sync"
"time"

"github.com/graphprotocol/ipfs-mgm/internal/utils"
Expand All @@ -21,6 +22,8 @@ var SyncCmd = &cobra.Command{
},
}

var workerItemCount int = 50

func init() {
SyncCmd.Flags().StringP("source", "s", "", "IPFS source endpoint")
SyncCmd.MarkFlagRequired("source")
Expand Down Expand Up @@ -95,64 +98,88 @@ func Sync(cmd *cobra.Command) {

counter := 1
length := len(cids)
for _, k := range cids {
// Get IPFS CID from source
srcCID := srcGet + k.Cid
log.Printf("%d/%d: Syncing the CID: %s\n",counter, length, k.Cid)

// Get CID from source
resG, err := utils.GetIPFS(srcCID, nil)
if err != nil {
log.Printf("%d/%d: %s",counter, length, err)
failed += 1
counter += 1
continue
}
defer resG.Body.Close()
// Adjust for the number of CID's
if length < workerItemCount {
workerItemCount = length
}

cidV := utils.GetCIDVersion(k.Cid)
// Create the API URL fo the POST on destination
apiADD := fmt.Sprintf("%s%s?cid-version=%s", dst, utils.IPFS_PIN_ENDPOINT, cidV)
for i := 0; i < length; {
// Create a channel with buffer of workerItemCount size
workChan := make(chan utils.HTTPResult, workerItemCount)
var wg sync.WaitGroup

newBody, err := utils.GetHTTPBody(resG)
if err != nil {
log.Printf("%d/%d: %s",counter, length, err)
}
for j := 0; j < workerItemCount; j++ {
wg.Add(1)
go func(c int, cidID string) {
defer wg.Done()
AsyncPostIPFS(srcGet, dst, cidID, &c, length, &failed, &synced)

// Sync IPFS CID into destination
// TODO: implement retry backoff with pester
var m utils.IPFSResponse
resP, err := utils.PostIPFS(apiADD, newBody)
if err != nil {
log.Printf("%d/%d: %s", counter, length, err)
failed += 1
} else {
defer resP.Body.Close()
}(counter, cids[i].Cid)
counter += 1

// Generic function to parse the response and create a struct
err := utils.UnmarshalToStruct[utils.IPFSResponse](resP.Body, &m)
if err != nil {
log.Printf("%d/%d: %s", counter, length, err)
}
i++
}

// Check if the IPFS Hash is the same as the source one
// If not the syncing didn't work
ok, err := utils.TestIPFSHash(k.Cid, m.Hash)
if err != nil {
log.Printf("%d/%d: %s",counter, length, err)
failed += 1
} else {
// Print success message
log.Printf("%d/%d: %s",counter, length, ok)
synced += 1
}
counter += 1
close(workChan)
wg.Wait()
}


// Print Final statistics
log.Printf("Total number of objects: %d; Synced: %d; Failed: %d\n", len(cids), synced, failed)
log.Printf("Total time: %s\n", time.Since(timeStart))
}

func AsyncPostIPFS(src string, dst string, cidID string, counter *int, length int, failed *int, synced *int) {
// Get IPFS CID from source
srcCID := src + cidID
log.Printf("%d/%d: Syncing the CID: %s\n", *counter, length, cidID)

// Get CID from source
resG, err := utils.GetIPFS(srcCID, nil)
if err != nil {
log.Printf("%d/%d: %s; CID: %s", *counter, length, err, cidID)
*failed += 1
*counter += 1
return
}
defer resG.Body.Close()

cidV := utils.GetCIDVersion(cidID)
// Create the API URL fo the POST on destination
apiADD := fmt.Sprintf("%s%s?cid-version=%s", dst, utils.IPFS_PIN_ENDPOINT, cidV)

newBody, err := utils.GetHTTPBody(resG)
if err != nil {
log.Printf("%d/%d: %s", *counter, length, err)
}

// Sync IPFS CID into destination
// TODO: implement retry backoff with pester
var m utils.IPFSResponse
resP, err := utils.PostIPFS(apiADD, newBody)
if err != nil {
log.Printf("%d/%d: %s", *counter, length, err)
*failed += 1
} else {
defer resP.Body.Close()

// Generic function to parse the response and create a struct
err := utils.UnmarshalToStruct[utils.IPFSResponse](resP.Body, &m)
if err != nil {
log.Printf("%d/%d: %s", *counter, length, err)
}
}

// Check if the IPFS Hash is the same as the source one
// If not the syncing didn't work
ok, err := utils.TestIPFSHash(cidID, m.Hash)
if err != nil {
log.Printf("%d/%d: %s", *counter, length, err)
*failed += 1
} else {
// Print success message
log.Printf("%d/%d: %s", *counter, length, ok)
*synced += 1
}
}
2 changes: 1 addition & 1 deletion internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestIPFSHash(s string, d string) (string, error) {
return "", fmt.Errorf("The source Hash %s is different from the destination hash %s", s, d)
}

return fmt.Sprint("Successfully synced to destination IPFS"), nil
return fmt.Sprintf("Successfully synced to destination IPFS, CID: %s", s), nil
}

func SliceToCIDSStruct(s []string) ([]IPFSCIDResponse, error) {
Expand Down

0 comments on commit b91deb2

Please sign in to comment.