Skip to content

Commit

Permalink
Handle backwards/zero index in watch (#136)
Browse files Browse the repository at this point in the history
Fixes #134 #135
  • Loading branch information
silas authored Dec 15, 2022
1 parent bff24e1 commit 6b399f7
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 52 deletions.
3 changes: 1 addition & 2 deletions .jshintrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"node": true,
"esversion": 9,
"esversion": 11,
"globals": {
// Mocha
"describe": false,
Expand All @@ -15,7 +15,6 @@
"eqnull": true,
"indent": 2,
"latedef": true,
"newcap": true,
"quotmark": "double",
"trailing": true,
"undef": true,
Expand Down
22 changes: 19 additions & 3 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ function parseDuration(value) {
return (n * constants.DURATION_UNITS[m[2]]) / 1e6;
}

/**
* Return BigInt or undefined if parse failed.
*/
function safeBigInt(value) {
if (value === undefined || value === null || value === "") {
return undefined;
}

try {
return BigInt(value);
} catch (err) {
return undefined;
}
}

/**
* Common options
*/
Expand Down Expand Up @@ -556,9 +571,9 @@ function createCheck(src) {
* Has the Consul index changed.
*/
function hasIndexChanged(index, prevIndex) {
if (typeof index !== "string" || !index) return false;
if (typeof prevIndex !== "string" || !prevIndex) return true;
return index !== prevIndex;
if (typeof index !== "bigint" || index <= 0) return false;
if (typeof prevIndex !== "bigint") return true;
return index > prevIndex;
}

/**
Expand Down Expand Up @@ -599,6 +614,7 @@ exports.options = options;
exports.defaultCommonOptions = defaultCommonOptions;
exports.clone = clone;
exports.parseDuration = parseDuration;
exports.safeBigInt = safeBigInt;
exports.setTimeoutContext = setTimeoutContext;
exports.createServiceCheck = createServiceCheck;
exports.createService = createService;
Expand Down
18 changes: 13 additions & 5 deletions lib/watch.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Watch extends events.EventEmitter {
let options = utils.normalizeKeys(opts.options || {});
options = utils.defaults(options, consul._defaults);
options.wait = options.wait || "30s";
options.index = options.index || "0";
options.index = utils.safeBigInt(options.index || "0");

if (
typeof options.timeout !== "string" &&
Expand Down Expand Up @@ -150,14 +150,22 @@ class Watch extends events.EventEmitter {
this._attempts = 0;
this._attemptsMaxed = false;

const newIndex = res.headers["x-consul-index"];

const newIndex = utils.safeBigInt(res.headers["x-consul-index"]);
if (newIndex === undefined) {
return this._err(errors.Validation("Watch not supported"), res);
}
if (newIndex === 0n) {
return this._err(
errors.Consul("Consul returned zero index value"),
res
);
}

const prevIndex = this._options.index;
const reset = prevIndex !== undefined && newIndex < prevIndex;

if (utils.hasIndexChanged(newIndex, this._options.index)) {
this._options.index = newIndex;
if (reset || utils.hasIndexChanged(newIndex, prevIndex)) {
this._options.index = reset ? 0n : newIndex;

this.emit("change", data, res);
}
Expand Down
44 changes: 31 additions & 13 deletions test/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,28 @@ describe("utils", function () {
should(utils.parseDuration("1.5s")).equal(1500);
should(utils.parseDuration("10.03m")).equal(601800);

should(utils.parseDuration()).be.undefined;
should(utils.parseDuration("")).be.undefined;
should(utils.parseDuration(".")).be.undefined;
should(utils.parseDuration("10x")).be.undefined;
should(utils.parseDuration(".ms")).be.undefined;
should(utils.parseDuration()).be.undefined();
should(utils.parseDuration("")).be.undefined();
should(utils.parseDuration(".")).be.undefined();
should(utils.parseDuration("10x")).be.undefined();
should(utils.parseDuration(".ms")).be.undefined();
});
});

describe("safeBigInt", function () {
it("should work", function () {
should(utils.safeBigInt(0)).equal(0n);
should(utils.safeBigInt(-1)).equal(-1n);
should(utils.safeBigInt(500)).equal(500n);
should(utils.safeBigInt("0")).equal(0n);
should(utils.safeBigInt("-1")).equal(-1n);
should(utils.safeBigInt("500")).equal(500n);

should(utils.safeBigInt("")).be.undefined();
should(utils.safeBigInt("a")).be.undefined();
should(utils.safeBigInt("1.0")).be.undefined();
should(utils.safeBigInt(null)).be.undefined();
should(utils.safeBigInt({})).be.undefined();
});
});

Expand Down Expand Up @@ -843,14 +860,15 @@ describe("utils", function () {
it("should work", function () {
should(utils.hasIndexChanged()).equal(false);
should(utils.hasIndexChanged("")).equal(false);
should(utils.hasIndexChanged("1")).equal(true);
should(utils.hasIndexChanged("1", "")).equal(true);
should(utils.hasIndexChanged("10", "1")).equal(true);
should(utils.hasIndexChanged("0", "1")).equal(true);
should(utils.hasIndexChanged("1", "1")).equal(false);
should(utils.hasIndexChanged("1", "0")).equal(true);
should(utils.hasIndexChanged("2", "1")).equal(true);
should(utils.hasIndexChanged("2", "2")).equal(false);
should(utils.hasIndexChanged(0n)).equal(false);
should(utils.hasIndexChanged(1n)).equal(true);
should(utils.hasIndexChanged(1n, "")).equal(true);
should(utils.hasIndexChanged(10n, 1n)).equal(true);
should(utils.hasIndexChanged(0n, 1n)).equal(false);
should(utils.hasIndexChanged(1n, 1n)).equal(false);
should(utils.hasIndexChanged(1n, 0n)).equal(true);
should(utils.hasIndexChanged(2n, 1n)).equal(true);
should(utils.hasIndexChanged(2n, 2n)).equal(false);
});
});
});
94 changes: 65 additions & 29 deletions test/watch.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,27 @@ describe("Watch", function () {
.get("/v1/kv/key1?index=10&wait=30s")
.reply(200, [{ n: 5 }], { "X-Consul-Index": "15" })
.get("/v1/kv/key1?index=15&wait=30s")
.reply(200, [{ n: 6 }], { "X-Consul-Index": "14" })
.get("/v1/kv/key1?index=0&wait=30s")
.reply(200, [{ n: 7 }], { "X-Consul-Index": "6" })
.get("/v1/kv/key1?index=6&wait=30s")
.reply(200, [{ n: 8 }], { "X-Consul-Index": "0" })
.get("/v1/kv/key1?index=6&wait=30s")
.reply(400);

const watch = this.consul.watch({
method: this.consul.kv.get,
options: { key: "key1" },
});

let doneCalled = false;
const safeDone = (err) => {
if (doneCalled) return;
doneCalled = true;
done(err);
watch.end();
};

should(watch.isRunning()).be.true();
should(watch.updateTime()).be.undefined();

Expand All @@ -40,6 +54,10 @@ describe("Watch", function () {
const called = {};

watch.on("error", (err) => {
if (err.message.includes("Nock")) {
return safeDone(err);
}

called.error = true;

errors.push(err);
Expand All @@ -48,48 +66,66 @@ describe("Watch", function () {
watch.on("cancel", () => {
called.cancel = true;

should(list).eql([1, 4, 5]);
try {
should(list).eql([1, 4, 5, 6, 7]);

watch._run();
watch._err();
watch._run();
watch._err();

watch.end();
should(watch.isRunning()).be.false();
watch.end();
should(watch.isRunning()).be.false();
} catch (err) {
safeDone(err);
}
});

watch.on("change", (data, res) => {
called.change = true;

list.push(data.n);

switch (res.headers["x-consul-index"]) {
case "5":
should(watch.isRunning()).be.true();
should(watch.updateTime()).be.a.number();
should(errors).be.empty();
break;
case "10":
should(watch.isRunning()).be.true();
should(watch.updateTime()).be.a.number();
should(errors).have.length(1);
should(errors[0]).have.property(
"message",
"consul: kv.get: internal server error"
);
break;
case "15":
break;
default:
break;
try {
switch (res.headers["x-consul-index"]) {
case "5":
should(watch.isRunning()).be.true();
should(watch.updateTime()).be.a.Number();
should(errors).be.empty();
break;
case "10":
should(watch.isRunning()).be.true();
should(watch.updateTime()).be.a.Number();
should(errors).have.length(1);
should(errors[0]).have.property(
"message",
"consul: kv.get: internal server error"
);
break;
case "15":
break;
default:
break;
}
} catch (err) {
safeDone(err);
}
});

watch.on("end", () => {
should(called).have.property("cancel", true);
should(called).have.property("change", true);
should(called).have.property("error", true);

done();
try {
should(called).have.property("cancel", true);
should(called).have.property("change", true);
should(called).have.property("error", true);

should(errors).have.length(3);
should(errors[1]).have.property(
"message",
"Consul returned zero index value"
);

safeDone();
} catch (err) {
safeDone(err);
}
});
});

Expand Down

0 comments on commit 6b399f7

Please sign in to comment.