diff --git a/__tests__/HierarchicalSingleEvent.test.ts b/__tests__/HierarchicalSingleEvent.test.ts index 45e7b9989..77eb80349 100644 --- a/__tests__/HierarchicalSingleEvent.test.ts +++ b/__tests__/HierarchicalSingleEvent.test.ts @@ -4,7 +4,8 @@ import { Parameter, OutPort, InPort, - TimeValue + TimeValue, + CanConnectResult } from "../src/core/internal"; import {SingleEvent} from "../src/share/SingleEvent"; @@ -93,7 +94,7 @@ describe("HierarchicalSingleEvent", function () { seTest.seContainer.child.o, seTest.logContainer.child.i ) - ).toBe(false); + ).toBe(CanConnectResult.NOT_IN_SCOPE); seTest._start(); }); diff --git a/__tests__/InvalidMutations.ts b/__tests__/InvalidMutations.ts index 17b62f6bf..977cad095 100644 --- a/__tests__/InvalidMutations.ts +++ b/__tests__/InvalidMutations.ts @@ -38,25 +38,20 @@ class R1 extends Reactor { [this.in1], [this.in1, this.in2, this.out1, this.out2], function (this, __in1, __in2, __out1, __out2) { - test("expect error on creating creating direct feed through", () => { - expect(() => { - this.connect(__in2, __out2); - }).toThrowError("New connection introduces direct feed through."); - }); test("expect error when creating connection outside container", () => { expect(() => { - this.connect(__out2, __in2); + this.connect(__out2.asConnectable(), __in2.asConnectable()); }).toThrowError("New connection is outside of container."); }); const R2 = new R1(this.getReactor()); test("expect error on mutation creating race condition on an output port", () => { expect(() => { - this.connect(R2.out1, __out1); + this.connect(R2.out1.asConnectable(), __out1.asConnectable()); }).toThrowError("Destination port is already occupied."); }); test("expect error on spawning and creating loop within a reactor", () => { expect(() => { - this.connect(R2.out1, R2.in1); + this.connect(R2.out1.asConnectable(), R2.in1.asConnectable()); }).toThrowError("New connection introduces cycle."); }); } diff --git a/__tests__/SimpleMutation.test.ts b/__tests__/SimpleMutation.test.ts index e85d06766..bd1345584 100644 --- a/__tests__/SimpleMutation.test.ts +++ b/__tests__/SimpleMutation.test.ts @@ -46,7 +46,7 @@ class R2 extends Reactor { function (this, __in, __out) { test("expect error to be thrown", () => { expect(() => { - this.connect(__out, __in); + this.connect(__out.asConnectable(), __in.asConnectable()); }).toThrowError("New connection is outside of container."); }); } diff --git a/__tests__/SingleEvent.test.ts b/__tests__/SingleEvent.test.ts index ac89fb1f3..ab49c55fd 100644 --- a/__tests__/SingleEvent.test.ts +++ b/__tests__/SingleEvent.test.ts @@ -32,12 +32,8 @@ describe("SingleEvent", function () { expect(expect(seTest.singleEvent).toBeInstanceOf(SingleEvent)); expect(expect(seTest.logger).toBeInstanceOf(Logger)); - expect(function () { - seTest.canConnect(seTest.singleEvent.o, seTest.logger.i); - }).toThrow(new Error("Destination port is already occupied.")); - expect(seTest.canConnect(seTest.logger.i, seTest.singleEvent.o)).toBe( - false - ); + expect(seTest.canConnect(seTest.singleEvent.o, seTest.logger.i)).toBeTruthy(); + expect(seTest.canConnect(seTest.logger.i, seTest.singleEvent.o)).toBeTruthy(); seTest._start(); }); diff --git a/__tests__/connection.bonus.test.ts b/__tests__/connection.bonus.test.ts new file mode 100644 index 000000000..b63a71070 --- /dev/null +++ b/__tests__/connection.bonus.test.ts @@ -0,0 +1,99 @@ +import {App, InPort, OutPort, Reactor} from "../src/core/internal"; + +// Readers might wonder why this test case exist; +// This is mainly because in the past, we assume direct feedthrough is forbidden, and will not establish the connection if we try to do so. However, it is possible that such a thing happen without introducing any causality issue. +describe("Direct feedthrough without causality issue", () => { + class BigReactor extends App { + public children: SmallReactor; + + constructor() { + super(undefined, false, false); + this.children = new SmallReactor(this); + } + } + + class SmallReactor extends Reactor { + public inp: InPort; + public outp: OutPort; + + constructor(parent: Reactor) { + super(parent); + this.inp = new InPort(this); + this.outp = new OutPort(this); + this.addMutation( + [this.startup], + [this.inp, this.outp], + function (this, inp, outp) { + it("test", () => { + expect(this.getReactor().canConnect(inp, outp)).toBeFalsy(); + }); + } + ); + } + } + + const root = new BigReactor(); + root._start(); +}); + +describe("Causality loop that can't be detected by only checking local graph", () => { + class FeedThrougher extends Reactor { + public inp = new InPort(this); + public outp = new OutPort(this); + + constructor(parent: Reactor) { + super(parent); + this.addReaction( + [this.inp], + [this.inp, this.writable(this.outp)], + function (this, inp, outp) { + // nop troll + } + ); + } + } + + class Looper extends Reactor { + public leftChild = new FeedThrougher(this); + public rightChild = new FeedThrougher(this); + + public inp = new InPort(this); + public outp = new OutPort(this); + + constructor(parent: Reactor) { + super(parent); + this.addMutation( + [this.startup], + [ + this.inp.asConnectable(), + this.outp.asConnectable(), + this.leftChild.inp.asConnectable(), + this.leftChild.outp.asConnectable(), + this.rightChild.inp.asConnectable(), + this.rightChild.outp.asConnectable() + ], + function (this, inp, outp, inp_lc, outp_lc, inp_rc, outp_rc) { + this.connect(inp, inp_lc); + this.connect(outp_rc, outp); + } + ); + } + } + + class TestApp extends App { + public child = new Looper(this); + + constructor() { + super(undefined, undefined, undefined, () => {}); + this._connect(this.child.outp, this.child.inp); + it("Test a connection that would create zero delay loop cannot be made", () => { + expect( + this.canConnect(this.child.leftChild.outp, this.child.rightChild.inp) + ).toBeTruthy(); + }); + } + } + + const app = new TestApp(); + app._start(); +}); diff --git a/__tests__/connection.test.ts b/__tests__/connection.test.ts index 3d8725e30..4798b7bc3 100644 --- a/__tests__/connection.test.ts +++ b/__tests__/connection.test.ts @@ -5,7 +5,8 @@ import { OutPort, InPort, TimeUnit, - TimeValue + TimeValue, + CanConnectResult } from "../src/core/internal"; describe("Check canConnect", () => { @@ -30,19 +31,19 @@ describe("Check canConnect", () => { it("canConnect success out->in", () => { expect(this.canConnect(this.source.out, this.destination.in)).toBe( - true + CanConnectResult.SUCCESS ); }); it("canConnect success out->out", () => { expect(this.canConnect(this.source.out, this.destination.out)).toBe( - true + CanConnectResult.SUCCESS ); }); it("canConnect failure", () => { expect(this.canConnect(this.destination.in, this.source.out)).toBe( - false + CanConnectResult.NOT_IN_SCOPE ); }); } diff --git a/__tests__/disconnect.test.ts b/__tests__/disconnect.test.ts index b25b31cb2..c876c3365 100644 --- a/__tests__/disconnect.test.ts +++ b/__tests__/disconnect.test.ts @@ -54,11 +54,11 @@ class R1 extends Reactor { const R2 = new R1(this.getReactor()); test("expect that disconnecting an existing connection will not result in an error being thrown", () => { expect(() => { - this.connect(R2.out2, R2.in2); + this.connect(R2.out2.asConnectable(), R2.in2.asConnectable()); this.disconnect(R2.out2, R2.in2); - this.connect(R2.out2, R2.in2); + this.connect(R2.out2.asConnectable(), R2.in2.asConnectable()); this.disconnect(R2.out2); - this.connect(R2.out2, R2.in2); + this.connect(R2.out2.asConnectable(), R2.in2.asConnectable()); }).not.toThrow(); }); } diff --git a/__tests__/hierarchy.ts b/__tests__/hierarchy.ts index ec7fda22f..0515e861e 100644 --- a/__tests__/hierarchy.ts +++ b/__tests__/hierarchy.ts @@ -1,4 +1,4 @@ -import {Reactor, App, InPort, OutPort} from "../src/core/internal"; +import {Reactor, App, InPort, OutPort, CanConnectResult} from "../src/core/internal"; class InOut extends Reactor { a = new InPort(this); @@ -36,65 +36,57 @@ describe("Container to Contained", () => { it("testing canConnect", () => { expect( app.container.canConnect(app.container.a, app.container.contained.a) - ).toBe(true); + ).toBe(CanConnectResult.SUCCESS); expect( app.container.canConnect(app.container.contained.a, app.container.a) - ).toBe(false); + ).toBe(CanConnectResult.NOT_IN_SCOPE); expect( app.container.canConnect( app.container.a, app.container.b ) - ).toBe(true); + ).toBe(CanConnectResult.SUCCESS); expect( app.container.canConnect( app.container.contained.a, app.container.contained.b ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect( app.container.contained.b, app.container.contained.a ) - ).toBe(true); + ).toBeFalsy(); expect( app.container.canConnect(app.container.a, app.container.contained.b) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.contained.b, app.container.a) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.b, app.container.contained.a) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.contained.a, app.container.b) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.b, app.container.contained.b) - ).toBe(false); + ).toBeTruthy(); expect( app.container.canConnect(app.container.contained.b, app.container.b) - ).toBe(true); - - expect(app.container.canConnect(app.container.contained.a, app.foo.a)).toBe( - false - ); - expect(app.container.canConnect(app.container.contained.a, app.foo.b)).toBe( - false - ); - expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBe( - false - ); - expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBe( - false - ); - - expect(app.container.canConnect(app.foo.a, app.container.b)).toBe(false); - expect(app.container.canConnect(app.foo.a, app.container.a)).toBe(false); + ).toBeFalsy(); + + expect(app.container.canConnect(app.container.contained.a, app.foo.a)).toBeTruthy(); + expect(app.container.canConnect(app.container.contained.a, app.foo.b)).toBeTruthy(); + expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBeTruthy(); + expect(app.container.canConnect(app.foo.a, app.container.contained.a)).toBeTruthy(); + + expect(app.container.canConnect(app.foo.a, app.container.b)).toBeTruthy(); + expect(app.container.canConnect(app.foo.a, app.container.a)).toBeTruthy(); // expect(app.container.contained).toBeDefined(); @@ -104,49 +96,49 @@ describe("Container to Contained", () => { app.container.contained.containedAgain.a, app.container.contained.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.container.contained.b ) - ).toBe(true); + ).toBeFalsy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.a, app.container.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.container.b ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.a, app.foo.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.foo.b ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.a, app.foo.a ) - ).toBe(false); + ).toBeTruthy(); expect( app.container.contained.canConnect( app.container.contained.containedAgain.b, app.foo.b ) - ).toBe(false); + ).toBeTruthy(); // } }); }); diff --git a/__tests__/mutations.test.ts b/__tests__/mutations.test.ts index 090e90053..93f64af7a 100644 --- a/__tests__/mutations.test.ts +++ b/__tests__/mutations.test.ts @@ -4,7 +4,8 @@ import { Timer, OutPort, InPort, - TimeValue + TimeValue, + IOPort } from "../src/core/internal"; class Source extends Reactor { @@ -92,7 +93,7 @@ class Computer extends Reactor { continue; } const x = new AddOne(this.getReactor(), id); - this.connect(src, x.input); + this.connect(src.asConnectable(), x.input.asConnectable()); } } }); @@ -189,3 +190,158 @@ describe("Creating reactors at runtime", function () { // }); // }); +describe("Test the result from refactor-canconnect: referencing ConnectablePort should not introduce any change in the causality graph", () => { + class InnocentReactor extends Reactor { + public inp = new InPort(this); + public outp = new OutPort(this); + public cip = new InPort(this); + public oip = new OutPort(this); + public child = new (class InnocentChild extends Reactor { + public oip = new OutPort(this); + constructor(parent: InnocentReactor) { + super(parent); + } + })(this); + + constructor(parent: TestApp) { + super(parent); + + this.addMutation( + [this.startup], + [ + this.inp, + this.writable(this.outp), + this.cip.asConnectable(), + this.oip.asConnectable(), + this.child.oip.asConnectable() + ], + function (this, a0, a1, a2, a3, a4) { + // This is current failing as we disallow direct feedthrough. Nevertheless I'm unsure why that is the case as of now? + // this.connect(a2, a3); + this.connect(a2, a4); + } + ); + } + } + + class TestApp extends App { + public child = new InnocentReactor(this); + constructor(done: () => void) { + super(undefined, undefined, undefined, () => { + const mut = this.child["_mutations"][1]; // M0 is the shutdown mutation; M1 is our mutation + const pg = this._getPrecedenceGraph(); + // child.oip should be an island in the causality graph + expect(pg.getUpstreamNeighbors(this.child.oip).size).toBe(0); + expect(pg.getDownstreamNeighbors(this.child.oip).size).toBe(0); + // The only dependency child.child.oip should have is child.cip + expect(pg.getUpstreamNeighbors(this.child.child.oip).size).toBe(1); + expect( + pg.getUpstreamNeighbors(this.child.child.oip).has(this.child.cip) + ).toBeTruthy(); + done(); + }); + } + } + test("test dependencies", (done) => { + const tapp = new TestApp(done); + tapp._start(); + }); +}); + +/* ++-----------+ +------------------------+ +| Useless | | +--------------+ | +| Reactor |------>--->| Mutation 1 | | +| | | +--------------+ | ++-----------+ | Mutation | | + | Holder \ | + | +--------v-----+ | + | | Mutation 2 | | + | +--------------+ | + +------------------------+ +*/ + +describe("Mutation phase 1: a mutation should not be able to make connection that directly lays on their upstream", () => { + class MutationHolder extends Reactor { + public inport: InPort; + public mut1inport: InPort; + public mut2inport: InPort; + public trigger: InPort; + + constructor(parent: Reactor) { + super(parent); + + this.inport = new InPort(this); + this.mut1inport = new InPort(this); + this.mut2inport = new InPort(this); + this.trigger = new InPort(this); + + // M1 + this.addMutation( + [this.trigger, this.mut1inport], + [ + this.inport.asConnectable(), + this.mut1inport.asConnectable(), + this.mut2inport.asConnectable() + ], + function (this, inp, m1, m2) { + expect(() => { + this.connect(inp, m1); + }).toThrow(); + expect(() => { + this.connect(inp, m2); + }).toReturn(); + } + ); + + // M2 + this.addMutation( + [this.trigger, this.mut2inport], + [ + this.inport.asConnectable(), + this.mut1inport.asConnectable(), + this.mut2inport.asConnectable() + ], + function (this, inp, m1, m2) { + expect(() => { + this.connect(inp, m1); + }).toThrow(); + expect(() => { + this.connect(inp, m2); + }).toThrow(); + } + ); + } + } + + class TestApp extends App { + public child = new MutationHolder(this); + public trigger = new OutPort(this); + + constructor(done: () => void) { + super(undefined, undefined, undefined, () => { + done(); + }); + + this.addMutation( + [this.startup], + [this.trigger.asConnectable(), this.child.trigger.asConnectable()], + function (this, myTrigger, childTrigger) { + this.connect(myTrigger, childTrigger); + } + ); + this.addReaction( + [this.startup], + [this.writable(this.trigger)], + function (this, trigger) { + trigger.set(true); + } + ); + } + } + + test("test mutation", (done) => { + const tapp = new TestApp(done); + tapp._start(); + }); +}); diff --git a/src/benchmark/FacilityLocation.ts b/src/benchmark/FacilityLocation.ts index bf72e0bd0..54eb637d8 100644 --- a/src/benchmark/FacilityLocation.ts +++ b/src/benchmark/FacilityLocation.ts @@ -4,7 +4,7 @@ * * @author Hokeun Kim (hokeunkim@berkeley.edu) */ -import type {WritablePort} from "../core/internal"; +import type {IOPort} from "../core/internal"; import { Log, TimeValue, @@ -428,6 +428,7 @@ export class Quadrant extends Reactor { childrenBoundaries = new State(new Array()); totalCost = new State(0); + trollPort = new OutPort(this); constructor( parent: Reactor, @@ -525,14 +526,19 @@ export class Quadrant extends Reactor { this.writable(this.toSecondChild), this.writable(this.toThirdChild), this.writable(this.toFourthChild), - this.writable(this.toAccumulator), + this.toFirstChild.asConnectable(), + this.toSecondChild.asConnectable(), + this.toThirdChild.asConnectable(), + this.toFourthChild.asConnectable(), + this.toAccumulator.asConnectable(), this.localFacilities, this.knownFacilities, this.maxDepthOfKnownOpenFacility, this.supportCustomers, this.hasChildren, this.childrenBoundaries, - this.totalCost + this.totalCost, + this.writable(this.trollPort) ], function ( this, @@ -544,20 +550,26 @@ export class Quadrant extends Reactor { depth, fromProducer, toProducer, - toFirstChild, - toSecondChild, - toThirdChild, - toFourthChild, - toAccumulator, + toFirstChildW, + toSecondChildW, + toThirdChildW, + toFourthChildW, + toFirstChildC, + toSecondChildC, + toThirdChildC, + toFourthChildC, + toAccumulatorC, localFacilities, knownFacilities, maxDepthOfKnownOpenFacility, supportCustomers, hasChildren, childrenBoundaries, - totalCost + totalCost, + trollPort ) { const thisReactor = this.getReactor(); + const toAccumulatorUpstreams: Array> = []; // Helper functions for mutation reaction. const notifyParentOfFacility = function (p: Point): void { @@ -622,11 +634,12 @@ export class Quadrant extends Reactor { // console.log(`Children boundaries: ${childrenBoundaries.get()[0]}, ${childrenBoundaries.get()[1]}, ${childrenBoundaries.get()[2]}, ${childrenBoundaries.get()[3]}`) const accumulator = new Accumulator(thisReactor); - const toAccumulatorOfQuadrant = ( - toAccumulator as unknown as WritablePort - ).getPort(); // Connect Accumulator's output to Quadrant's output. - this.connect(accumulator.toNextAccumulator, toAccumulatorOfQuadrant); + toAccumulatorUpstreams.push(accumulator.toNextAccumulator); + this.connect( + accumulator.toNextAccumulator.asConnectable(), + toAccumulatorC + ); const firstChild = new Quadrant( thisReactor, @@ -640,11 +653,11 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toFirstChildPort = ( - toFirstChild as unknown as WritablePort - ).getPort(); - this.connect(toFirstChildPort, firstChild.fromProducer); - this.connect(firstChild.toAccumulator, accumulator.fromFirstQuadrant); + this.connect(toFirstChildC, firstChild.fromProducer.asConnectable()); + this.connect( + firstChild.toAccumulator.asConnectable(), + accumulator.fromFirstQuadrant.asConnectable() + ); const secondChild = new Quadrant( thisReactor, @@ -658,13 +671,13 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toSecondChildPort = ( - toSecondChild as unknown as WritablePort - ).getPort(); - this.connect(toSecondChildPort, secondChild.fromProducer); this.connect( - secondChild.toAccumulator, - accumulator.fromSecondQuadrant + toSecondChildC, + secondChild.fromProducer.asConnectable() + ); + this.connect( + secondChild.toAccumulator.asConnectable(), + accumulator.fromSecondQuadrant.asConnectable() ); const thirdChild = new Quadrant( @@ -679,11 +692,11 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toThirdChildPort = ( - toThirdChild as unknown as WritablePort - ).getPort(); - this.connect(toThirdChildPort, thirdChild.fromProducer); - this.connect(thirdChild.toAccumulator, accumulator.fromThirdQuadrant); + this.connect(toThirdChildC, thirdChild.fromProducer.asConnectable()); + this.connect( + thirdChild.toAccumulator.asConnectable(), + accumulator.fromThirdQuadrant.asConnectable() + ); const fourthChild = new Quadrant( thisReactor, @@ -697,13 +710,13 @@ export class Quadrant extends Reactor { maxDepthOfKnownOpenFacility.get(), Point.arrayClone(supportCustomers.get()) ); - const toFourthChildPort = ( - toFourthChild as unknown as WritablePort - ).getPort(); - this.connect(toFourthChildPort, fourthChild.fromProducer); this.connect( - fourthChild.toAccumulator, - accumulator.fromFourthQuadrant + toFourthChildC, + fourthChild.fromProducer.asConnectable() + ); + this.connect( + fourthChild.toAccumulator.asConnectable(), + accumulator.fromFourthQuadrant.asConnectable() ); supportCustomers.set(new Array()); @@ -733,16 +746,16 @@ export class Quadrant extends Reactor { if (childrenBoundaries.get()[i].contains(point)) { switch (i) { case 0: - toFirstChild.set(msg); + toFirstChildW.set(msg); break; case 1: - toSecondChild.set(msg); + toSecondChildW.set(msg); break; case 2: - toThirdChild.set(msg); + toThirdChildW.set(msg); break; case 3: - toFourthChild.set(msg); + toFourthChildW.set(msg); break; } break; @@ -754,7 +767,12 @@ export class Quadrant extends Reactor { case RequestExitMsg: if (!hasChildren.get()) { // No children, number of facilities will be counted on parent's side. - toAccumulator.set( + toAccumulatorUpstreams.forEach((val) => { + this.disconnect(val, toAccumulatorC.getPort()); + }); + this.connect(trollPort.getPort().asConnectable(), toAccumulatorC); + + trollPort.set( new ConfirmExitMsg( 0, // facilities supportCustomers.get().length, // supportCustomers @@ -762,10 +780,10 @@ export class Quadrant extends Reactor { ) ); } else { - toFirstChild.set(msg); - toSecondChild.set(msg); - toThirdChild.set(msg); - toFourthChild.set(msg); + toFirstChildW.set(msg); + toSecondChildW.set(msg); + toThirdChildW.set(msg); + toFourthChildW.set(msg); } break; default: diff --git a/src/benchmark/Sieve.ts b/src/benchmark/Sieve.ts index 4b88776ed..e64327b2d 100644 --- a/src/benchmark/Sieve.ts +++ b/src/benchmark/Sieve.ts @@ -1,5 +1,4 @@ import { - type WritablePort, Parameter, InPort, OutPort, @@ -66,11 +65,12 @@ class Filter extends Reactor { [ this.inp, this.writable(this.out), + this.out.asConnectable(), this.startPrime, this.hasChild, this.localPrimes ], - function (this, inp, out, prime, hasChild, localPrimes) { + function (this, inp, outW, outC, prime, hasChild, localPrimes) { const p = inp.get(); if (p !== undefined) { const seen = localPrimes.get(); @@ -96,14 +96,13 @@ class Filter extends Reactor { // let x = this.create(Filter, [this.getReactor(), p]) // console.log("CREATED: " + x._getFullyQualifiedName()) // FIXME: weird hack. Maybe just accept writable ports as well? - const port = (out as unknown as WritablePort).getPort(); - this.connect(port, n.inp); + this.connect(outC, n.inp.asConnectable()); // FIXME: this updates the dependency graph, but it doesn't redo the topological sort // For a pipeline like this one, it is not necessary, but in general it is. // Can we avoid redoing the entire sort? hasChild.set(true); } else { - out.set(p); + outW.set(p); } } } diff --git a/src/core/port.ts b/src/core/port.ts index e6206e821..8870d1cd0 100644 --- a/src/core/port.ts +++ b/src/core/port.ts @@ -7,7 +7,8 @@ import type { Absent, MultiReadWrite, ReadWrite, - Variable + Variable, + Read } from "./internal"; import {Trigger, Log} from "./internal"; @@ -59,6 +60,13 @@ export abstract class Port extends Trigger { } } +export class ConnectablePort implements Read { + public get = (): Absent => undefined; + public getPort = (): IOPort => this.port; + + constructor(public port: IOPort) {} +} + /** * Abstract class for a writable port. It is intended as a wrapper for a * regular port. In addition to a get method, it also has a set method and @@ -103,6 +111,10 @@ export abstract class IOPort extends Port { } } + public asConnectable(): ConnectablePort { + return new ConnectablePort(this); + } + /** * Only the holder of the key may obtain a writable port. * @param key diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 075e863ac..765b9117b 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -42,7 +42,8 @@ import { Startup, Shutdown, WritableMultiPort, - Dummy + Dummy, + ConnectablePort } from "./internal"; import {v4 as uuidv4} from "uuid"; import {Bank} from "./bank"; @@ -61,6 +62,18 @@ export interface Call extends Write, Read { invoke: (args: A) => R | undefined; } +export enum CanConnectResult { + SUCCESS = 0, + SELF_LOOP = "Source port and destination port are the same.", + DESTINATION_OCCUPIED = "Destination port is already occupied.", + DOWNSTREAM_WRITE_CONFLICT = "Write conflict: port is already occupied.", + NOT_IN_SCOPE = "Source and destination ports are not in scope.", + RT_CONNECTION_OUTSIDE_CONTAINER = "New connection is outside of container.", + RT_DIRECT_FEED_THROUGH = "New connection introduces direct feed through.", + RT_CYCLE = "New connection introduces cycle.", + MUTATION_CAUSALITY_LOOP = "New connection will change the causal effect of the mutation that triggered this connection." +} + /** * Abstract class for a schedulable action. It is intended as a wrapper for a * regular action. In addition to a get method, it also has a schedule method @@ -153,6 +166,11 @@ export abstract class Reactor extends Component { */ private readonly _keyChain = new Map(); + // This is the keychain for creation, i.e. if Reactor R's mutation created reactor B, + // then R is B's creator, even if they are siblings. R should have access to B, + // at least semantically......? + private readonly _creatorKeyChain = new Map(); + /** * This graph has in it all the dependencies implied by this container's * ports, reactions, and connections. @@ -387,6 +405,9 @@ export abstract class Reactor extends Component { return owner._getKey(component, this._keyChain.get(owner)); } } + return component + .getContainer() + ._getKey(component, this._creatorKeyChain.get(component.getContainer())); } /** @@ -428,16 +449,31 @@ export abstract class Reactor extends Component { * @param src * @param dst */ + + public connect( + src: ConnectablePort, + dst: ConnectablePort + ): void; + public connect( + src: CallerPort, + dst: CalleePort + ): void; public connect( - src: CallerPort | IOPort, - dst: CalleePort | IOPort + ...[src, dst]: + | [ConnectablePort, ConnectablePort] + | [CallerPort, CalleePort] ): void { if (src instanceof CallerPort && dst instanceof CalleePort) { this.reactor._connectCall(src, dst); - } else if (src instanceof IOPort && dst instanceof IOPort) { - this.reactor._connect(src, dst); + } else if ( + src instanceof ConnectablePort && + dst instanceof ConnectablePort + ) { + this.reactor._connect(src.getPort(), dst.getPort()); } else { - // ERROR + throw Error( + "Logically unreachable code: src and dst type mismatch, Caller(ee) port cannot be connected to IOPort." + ); } } @@ -467,6 +503,20 @@ export abstract class Reactor extends Component { public delete(reactor: Reactor): void { reactor._delete(); } + + public addChild( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + return this.reactor._addChild(constructor, ...args); + } + + public addSibling( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + return this.reactor._addSibling(constructor, ...args); + } }; /** @@ -1091,10 +1141,13 @@ export abstract class Reactor extends Component { * @param src The start point of a new connection. * @param dst The end point of a new connection. */ - public canConnect(src: IOPort, dst: IOPort): boolean { + public canConnect( + src: IOPort, + dst: IOPort + ): CanConnectResult { // Immediate rule out trivial self loops. if (src === dst) { - throw Error("Source port and destination port are the same."); + return CanConnectResult.SELF_LOOP; } // Check the race condition @@ -1102,7 +1155,7 @@ export abstract class Reactor extends Component { // in addReaction) const deps = this._dependencyGraph.getUpstreamNeighbors(dst); // FIXME this will change with multiplex ports if (deps !== undefined && deps.size > 0) { - throw Error("Destination port is already occupied."); + return CanConnectResult.DESTINATION_OCCUPIED; } if (!this._runtime.isRunning()) { @@ -1114,10 +1167,13 @@ export abstract class Reactor extends Component { // Rule out write conflicts. // - (between reactors) if (this._dependencyGraph.getDownstreamNeighbors(dst).size > 0) { - return false; + return CanConnectResult.DOWNSTREAM_WRITE_CONFLICT; } - return this._isInScope(src, dst); + if (!this._isInScope(src, dst)) { + return CanConnectResult.NOT_IN_SCOPE; + } + return CanConnectResult.SUCCESS; } else { // Attempt to make a connection while executing. // Check the local dependency graph to figure out whether this change @@ -1131,10 +1187,27 @@ export abstract class Reactor extends Component { src._isContainedBy(this) && dst._isContainedBy(this) ) { - throw Error("New connection is outside of container."); + return CanConnectResult.RT_CONNECTION_OUTSIDE_CONTAINER; + } + + /** + * TODO (axmmisaka): The following code is commented for multiple reasons: + * The causality interface check is not fully implemented so new checks are failing + * Second, direct feedthrough itself would not cause any problem *per se*. + * To ensure there is no cycle, the safest way is to check against the global dependency graph. + */ + + let app = this as Reactor; + while (app._getContainer() !== app) { + app = app._getContainer(); + } + const graph = app._getPrecedenceGraph(); + graph.addEdge(src, dst); + if (graph.hasCycle()) { + return CanConnectResult.RT_CYCLE; } - // Take the local graph and merge in all the causality interfaces + /* // Take the local graph and merge in all the causality interfaces // of contained reactors. Then: const graph = new PrecedenceGraph | Reaction>(); graph.addAll(this._dependencyGraph); @@ -1148,23 +1221,21 @@ export abstract class Reactor extends Component { // 1) check for loops const hasCycle = graph.hasCycle(); + if (hasCycle) { + return CanConnectResult.RT_CYCLE; + } // 2) check for direct feed through. // FIXME: This doesn't handle while direct feed thorugh cases. - let hasDirectFeedThrough = false; - if (src instanceof InPort && dst instanceof OutPort) { - hasDirectFeedThrough = dst.getContainer() === src.getContainer(); - } - // Throw error cases - if (hasDirectFeedThrough && hasCycle) { - throw Error("New connection introduces direct feed through and cycle."); - } else if (hasCycle) { - throw Error("New connection introduces cycle."); - } else if (hasDirectFeedThrough) { - throw Error("New connection introduces direct feed through."); - } + if ( + src instanceof InPort && + dst instanceof OutPort && + dst.getContainer() === src.getContainer() + ) { + return CanConnectResult.RT_DIRECT_FEED_THROUGH; + } */ - return true; + return CanConnectResult.SUCCESS; } } @@ -1258,11 +1329,14 @@ export abstract class Reactor extends Component { if (dst === undefined || dst === null) { throw new Error("Cannot connect unspecified destination"); } - if (this.canConnect(src, dst)) { - this._uncheckedConnect(src, dst); - } else { - throw new Error(`ERROR connecting ${src} to ${dst}`); + const canConnectResult = this.canConnect(src, dst); + // I know, this looks a bit weird. But + if (canConnectResult !== CanConnectResult.SUCCESS) { + throw new Error( + `ERROR connecting ${src} to ${dst}. Reason is ${canConnectResult.valueOf()}` + ); } + this._uncheckedConnect(src, dst); } protected _connectMulti( @@ -1316,7 +1390,8 @@ export abstract class Reactor extends Component { } for (let i = 0; i < leftPorts.length && i < rightPorts.length; i++) { - if (!this.canConnect(leftPorts[i], rightPorts[i])) { + const canConnectResult = this.canConnect(leftPorts[i], rightPorts[i]); + if (canConnectResult !== CanConnectResult.SUCCESS) { throw new Error( `ERROR connecting ${leftPorts[i]} to ${rightPorts[i]} @@ -1555,6 +1630,33 @@ export abstract class Reactor extends Component { toString(): string { return this._getFullyQualifiedName(); } + + protected _addChild( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + const newReactor = new constructor(this, ...args); + return newReactor; + } + + protected _addSibling( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + if (this._getContainer() == null) { + throw new Error( + `Reactor ${this} does not have a parent. Sibling is not well-defined.` + ); + } + if (this._getContainer() === this) { + throw new Error( + `Reactor ${this} is self-contained. Adding sibling creates logical issue.` + ); + } + const newReactor = this._getContainer()._addChild(constructor, ...args); + this._creatorKeyChain.set(newReactor, newReactor._key); + return newReactor; + } } /* @@ -1784,10 +1886,13 @@ interface UtilityFunctions { } export interface MutationSandbox extends ReactionSandbox { - connect: ( - src: CallerPort | IOPort, - dst: CalleePort | IOPort - ) => void; + connect: { + (src: ConnectablePort, dst: ConnectablePort): void; + ( + src: CallerPort, + dst: CalleePort + ): void; + }; disconnect: (src: IOPort, dst?: IOPort) => void; @@ -1795,6 +1900,16 @@ export interface MutationSandbox extends ReactionSandbox { getReactor: () => Reactor; // Container + addChild: ( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ) => R; + + addSibling: ( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ) => R; + // FIXME: // forkJoin(constructor: new () => Reactor, ): void; }