Skip to content

Commit

Permalink
feat: provide multipiping
Browse files Browse the repository at this point in the history
  • Loading branch information
antongolub committed Oct 2, 2024
1 parent 4daae39 commit a4e0312
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 32 deletions.
64 changes: 37 additions & 27 deletions src/main/ts/mixin/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,55 @@
import { Writable } from 'node:stream'
import { assign } from '../util.js'
import { VoidWritable } from '../spawn.js'
import EventEmitter from 'node:events'
import { assign, isStringLiteral } from '../util.js'
import { VoidStream } from '../spawn.js'
import type { TShell, TMixin, TShellCtx } from '../x.js'
import { type TZurk, type TZurkPromise, isZurkAny } from '../zurk.js'

// eslint-disable-next-line sonarjs/cognitive-complexity
export const pipeMixin: TMixin = <T extends TZurk | TZurkPromise >($: TShell, result: T, ctx: TShellCtx) =>
isZurkAny(result)
? assign(result, {
pipe(...args: any[]) {
const target = args[0]
const { fulfilled, stdout, store } = ctx
if (isZurkAny(target)) {
// stream.ctx.input = fulfilled.stdout
const input = new VoidWritable()
target.ctx.input = input
pipe(...args: any) {
const [target, ...rest] = args
const { fulfilled, store, ee } = ctx
const from = new VoidStream()
const sync = !('then' in result)
const input = fulfilled ? fulfilled.stdout : from
const fill = () => {
for (const chunk of store.stdout) {
input.push(chunk)
}
if (fulfilled) {
input.push(null)
} else {
stdout.pipe(input)
from.write(chunk)
}

return target
}
let _result

if (target instanceof Writable) {
for (const chunk of store.stdout) {
target.write(chunk)
}
if (fulfilled) {
target.end()
return target
}
if (isZurkAny(target)) {
target.ctx.input = input
_result = target
} else if (target instanceof Writable) {
_result = from.pipe(target)
} else if (isStringLiteral(target, ...rest)) {
_result = $.apply({ input: input, sync}, args)
} else {
throw new Error('Unsupported pipe argument')
}

return stdout.pipe(target)
if (fulfilled) {
fill()
from.end()
} else {
const onStdout = (chunk: string | Buffer) => from.write(chunk)
ee
.once('stdout', () => {
fill()
ee.on('stdout', onStdout)
})
.once('end', () => {
ee.removeListener('stdout', onStdout)
from.end()
})
}

return $.apply({input: fulfilled?.stdout || stdout, sync: !('then' in result)}, args as any)
return _result
}
})
: result
10 changes: 5 additions & 5 deletions src/main/ts/spawn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ export const defaults: TSpawnCtxNormalized = {
spawnOpts: {},
get store() { return createStore() },
callback: noop,
get stdin() { return new VoidWritable() },
get stdout(){ return new VoidWritable() },
get stderr(){ return new VoidWritable() },
get stdin() { return new VoidStream() },
get stdout(){ return new VoidStream() },
get stderr(){ return new VoidStream() },
stdio: ['pipe', 'pipe', 'pipe'],
run: setImmediate,
}
Expand All @@ -121,7 +121,7 @@ export const processInput = (child: TChild, input?: TInput | null) => {
}
}

export class VoidWritable extends Transform {
export class VoidStream extends Transform {
_transform(chunk: any, _: string, cb: (err?: Error) => void) {
this.emit('data', chunk)
cb()
Expand Down Expand Up @@ -181,8 +181,8 @@ export const invoke = (c: TSpawnCtxNormalized): TSpawnCtxNormalized => {
...result,
get stdout() { return c.store.stdout.join('') },
get stderr() { return c.store.stderr.join('') },
stdio,
get stdall() { return c.store.stdall.join('') },
stdio,
duration: Date.now() - now,
ctx: c
})
Expand Down
14 changes: 14 additions & 0 deletions src/test/ts/x.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,19 @@ describe('mixins', () => {

assert.equal(piped.toString(), expected)
})

it('supports multipiping', async () => {
const result = $`echo 1; sleep 1; echo 2; sleep 1; echo 3`
const piped1 = result.pipe`cat`
let piped2: any

setTimeout(() => {
piped2 = result.pipe`cat`
}, 1500)

await piped1
assert.equal((await piped1).toString(), '1\n2\n3')
assert.equal((await piped2).toString(), '1\n2\n3')
})
})
})

0 comments on commit a4e0312

Please sign in to comment.