Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Colossus] Intelligent operator url selection (based on previous download speeds) for syncing data objects #5015

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions storage-node/.eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ module.exports = {
'no-unused-vars': 'off', // Required by the typescript rule below
'@typescript-eslint/no-unused-vars': ['error'],
'@typescript-eslint/no-floating-promises': 'error',
'no-useless-constructor': 'off',
},
}
1 change: 1 addition & 0 deletions storage-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"sleep-promise": "^9.1.0",
"subscriptions-transport-ws": "^0.11.0",
"superagent": "^6.1.0",
"superagent-node-http-timings": "1.0.1",
"tslib": "^1",
"url-join": "^4.0.1",
"uuid": "^8.3.2",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
declare module 'superagent-node-http-timings'
11 changes: 1 addition & 10 deletions storage-node/src/commands/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,16 +388,7 @@ async function runSyncWithInterval(
while (true) {
try {
logger.info(`Resume syncing....`)
await performSync(
api,
buckets,
syncWorkersNumber,
syncWorkersTimeout,
qnApi,
uploadsDirectory,
tempDirectory,
hostId
)
await performSync(buckets, syncWorkersNumber, syncWorkersTimeout, qnApi, uploadsDirectory, tempDirectory, hostId)
logger.info(`Sync run complete. Next run in ${syncIntervalMinutes} minute(s).`)
await sleep(sleepInterval)
} catch (err) {
Expand Down
1 change: 0 additions & 1 deletion storage-node/src/commands/util/fetch-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ export default class FetchBucket extends Command {

try {
await performSync(
undefined,
[bucketId],
flags.syncWorkersNumber,
flags.syncWorkersTimeout,
Expand Down
2 changes: 2 additions & 0 deletions storage-node/src/services/queryNode/queries/queries.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ query getBagConnection($bucketIds: [ID!], $limit: Int, $cursor: String) {

fragment DataObjectByBagIdsDetails on StorageDataObject {
id
size
ipfsHash
storageBagId
}

Expand Down
63 changes: 0 additions & 63 deletions storage-node/src/services/sync/remoteStorageData.ts

This file was deleted.

12 changes: 12 additions & 0 deletions storage-node/src/services/sync/storageObligations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ type DataObject = {
* Assigned bag ID
*/
bagId: string

/**
* Data Object size (in bytes)
*/
size: string

/**
* Data Object hash
*/
ipfsHash: string
}

/**
Expand Down Expand Up @@ -109,6 +119,8 @@ export async function getStorageObligationsFromRuntime(
})),
dataObjects: assignedDataObjects.map((dataObject) => ({
id: dataObject.id,
size: dataObject.size,
ipfsHash: dataObject.ipfsHash,
bagId: dataObject.storageBagId,
})),
}
Expand Down
104 changes: 35 additions & 69 deletions storage-node/src/services/sync/synchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { ApiPromise } from '@polkadot/api'
import _ from 'lodash'
import { getDataObjectIDs } from '../../services/caching/localDataObjects'
import logger from '../../services/logger'
import { QueryNodeApi } from '../queryNode/api'
import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations'
import { DownloadFileTask, PrepareDownloadFileTask, SyncTask } from './tasks'
import { TaskProcessorSpawner, TaskSink, WorkingStack } from './workingProcess'
import { DownloadFileTask } from './tasks'
import { TaskProcessorSpawner, WorkingStack } from './workingProcess'

/**
* Temporary directory name for data uploading.
Expand All @@ -31,50 +30,43 @@ export const PendingDirName = 'pending'
* @param qnApi - Query Node API
* @param uploadDirectory - local directory to get file names from
* @param tempDirectory - local directory for temporary data uploading
* @param operatorUrl - (optional) defines the data source URL. If not set
* @param selectedOperatorUrl - (optional) defines the data source URL. If not set
* the source URL is resolved for each data object separately using the Query
* Node information about the storage providers.
*/
export async function performSync(
api: ApiPromise | undefined,
buckets: string[],
asyncWorkersNumber: number,
asyncWorkersTimeout: number,
qnApi: QueryNodeApi,
uploadDirectory: string,
tempDirectory: string,
hostId: string,
operatorUrl?: string
selectedOperatorUrl?: string
): Promise<void> {
logger.info('Started syncing...')
const [model, files] = await Promise.all([getStorageObligationsFromRuntime(qnApi, buckets), getDataObjectIDs()])

const requiredIds = model.dataObjects.map((obj) => obj.id)
const required = model.dataObjects

const added = _.difference(requiredIds, files)
const removed = _.difference(files, requiredIds)
const added = _.differenceWith(required, files, (required, file) => required.id === file)
const removed = _.differenceWith(files, required, (file, required) => file === required.id)

logger.debug(`Sync - new objects: ${added.length}`)
logger.debug(`Sync - obsolete objects: ${removed.length}`)

const workingStack = new WorkingStack()

let addedTasks: SyncTask[]
if (operatorUrl === undefined) {
addedTasks = await getPrepareDownloadTasks(
api,
model,
buckets,
added,
uploadDirectory,
tempDirectory,
workingStack,
asyncWorkersTimeout,
hostId
)
} else {
addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory, tempDirectory, asyncWorkersTimeout, hostId)
}
const addedTasks = await getDownloadTasks(
model,
buckets,
added,
uploadDirectory,
tempDirectory,
asyncWorkersTimeout,
hostId,
selectedOperatorUrl
)

logger.debug(`Sync - started processing...`)

Expand All @@ -87,28 +79,28 @@ export async function performSync(
}

/**
* Creates the download preparation tasks.
* Creates the download tasks.
*
* @param api - Runtime API promise
* @param ownBuckets - list of bucket ids operated this node
* @param dataObligations - defines the current data obligations for the node
* @param ownBuckets - list of bucket ids operated this node
* @param addedIds - data object IDs to download
* @param uploadDirectory - local directory for data uploading
* @param tempDirectory - local directory for temporary data uploading
* @param taskSink - a destination for the newly created tasks
* @param asyncWorkersTimeout - downloading asset timeout
* @param hostId - Random host UUID assigned to each node during bootstrap
* @param selectedOperatorUrl - operator URL selected for syncing objects
*/
async function getPrepareDownloadTasks(
api: ApiPromise | undefined,
async function getDownloadTasks(
dataObligations: DataObligations,
ownBuckets: string[],
addedIds: string[],
added: DataObligations['dataObjects'],
uploadDirectory: string,
tempDirectory: string,
taskSink: TaskSink,
asyncWorkersTimeout: number,
hostId: string
): Promise<PrepareDownloadFileTask[]> {
hostId: string,
selectedOperatorUrl?: string
): Promise<DownloadFileTask[]> {
const bagIdByDataObjectId = new Map()
for (const entry of dataObligations.dataObjects) {
bagIdByDataObjectId.set(entry.id, entry.bagId)
Expand Down Expand Up @@ -148,53 +140,27 @@ async function getPrepareDownloadTasks(
bagOperatorsUrlsById.set(entry.id, operatorUrls)
}

const tasks = addedIds.map((id) => {
const tasks = added.map((dataObject) => {
let operatorUrls: string[] = [] // can be empty after look up
let bagId = null
if (bagIdByDataObjectId.has(id)) {
bagId = bagIdByDataObjectId.get(id)
if (bagIdByDataObjectId.has(dataObject.id)) {
bagId = bagIdByDataObjectId.get(dataObject.id)
if (bagOperatorsUrlsById.has(bagId)) {
operatorUrls = bagOperatorsUrlsById.get(bagId)
}
}

return new PrepareDownloadFileTask(
operatorUrls,
hostId,
bagId,
id,
return new DownloadFileTask(
selectedOperatorUrl ? [selectedOperatorUrl] : operatorUrls,
dataObject.id,
dataObject.size,
dataObject.ipfsHash,
uploadDirectory,
tempDirectory,
taskSink,
asyncWorkersTimeout,
api
hostId
)
})

return tasks
}

/**
* Creates the download file tasks.
*
* @param operatorUrl - defines the data source URL.
* @param addedIds - data object IDs to download
* @param uploadDirectory - local directory for data uploading
* @param tempDirectory - local directory for temporary data uploading
* @param downloadTimeout - asset downloading timeout (in minutes)
*/
async function getDownloadTasks(
operatorUrl: string,
addedIds: string[],
uploadDirectory: string,
tempDirectory: string,
downloadTimeout: number,
hostId: string
): Promise<DownloadFileTask[]> {
const addedTasks = addedIds.map(
(fileName) =>
new DownloadFileTask(operatorUrl, fileName, undefined, uploadDirectory, tempDirectory, downloadTimeout, hostId)
)

return addedTasks
}
Loading
Loading