Skip to content

Commit

Permalink
sigint
Browse files Browse the repository at this point in the history
  • Loading branch information
pacoccino committed Feb 23, 2022
1 parent c3648e4 commit 0431397
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 29 deletions.
75 changes: 52 additions & 23 deletions api/src/lib/async.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,74 @@
import async from 'async'

type ExecFn<T, R> = (task: T) => Promise<R>

type ExecResult<T, R> = {
task: T
result?: R
error?: Error
}

function reflect<T, R>(fn: ExecFn<T, R>) {
return async (task: T): Promise<ExecResult<T, R>> => {
type ParrallelResult<T, R> = {
successes: ExecResult<T, R>[]
errors: ExecResult<T, R>[]
}

type ParrallelActions<T, R> = {
stop: () => Promise<void>
finished: () => Promise<ParrallelResult<T, R>>
}

export function parallel<T, R>(
tasks: T[],
limit: number,
fn: ExecFn<T, R>,
processKill = false
): ParrallelActions<T, R> {
let errors: ExecResult<T, R>[] = []
let successes: ExecResult<T, R>[] = []

const queue = async.queue(async (task: T) => {
try {
const result = await fn(task)
return {
successes = successes.concat({
task,
result,
}
})
} catch (error) {
return {
errors = errors.concat({
task,
error,
}
})
}
}, limit)

queue.push(tasks)

async function stop() {
await queue.kill()
await queue.drain()
}

if (processKill) {
process.on('SIGINT', function () {
console.log('SIGINT, stopping queue')
stop().then(() => {
console.log('queue stopped, killing process')
process.exit()
})
})
}

async function finished(): Promise<ParrallelResult<T, R>> {
await queue.drain()
return {
errors,
successes,
}
}
}

export async function parallel<T, R>(
tasks: T[],
limit: number,
fn: ExecFn<T, R>
) {
const results = await async.mapLimit(tasks, limit, reflect(fn))
const errors = results.reduce(
(acc, curr) => (curr.error ? acc.concat(curr) : acc),
[]
)
const successes = results.reduce(
(acc, curr) => (curr.result ? acc.concat(curr) : acc),
[]
)
return {
errors,
successes,
stop,
finished,
}
}
9 changes: 6 additions & 3 deletions api/src/lib/scanner/scanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,15 @@ export async function scanFiles(_args = {}) {
const files = await s3photos.list()
logger.debug({ filesLength: files.length }, 'importing files from s3')

const scanResult = await parallel<Task, TaskResult>(
files,
const parallelActions = parallel<Task, TaskResult>(
files.slice(0, 8),
PARALLEL_SCANS,
scanImage
scanImage,
true
)

const scanResult = await parallelActions.finished()

if (scanResult.errors.length)
logger.error({ errors: scanResult.errors }, 'errors:')

Expand Down
8 changes: 5 additions & 3 deletions api/src/lib/uploader/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { logger as parentLogger } from 'src/lib/logger'

import { ACCEPTED_EXTENSIONS } from 'src/lib/images/constants'

const logger = parentLogger.child({ module: 'SCANNER' })
const logger = parentLogger.child({ module: 'UPLOADER' })

const PARALLEL_UPLOAD = 5

Expand Down Expand Up @@ -79,12 +79,14 @@ export async function upload({
const tasks: Task[] = files.map((path) => ({ rootDir, path, prefix }))

logger.debug({ filesLength: files.length }, 'uploading files to S3')
const uploadResult = await parallel<Task, TaskResult>(
const parallelActions = await parallel<Task, TaskResult>(
tasks,
PARALLEL_UPLOAD,
uploadFile
uploadFile,
true
)

const uploadResult = await parallelActions.finished()
if (uploadResult.errors.length)
logger.error({ errors: uploadResult.errors }, 'error:')

Expand Down

0 comments on commit 0431397

Please sign in to comment.