Skip to content

Commit

Permalink
fix(CLI): Add concurrency safe download of documents, refactor code i…
Browse files Browse the repository at this point in the history
…nto modules that can be easily tested, improve progress tracking for dataset backup
  • Loading branch information
j33ty committed Feb 7, 2024
1 parent 442ea6c commit b9d56e0
Show file tree
Hide file tree
Showing 10 changed files with 414 additions and 252 deletions.
2 changes: 2 additions & 0 deletions packages/sanity/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@
"@vitejs/plugin-react": "^4.2.0",
"archiver": "^6.0.1",
"arrify": "^1.0.1",
"async-mutex": "^0.4.1",
"chalk": "^4.1.2",
"chokidar": "^3.5.3",
"classnames": "^2.2.5",
Expand Down Expand Up @@ -312,6 +313,7 @@
"@testing-library/jest-dom": "^5.16.5",
"@testing-library/react": "^13.4.0",
"@testing-library/user-event": "^13.0.16",
"@types/archiver": "^6.0.2",
"@types/arrify": "^1.0.4",
"@types/connect-history-api-fallback": "^1.5.2",
"@types/lodash": "^4.14.149",
Expand Down
54 changes: 54 additions & 0 deletions packages/sanity/src/_internal/cli/actions/backup/archiveDir.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import zlib from 'zlib'
import {createWriteStream} from 'fs'
import {ProgressData} from 'archiver'
import debug from './debug'

const archiver = require('archiver')

// ProgressCb is a callback that is called with the number of bytes processed so far.
type ProgressCb = (processedBytes: number) => void

// archiveDir creates a tarball of the given directory and writes it to the given file path.
function archiveDir(tmpOutDir: string, outFilePath: string, progressCb: ProgressCb): Promise<void> {
return new Promise((resolve, reject) => {
const archiveDestination = createWriteStream(outFilePath)
archiveDestination.on('error', (err: Error) => {
reject(err)
})

archiveDestination.on('close', () => {
resolve()
})

const archive = archiver('tar', {
gzip: true,
gzipOptions: {level: zlib.constants.Z_DEFAULT_COMPRESSION},
})

archive.on('error', (err: Error) => {
debug('Archiving errored!\n%s', err.stack)
reject(err)
})

// Catch warnings for non-blocking errors (stat failures and others)
archive.on('warning', (err: Error) => {
debug('Archive warning: %s', err.message)
})

archive.on('progress', (progress: ProgressData) => {
progressCb(progress.fs.processedBytes)
})

// Pipe archive data to the file
archive.pipe(archiveDestination)
archive.directory(tmpOutDir, false)
archive.finalize()
})
}

function humanFileSize(size: number): string {
const i = size == 0 ? 0 : Math.floor(Math.log(size) / Math.log(1024))
return `${(size / Math.pow(1024, i)).toFixed(2)} ${['B', 'kB', 'MB', 'GB', 'TB'][i]}`
}

export {archiveDir, humanFileSize}
12 changes: 12 additions & 0 deletions packages/sanity/src/_internal/cli/actions/backup/cleanupTmpDir.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import rimraf from 'rimraf'
import debug from './debug'

function cleanupTmpDir(tmpDir: string): void {
rimraf(tmpDir, (err) => {
if (err) {
debug(`Error cleaning up temporary files: ${err.message}`)
}
})
}

export default cleanupTmpDir
52 changes: 52 additions & 0 deletions packages/sanity/src/_internal/cli/actions/backup/downloadAsset.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import path from 'path'
import {createWriteStream} from 'fs'
import withRetry from './withRetry'
import debug from './debug'

const {getIt} = require('get-it')
const {keepAlive, promise} = require('get-it/middleware')

const CONNECTION_TIMEOUT = 15 * 1000 // 15 seconds
const READ_TIMEOUT = 3 * 60 * 1000 // 3 minutes

const request = getIt([keepAlive(), promise()])

async function downloadAsset(
url: string,
fileName: string,
fileType: string,
outDir: string,
): Promise<void> {
// File names that contain a path to file (sanity-storage/assets/file-name) fail when archive is created, so we
// want to handle them by taking the base name as file name.
const normalizedFileName = path.basename(fileName)

const assetFilePath = getAssetFilePath(normalizedFileName, fileType, outDir)
await withRetry(async () => {
const response = await request({
url: url,
maxRedirects: 5,
timeout: {connect: CONNECTION_TIMEOUT, socket: READ_TIMEOUT},
stream: true,
})

debug('Received asset %s with status code %d', normalizedFileName, response?.statusCode)

response.body.pipe(createWriteStream(assetFilePath))
})
}

function getAssetFilePath(fileName: string, fileType: string, outDir: string): string {
// Set assetFilePath if we are downloading an asset file.
// If it's a JSON document, assetFilePath will be an empty string.
let assetFilePath = ''
if (fileType === 'image') {
assetFilePath = path.join(outDir, 'images', fileName)
} else if (fileType === 'file') {
assetFilePath = path.join(outDir, 'files', fileName)
}

return assetFilePath
}

export default downloadAsset
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import type {MiddlewareResponse} from 'get-it/src/types'
import debug from './debug'
import withRetry from './withRetry'

const {getIt} = require('get-it')
const {keepAlive, promise} = require('get-it/middleware')

