diff --git a/src/main/ts/mixin/pipe.ts b/src/main/ts/mixin/pipe.ts index 4226f0c..7683a5a 100644 --- a/src/main/ts/mixin/pipe.ts +++ b/src/main/ts/mixin/pipe.ts @@ -1,6 +1,7 @@ import { Writable } from 'node:stream' -import { assign } from '../util.js' -import { VoidWritable } from '../spawn.js' +import EventEmitter from 'node:events' +import { assign, isStringLiteral } from '../util.js' +import { VoidStream } from '../spawn.js' import type { TShell, TMixin, TShellCtx } from '../x.js' import { type TZurk, type TZurkPromise, isZurkAny } from '../zurk.js' @@ -8,38 +9,47 @@ import { type TZurk, type TZurkPromise, isZurkAny } from '../zurk.js' export const pipeMixin: TMixin = ($: TShell, result: T, ctx: TShellCtx) => isZurkAny(result) ? assign(result, { - pipe(...args: any[]) { - const target = args[0] - const { fulfilled, stdout, store } = ctx - if (isZurkAny(target)) { - // stream.ctx.input = fulfilled.stdout - const input = new VoidWritable() - target.ctx.input = input + pipe(...args: any) { + const [target, ...rest] = args + const { fulfilled, store, ee } = ctx + const from = new VoidStream() + const sync = !('then' in result) + const input = fulfilled ? fulfilled.stdout : from + const fill = () => { for (const chunk of store.stdout) { - input.push(chunk) - } - if (fulfilled) { - input.push(null) - } else { - stdout.pipe(input) + from.write(chunk) } - - return target } + let _result - if (target instanceof Writable) { - for (const chunk of store.stdout) { - target.write(chunk) - } - if (fulfilled) { - target.end() - return target - } + if (isZurkAny(target)) { + target.ctx.input = input + _result = target + } else if (target instanceof Writable) { + _result = from.pipe(target) + } else if (isStringLiteral(target, ...rest)) { + _result = $.apply({ input: input, sync}, args) + } else { + throw new Error('Unsupported pipe argument') + } - return stdout.pipe(target) + if (fulfilled) { + fill() + from.end() + } else { + const onStdout = (chunk: string | Buffer) => from.write(chunk) + ee + .once('stdout', () => { + fill() + ee.on('stdout', onStdout) + }) + .once('end', () => { + ee.removeListener('stdout', onStdout) + from.end() + }) } - return $.apply({input: fulfilled?.stdout || stdout, sync: !('then' in result)}, args as any) + return _result } }) : result diff --git a/src/main/ts/spawn.ts b/src/main/ts/spawn.ts index 0f7481c..6497d19 100644 --- a/src/main/ts/spawn.ts +++ b/src/main/ts/spawn.ts @@ -98,9 +98,9 @@ export const defaults: TSpawnCtxNormalized = { spawnOpts: {}, get store() { return createStore() }, callback: noop, - get stdin() { return new VoidWritable() }, - get stdout(){ return new VoidWritable() }, - get stderr(){ return new VoidWritable() }, + get stdin() { return new VoidStream() }, + get stdout(){ return new VoidStream() }, + get stderr(){ return new VoidStream() }, stdio: ['pipe', 'pipe', 'pipe'], run: setImmediate, } @@ -121,7 +121,7 @@ export const processInput = (child: TChild, input?: TInput | null) => { } } -export class VoidWritable extends Transform { +export class VoidStream extends Transform { _transform(chunk: any, _: string, cb: (err?: Error) => void) { this.emit('data', chunk) cb() @@ -181,8 +181,8 @@ export const invoke = (c: TSpawnCtxNormalized): TSpawnCtxNormalized => { ...result, get stdout() { return c.store.stdout.join('') }, get stderr() { return c.store.stderr.join('') }, - stdio, get stdall() { return c.store.stdall.join('') }, + stdio, duration: Date.now() - now, ctx: c }) diff --git a/src/test/ts/x.test.ts b/src/test/ts/x.test.ts index cb80046..84bc2ad 100644 --- a/src/test/ts/x.test.ts +++ b/src/test/ts/x.test.ts @@ -170,5 +170,19 @@ describe('mixins', () => { assert.equal(piped.toString(), expected) }) + + it('supports multipiping', async () => { + const result = $`echo 1; sleep 1; echo 2; sleep 1; echo 3` + const piped1 = result.pipe`cat` + let piped2: any + + setTimeout(() => { + piped2 = result.pipe`cat` + }, 1500) + + await piped1 + assert.equal((await piped1).toString(), '1\n2\n3') + assert.equal((await piped2).toString(), '1\n2\n3') + }) }) })