diff --git a/doc/environment_variables.md b/doc/environment_variables.md index 70b32e715..1b5ad26af 100644 --- a/doc/environment_variables.md +++ b/doc/environment_variables.md @@ -42,7 +42,6 @@ can be set. - `subchannel_internals` - Traces HTTP/2 session state. Includes per-call logs. - `channel_stacktrace` - Traces channel construction events with stack traces. - `keepalive` - Traces gRPC keepalive pings - - `index` - Traces module loading - `outlier_detection` - Traces outlier detection events The following tracers are added by the `@grpc/grpc-js-xds` library: @@ -62,4 +61,4 @@ can be set. - DEBUG - log all gRPC messages - INFO - log INFO and ERROR message - ERROR - log only errors (default) - - NONE - won't log any \ No newline at end of file + - NONE - won't log any diff --git a/packages/grpc-js-xds/README.md b/packages/grpc-js-xds/README.md index 793e0c0d7..c1db440cf 100644 --- a/packages/grpc-js-xds/README.md +++ b/packages/grpc-js-xds/README.md @@ -1,6 +1,6 @@ # @grpc/grpc-js xDS plugin -This package provides support for the `xds://` URL scheme to the `@grpc/grpc-js` library. The latest version of this package is compatible with `@grpc/grpc-js` version 1.2.x. +This package provides support for the `xds://` URL scheme to the `@grpc/grpc-js` library. The latest version of this package is compatible with `@grpc/grpc-js` version 1.9.x. ## Installation @@ -29,4 +29,6 @@ const client = new MyServiceClient('xds:///example.com:123'); - [xDS Client-Side Fault Injection](https://github.com/grpc/proposal/blob/master/A33-Fault-Injection.md) - [Client Status Discovery Service](https://github.com/grpc/proposal/blob/master/A40-csds-support.md) - [Outlier Detection](https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md) - - [xDS Retry Support](https://github.com/grpc/proposal/blob/master/A44-xds-retry.md) \ No newline at end of file + - [xDS Retry Support](https://github.com/grpc/proposal/blob/master/A44-xds-retry.md) + - [xDS Aggregate and Logical DNS Clusters](https://github.com/grpc/proposal/blob/master/A37-xds-aggregate-and-logical-dns-clusters.md)' + - [xDS Federation](https://github.com/grpc/proposal/blob/master/A47-xds-federation.md) (Currently experimental, enabled by environment variable `GRPC_EXPERIMENTAL_XDS_FEDERATION`) diff --git a/packages/grpc-js-xds/package.json b/packages/grpc-js-xds/package.json index 389f2b731..c2f324b99 100644 --- a/packages/grpc-js-xds/package.json +++ b/packages/grpc-js-xds/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js-xds", - "version": "1.8.2", + "version": "1.9.2", "description": "Plugin for @grpc/grpc-js. Adds the xds:// URL scheme and associated features.", "main": "build/src/index.js", "scripts": { @@ -9,7 +9,7 @@ "clean": "gts clean", "compile": "tsc", "fix": "gts fix", - "prepare": "npm run compile", + "prepare": "npm run generate-types && npm run compile", "pretest": "npm run compile", "posttest": "npm run check", "generate-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --includeDirs deps/envoy-api/ deps/xds/ deps/googleapis/ deps/protoc-gen-validate/ -O src/generated/ --grpcLib @grpc/grpc-js envoy/service/discovery/v3/ads.proto envoy/service/load_stats/v3/lrs.proto envoy/config/listener/v3/listener.proto envoy/config/route/v3/route.proto envoy/config/cluster/v3/cluster.proto envoy/config/endpoint/v3/endpoint.proto envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.proto udpa/type/v1/typed_struct.proto xds/type/v3/typed_struct.proto envoy/extensions/filters/http/fault/v3/fault.proto envoy/service/status/v3/csds.proto envoy/extensions/load_balancing_policies/wrr_locality/v3/wrr_locality.proto envoy/extensions/load_balancing_policies/ring_hash/v3/ring_hash.proto envoy/extensions/load_balancing_policies/pick_first/v3/pick_first.proto", @@ -50,7 +50,7 @@ "xxhash-wasm": "^1.0.2" }, "peerDependencies": { - "@grpc/grpc-js": "~1.8.0" + "@grpc/grpc-js": "~1.9.0" }, "engines": { "node": ">=10.10.0" diff --git a/packages/grpc-js-xds/src/resolver-xds.ts b/packages/grpc-js-xds/src/resolver-xds.ts index 5182e1005..b02764a79b 100644 --- a/packages/grpc-js-xds/src/resolver-xds.ts +++ b/packages/grpc-js-xds/src/resolver-xds.ts @@ -677,9 +677,11 @@ class XdsResolver implements Resolver { destroy() { if (this.listenerResourceName) { ListenerResourceType.cancelWatch(this.xdsClient, this.listenerResourceName, this.ldsWatcher); + this.isLdsWatcherActive = false; } if (this.latestRouteConfigName) { RouteConfigurationResourceType.cancelWatch(this.xdsClient, this.latestRouteConfigName, this.rdsWatcher); + this.latestRouteConfigName = null; } } diff --git a/packages/grpc-js-xds/src/xds-client.ts b/packages/grpc-js-xds/src/xds-client.ts index d1aed9265..540d946c1 100644 --- a/packages/grpc-js-xds/src/xds-client.ts +++ b/packages/grpc-js-xds/src/xds-client.ts @@ -924,8 +924,8 @@ class XdsSingleServerClient { } onLrsStreamReceivedMessage() { - this.adsBackoff.stop(); - this.adsBackoff.reset(); + this.lrsBackoff.stop(); + this.lrsBackoff.reset(); } handleLrsStreamEnd() { diff --git a/packages/grpc-js-xds/test/client.ts b/packages/grpc-js-xds/test/client.ts index 6d346f918..0779702bb 100644 --- a/packages/grpc-js-xds/test/client.ts +++ b/packages/grpc-js-xds/test/client.ts @@ -15,7 +15,7 @@ * */ -import { credentials, loadPackageDefinition, ServiceError } from "@grpc/grpc-js"; +import { ChannelOptions, credentials, loadPackageDefinition, ServiceError } from "@grpc/grpc-js"; import { loadSync } from "@grpc/proto-loader"; import { ProtoGrpcType } from "./generated/echo"; import { EchoTestServiceClient } from "./generated/grpc/testing/EchoTestService"; @@ -44,14 +44,14 @@ export class XdsTestClient { private client: EchoTestServiceClient; private callInterval: NodeJS.Timer; - constructor(target: string, bootstrapInfo: string) { - this.client = new loadedProtos.grpc.testing.EchoTestService(target, credentials.createInsecure(), {[BOOTSTRAP_CONFIG_KEY]: bootstrapInfo}); + constructor(target: string, bootstrapInfo: string, options?: ChannelOptions) { + this.client = new loadedProtos.grpc.testing.EchoTestService(target, credentials.createInsecure(), {...options, [BOOTSTRAP_CONFIG_KEY]: bootstrapInfo}); this.callInterval = setInterval(() => {}, 0); clearInterval(this.callInterval); } - static createFromServer(targetName: string, xdsServer: XdsServer) { - return new XdsTestClient(`xds:///${targetName}`, xdsServer.getBootstrapInfoString()); + static createFromServer(targetName: string, xdsServer: XdsServer, options?: ChannelOptions) { + return new XdsTestClient(`xds:///${targetName}`, xdsServer.getBootstrapInfoString(), options); } startCalls(interval: number) { @@ -98,4 +98,8 @@ export class XdsTestClient { } sendInner(count, callback); } + + getConnectivityState() { + return this.client.getChannel().getConnectivityState(false); + } } diff --git a/packages/grpc-js-xds/test/test-core.ts b/packages/grpc-js-xds/test/test-core.ts index cb145eb81..f48ab6c11 100644 --- a/packages/grpc-js-xds/test/test-core.ts +++ b/packages/grpc-js-xds/test/test-core.ts @@ -22,6 +22,7 @@ import { XdsServer } from "./xds-server"; import { register } from "../src"; import assert = require("assert"); +import { connectivityState } from "@grpc/grpc-js"; register(); @@ -60,4 +61,34 @@ describe('core xDS functionality', () => { }, reason => done(reason)); }, reason => done(reason)); }); + it('should be able to enter and exit idle', function(done) { + this.timeout(5000); + const cluster = new FakeEdsCluster('cluster1', 'endpoint1', [{backends: [new Backend()], locality:{region: 'region1'}}]); + const routeGroup = new FakeRouteGroup('listener1', 'route1', [{cluster: cluster}]); + routeGroup.startAllBackends().then(() => { + xdsServer.setEdsResource(cluster.getEndpointConfig()); + xdsServer.setCdsResource(cluster.getClusterConfig()); + xdsServer.setRdsResource(routeGroup.getRouteConfiguration()); + xdsServer.setLdsResource(routeGroup.getListener()); + xdsServer.addResponseListener((typeUrl, responseState) => { + if (responseState.state === 'NACKED') { + client.stopCalls(); + assert.fail(`Client NACKED ${typeUrl} resource with message ${responseState.errorMessage}`); + } + }) + client = XdsTestClient.createFromServer('listener1', xdsServer, { + 'grpc.client_idle_timeout_ms': 1000, + }); + client.sendOneCall(error => { + assert.ifError(error); + assert.strictEqual(client.getConnectivityState(), connectivityState.READY); + setTimeout(() => { + assert.strictEqual(client.getConnectivityState(), connectivityState.IDLE); + client.sendOneCall(error => { + done(error); + }) + }, 1100); + }); + }, reason => done(reason)); + }); }); diff --git a/packages/grpc-js/README.md b/packages/grpc-js/README.md index 112b99932..eb04ece2f 100644 --- a/packages/grpc-js/README.md +++ b/packages/grpc-js/README.md @@ -65,6 +65,7 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`. - `grpc.service_config_disable_resolution` - `grpc.client_idle_timeout_ms` - `grpc-node.max_session_memory` + - `grpc-node.tls_enable_trace` - `channelOverride` - `channelFactoryOverride` diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index f9f5629d5..8d8f4fd90 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.21", + "version": "1.9.14", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", @@ -65,7 +65,7 @@ "generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto" }, "dependencies": { - "@grpc/proto-loader": "^0.7.0", + "@grpc/proto-loader": "^0.7.8", "@types/node": ">=12.12.47" }, "files": [ diff --git a/packages/grpc-js/src/backoff-timeout.ts b/packages/grpc-js/src/backoff-timeout.ts index 3ffd26064..78318d1e8 100644 --- a/packages/grpc-js/src/backoff-timeout.ts +++ b/packages/grpc-js/src/backoff-timeout.ts @@ -78,6 +78,11 @@ export class BackoffTimeout { * running is true. */ private startTime: Date = new Date(); + /** + * The approximate time that the currently running timer will end. Only valid + * if running is true. + */ + private endTime: Date = new Date(); constructor(private callback: () => void, options?: BackoffOptions) { if (options) { @@ -100,6 +105,8 @@ export class BackoffTimeout { } private runTimer(delay: number) { + this.endTime = this.startTime; + this.endTime.setMilliseconds(this.endTime.getMilliseconds() + this.nextDelay); clearTimeout(this.timerId); this.timerId = setTimeout(() => { this.callback(); @@ -178,4 +185,12 @@ export class BackoffTimeout { this.hasRef = false; this.timerId.unref?.(); } + + /** + * Get the approximate timestamp of when the timer will fire. Only valid if + * this.isRunning() is true. + */ + getEndTime() { + return this.endTime; + } } diff --git a/packages/grpc-js/src/http_proxy.ts b/packages/grpc-js/src/http_proxy.ts index 3aed28c85..3e905c488 100644 --- a/packages/grpc-js/src/http_proxy.ts +++ b/packages/grpc-js/src/http_proxy.ts @@ -30,6 +30,7 @@ import { import { ChannelOptions } from './channel-options'; import { GrpcUri, parseUri, splitHostPort, uriToString } from './uri-parser'; import { URL } from 'url'; +import { DEFAULT_PORT } from './resolver-dns'; const TRACER_NAME = 'proxy'; @@ -189,12 +190,19 @@ export function getProxiedConnection( if (parsedTarget === null) { return Promise.resolve({}); } + const splitHostPost = splitHostPort(parsedTarget.path); + if (splitHostPost === null) { + return Promise.resolve({}); + } + const hostPort = `${splitHostPost.host}:${ + splitHostPost.port ?? DEFAULT_PORT + }`; const options: http.RequestOptions = { method: 'CONNECT', - path: parsedTarget.path, + path: hostPort, }; const headers: http.OutgoingHttpHeaders = { - Host: parsedTarget.path, + Host: hostPort, }; // Connect to the subchannel address as a proxy if (isTcpSubchannelAddress(address)) { diff --git a/packages/grpc-js/src/index.ts b/packages/grpc-js/src/index.ts index 6733246ba..50671b01c 100644 --- a/packages/grpc-js/src/index.ts +++ b/packages/grpc-js/src/index.ts @@ -276,14 +276,7 @@ import * as load_balancer_outlier_detection from './load-balancer-outlier-detect import * as channelz from './channelz'; import { Deadline } from './deadline'; -const clientVersion = require('../../package.json').version; - (() => { - logging.trace( - LogVerbosity.DEBUG, - 'index', - 'Loading @grpc/grpc-js version ' + clientVersion - ); resolver_dns.setup(); resolver_uds.setup(); resolver_ip.setup(); diff --git a/packages/grpc-js/src/internal-channel.ts b/packages/grpc-js/src/internal-channel.ts index 10d865cef..be140522b 100644 --- a/packages/grpc-js/src/internal-channel.ts +++ b/packages/grpc-js/src/internal-channel.ts @@ -184,6 +184,7 @@ export class InternalChannel { private callCount = 0; private idleTimer: NodeJS.Timeout | null = null; private readonly idleTimeoutMs: number; + private lastActivityTimestamp: Date; // Channelz info private readonly channelzEnabled: boolean = true; @@ -305,7 +306,9 @@ export class InternalChannel { this.currentPicker = picker; const queueCopy = this.pickQueue.slice(); this.pickQueue = []; - this.callRefTimerUnref(); + if (queueCopy.length > 0) { + this.callRefTimerUnref(); + } for (const call of queueCopy) { call.doPick(); } @@ -358,11 +361,12 @@ export class InternalChannel { process.nextTick(() => { const localQueue = this.configSelectionQueue; this.configSelectionQueue = []; - this.callRefTimerUnref(); + if (localQueue.length > 0) { + this.callRefTimerUnref(); + } for (const call of localQueue) { call.getConfig(); } - this.configSelectionQueue = []; }); }, status => { @@ -389,7 +393,9 @@ export class InternalChannel { } const localQueue = this.configSelectionQueue; this.configSelectionQueue = []; - this.callRefTimerUnref(); + if (localQueue.length > 0) { + this.callRefTimerUnref(); + } for (const call of localQueue) { call.reportResolverError(status); } @@ -413,6 +419,7 @@ export class InternalChannel { 'Channel constructed \n' + error.stack?.substring(error.stack.indexOf('\n') + 1) ); + this.lastActivityTimestamp = new Date(); } private getChannelzInfo(): ChannelInfo { @@ -486,9 +493,7 @@ export class InternalChannel { if (this.channelzEnabled) { this.channelzTrace.addTrace( 'CT_INFO', - ConnectivityState[this.connectivityState] + - ' -> ' + - ConnectivityState[newState] + 'Connectivity state change to ' + ConnectivityState[newState] ); } this.connectivityState = newState; @@ -562,19 +567,44 @@ export class InternalChannel { this.resolvingLoadBalancer.destroy(); this.updateState(ConnectivityState.IDLE); this.currentPicker = new QueuePicker(this.resolvingLoadBalancer); + if (this.idleTimer) { + clearTimeout(this.idleTimer); + this.idleTimer = null; + } } - private maybeStartIdleTimer() { - if (this.callCount === 0) { - this.idleTimer = setTimeout(() => { + private startIdleTimeout(timeoutMs: number) { + this.idleTimer = setTimeout(() => { + if (this.callCount > 0) { + /* If there is currently a call, the channel will not go idle for a + * period of at least idleTimeoutMs, so check again after that time. + */ + this.startIdleTimeout(this.idleTimeoutMs); + return; + } + const now = new Date(); + const timeSinceLastActivity = now.valueOf() - this.lastActivityTimestamp.valueOf(); + if (timeSinceLastActivity >= this.idleTimeoutMs) { this.trace( 'Idle timer triggered after ' + this.idleTimeoutMs + 'ms of inactivity' ); this.enterIdle(); - }, this.idleTimeoutMs); - this.idleTimer.unref?.(); + } else { + /* Whenever the timer fires with the latest activity being too recent, + * set the timer again for the time when the time since the last + * activity is equal to the timeout. This should result in the timer + * firing no more than once every idleTimeoutMs/2 on average. */ + this.startIdleTimeout(this.idleTimeoutMs - timeSinceLastActivity); + } + }, timeoutMs); + this.idleTimer.unref?.(); + } + + private maybeStartIdleTimer() { + if (this.connectivityState !== ConnectivityState.SHUTDOWN && !this.idleTimer) { + this.startIdleTimeout(this.idleTimeoutMs); } } @@ -583,10 +613,6 @@ export class InternalChannel { this.callTracker.addCallStarted(); } this.callCount += 1; - if (this.idleTimer) { - clearTimeout(this.idleTimer); - this.idleTimer = null; - } } private onCallEnd(status: StatusObject) { @@ -598,6 +624,7 @@ export class InternalChannel { } } this.callCount -= 1; + this.lastActivityTimestamp = new Date(); this.maybeStartIdleTimer(); } @@ -717,6 +744,9 @@ export class InternalChannel { this.resolvingLoadBalancer.destroy(); this.updateState(ConnectivityState.SHUTDOWN); clearInterval(this.callRefTimer); + if (this.idleTimer) { + clearTimeout(this.idleTimer); + } if (this.channelzEnabled) { unregisterChannelzRef(this.channelzRef); } @@ -732,6 +762,7 @@ export class InternalChannel { const connectivityState = this.connectivityState; if (tryToConnect) { this.resolvingLoadBalancer.exitIdle(); + this.lastActivityTimestamp = new Date(); this.maybeStartIdleTimer(); } return connectivityState; diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 5cbd11cf3..c9224de6b 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -194,9 +194,11 @@ export class PickFirstLoadBalancer implements LoadBalancer { private subchannelStateListener: ConnectivityStateListener = ( subchannel, previousState, - newState + newState, + keepaliveTime, + errorMessage ) => { - this.onSubchannelStateUpdate(subchannel, previousState, newState); + this.onSubchannelStateUpdate(subchannel, previousState, newState, errorMessage); }; private pickedSubchannelHealthListener: HealthListener = () => @@ -218,6 +220,20 @@ export class PickFirstLoadBalancer implements LoadBalancer { private reportHealthStatus: boolean; + /** + * Indicates whether we called channelControlHelper.requestReresolution since + * the last call to updateAddressList + */ + private requestedResolutionSinceLastUpdate = false; + + /** + * The most recent error reported by any subchannel as it transitioned to + * TRANSIENT_FAILURE. + */ + private lastError: string | null = null; + + private latestAddressList: SubchannelAddress[] | null = null; + /** * Load balancer that attempts to connect to each backend in the address list * in order, and picks the first one that connects, using it for every @@ -259,7 +275,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { if (this.stickyTransientFailureMode) { this.updateState( ConnectivityState.TRANSIENT_FAILURE, - new UnavailablePicker() + new UnavailablePicker({details: `No connection established. Last error: ${this.lastError}`}) ); } else { this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); @@ -267,15 +283,28 @@ export class PickFirstLoadBalancer implements LoadBalancer { } } + private requestReresolution() { + this.requestedResolutionSinceLastUpdate = true; + this.channelControlHelper.requestReresolution(); + } + private maybeEnterStickyTransientFailureMode() { - if (this.stickyTransientFailureMode) { + if (!this.allChildrenHaveReportedTF()) { return; } - if (!this.allChildrenHaveReportedTF()) { + if (!this.requestedResolutionSinceLastUpdate) { + /* Each time we get an update we reset each subchannel's + * hasReportedTransientFailure flag, so the next time we get to this + * point after that, each subchannel has reported TRANSIENT_FAILURE + * at least once since then. That is the trigger for requesting + * reresolution, whether or not the LB policy is already in sticky TF + * mode. */ + this.requestReresolution(); + } + if (this.stickyTransientFailureMode) { return; } this.stickyTransientFailureMode = true; - this.channelControlHelper.requestReresolution(); for (const { subchannel } of this.children) { subchannel.startConnecting(); } @@ -305,13 +334,14 @@ export class PickFirstLoadBalancer implements LoadBalancer { private onSubchannelStateUpdate( subchannel: SubchannelInterface, previousState: ConnectivityState, - newState: ConnectivityState + newState: ConnectivityState, + errorMessage?: string ) { if (this.currentPick?.realSubchannelEquals(subchannel)) { if (newState !== ConnectivityState.READY) { this.removeCurrentPick(); this.calculateAndReportNewState(); - this.channelControlHelper.requestReresolution(); + this.requestReresolution(); } return; } @@ -322,6 +352,9 @@ export class PickFirstLoadBalancer implements LoadBalancer { } if (newState === ConnectivityState.TRANSIENT_FAILURE) { child.hasReportedTransientFailure = true; + if (errorMessage) { + this.lastError = errorMessage; + } this.maybeEnterStickyTransientFailureMode(); if (index === this.currentSubchannelIndex) { this.startNextSubchannelConnecting(index + 1); @@ -335,7 +368,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { private startNextSubchannelConnecting(startIndex: number) { clearTimeout(this.connectionDelayTimeout); - if (this.triedAllSubchannels || this.stickyTransientFailureMode) { + if (this.triedAllSubchannels) { return; } for (const [index, child] of this.children.entries()) { @@ -408,7 +441,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { private resetSubchannelList() { for (const child of this.children) { - if (child.subchannel !== this.currentPick) { + if (!(this.currentPick && child.subchannel.realSubchannelEquals(this.currentPick))) { /* The connectivity state listener is the same whether the subchannel * is in the list of children or it is the currentPick, so if it is in * both, removing it here would cause problems. In particular, that @@ -429,28 +462,10 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.currentSubchannelIndex = 0; this.children = []; this.triedAllSubchannels = false; + this.requestedResolutionSinceLastUpdate = false; } - updateAddressList( - endpointList: Endpoint[], - lbConfig: TypedLoadBalancingConfig - ): void { - if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) { - return; - } - /* Previously, an update would be discarded if it was identical to the - * previous update, to minimize churn. Now the DNS resolver is - * rate-limited, so that is less of a concern. */ - if (lbConfig.getShuffleAddressList()) { - endpointList = shuffled(endpointList); - } - const rawAddressList = ([] as SubchannelAddress[]).concat( - ...endpointList.map(endpoint => endpoint.addresses) - ); - if (rawAddressList.length === 0) { - throw new Error('No addresses in endpoint list passed to pick_first'); - } - const addressList = interleaveAddressFamilies(rawAddressList); + private connectToAddressList(addressList: SubchannelAddress[]) { const newChildrenList = addressList.map(address => ({ subchannel: this.channelControlHelper.createSubchannel(address, {}), hasReportedTransientFailure: false, @@ -483,10 +498,34 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.calculateAndReportNewState(); } + updateAddressList( + endpointList: Endpoint[], + lbConfig: TypedLoadBalancingConfig + ): void { + if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) { + return; + } + /* Previously, an update would be discarded if it was identical to the + * previous update, to minimize churn. Now the DNS resolver is + * rate-limited, so that is less of a concern. */ + if (lbConfig.getShuffleAddressList()) { + endpointList = shuffled(endpointList); + } + const rawAddressList = ([] as SubchannelAddress[]).concat( + ...endpointList.map(endpoint => endpoint.addresses) + ); + if (rawAddressList.length === 0) { + throw new Error('No addresses in endpoint list passed to pick_first'); + } + const addressList = interleaveAddressFamilies(rawAddressList); + this.latestAddressList = addressList; + this.connectToAddressList(addressList); + } + exitIdle() { - /* The pick_first LB policy is only in the IDLE state if it has no - * addresses to try to connect to and it has no picked subchannel. - * In that case, there is no meaningful action that can be taken here. */ + if (this.currentState === ConnectivityState.IDLE && this.latestAddressList) { + this.connectToAddressList(this.latestAddressList); + } } resetBackoff() { diff --git a/packages/grpc-js/src/load-balancer-round-robin.ts b/packages/grpc-js/src/load-balancer-round-robin.ts index 9f093596a..5ed26c9a5 100644 --- a/packages/grpc-js/src/load-balancer-round-robin.ts +++ b/packages/grpc-js/src/load-balancer-round-robin.ts @@ -100,6 +100,8 @@ export class RoundRobinLoadBalancer implements LoadBalancer { private childChannelControlHelper: ChannelControlHelper; + private lastError: string | null = null; + constructor( private readonly channelControlHelper: ChannelControlHelper, private readonly options: ChannelOptions @@ -154,7 +156,7 @@ export class RoundRobinLoadBalancer implements LoadBalancer { ) { this.updateState( ConnectivityState.TRANSIENT_FAILURE, - new UnavailablePicker() + new UnavailablePicker({details: `No connection established. Last error: ${this.lastError}`}) ); } else { this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); diff --git a/packages/grpc-js/src/load-balancing-call.ts b/packages/grpc-js/src/load-balancing-call.ts index 69dc518f8..87ef02497 100644 --- a/packages/grpc-js/src/load-balancing-call.ts +++ b/packages/grpc-js/src/load-balancing-call.ts @@ -141,6 +141,13 @@ export class LoadBalancingCall implements Call { .generateMetadata({ service_url: this.serviceUrl }) .then( credsMetadata => { + /* If this call was cancelled (e.g. by the deadline) before + * metadata generation finished, we shouldn't do anything with + * it. */ + if (this.ended) { + this.trace('Credentials metadata generation finished after call ended'); + return; + } finalMetadata.merge(credsMetadata); if (finalMetadata.get('authorization').length > 1) { this.outputStatus( diff --git a/packages/grpc-js/src/logging.ts b/packages/grpc-js/src/logging.ts index 83438ef73..e1b396fff 100644 --- a/packages/grpc-js/src/logging.ts +++ b/packages/grpc-js/src/logging.ts @@ -16,6 +16,9 @@ */ import { LogVerbosity } from './constants'; +import { pid } from 'process'; + +const clientVersion = require('../../package.json').version; const DEFAULT_LOGGER: Partial = { error: (message?: any, ...optionalParams: any[]) => { @@ -109,7 +112,7 @@ export function trace( text: string ): void { if (isTracerEnabled(tracer)) { - log(severity, new Date().toISOString() + ' | ' + tracer + ' | ' + text); + log(severity, new Date().toISOString() + ' | v' + clientVersion + ' ' + pid + ' | ' + tracer + ' | ' + text); } } diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 0956b460c..978f1442a 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -43,7 +43,7 @@ function trace(text: string): void { /** * The default TCP port to connect to if not explicitly specified in the target. */ -const DEFAULT_PORT = 443; +export const DEFAULT_PORT = 443; const DEFAULT_MIN_TIME_BETWEEN_RESOLUTIONS_MS = 30_000; @@ -75,6 +75,7 @@ class DnsResolver implements Resolver { private nextResolutionTimer: NodeJS.Timeout; private isNextResolutionTimerRunning = false; private isServiceConfigEnabled = true; + private returnedIpResult = false; constructor( private target: GrpcUri, private listener: ResolverListener, @@ -143,18 +144,22 @@ class DnsResolver implements Resolver { */ private startResolution() { if (this.ipResult !== null) { - trace('Returning IP address for target ' + uriToString(this.target)); - setImmediate(() => { - this.listener.onSuccessfulResolution( - this.ipResult!, - null, - null, - null, - {} - ); - }); + if (!this.returnedIpResult) { + trace('Returning IP address for target ' + uriToString(this.target)); + setImmediate(() => { + this.listener.onSuccessfulResolution( + this.ipResult!, + null, + null, + null, + {} + ); + }); + this.returnedIpResult = true; + } this.backoff.stop(); this.backoff.reset(); + this.stopNextResolutionTimer(); return; } if (this.dnsHostname === null) { @@ -316,9 +321,9 @@ class DnsResolver implements Resolver { private startResolutionWithBackoff() { if (this.pendingLookupPromise === null) { this.continueResolving = false; - this.startResolution(); this.backoff.runOnce(); this.startNextResolutionTimer(); + this.startResolution(); } } @@ -329,6 +334,11 @@ class DnsResolver implements Resolver { * fires. Otherwise, start resolving immediately. */ if (this.pendingLookupPromise === null) { if (this.isNextResolutionTimerRunning || this.backoff.isRunning()) { + if (this.isNextResolutionTimerRunning) { + trace('resolution update delayed by "min time between resolutions" rate limit'); + } else { + trace('resolution update delayed by backoff timer until ' + this.backoff.getEndTime().toISOString()); + } this.continueResolving = true; } else { this.startResolutionWithBackoff(); @@ -351,6 +361,7 @@ class DnsResolver implements Resolver { this.latestLookupResult = null; this.latestServiceConfig = null; this.latestServiceConfigError = null; + this.returnedIpResult = false; } /** diff --git a/packages/grpc-js/src/resolver-ip.ts b/packages/grpc-js/src/resolver-ip.ts index cda35d3b9..8fed35bd1 100644 --- a/packages/grpc-js/src/resolver-ip.ts +++ b/packages/grpc-js/src/resolver-ip.ts @@ -41,6 +41,7 @@ const DEFAULT_PORT = 443; class IpResolver implements Resolver { private endpoints: Endpoint[] = []; private error: StatusObject | null = null; + private hasReturnedResult = false; constructor( target: GrpcUri, private listener: ResolverListener, @@ -87,22 +88,25 @@ class IpResolver implements Resolver { trace('Parsed ' + target.scheme + ' address list ' + addresses); } updateResolution(): void { - process.nextTick(() => { - if (this.error) { - this.listener.onError(this.error); - } else { - this.listener.onSuccessfulResolution( - this.endpoints, - null, - null, - null, - {} - ); - } - }); + if (!this.hasReturnedResult) { + this.hasReturnedResult = true; + process.nextTick(() => { + if (this.error) { + this.listener.onError(this.error); + } else { + this.listener.onSuccessfulResolution( + this.endpoints, + null, + null, + null, + {} + ); + } + }); + } } destroy(): void { - // This resolver owns no resources, so we do nothing here. + this.hasReturnedResult = false; } static getDefaultAuthority(target: GrpcUri): string { diff --git a/packages/grpc-js/src/resolver-uds.ts b/packages/grpc-js/src/resolver-uds.ts index 4fa1944b1..3a42b18c4 100644 --- a/packages/grpc-js/src/resolver-uds.ts +++ b/packages/grpc-js/src/resolver-uds.ts @@ -20,6 +20,7 @@ import { GrpcUri } from './uri-parser'; import { ChannelOptions } from './channel-options'; class UdsResolver implements Resolver { + private hasReturnedResult = false; private endpoints: Endpoint[] = []; constructor( target: GrpcUri, @@ -35,14 +36,17 @@ class UdsResolver implements Resolver { this.endpoints = [{ addresses: [{ path }] }]; } updateResolution(): void { - process.nextTick( - this.listener.onSuccessfulResolution, - this.endpoints, - null, - null, - null, - {} - ); + if (!this.hasReturnedResult) { + this.hasReturnedResult = true; + process.nextTick( + this.listener.onSuccessfulResolution, + this.endpoints, + null, + null, + null, + {} + ); + } } destroy() { diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 3f52093a1..a8de2019a 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -21,7 +21,11 @@ import { TypedLoadBalancingConfig, selectLbConfigFromList, } from './load-balancer'; -import { ServiceConfig, validateServiceConfig } from './service-config'; +import { + MethodConfig, + ServiceConfig, + validateServiceConfig, +} from './service-config'; import { ConnectivityState } from './connectivity-state'; import { ConfigSelector, createResolver, Resolver } from './resolver'; import { ServiceError } from './call'; @@ -43,6 +47,59 @@ function trace(text: string): void { logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); } +type NameMatchLevel = 'EMPTY' | 'SERVICE' | 'SERVICE_AND_METHOD'; + +/** + * Name match levels in order from most to least specific. This is the order in + * which searches will be performed. + */ +const NAME_MATCH_LEVEL_ORDER: NameMatchLevel[] = [ + 'SERVICE_AND_METHOD', + 'SERVICE', + 'EMPTY', +]; + +function hasMatchingName( + service: string, + method: string, + methodConfig: MethodConfig, + matchLevel: NameMatchLevel +): boolean { + for (const name of methodConfig.name) { + switch (matchLevel) { + case 'EMPTY': + if (!name.service && !name.method) { + return true; + } + break; + case 'SERVICE': + if (name.service === service && !name.method) { + return true; + } + break; + case 'SERVICE_AND_METHOD': + if (name.service === service && name.method === method) { + return true; + } + } + } + return false; +} + +function findMatchingConfig( + service: string, + method: string, + methodConfigs: MethodConfig[], + matchLevel: NameMatchLevel +): MethodConfig | null { + for (const config of methodConfigs) { + if (hasMatchingName(service, method, config, matchLevel)) { + return config; + } + } + return null; +} + function getDefaultConfigSelector( serviceConfig: ServiceConfig | null ): ConfigSelector { @@ -54,19 +111,26 @@ function getDefaultConfigSelector( const service = splitName[0] ?? ''; const method = splitName[1] ?? ''; if (serviceConfig && serviceConfig.methodConfig) { - for (const methodConfig of serviceConfig.methodConfig) { - for (const name of methodConfig.name) { - if ( - name.service === service && - (name.method === undefined || name.method === method) - ) { - return { - methodConfig: methodConfig, - pickInformation: {}, - status: Status.OK, - dynamicFilterFactories: [], - }; - } + /* Check for the following in order, and return the first method + * config that matches: + * 1. A name that exactly matches the service and method + * 2. A name with no method set that matches the service + * 3. An empty name + */ + for (const matchLevel of NAME_MATCH_LEVEL_ORDER) { + const matchingConfig = findMatchingConfig( + service, + method, + serviceConfig.methodConfig, + matchLevel + ); + if (matchingConfig) { + return { + methodConfig: matchingConfig, + pickInformation: {}, + status: Status.OK, + dynamicFilterFactories: [], + }; } } } @@ -159,7 +223,8 @@ export class ResolvingLoadBalancer implements LoadBalancer { * In that case, the backoff timer callback will call * updateResolution */ if (this.backoffTimeout.isRunning()) { - this.continueResolving = true; + trace('requestReresolution delayed by backoff timer until ' + this.backoffTimeout.getEndTime().toISOString()); + this.continueResolving = true; } else { this.updateResolution(); } @@ -186,6 +251,8 @@ export class ResolvingLoadBalancer implements LoadBalancer { configSelector: ConfigSelector | null, attributes: { [key: string]: unknown } ) => { + this.backoffTimeout.stop(); + this.backoffTimeout.reset(); let workingServiceConfig: ServiceConfig | null = null; /* This first group of conditionals implements the algorithm described * in https://github.com/grpc/proposal/blob/master/A21-service-config-error-handling.md diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 95f928350..107c2e3ef 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -558,7 +558,7 @@ export class Http2ServerCallStream< return metadata; } - receiveUnaryMessage(encoding: string): Promise { + receiveUnaryMessage(encoding: string): Promise { return new Promise((resolve, reject) => { const { stream } = this; diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 9bd71fd42..adc7f299f 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -233,6 +233,7 @@ export class Server { * it is called twice, as it did previously. */ private started = false; + private shutdown = false; private options: ChannelOptions; private serverAddressString = 'null'; @@ -699,6 +700,9 @@ export class Server { creds: ServerCredentials, callback: (error: Error | null, port: number) => void ): void { + if (this.shutdown) { + throw new Error('bindAsync called after shutdown'); + } if (typeof port !== 'string') { throw new TypeError('port must be a string'); } @@ -920,6 +924,8 @@ export class Server { if (this.channelzEnabled) { unregisterChannelzRef(this.channelzRef); } + + this.shutdown = true; } register( @@ -983,6 +989,7 @@ export class Server { wrappedCallback(); } } + this.shutdown = true; for (const server of this.http2Servers.keys()) { pendingChecks++; diff --git a/packages/grpc-js/src/service-config.ts b/packages/grpc-js/src/service-config.ts index 168f28c78..5c2ca0d06 100644 --- a/packages/grpc-js/src/service-config.ts +++ b/packages/grpc-js/src/service-config.ts @@ -31,7 +31,7 @@ import { Status } from './constants'; import { Duration } from './duration'; export interface MethodConfigName { - service: string; + service?: string; method?: string; } @@ -95,20 +95,36 @@ const DURATION_REGEX = /^\d+(\.\d{1,9})?s$/; const CLIENT_LANGUAGE_STRING = 'node'; function validateName(obj: any): MethodConfigName { - if (!('service' in obj) || typeof obj.service !== 'string') { - throw new Error('Invalid method config name: invalid service'); - } - const result: MethodConfigName = { - service: obj.service, - }; - if ('method' in obj) { - if (typeof obj.method === 'string') { - result.method = obj.method; + // In this context, and unset field and '' are considered the same + if ('service' in obj && obj.service !== '') { + if (typeof obj.service !== 'string') { + throw new Error( + `Invalid method config name: invalid service: expected type string, got ${typeof obj.service}` + ); + } + if ('method' in obj && obj.method !== '') { + if (typeof obj.method !== 'string') { + throw new Error( + `Invalid method config name: invalid method: expected type string, got ${typeof obj.service}` + ); + } + return { + service: obj.service, + method: obj.method, + }; } else { - throw new Error('Invalid method config name: invalid method'); + return { + service: obj.service, + }; } + } else { + if ('method' in obj && obj.method !== undefined) { + throw new Error( + `Invalid method config name: method set with empty or unset service` + ); + } + return {}; } - return result; } function validateRetryPolicy(obj: any): RetryPolicy { diff --git a/packages/grpc-js/src/subchannel-call.ts b/packages/grpc-js/src/subchannel-call.ts index e06ece388..3b9b6152f 100644 --- a/packages/grpc-js/src/subchannel-call.ts +++ b/packages/grpc-js/src/subchannel-call.ts @@ -501,16 +501,22 @@ export class Http2SubchannelCall implements SubchannelCall { sendMessageWithContext(context: MessageContext, message: Buffer) { this.trace('write() called with message of length ' + message.length); const cb: WriteCallback = (error?: Error | null) => { - let code: Status = Status.UNAVAILABLE; - if ( - (error as NodeJS.ErrnoException)?.code === 'ERR_STREAM_WRITE_AFTER_END' - ) { - code = Status.INTERNAL; - } - if (error) { - this.cancelWithStatus(code, `Write error: ${error.message}`); - } - context.callback?.(); + /* nextTick here ensures that no stream action can be taken in the call + * stack of the write callback, in order to hopefully work around + * https://github.com/nodejs/node/issues/49147 */ + process.nextTick(() => { + let code: Status = Status.UNAVAILABLE; + if ( + (error as NodeJS.ErrnoException)?.code === + 'ERR_STREAM_WRITE_AFTER_END' + ) { + code = Status.INTERNAL; + } + if (error) { + this.cancelWithStatus(code, `Write error: ${error.message}`); + } + context.callback?.(); + }); }; this.trace('sending data chunk of length ' + message.length); this.callEventTracker.addMessageSent(); diff --git a/packages/grpc-js/src/subchannel-interface.ts b/packages/grpc-js/src/subchannel-interface.ts index c7fdca4fd..c26669ba3 100644 --- a/packages/grpc-js/src/subchannel-interface.ts +++ b/packages/grpc-js/src/subchannel-interface.ts @@ -23,7 +23,8 @@ export type ConnectivityStateListener = ( subchannel: SubchannelInterface, previousState: ConnectivityState, newState: ConnectivityState, - keepaliveTime: number + keepaliveTime: number, + errorMessage?: string ) => void; export type HealthListener = (healthy: boolean) => void; diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index d9a2dbd80..63e254cf3 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -245,12 +245,17 @@ export class Subchannel { ); } }); + } else { + /* If we can't transition from CONNECTING to READY here, we will + * not be using this transport, so release its resources. */ + transport.shutdown(); } }, error => { this.transitionToState( [ConnectivityState.CONNECTING], - ConnectivityState.TRANSIENT_FAILURE + ConnectivityState.TRANSIENT_FAILURE, + `${error}` ); } ); @@ -265,7 +270,8 @@ export class Subchannel { */ private transitionToState( oldStates: ConnectivityState[], - newState: ConnectivityState + newState: ConnectivityState, + errorMessage?: string ): boolean { if (oldStates.indexOf(this.connectivityState) === -1) { return false; @@ -278,9 +284,7 @@ export class Subchannel { if (this.channelzEnabled) { this.channelzTrace.addTrace( 'CT_INFO', - ConnectivityState[this.connectivityState] + - ' -> ' + - ConnectivityState[newState] + 'Connectivity state change to ' + ConnectivityState[newState] ); } const previousState = this.connectivityState; @@ -320,7 +324,7 @@ export class Subchannel { throw new Error(`Invalid state: unknown ConnectivityState ${newState}`); } for (const listener of this.stateListeners) { - listener(this, previousState, newState, this.keepaliveTime); + listener(this, previousState, newState, this.keepaliveTime, errorMessage); } return true; } diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index 18d83cbfe..39ca69383 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -194,17 +194,18 @@ class Http2Transport implements Transport { }); session.once( 'goaway', - (errorCode: number, lastStreamID: number, opaqueData: Buffer) => { + (errorCode: number, lastStreamID: number, opaqueData?: Buffer) => { let tooManyPings = false; /* See the last paragraph of * https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md#basic-keepalive */ if ( errorCode === http2.constants.NGHTTP2_ENHANCE_YOUR_CALM && + opaqueData && opaqueData.equals(tooManyPingsData) ) { tooManyPings = true; } - this.trace('connection closed by GOAWAY with code ' + errorCode); + this.trace('connection closed by GOAWAY with code ' + errorCode + ' and data ' + opaqueData?.toString()); this.reportDisconnectToOwner(tooManyPings); } ); @@ -426,6 +427,10 @@ class Http2Transport implements Transport { try { this.session!.ping( (err: Error | null, duration: number, payload: Buffer) => { + if (err) { + this.keepaliveTrace('Ping failed with error ' + err.message); + this.handleDisconnect(); + } this.keepaliveTrace('Received ping response'); this.clearKeepaliveTimeout(); this.maybeStartKeepalivePingTimer(); @@ -737,6 +742,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { connectionOptions ); this.session = session; + let errorMessage = 'Failed to connect'; session.unref(); session.once('connect', () => { session.removeAllListeners(); @@ -745,10 +751,14 @@ export class Http2SubchannelConnector implements SubchannelConnector { }); session.once('close', () => { this.session = null; - reject(); + // Leave time for error event to happen before rejecting + setImmediate(() => { + reject(`${errorMessage} (${new Date().toISOString()})`); + }); }); session.once('error', error => { - this.trace('connection failed with error ' + (error as Error).message); + errorMessage = (error as Error).message; + this.trace('connection failed with error ' + errorMessage); }); }); } diff --git a/packages/grpc-js/test/test-pick-first.ts b/packages/grpc-js/test/test-pick-first.ts index 18dbf9d2d..df7a3c741 100644 --- a/packages/grpc-js/test/test-pick-first.ts +++ b/packages/grpc-js/test/test-pick-first.ts @@ -38,7 +38,11 @@ function updateStateCallBackForExpectedStateSequence( ) { const actualStateSequence: ConnectivityState[] = []; let lastPicker: Picker | null = null; + let finished = false; return (connectivityState: ConnectivityState, picker: Picker) => { + if (finished) { + return; + } // Ignore duplicate state transitions if ( connectivityState === actualStateSequence[actualStateSequence.length - 1] @@ -57,6 +61,7 @@ function updateStateCallBackForExpectedStateSequence( if ( expectedStateSequence[actualStateSequence.length] !== connectivityState ) { + finished = true; done( new Error( `Unexpected state ${ @@ -66,10 +71,12 @@ function updateStateCallBackForExpectedStateSequence( )}]` ) ); + return; } actualStateSequence.push(connectivityState); lastPicker = picker; if (actualStateSequence.length === expectedStateSequence.length) { + finished = true; done(); } }; @@ -537,6 +544,115 @@ describe('pick_first load balancing policy', () => { }); }); }); + it('Should request reresolution every time each child reports TF', done => { + let reresolutionRequestCount = 0; + const targetReresolutionRequestCount = 3; + const currentStartState = ConnectivityState.IDLE; + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + currentStartState + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.CONNECTING, ConnectivityState.TRANSIENT_FAILURE], + err => setImmediate(() => { + assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount); + done(err); + }) + ), + requestReresolution: () => { + reresolutionRequestCount += 1; + } + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); + process.nextTick(() => { + pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config); + process.nextTick(() => { + subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE); + process.nextTick(() => { + pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 3 }]}], config); + process.nextTick(() => { + subchannels[2].transitionToState(ConnectivityState.TRANSIENT_FAILURE); + }); + }); + }); + }); + }); + }); + it('Should request reresolution if the new subchannels are already in TF', done => { + let reresolutionRequestCount = 0; + const targetReresolutionRequestCount = 3; + const currentStartState = ConnectivityState.TRANSIENT_FAILURE; + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + currentStartState + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.TRANSIENT_FAILURE], + err => setImmediate(() => { + assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount); + done(err); + }) + ), + requestReresolution: () => { + reresolutionRequestCount += 1; + } + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config); + process.nextTick(() => { + pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config); + process.nextTick(() => { + pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 2 }]}], config); + }); + }); + }); + it('Should reconnect to the same address list if exitIdle is called', done => { + const currentStartState = ConnectivityState.READY; + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + currentStartState + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.READY, ConnectivityState.IDLE, ConnectivityState.READY], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper, {}); + pickFirst.updateAddressList([{ addresses: [{ host: 'localhost', port: 1 }]}], config); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.IDLE); + process.nextTick(() => { + pickFirst.exitIdle(); + }); + }); + }); describe('Address list randomization', () => { const shuffleConfig = new PickFirstLoadBalancingConfig(true); it('Should pick different subchannels after multiple updates', done => { diff --git a/packages/grpc-js/test/test-server.ts b/packages/grpc-js/test/test-server.ts index c497b8e27..48b305ef4 100644 --- a/packages/grpc-js/test/test-server.ts +++ b/packages/grpc-js/test/test-server.ts @@ -211,7 +211,7 @@ describe('Server', () => { client!.makeUnaryRequest('/math.Math/Div', x => x, x => x, Buffer.from('abc'), {deadline: deadline}, (callError2, result) => { assert(callError2); // DEADLINE_EXCEEDED means that the server is unreachable - assert.strictEqual(callError2.code, grpc.status.DEADLINE_EXCEEDED); + assert(callError2.code === grpc.status.DEADLINE_EXCEEDED || callError2.code === grpc.status.UNAVAILABLE); done(); }); }); @@ -228,7 +228,7 @@ describe('Server', () => { }); }); - describe.only('drain', () => { + describe('drain', () => { let client: ServiceClient; let portNumber: number; const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');