Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ContentUploaddService: Retry uploads #339

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/services/runtime/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export type StorageNodeInfo = {

export type AssetUploadInput = {
dataObjectId: DataObjectId
file: Readable
file: () => Promise<Readable>
}

export type VideoFFProbeMetadata = {
Expand Down
106 changes: 62 additions & 44 deletions src/services/storage-node/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ import { LoggingService } from '../logging'
import { QueryNodeApi } from '../query-node/api'
import { getThumbnailAsset } from '../runtime/client'
import { AssetUploadInput, StorageNodeInfo } from '../runtime/types'
import { Readable } from 'stream'
import sleep from 'sleep-promise'

export type OperatorInfo = { id: string; endpoint: string }
export type OperatorsMapping = Record<string, OperatorInfo>
export type VideoUploadResponse = {
id: string // hash of dataObject uploaded
}

const UPLOAD_MAX_ATTEMPTS = 3
const UPLOAD_RETRY_INTERVAL = 10

export class StorageNodeApi {
private logger: Logger

Expand All @@ -29,65 +34,57 @@ export class StorageNodeApi {
const assetsInput: AssetUploadInput[] = [
{
dataObjectId: createType('u64', new BN(video.joystreamVideo.assetIds[0])),
file: fs.createReadStream(videoFilePath),
file: async () => { return fs.createReadStream(videoFilePath) },
},
{
dataObjectId: createType('u64', new BN(video.joystreamVideo.assetIds[1])),
file: await getThumbnailAsset(video.thumbnails),
file: async () => { return getThumbnailAsset(video.thumbnails) },
},
]
return this.upload(bagId, assetsInput)
}

private async upload(bagId: string, assets: AssetUploadInput[]) {
// Get a random active storage node for given bag
const operator = await this.getRandomActiveStorageNodeInfo(bagId)
if (!operator) {
throw new StorageApiError(
ExitCodes.StorageApi.NO_ACTIVE_STORAGE_PROVIDER,
`No active storage node found for bagId: ${bagId}`
)
}

for (const { dataObjectId, file } of assets) {
try {
this.logger.debug('Uploading asset', { dataObjectId: dataObjectId.toString() })
for (const attempt of _.range(1, UPLOAD_MAX_ATTEMPTS)) {
// Randomly select one active storage node for given bag
const operator = await this.getRandomActiveStorageNodeInfo(bagId)
if (!operator) {
throw new StorageApiError(
ExitCodes.StorageApi.NO_ACTIVE_STORAGE_PROVIDER,
`No active storage node found for bagId: ${bagId}`
)
}
const fileStream = await file()
try {
await this.uploadAsset(operator, bagId, dataObjectId.toString(), fileStream)
break // upload successfull, continue with next asset
} catch (error) {
fileStream.destroy()

const formData = new FormData()
formData.append('file', file, 'video.mp4')
await axios.post<VideoUploadResponse>(`${operator.apiEndpoint}/files`, formData, {
params: {
dataObjectId: dataObjectId.toString(),
storageBucketId: operator.bucketId,
bagId,
},
maxBodyLength: Infinity,
maxContentLength: Infinity,
maxRedirects: 0,
headers: {
'content-type': 'multipart/form-data',
...formData.getHeaders(),
},
})
} catch (error) {
// destroy the file stream
file.destroy()
if (axios.isAxiosError(error) && error.response) {
const storageNodeUrl = error.config?.url
const { status, data } = error.response
error = new Error(data?.message)

if (axios.isAxiosError(error) && error.response) {
const storageNodeUrl = error.config?.url
const { status, data } = error.response
this.logger.error(`${storageNodeUrl} - errorCode: ${status}, msg: ${data?.message}`)

if (data?.message?.includes(`Data object ${dataObjectId} already exist`)) {
// No need to throw an error, we can continue with the next asset
continue
}
if (data?.message?.includes(`Data object ${dataObjectId} already exist`)) {
// No need to throw an error, we can continue with the next asset
break
}

this.logger.error(`${storageNodeUrl} - errorCode: ${status}, msg: ${data?.message}`)
if (data?.message?.includes(`Data object ${dataObjectId} doesn't exist in storage bag ${bagId}`)) {
if (attempt < UPLOAD_MAX_ATTEMPTS) {
this.logger.error(`Will retry upload of asset ${dataObjectId} in ${UPLOAD_RETRY_INTERVAL} seconds.`)
await sleep(UPLOAD_RETRY_INTERVAL * 1000)
continue // try again
}
}
}

throw new Error(data?.message)
throw error
}

throw error
}
}
}
Expand Down Expand Up @@ -115,10 +112,31 @@ export class StorageNodeApi {
this.logger.debug(
`No storage provider can serve the request yet, retrying in ${retryTime}s (${i + 1}/${retryCount})...`
)
await new Promise((resolve) => setTimeout(resolve, retryTime * 1000))
await sleep(retryTime * 1000)
}
}

return null
}

async uploadAsset(operator: StorageNodeInfo, bagId: string, dataObjectId: string, file: Readable) {
this.logger.debug('Uploading asset', { dataObjectId })

const formData = new FormData()
formData.append('file', file, 'video.mp4')
await axios.post<VideoUploadResponse>(`${operator.apiEndpoint}/files`, formData, {
params: {
dataObjectId,
storageBucketId: operator.bucketId,
bagId,
},
maxBodyLength: Infinity,
maxContentLength: Infinity,
maxRedirects: 0,
headers: {
'content-type': 'multipart/form-data',
...formData.getHeaders(),
},
})
}
}
3 changes: 1 addition & 2 deletions src/services/syncProcessing/ContentUploadService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ export class ContentUploadService {
if (
error instanceof Error &&
(error.message.includes(`File multihash doesn't match the data object's ipfsContentId`) ||
error.message.includes(`File size doesn't match the data object's`) ||
error.message.includes(`doesn't exist in storage bag`))
error.message.includes(`File size doesn't match the data object's`))
) {
console.log('VideoUnavailable::Other', job.data.id)
await this.dynamodbService.videos.updateState(job.data, 'VideoUnavailable::Other')
Expand Down
4 changes: 2 additions & 2 deletions src/services/syncProcessing/PriorityQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { DynamodbService } from '../../repository'
import { ReadonlyConfig } from '../../types'
import { YtChannel, YtVideo } from '../../types/youtube'
import { SyncUtils } from './utils'
import sleep from 'sleep-promise'

export const QUEUE_NAME_PREFIXES = ['Upload', 'Creation', 'Metadata', 'Download'] as const

Expand Down Expand Up @@ -192,8 +193,7 @@ class PriorityJobQueue<
await this.asyncLock.acquire(this.RECALCULATE_PRIORITY_LOCK_KEY, async () => {
// Wait until all processing tasks have completed
while (this.processingCount > 0) {
console.log('recalculateJobsPriority', this.queue.name, this.processingCount, this.processingTasks)
await new Promise((resolve) => setTimeout(resolve, 1000))
await sleep(1000)
}

const jobs = await this.queue.getJobs(['prioritized'])
Expand Down
2 changes: 1 addition & 1 deletion src/services/syncProcessing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ export class ContentProcessingService extends ContentProcessingClient implements
// add job flow to the flow producer
const jobNode = await this.flowManager.addFlowJob(flowJob)

jobNode.job
return jobNode.job
}
}
})
Expand Down
Loading