From c51898a9e633c9ebf879f679267a4acdf28fb1b6 Mon Sep 17 00:00:00 2001 From: Vanessa Williams Date: Tue, 29 Mar 2016 10:53:01 -0400 Subject: [PATCH 1/2] Issue #1 Allow deletion of durable subscriptions by adding headers to the unsubscribe call --- lib/stomp-node.js | 2 +- lib/stomp.js | 134 +++++++++++++++++++++++----------------------- src/stomp.coffee | 13 +++-- 3 files changed, 75 insertions(+), 74 deletions(-) diff --git a/lib/stomp-node.js b/lib/stomp-node.js index 68660ee..542049b 100644 --- a/lib/stomp-node.js +++ b/lib/stomp-node.js @@ -1,4 +1,4 @@ -// Generated by CoffeeScript 1.7.1 +// Generated by CoffeeScript 1.10.0 /* Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0 diff --git a/lib/stomp.js b/lib/stomp.js index 9c30356..095dd44 100644 --- a/lib/stomp.js +++ b/lib/stomp.js @@ -1,4 +1,4 @@ -// Generated by CoffeeScript 1.7.1 +// Generated by CoffeeScript 1.10.0 /* Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0 @@ -9,8 +9,8 @@ (function() { var Byte, Client, Frame, Stomp, - __hasProp = {}.hasOwnProperty, - __slice = [].slice; + hasProp = {}.hasOwnProperty, + slice = [].slice; Byte = { LF: '\x0A', @@ -20,24 +20,24 @@ Frame = (function() { var unmarshallSingle; - function Frame(command, headers, body) { - this.command = command; - this.headers = headers != null ? headers : {}; - this.body = body != null ? body : ''; + function Frame(command1, headers1, body1) { + this.command = command1; + this.headers = headers1 != null ? headers1 : {}; + this.body = body1 != null ? body1 : ''; } Frame.prototype.toString = function() { - var lines, name, skipContentLength, value, _ref; + var lines, name, ref, skipContentLength, value; lines = [this.command]; skipContentLength = this.headers['content-length'] === false ? true : false; if (skipContentLength) { delete this.headers['content-length']; } - _ref = this.headers; - for (name in _ref) { - if (!__hasProp.call(_ref, name)) continue; - value = _ref[name]; - lines.push("" + name + ":" + value); + ref = this.headers; + for (name in ref) { + if (!hasProp.call(ref, name)) continue; + value = ref[name]; + lines.push(name + ":" + value); } if (this.body && !skipContentLength) { lines.push("content-length:" + (Frame.sizeOfUTF8(this.body))); @@ -55,7 +55,7 @@ }; unmarshallSingle = function(data) { - var body, chr, command, divider, headerLines, headers, i, idx, len, line, start, trim, _i, _j, _len, _ref, _ref1; + var body, chr, command, divider, headerLines, headers, i, idx, j, k, len, len1, line, ref, ref1, ref2, start, trim; divider = data.search(RegExp("" + Byte.LF + Byte.LF)); headerLines = data.substring(0, divider).split(Byte.LF); command = headerLines.shift(); @@ -63,9 +63,9 @@ trim = function(str) { return str.replace(/^\s+|\s+$/g, ''); }; - _ref = headerLines.reverse(); - for (_i = 0, _len = _ref.length; _i < _len; _i++) { - line = _ref[_i]; + ref = headerLines.reverse(); + for (j = 0, len1 = ref.length; j < len1; j++) { + line = ref[j]; idx = line.indexOf(':'); headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1)); } @@ -76,7 +76,7 @@ body = ('' + data).substring(start, start + len); } else { chr = null; - for (i = _j = start, _ref1 = data.length; start <= _ref1 ? _j < _ref1 : _j > _ref1; i = start <= _ref1 ? ++_j : --_j) { + for (i = k = ref1 = start, ref2 = data.length; ref1 <= ref2 ? k < ref2 : k > ref2; i = ref1 <= ref2 ? ++k : --k) { chr = data.charAt(i); if (chr === Byte.NULL) { break; @@ -95,14 +95,14 @@ partial: '' }; r.frames = (function() { - var _i, _len, _ref, _results; - _ref = frames.slice(0, -1); - _results = []; - for (_i = 0, _len = _ref.length; _i < _len; _i++) { - frame = _ref[_i]; - _results.push(unmarshallSingle(frame)); + var j, len1, ref, results; + ref = frames.slice(0, -1); + results = []; + for (j = 0, len1 = ref.length; j < len1; j++) { + frame = ref[j]; + results.push(unmarshallSingle(frame)); } - return _results; + return results; })(); last_frame = frames.slice(-1)[0]; if (last_frame === Byte.LF || (last_frame.search(RegExp("" + Byte.NULL + Byte.LF + "*$"))) !== -1) { @@ -126,8 +126,8 @@ Client = (function() { var now; - function Client(ws) { - this.ws = ws; + function Client(ws1) { + this.ws = ws1; this.ws.binaryType = "arraybuffer"; this.counter = 0; this.connected = false; @@ -141,8 +141,8 @@ } Client.prototype.debug = function(message) { - var _ref; - return typeof window !== "undefined" && window !== null ? (_ref = window.console) != null ? _ref.log(message) : void 0 : void 0; + var ref; + return typeof window !== "undefined" && window !== null ? (ref = window.console) != null ? ref.log(message) : void 0 : void 0; }; now = function() { @@ -173,20 +173,20 @@ }; Client.prototype._setupHeartbeat = function(headers) { - var serverIncoming, serverOutgoing, ttl, v, _ref, _ref1; - if ((_ref = headers.version) !== Stomp.VERSIONS.V1_1 && _ref !== Stomp.VERSIONS.V1_2) { + var ref, ref1, serverIncoming, serverOutgoing, ttl, v; + if ((ref = headers.version) !== Stomp.VERSIONS.V1_1 && ref !== Stomp.VERSIONS.V1_2) { return; } - _ref1 = (function() { - var _i, _len, _ref1, _results; - _ref1 = headers['heart-beat'].split(","); - _results = []; - for (_i = 0, _len = _ref1.length; _i < _len; _i++) { - v = _ref1[_i]; - _results.push(parseInt(v)); + ref1 = (function() { + var j, len1, ref1, results; + ref1 = headers['heart-beat'].split(","); + results = []; + for (j = 0, len1 = ref1.length; j < len1; j++) { + v = ref1[j]; + results.push(parseInt(v)); } - return _results; - })(), serverOutgoing = _ref1[0], serverIncoming = _ref1[1]; + return results; + })(), serverOutgoing = ref1[0], serverIncoming = ref1[1]; if (!(this.heartbeat.outgoing === 0 || serverIncoming === 0)) { ttl = Math.max(this.heartbeat.outgoing, serverIncoming); if (typeof this.debug === "function") { @@ -221,7 +221,7 @@ Client.prototype._parseConnect = function() { var args, connectCallback, errorCallback, headers; - args = 1 <= arguments.length ? __slice.call(arguments, 0) : []; + args = 1 <= arguments.length ? slice.call(arguments, 0) : []; headers = {}; switch (args.length) { case 2: @@ -245,7 +245,7 @@ Client.prototype.connect = function() { var args, errorCallback, headers, out; - args = 1 <= arguments.length ? __slice.call(arguments, 0) : []; + args = 1 <= arguments.length ? slice.call(arguments, 0) : []; out = this._parseConnect.apply(this, args); headers = out[0], this.connectCallback = out[1], errorCallback = out[2]; if (typeof this.debug === "function") { @@ -253,15 +253,15 @@ } this.ws.onmessage = (function(_this) { return function(evt) { - var arr, c, client, data, frame, messageID, onreceive, subscription, unmarshalledData, _i, _len, _ref, _results; + var arr, c, client, data, frame, j, len1, messageID, onreceive, ref, results, subscription, unmarshalledData; data = typeof ArrayBuffer !== 'undefined' && evt.data instanceof ArrayBuffer ? (arr = new Uint8Array(evt.data), typeof _this.debug === "function" ? _this.debug("--- got data length: " + arr.length) : void 0, ((function() { - var _i, _len, _results; - _results = []; - for (_i = 0, _len = arr.length; _i < _len; _i++) { - c = arr[_i]; - _results.push(String.fromCharCode(c)); + var j, len1, results; + results = []; + for (j = 0, len1 = arr.length; j < len1; j++) { + c = arr[j]; + results.push(String.fromCharCode(c)); } - return _results; + return results; })()).join('')) : evt.data; _this.serverActivity = now(); if (data === Byte.LF) { @@ -275,10 +275,10 @@ } unmarshalledData = Frame.unmarshall(_this.partialData + data); _this.partialData = unmarshalledData.partial; - _ref = unmarshalledData.frames; - _results = []; - for (_i = 0, _len = _ref.length; _i < _len; _i++) { - frame = _ref[_i]; + ref = unmarshalledData.frames; + results = []; + for (j = 0, len1 = ref.length; j < len1; j++) { + frame = ref[j]; switch (frame.command) { case "CONNECTED": if (typeof _this.debug === "function") { @@ -286,7 +286,7 @@ } _this.connected = true; _this._setupHeartbeat(frame.headers); - _results.push(typeof _this.connectCallback === "function" ? _this.connectCallback(frame) : void 0); + results.push(typeof _this.connectCallback === "function" ? _this.connectCallback(frame) : void 0); break; case "MESSAGE": subscription = frame.headers.subscription; @@ -306,22 +306,22 @@ } return client.nack(messageID, subscription, headers); }; - _results.push(onreceive(frame)); + results.push(onreceive(frame)); } else { - _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled received MESSAGE: " + frame) : void 0); + results.push(typeof _this.debug === "function" ? _this.debug("Unhandled received MESSAGE: " + frame) : void 0); } break; case "RECEIPT": - _results.push(typeof _this.onreceipt === "function" ? _this.onreceipt(frame) : void 0); + results.push(typeof _this.onreceipt === "function" ? _this.onreceipt(frame) : void 0); break; case "ERROR": - _results.push(typeof errorCallback === "function" ? errorCallback(frame) : void 0); + results.push(typeof errorCallback === "function" ? errorCallback(frame) : void 0); break; default: - _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled frame: " + frame) : void 0); + results.push(typeof _this.debug === "function" ? _this.debug("Unhandled frame: " + frame) : void 0); } } - return _results; + return results; }; })(this); this.ws.onclose = (function(_this) { @@ -393,17 +393,19 @@ client = this; return { id: headers.id, - unsubscribe: function() { - return client.unsubscribe(headers.id); + unsubscribe: function(hdrs) { + return client.unsubscribe(headers.id, hdrs); } }; }; - Client.prototype.unsubscribe = function(id) { + Client.prototype.unsubscribe = function(id, headers) { + if (headers == null) { + headers = {}; + } delete this.subscriptions[id]; - return this._transmit("UNSUBSCRIBE", { - id: id - }); + headers.id = id; + return this._transmit("UNSUBSCRIBE", headers); }; Client.prototype.begin = function(transaction) { diff --git a/src/stomp.coffee b/src/stomp.coffee index 69788a9..33bf77a 100644 --- a/src/stomp.coffee +++ b/src/stomp.coffee @@ -362,8 +362,8 @@ class Client return { id: headers.id - unsubscribe: -> - client.unsubscribe headers.id + unsubscribe: (hdrs) -> + client.unsubscribe headers.id, hdrs } # [UNSUBSCRIBE Frame](http://stomp.github.com/stomp-specification-1.1.html#UNSUBSCRIBE) @@ -375,12 +375,11 @@ class Client # # var subscription = client.subscribe(destination, onmessage); # ... - # subscription.unsubscribe(); - unsubscribe: (id) -> + # subscription.unsubscribe(headers); + unsubscribe: (id, headers={}) -> delete @subscriptions[id] - @_transmit "UNSUBSCRIBE", { - id: id - } + headers.id = id + @_transmit "UNSUBSCRIBE", headers # [BEGIN Frame](http://stomp.github.com/stomp-specification-1.1.html#BEGIN) # From 1378907c96e93298c5216026a65bcb3a261fc964 Mon Sep 17 00:00:00 2001 From: Vanessa Williams Date: Tue, 29 Mar 2016 11:17:35 -0400 Subject: [PATCH 2/2] Issue #2 Support STOMP 1.2. ACK and NACK headers. This change is backward-compatible with STOMP 1.1 and 1.0 --- lib/stomp.js | 23 ++++++++++++++++++----- lib/stomp.min.js | 4 ++-- src/stomp.coffee | 20 +++++++++++++++----- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/lib/stomp.js b/lib/stomp.js index 095dd44..190360a 100644 --- a/lib/stomp.js +++ b/lib/stomp.js @@ -285,6 +285,7 @@ _this.debug("connected to server " + frame.headers.server); } _this.connected = true; + _this.version = frame.headers.version; _this._setupHeartbeat(frame.headers); results.push(typeof _this.connectCallback === "function" ? _this.connectCallback(frame) : void 0); break; @@ -293,7 +294,11 @@ onreceive = _this.subscriptions[subscription] || _this.onreceive; if (onreceive) { client = _this; - messageID = frame.headers["message-id"]; + if (_this.version === Stomp.VERSIONS.V1_2) { + messageID = frame.headers["ack"]; + } else { + messageID = frame.headers["message-id"]; + } frame.ack = function(headers) { if (headers == null) { headers = {}; @@ -442,7 +447,11 @@ if (headers == null) { headers = {}; } - headers["message-id"] = messageID; + if (this.version === Stomp.VERSIONS.V1_2) { + headers["id"] = messageID; + } else { + headers["message-id"] = messageID; + } headers.subscription = subscription; return this._transmit("ACK", headers); }; @@ -451,7 +460,11 @@ if (headers == null) { headers = {}; } - headers["message-id"] = messageID; + if (this.version === Stomp.VERSIONS.V1_2) { + headers["id"] = messageID; + } else { + headers["message-id"] = messageID; + } headers.subscription = subscription; return this._transmit("NACK", headers); }; @@ -466,13 +479,13 @@ V1_1: '1.1', V1_2: '1.2', supportedVersions: function() { - return '1.1,1.0'; + return '1.2,1.1,1.0'; } }, client: function(url, protocols) { var klass, ws; if (protocols == null) { - protocols = ['v10.stomp', 'v11.stomp']; + protocols = ['v10.stomp', 'v11.stomp', 'v12.stomp']; } klass = Stomp.WebSocketClass || WebSocket; ws = new klass(url, protocols); diff --git a/lib/stomp.min.js b/lib/stomp.min.js index dd0b1fa..762e059 100644 --- a/lib/stomp.min.js +++ b/lib/stomp.min.js @@ -1,8 +1,8 @@ -// Generated by CoffeeScript 1.7.1 +// Generated by CoffeeScript 1.10.0 /* Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0 Copyright (C) 2010-2013 [Jeff Mesnil](http://jmesnil.net/) Copyright (C) 2012 [FuseSource, Inc.](http://fusesource.com) */ -(function(){var t,e,n,i,r={}.hasOwnProperty,o=[].slice;t={LF:"\n",NULL:"\x00"};n=function(){var e;function n(t,e,n){this.command=t;this.headers=e!=null?e:{};this.body=n!=null?n:""}n.prototype.toString=function(){var e,i,o,s,u;e=[this.command];o=this.headers["content-length"]===false?true:false;if(o){delete this.headers["content-length"]}u=this.headers;for(i in u){if(!r.call(u,i))continue;s=u[i];e.push(""+i+":"+s)}if(this.body&&!o){e.push("content-length:"+n.sizeOfUTF8(this.body))}e.push(t.LF+this.body);return e.join(t.LF)};n.sizeOfUTF8=function(t){if(t){return encodeURI(t).match(/%..|./g).length}else{return 0}};e=function(e){var i,r,o,s,u,a,c,f,h,l,p,d,g,b,m,v,y;s=e.search(RegExp(""+t.LF+t.LF));u=e.substring(0,s).split(t.LF);o=u.shift();a={};d=function(t){return t.replace(/^\s+|\s+$/g,"")};v=u.reverse();for(g=0,m=v.length;gy;c=p<=y?++b:--b){r=e.charAt(c);if(r===t.NULL){break}i+=r}}return new n(o,a,i)};n.unmarshall=function(n){var i,r,o,s;r=n.split(RegExp(""+t.NULL+t.LF+"*"));s={frames:[],partial:""};s.frames=function(){var t,n,o,s;o=r.slice(0,-1);s=[];for(t=0,n=o.length;t>> "+r)}while(true){if(r.length>this.maxWebSocketFrameSize){this.ws.send(r.substring(0,this.maxWebSocketFrameSize));r=r.substring(this.maxWebSocketFrameSize);if(typeof this.debug==="function"){this.debug("remaining = "+r.length)}}else{return this.ws.send(r)}}};r.prototype._setupHeartbeat=function(n){var r,o,s,u,a,c;if((a=n.version)!==i.VERSIONS.V1_1&&a!==i.VERSIONS.V1_2){return}c=function(){var t,e,i,r;i=n["heart-beat"].split(",");r=[];for(t=0,e=i.length;t>> PING"):void 0}}(this))}if(!(this.heartbeat.incoming===0||o===0)){s=Math.max(this.heartbeat.incoming,o);if(typeof this.debug==="function"){this.debug("check PONG every "+s+"ms")}return this.ponger=i.setInterval(s,function(t){return function(){var n;n=e()-t.serverActivity;if(n>s*2){if(typeof t.debug==="function"){t.debug("did not receive server activity for the last "+n+"ms")}return t.ws.close()}}}(this))}};r.prototype._parseConnect=function(){var t,e,n,i;t=1<=arguments.length?o.call(arguments,0):[];i={};switch(t.length){case 2:i=t[0],e=t[1];break;case 3:if(t[1]instanceof Function){i=t[0],e=t[1],n=t[2]}else{i.login=t[0],i.passcode=t[1],e=t[2]}break;case 4:i.login=t[0],i.passcode=t[1],e=t[2],n=t[3];break;default:i.login=t[0],i.passcode=t[1],e=t[2],n=t[3],i.host=t[4]}return[i,e,n]};r.prototype.connect=function(){var r,s,u,a;r=1<=arguments.length?o.call(arguments,0):[];a=this._parseConnect.apply(this,r);u=a[0],this.connectCallback=a[1],s=a[2];if(typeof this.debug==="function"){this.debug("Opening Web Socket...")}this.ws.onmessage=function(i){return function(r){var o,u,a,c,f,h,l,p,d,g,b,m,v;c=typeof ArrayBuffer!=="undefined"&&r.data instanceof ArrayBuffer?(o=new Uint8Array(r.data),typeof i.debug==="function"?i.debug("--- got data length: "+o.length):void 0,function(){var t,e,n;n=[];for(t=0,e=o.length;tv;c=m<=v?++l:--l){r=e.charAt(c);if(r===t.NULL){break}i+=r}}return new n(o,a,i)};n.unmarshall=function(n){var i,r,o,s;r=n.split(RegExp(""+t.NULL+t.LF+"*"));s={frames:[],partial:""};s.frames=function(){var t,n,o,s;o=r.slice(0,-1);s=[];for(t=0,n=o.length;t>> "+r)}while(true){if(r.length>this.maxWebSocketFrameSize){this.ws.send(r.substring(0,this.maxWebSocketFrameSize));r=r.substring(this.maxWebSocketFrameSize);if(typeof this.debug==="function"){this.debug("remaining = "+r.length)}}else{return this.ws.send(r)}}};r.prototype._setupHeartbeat=function(n){var r,o,s,u,a,c;if((r=n.version)!==i.VERSIONS.V1_1&&r!==i.VERSIONS.V1_2){return}o=function(){var t,e,i,r;i=n["heart-beat"].split(",");r=[];for(t=0,e=i.length;t>> PING"):void 0}}(this))}if(!(this.heartbeat.incoming===0||u===0)){a=Math.max(this.heartbeat.incoming,u);if(typeof this.debug==="function"){this.debug("check PONG every "+a+"ms")}return this.ponger=i.setInterval(a,function(t){return function(){var n;n=e()-t.serverActivity;if(n>a*2){if(typeof t.debug==="function"){t.debug("did not receive server activity for the last "+n+"ms")}return t.ws.close()}}}(this))}};r.prototype._parseConnect=function(){var t,e,n,i;t=1<=arguments.length?o.call(arguments,0):[];i={};switch(t.length){case 2:i=t[0],e=t[1];break;case 3:if(t[1]instanceof Function){i=t[0],e=t[1],n=t[2]}else{i.login=t[0],i.passcode=t[1],e=t[2]}break;case 4:i.login=t[0],i.passcode=t[1],e=t[2],n=t[3];break;default:i.login=t[0],i.passcode=t[1],e=t[2],n=t[3],i.host=t[4]}return[i,e,n]};r.prototype.connect=function(){var r,s,u,a;r=1<=arguments.length?o.call(arguments,0):[];a=this._parseConnect.apply(this,r);u=a[0],this.connectCallback=a[1],s=a[2];if(typeof this.debug==="function"){this.debug("Opening Web Socket...")}this.ws.onmessage=function(i){return function(r){var o,u,a,c,f,h,l,p,d,g,b,m,v;c=typeof ArrayBuffer!=="undefined"&&r.data instanceof ArrayBuffer?(o=new Uint8Array(r.data),typeof i.debug==="function"?i.debug("--- got data length: "+o.length):void 0,function(){var t,e,n;n=[];for(t=0,e=o.length;t @@ -444,7 +448,10 @@ class Client # {'ack': 'client'} # ); ack: (messageID, subscription, headers = {}) -> - headers["message-id"] = messageID + if (@version == Stomp.VERSIONS.V1_2) + headers["id"] = messageID + else + headers["message-id"] = messageID headers.subscription = subscription @_transmit "ACK", headers @@ -464,7 +471,10 @@ class Client # {'ack': 'client'} # ); nack: (messageID, subscription, headers = {}) -> - headers["message-id"] = messageID + if (@version == Stomp.VERSIONS.V1_2) + headers["id"] = messageID + else + headers["message-id"] = messageID headers.subscription = subscription @_transmit "NACK", headers @@ -477,11 +487,11 @@ Stomp = # Versions of STOMP specifications supported supportedVersions: -> - '1.1,1.0' + '1.2,1.1,1.0' # This method creates a WebSocket client that is connected to # the STOMP server located at the url. - client: (url, protocols = ['v10.stomp', 'v11.stomp']) -> + client: (url, protocols = ['v10.stomp', 'v11.stomp', 'v12.stomp']) -> # This is a hack to allow another implementation than the standard # HTML5 WebSocket class. #