Skip to content

Commit

Permalink
Ignore outputs for earlier commands
Browse files Browse the repository at this point in the history
when a new command is received.

Earlier, due to the use of promises, results of earlier commands were possible to be relayed to the UI/coordinator, which is not part of the spec. In this version, we ensure that responses to earlier commands are dropped (from stdout, stderr) on receipt of a new valid command.
  • Loading branch information
artfuldev committed Jan 21, 2024
1 parent 688c90b commit 312235c
Show file tree
Hide file tree
Showing 18 changed files with 269 additions and 67 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "random-step",
"version": "2.1.1",
"version": "2.2.0",
"description": "A tic tac toe engine that follows the st3p protocol but makes a random valid move every time",
"main": "src/index.ts",
"repository": {
Expand All @@ -20,6 +20,7 @@
"typescript": "^5.3.3"
},
"dependencies": {
"fp-ts": "^2.16.1"
"fp-ts": "^2.16.1",
"rxjs": "^7.8.1"
}
}
13 changes: 13 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { filter, map, mergeWith, of, switchMap } from "rxjs";
import { parse } from "./st3p";
import { ParseFailure, ParseSuccess } from "./parser";
import { Command, run } from "./st3p/command";
import { Sources } from "./sources";
import { Sinks } from "./sinks";

export const app = ({ stdin: { line$ } }: Sources): Sinks => {
const parsed$ = line$.pipe(map(parse));
const command$ = parsed$.pipe(
filter((x) => x.type === "success"),
map((x) => (x as ParseSuccess<Command>).parsed)
);
const execute$ = command$.pipe(switchMap((command) => of(run(command))));
return {
stderr: parsed$.pipe(
filter((x) => x.type === "failure"),
map((x) => (x as ParseFailure).reason),
mergeWith(execute$.pipe(switchMap((x) => x.stderr)))
),
stdout: execute$.pipe(switchMap((x) => x.stdout)),
exit: execute$.pipe(switchMap((x) => x.exit)),
};
};
5 changes: 5 additions & 0 deletions src/framework/exit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Observable } from "rxjs";

export const exit = (code$: Observable<number>) => {
code$.subscribe((code) => process.exit(code));
};
47 changes: 47 additions & 0 deletions src/framework/run.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { Observable, Subject } from "rxjs";
import {
MatchingDrivers,
MatchingMain,
SinkProxies,
Sinks,
Sources,
} from "./types";

export const run = <
D extends MatchingDrivers<D, M>,
M extends MatchingMain<D, M>
>(
main: M,
drivers: D
) => {
const subjects = (Object.keys(drivers) as Array<keyof D>).reduce(
(acc, k) => ({
...acc,
[k]: new Subject(),
}),
{} as SinkProxies<Sinks<M>>
);
const sources = (Object.keys(drivers) as Array<keyof D>).reduce(
(acc, k) => ({
...acc,
[k]: (drivers as any)[k]((subjects as any)[k]),
}),
{} as Sources<D>
);
const sinks = main(sources);
Object.keys(sinks).forEach((key) => {
const sink = sinks[key] as Observable<any>;
const proxy = (subjects as any)[key] as Subject<any>;
sink.subscribe({
next: (value: any) => {
proxy.next(value);
},
error: (err: any) => {
proxy.error(err);
},
complete: () => {
proxy.complete();
},
});
});
};
5 changes: 5 additions & 0 deletions src/framework/stderr.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Observable } from "rxjs";

export const stderr = (err$: Observable<string>) => {
err$.subscribe((line) => console.error(line));
};
15 changes: 15 additions & 0 deletions src/framework/stdin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { createInterface } from "node:readline";
import { Observable, fromEvent, map, tap } from "rxjs";

type Sources = {
line$: Observable<string>;
};

export const stdin = (): Sources => {
const rl = createInterface({
input: process.stdin,
});
return {
line$: fromEvent<string>(rl, "line"),
};
};
5 changes: 5 additions & 0 deletions src/framework/stdout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Observable } from "rxjs";

export const stdout = (out$: Observable<string>) => {
out$.subscribe((line) => console.log(line));
};
52 changes: 52 additions & 0 deletions src/framework/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Observable } from "rxjs";

export type SinkProxies<Si> = { [P in keyof Si]: Observable<any> };

export type Driver<Si, So> = Si extends void ? () => So : (stream: Si) => So;

export type DisposeFunction = () => void;

export type Drivers = {
[name: string]: Driver<Observable<any>, any | void>;
};

export type Main = (...args: Array<any>) => any;

export type Sources<D extends Drivers> = { [k in keyof D]: ReturnType<D[k]> };

export type Sinks<M extends Main> = ReturnType<M>;

export type MatchingMain<D extends Drivers, M extends Main> =
| (Main & {
(so: Sources<D>): Sinks<M>;
})
| (Main & {
(): Sinks<M>;
});

