diff --git a/app/connection-manager.js b/app/connection-manager.js index 44470f7..55a3dff 100644 --- a/app/connection-manager.js +++ b/app/connection-manager.js @@ -64,7 +64,8 @@ export class ConnectionManager { this.connectionInfo.bucketName, this.connectionInfo.bucketPassword); - return new IndexManager(this.connectionInfo.bucketName, this.bucket); + return new IndexManager(this.connectionInfo.bucketName, this.bucket, + this.cluster); } /** diff --git a/app/index-manager.js b/app/index-manager.js index e844693..87960a4 100644 --- a/app/index-manager.js +++ b/app/index-manager.js @@ -1,5 +1,4 @@ import {N1qlQuery} from 'couchbase'; -import {extend} from 'lodash'; const WAIT_TICK_INTERVAL = 10000; // in milliseconds @@ -42,18 +41,41 @@ function _respRead(callback) { }; } -/** Extension methods injected into BucketManager */ -const extensions = { - cbim_getIndexStatus: function(callback) { - this._mgmtRequest('indexStatus', 'GET', (err, httpReq) => { - if (err) { - return callback(err, null); - } +/** + * Manages Couchbase indexes + * + * @property {!string} bucketName + * @property {!boolean} is4XCluster + */ +export class IndexManager { + /** + * @param {string} bucketName + * @param {CouchbaseBucket} bucket + * @param {Cluster} cluster + */ + constructor(bucketName, bucket, cluster) { + this.bucketName = bucketName; + this.bucket = bucket; + this.manager = bucket.manager(); + this.clusterManager = cluster.manager(); + } + + /** + * @private + * Gets index statuses for the bucket via the cluster manager + * + * @return {Promise.array} + */ + getIndexStatuses() { + return new Promise((resolve, reject) => { + let httpReq = this.clusterManager._mgmtRequest( + 'indexStatus', 'GET'); - httpReq.on('error', callback); + httpReq.on('error', reject); httpReq.on('response', _respRead((err, resp, data) => { if (err) { - return callback(err); + reject(err); + return; } if (resp.statusCode !== 200) { @@ -65,45 +87,24 @@ const extensions = { } if (!errData) { - callback(new Error( - 'operation failed (' + resp.statusCode +')'), null); + reject(new Error( + 'operation failed (' + resp.statusCode +')')); return; } - callback(new Error(errData.reason), null); + reject(new Error(errData.reason)); return; } let indexStatusData = JSON.parse(data); let indexStatuses = indexStatusData.indexes.filter((index) => { - return index.bucket === this._bucket._name; + return index.bucket === this.bucketName; }); - callback(null, indexStatuses); + resolve(indexStatuses); })); httpReq.end(); }); - }, -}; - -/** - * Manages Couchbase indexes - * - * @property {!string} bucketName - * @property {!boolean} is4XCluster - */ -export class IndexManager { - /** - * @param {string} bucketName - * @param {CouchbaseBucket} bucket - * @param {boolean} is4XCluster - */ - constructor(bucketName, bucket) { - this.bucketName = bucketName; - this.bucket = bucket; - this.manager = bucket.manager(); - - extend(this.manager, extensions); } /** @@ -121,15 +122,7 @@ export class IndexManager { }); // Get additional info from the index status API - let statuses = await new Promise((resolve, reject) => { - this.manager.cbim_getIndexStatus((err, statuses) => { - if (err) { - reject(err); - } else { - resolve(statuses); - } - }); - }); + let statuses = await this.getIndexStatuses(); // Apply hosts from index status API to index information statuses.forEach((status) => { @@ -273,53 +266,50 @@ export class IndexManager { async getClusterVersion() { // Get additional info from the index status API let clusterCompatibility = await new Promise((resolve, reject) => { - this.manager._mgmtRequest('pools/default', 'GET', - (err, httpReq) => { + let httpReq = this.clusterManager._mgmtRequest( + 'pools/default', 'GET'); + + httpReq.on('error', reject); + httpReq.on('response', _respRead((err, resp, data) => { if (err) { - return reject(err); + reject(err); + return; } - httpReq.on('error', reject); - httpReq.on('response', _respRead((err, resp, data) => { - if (err) { - return reject(err); + if (resp.statusCode !== 200) { + let errData = null; + try { + errData = JSON.parse(data); + } catch (e) { + // ignore } - if (resp.statusCode !== 200) { - let errData = null; - try { - errData = JSON.parse(data); - } catch (e) { - // ignore - } + if (!errData) { + reject(new Error( + 'operation failed (' + resp.statusCode +')'), + null); + return; + } - if (!errData) { - reject(new Error( - 'operation failed (' + resp.statusCode +')'), - null); - return; + reject(new Error(errData.reason)); + return; + } + + let poolData = JSON.parse(data); + let minCompatibility = poolData.nodes.reduce( + (accum, value) => { + if (value.clusterCompatibility < accum) { + accum = value.clusterCompatibility; } - reject(new Error(errData.reason)); - return; - } + return accum; + }, 65535 * 65536); - let poolData = JSON.parse(data); - let minCompatibility = poolData.nodes.reduce( - (accum, value) => { - if (value.clusterCompatibility < accum) { - accum = value.clusterCompatibility; - } - - return accum; - }, 65535 * 65536); - - resolve(minCompatibility < 65535 * 65536 ? - minCompatibility : - 0); - })); - httpReq.end(); - }); + resolve(minCompatibility < 65535 * 65536 ? + minCompatibility : + 0); + })); + httpReq.end(); }); return { diff --git a/app/sync.js b/app/sync.js index 721f6f2..b65144e 100644 --- a/app/sync.js +++ b/app/sync.js @@ -94,8 +94,6 @@ export class Sync { clusterVersion: await this.manager.getClusterVersion(), }; - console.log(mutationContext.clusterVersion); - if (mutationContext.clusterVersion.major < 5) { // Force all definitions to use manual replica management definitions.forEach((def) => {