From 564c4a6ff96225a1d1849dcdfea11bf9fb3b7e82 Mon Sep 17 00:00:00 2001 From: Joe Child Date: Mon, 2 Oct 2023 14:52:12 +0100 Subject: [PATCH 1/5] Add lazy connection option --- nameko_grpc/client.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/nameko_grpc/client.py b/nameko_grpc/client.py index f28bb75..0868fc3 100644 --- a/nameko_grpc/client.py +++ b/nameko_grpc/client.py @@ -121,12 +121,15 @@ def __init__( compression_algorithm="none", compression_level="high", ssl=False, + lazy=False, ): self.target = target self.stub = stub self.compression_algorithm = compression_algorithm self.compression_level = compression_level # NOTE not used self.ssl = SslConfig(ssl) + self.lazy = lazy + self._channel = None def spawn_thread(self, target, args=(), kwargs=None, name=None): raise NotImplementedError @@ -138,11 +141,22 @@ def default_compression(self): return "identity" def start(self): - self.channel = ClientChannel(self.target, self.ssl, self.spawn_thread) - self.channel.start() + if not self.lazy: + self._channel = self._get_channel() + + def _get_channel(self): + channel = ClientChannel(self.target, self.ssl, self.spawn_thread) + channel.start() + return channel def stop(self): - self.channel.stop() + self._channel.stop() + + @property + def channel(self): + if self._channel is None: + self._channel = self._get_channel() + return self._channel def timeout(self, send_stream, response_stream, deadline): start = time.time() From 23962e5b8e54b12db8475298f1dc0e90f98e77cd Mon Sep 17 00:00:00 2001 From: Joe Child Date: Mon, 2 Oct 2023 15:56:04 +0100 Subject: [PATCH 2/5] Add lock --- .pre-commit-config.yaml | 8 +++++--- nameko_grpc/client.py | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 060aa06..bff082a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,5 @@ repos: -- repo: git://github.com/pre-commit/pre-commit-hooks +- repo: https://github.com/pre-commit/pre-commit-hooks rev: v1.3.0 hooks: - id: trailing-whitespace @@ -7,11 +7,13 @@ repos: - id: check-merge-conflict - id: fix-encoding-pragma - id: flake8 -- repo: https://github.com/mattbennett/mirrors-isort.git - rev: master +- repo: https://github.com/PyCQA/isort + rev: 4.3.21 hooks: - id: isort - repo: https://github.com/ambv/black rev: 19.10b0 hooks: - id: black + # TODO remove this when black is updated + additional_dependencies: [ 'click==8.0.4' ] diff --git a/nameko_grpc/client.py b/nameko_grpc/client.py index 0868fc3..0f6d53a 100644 --- a/nameko_grpc/client.py +++ b/nameko_grpc/client.py @@ -129,6 +129,7 @@ def __init__( self.compression_level = compression_level # NOTE not used self.ssl = SslConfig(ssl) self.lazy = lazy + self._channel_creation_lock = threading.Lock() self._channel = None def spawn_thread(self, target, args=(), kwargs=None, name=None): @@ -145,9 +146,10 @@ def start(self): self._channel = self._get_channel() def _get_channel(self): - channel = ClientChannel(self.target, self.ssl, self.spawn_thread) - channel.start() - return channel + with self._channel_creation_lock: + channel = ClientChannel(self.target, self.ssl, self.spawn_thread) + channel.start() + return channel def stop(self): self._channel.stop() From 21ebede21550898c635bd5d4c173e6f19d45f996 Mon Sep 17 00:00:00 2001 From: Joe Child Date: Mon, 2 Oct 2023 16:44:32 +0100 Subject: [PATCH 3/5] add test --- nameko_grpc/client.py | 4 +++- test/conftest.py | 5 ++++- test/test_basic.py | 27 +++++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/nameko_grpc/client.py b/nameko_grpc/client.py index 0f6d53a..224943d 100644 --- a/nameko_grpc/client.py +++ b/nameko_grpc/client.py @@ -152,7 +152,9 @@ def _get_channel(self): return channel def stop(self): - self._channel.stop() + if self._channel is not None: + self._channel.stop() + self._channel = None @property def channel(self): diff --git a/test/conftest.py b/test/conftest.py index 3fa9aac..7701b01 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -352,6 +352,8 @@ def make( proto_name=None, compression_algorithm="none", compression_level="high", + lazy=False, + service_url="//localhost:{}".format(grpc_port), ): if proto_name is None: proto_name = service_name @@ -359,11 +361,12 @@ def make( stubs = load_stubs(proto_name) stub_cls = getattr(stubs, "{}Stub".format(service_name)) client = Client( - "//localhost:{}".format(grpc_port), + service_url, stub_cls, compression_algorithm, compression_level, ssl_options, + lazy=lazy, ) clients.append(client) return client.start() diff --git a/test/test_basic.py b/test/test_basic.py index b24038e..7069bc3 100644 --- a/test/test_basic.py +++ b/test/test_basic.py @@ -109,3 +109,30 @@ def generate_requests(): ("A", 1), ("B", 2), ] + + +class TestLazy: + def test_lazy_client(self, start_nameko_client, server, protobufs): + client = start_nameko_client("example", lazy=True) + response = client.unary_unary(protobufs.ExampleRequest(value="A")) + assert response.message == "A" + + # Note lack of server fixture + def test_lazy_client_does_not_connect_on_start( + self, start_nameko_client, protobufs, start_grpc_server + ): + client = start_nameko_client("example", lazy=True) + + with pytest.raises(ConnectionRefusedError): + client.unary_unary(protobufs.ExampleRequest(value="A")) + + start_grpc_server("example") + + # After starting the server, should now work + response = client.unary_unary(protobufs.ExampleRequest(value="A")) + assert response.message == "A" + + # Note lack of server fixture + def test_nonlazy_client_connects_on_start(self, start_nameko_client, protobufs): + with pytest.raises(ConnectionRefusedError): + start_nameko_client("example", lazy=False) From 05467ac5d432b825f3f314f1bcb3d596316e15e1 Mon Sep 17 00:00:00 2001 From: Joe Child Date: Mon, 2 Oct 2023 18:01:06 +0100 Subject: [PATCH 4/5] fix locking --- nameko_grpc/client.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/nameko_grpc/client.py b/nameko_grpc/client.py index 224943d..3ffb88f 100644 --- a/nameko_grpc/client.py +++ b/nameko_grpc/client.py @@ -143,13 +143,14 @@ def default_compression(self): def start(self): if not self.lazy: - self._channel = self._get_channel() + self._start_channel() - def _get_channel(self): + def _start_channel(self): with self._channel_creation_lock: - channel = ClientChannel(self.target, self.ssl, self.spawn_thread) - channel.start() - return channel + if self._channel is None: + channel = ClientChannel(self.target, self.ssl, self.spawn_thread) + channel.start() + self._channel = channel def stop(self): if self._channel is not None: @@ -159,7 +160,7 @@ def stop(self): @property def channel(self): if self._channel is None: - self._channel = self._get_channel() + self._start_channel() return self._channel def timeout(self, send_stream, response_stream, deadline): From 50c3bff1233ecbe4925e9c7630445197a4df3bb9 Mon Sep 17 00:00:00 2001 From: Joe Child Date: Tue, 3 Oct 2023 10:24:30 +0100 Subject: [PATCH 5/5] Rename option to lazy_startup --- nameko_grpc/client.py | 6 +++--- test/conftest.py | 4 ++-- test/test_basic.py | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/nameko_grpc/client.py b/nameko_grpc/client.py index 3ffb88f..704ca3e 100644 --- a/nameko_grpc/client.py +++ b/nameko_grpc/client.py @@ -121,14 +121,14 @@ def __init__( compression_algorithm="none", compression_level="high", ssl=False, - lazy=False, + lazy_startup=False, ): self.target = target self.stub = stub self.compression_algorithm = compression_algorithm self.compression_level = compression_level # NOTE not used self.ssl = SslConfig(ssl) - self.lazy = lazy + self.lazy_startup = lazy_startup self._channel_creation_lock = threading.Lock() self._channel = None @@ -142,7 +142,7 @@ def default_compression(self): return "identity" def start(self): - if not self.lazy: + if not self.lazy_startup: self._start_channel() def _start_channel(self): diff --git a/test/conftest.py b/test/conftest.py index 7701b01..2b22c0e 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -352,7 +352,7 @@ def make( proto_name=None, compression_algorithm="none", compression_level="high", - lazy=False, + lazy_startup=False, service_url="//localhost:{}".format(grpc_port), ): if proto_name is None: @@ -366,7 +366,7 @@ def make( compression_algorithm, compression_level, ssl_options, - lazy=lazy, + lazy_startup=lazy_startup, ) clients.append(client) return client.start() diff --git a/test/test_basic.py b/test/test_basic.py index 7069bc3..dc5eb30 100644 --- a/test/test_basic.py +++ b/test/test_basic.py @@ -113,7 +113,7 @@ def generate_requests(): class TestLazy: def test_lazy_client(self, start_nameko_client, server, protobufs): - client = start_nameko_client("example", lazy=True) + client = start_nameko_client("example", lazy_startup=True) response = client.unary_unary(protobufs.ExampleRequest(value="A")) assert response.message == "A" @@ -121,7 +121,7 @@ def test_lazy_client(self, start_nameko_client, server, protobufs): def test_lazy_client_does_not_connect_on_start( self, start_nameko_client, protobufs, start_grpc_server ): - client = start_nameko_client("example", lazy=True) + client = start_nameko_client("example", lazy_startup=True) with pytest.raises(ConnectionRefusedError): client.unary_unary(protobufs.ExampleRequest(value="A")) @@ -135,4 +135,4 @@ def test_lazy_client_does_not_connect_on_start( # Note lack of server fixture def test_nonlazy_client_connects_on_start(self, start_nameko_client, protobufs): with pytest.raises(ConnectionRefusedError): - start_nameko_client("example", lazy=False) + start_nameko_client("example", lazy_startup=False)