/**
* For whatever reason, this does not work with RxJS observables,
* this for this reason, `MatchingDrivers` has to be redefined
* in @cycle/rxjs-run-
*/
export type ToStream<S> = S extends Observable<infer T> ? Observable<T> : S;

type WidenStream<S, U> = S extends Observable<infer T>
? T extends U
? U
: never
: any;

type GetValidInputs<D extends Driver<any, any>> = D extends Driver<infer S, any>
? S extends Observable<infer T>
? T
: never
: never;

export type MatchingDrivers<D extends Drivers, M extends Main> = Drivers & {
[k in string & keyof Sinks<M>]:
| (() => Sources<D>[k])
| ((
si: Observable<WidenStream<ToStream<Sinks<M>[k]>, GetValidInputs<D[k]>>>
) => Sources<D>[k]);
};
22 changes: 11 additions & 11 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import * as readline from "node:readline";
import { parse, run } from "./st3p";
import { app } from "./app";
import { exit } from "./framework/exit";
import { run } from "./framework/run";
import { stderr } from "./framework/stderr";
import { stdin } from "./framework/stdin";
import { stdout } from "./framework/stdout";

const rl = readline.createInterface({
input: process.stdin,
});

rl.on("line", (line) => {
if (process.env.DEBUG) console.log('>', line);
const result = parse(line);
if (result.type === "success") run(result.parsed);
else console.log(`unknown: ${line}, reason: ${result.reason}`);
run(app, {
stdin,
stdout,
stderr,
exit,
});
1 change: 1 addition & 0 deletions src/parser/integer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { isExternal } from "util/types";
import { Parser, failure, success } from "./parser";

export const integer: Parser<number> = (str: string) => {
Expand Down
7 changes: 7 additions & 0 deletions src/sinks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { Observable } from "rxjs";

export type Sinks = {
stdout: Observable<string>;
stderr: Observable<string>;
exit: Observable<number>;
};
5 changes: 5 additions & 0 deletions src/sources.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { stdin } from "./framework/stdin";

export type Sources = {
stdin: ReturnType<typeof stdin>;
};
12 changes: 9 additions & 3 deletions src/st3p/command.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { EMPTY, of } from "rxjs";
import { Parser, or } from "../parser";
import { Sinks } from "../sinks";
import * as H from "./handshake";
import * as I from "./identify";
import * as M from "./move";
Expand All @@ -8,7 +10,7 @@ export type Command = H.Handshake | I.Identify | M.Move | Q.Quit;

export const parse: Parser<Command> = or(H.parse, I.parse, M.parse, Q.parse);

export const run = async (command: Command) => {
export const run = (command: Command): Sinks => {
switch (command[0]) {
case "handshake":
return H.handshake(command);
Expand All @@ -19,6 +21,10 @@ export const run = async (command: Command) => {
case "quit":
return Q.quit(command);
default:
return console.error(new Error("unknown command"));
return {
stdout: EMPTY,
stderr: of(new Error("unknown command").message),
exit: EMPTY,
};
}
}
};
10 changes: 8 additions & 2 deletions src/st3p/handshake.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { pipe } from "fp-ts/lib/function";
import { and, token, one_or_more, map, whitespace } from "../parser";
import { Sinks } from "../sinks";
import { EMPTY, of } from "rxjs";

type Version = string;
export type Handshake = ["handshake", Version];
Expand All @@ -16,6 +18,10 @@ export const parse = pipe(
map(([, , , , version]) => Handshake(version))
);

export const handshake = async ([, version]: Handshake) => {
console.log(`st3p version ${version} ok`);
export const handshake = ([, version]: Handshake): Sinks => {
return {
stderr: EMPTY,
stdout: of(`st3p version ${version} ok`),
exit: EMPTY,
};
};
28 changes: 17 additions & 11 deletions src/st3p/identify.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import { token, map } from "../parser";
import * as P from "../parser";
import { name, author, version, repository } from "../../package.json";
import { pipe } from "fp-ts/lib/function";
import { Sinks } from "../sinks";
import { EMPTY, from, map } from "rxjs";

export type Identify = ["identify"];
const Identify: Identify = ["identify"];

export const parse = pipe(
token("identify"),
map(() => Identify)
P.token("identify"),
P.map(() => Identify)
);

const write = (str: string) => console.log('identify', str);

export const identify = async (_: Identify) => {
write(`name ${name}`);
write(`author ${author}`);
write(`version ${version}`);
write(`url ${repository.url}`);
write("ok");
export const identify = (_: Identify): Sinks => {
return {
stderr: EMPTY,
exit: EMPTY,
stdout: from([
`name ${name}`,
`author ${author}`,
`version ${version}`,
`url ${repository.url}`,
"ok",
]).pipe(map((str) => `identify ${str}`)),
};
};
Loading

0 comments on commit 312235c

Please sign in to comment.