const CONNECTION_TIMEOUT = 15 * 1000 // 15 seconds
const READ_TIMEOUT = 3 * 60 * 1000 // 3 minutes

const request = getIt([keepAlive(), promise()])

// eslint-disable-next-line @typescript-eslint/no-explicit-any
async function downloadDocument(url: string): Promise<any> {
const response = await withRetry<MiddlewareResponse>(() =>
request({
url,
maxRedirects: 5,
timeout: {connect: CONNECTION_TIMEOUT, socket: READ_TIMEOUT},
}),
)

debug('Received document from %s with status code %d', url, response?.statusCode)

return response.body
}

export default downloadDocument
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import {Readable} from 'stream'
import {SanityClient, QueryParams} from '@sanity/client'

type File = {
name: string
url: string
type: string
}

type GetBackupResponse = {
createdAt: string
totalFiles: number
files: File[]
nextCursor?: string
}

class PaginatedGetBackupStream extends Readable {
private cursor = ''
private readonly client: SanityClient
private readonly projectId: string
private readonly datasetName: string
private readonly backupId: string
private readonly token: string
public totalFiles = 0

constructor(
client: SanityClient,
projectId: string,
datasetName: string,
backupId: string,
token: string,
) {
super({objectMode: true})
this.client = client
this.projectId = projectId
this.datasetName = datasetName
this.backupId = backupId
this.token = token
}

async _read(): Promise<void> {
try {
const data = await this.fetchNextBackupPage()

// Set totalFiles when it's fetched for the first time
if (this.totalFiles === 0) {
this.totalFiles = data.totalFiles
}

data.files.forEach((file: File) => this.push(file))

if (typeof data.nextCursor === 'string' && data.nextCursor !== '') {
this.cursor = data.nextCursor
} else {
// No more pages left to fetch.
this.push(null)
}
} catch (err) {
this.destroy(err as Error)
}
}

// fetchNextBackupPage fetches the next page of backed up files from the backup API.
async fetchNextBackupPage(): Promise<GetBackupResponse> {
const query: QueryParams = this.cursor === '' ? {} : {nextCursor: this.cursor}

try {
return await this.client.request({
headers: {Authorization: `Bearer ${this.token}`},
uri: `/projects/${this.projectId}/datasets/${this.datasetName}/backups/${this.backupId}`,
query,
})
} catch (error) {
// It can be clearer to pull this logic out in a common error handling function for re-usability.
let msg = error.statusCode ? error.response.body.message : error.message

// If no message can be extracted, print the whole error.
if (msg === undefined) {
msg = String(error)
}
throw new Error(`Downloading dataset backup failed: ${msg}`)
}
}
}

export {PaginatedGetBackupStream}
export type {File, GetBackupResponse}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import type {CliOutputter} from '@sanity/cli'
import prettyMs from 'pretty-ms'

type ProgressEvent = {
step: string
update?: boolean
current?: number
total?: number
}

interface ProgressSpinner {
set: (progress: ProgressEvent) => void
update: (progress: ProgressEvent) => void
succeed: () => void
fail: () => void
}

const newProgress = (output: CliOutputter, startStep: string): ProgressSpinner => {
let spinner = output.spinner(startStep).start()
let lastProgress: ProgressEvent = {step: startStep}
let start = Date.now()

const print = (progress: ProgressEvent) => {
const elapsed = prettyMs(Date.now() - start)
if (progress.current && progress.current > 0 && progress.total && progress.total > 0) {
spinner.text = `${progress.step} (${progress.current}/${progress.total}) [${elapsed}]`
} else {
spinner.text = `${progress.step} [${elapsed}]`
}
}

return {
set: (progress: ProgressEvent) => {
if (progress.step !== lastProgress.step) {
print(lastProgress) // Print the last progress before moving on
spinner.succeed()
spinner = output.spinner(progress.step).start()
start = Date.now()
} else if (progress.step === lastProgress.step && progress.update) {
print(progress)
}
lastProgress = progress
},
update: (progress: ProgressEvent) => {
print(progress)
lastProgress = progress
},
succeed: () => {
spinner.succeed()
start = Date.now()
},
fail: () => {
spinner.fail()
start = Date.now()
},
}
}

export default newProgress
30 changes: 30 additions & 0 deletions packages/sanity/src/_internal/cli/actions/backup/withRetry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import debug from './debug'

const MAX_RETRIES = 5
const BACKOFF_DELAY_BASE = 200

const exponentialBackoff = (retryCount: number) => Math.pow(2, retryCount) * BACKOFF_DELAY_BASE

async function withRetry<T>(
operation: () => Promise<T>,
maxRetries: number = MAX_RETRIES,
): Promise<T> {
for (let retryCount = 0; retryCount < maxRetries; retryCount++) {
try {
return await operation()
} catch (err) {
// Immediately rethrow if the error is not server-related.
if (err.response && err.response.statusCode && err.response.statusCode < 500) {
throw err
}

const retryDelay = exponentialBackoff(retryCount)
debug(`Error encountered, retrying after ${retryDelay}ms: %s`, err.message)
await new Promise((resolve) => setTimeout(resolve, retryDelay))
}
}

throw new Error('Operation failed after all retries')
}

export default withRetry
Loading

0 comments on commit b9d56e0

Please sign in to comment.