Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 069e012

Browse files
author
Jongmin Kim
committed
refactor: add a timeout settings to grpc client
Signed-off-by: Jongmin Kim <whdalsrnt@megazone.com>
1 parent 164f843 commit 069e012

File tree

3 files changed

+56
-8
lines changed

3 files changed

+56
-8
lines changed

‎src/spaceone/core/connector/space_connector.py‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ def __init__(
2525
endpoint: str = None,
2626
token: str = None,
2727
return_type: str = "dict",
28+
timeout: int = None,
2829
**kwargs,
2930
):
3031
super().__init__(*args, **kwargs)
3132
self._service = service
3233
self._endpoint = endpoint
3334
self._token = token
3435
self._return_type = return_type
36+
self._timeout = timeout
3537

3638
self._client = None
3739
self._endpoints: dict = self.config.get("endpoints", {})
@@ -95,6 +97,7 @@ def _init_client(self) -> None:
9597
endpoint=e["endpoint"],
9698
ssl_enabled=e["ssl_enabled"],
9799
max_message_length=1024 * 1024 * 256,
100+
timeout=self._timeout,
98101
)
99102

100103
@staticmethod

‎src/spaceone/core/error.py‎

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,10 @@ class ERROR_GRPC_CONNECTION(ERROR_UNAVAILAVBLE):
250250
_message = "Server is unavailable. (channel = {channel}, message = {message})"
251251

252252

253+
class ERROR_GRPC_TIMEOUT(ERROR_GRPC_CONNECTION):
254+
_message = "gRPC Timeout."
255+
256+
253257
class ERROR_GRPC_TLS_HANDSHAKE(ERROR_GRPC_CONNECTION):
254258
_message = "TLS handshake failed. (reason = {reason})"
255259

‎src/spaceone/core/pygrpc/client.py‎

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
import types
33
import grpc
4+
from grpc import ClientCallDetails
45
from google.protobuf.json_format import ParseDict
56
from google.protobuf.message_factory import MessageFactory # , GetMessageClass
67
from google.protobuf.descriptor_pool import DescriptorPool
@@ -15,16 +16,28 @@
1516
_LOGGER = logging.getLogger(__name__)
1617

1718

19+
class _ClientCallDetails(ClientCallDetails):
20+
def __init__(self, method, timeout, metadata, credentials, wait_for_ready):
21+
self.method = method
22+
self.timeout = timeout
23+
self.metadata = metadata
24+
self.credentials = credentials
25+
self.wait_for_ready = wait_for_ready
26+
27+
1828
class _ClientInterceptor(
1929
grpc.UnaryUnaryClientInterceptor,
2030
grpc.UnaryStreamClientInterceptor,
2131
grpc.StreamUnaryClientInterceptor,
2232
grpc.StreamStreamClientInterceptor,
2333
):
24-
def __init__(self, options: dict, channel_key: str, request_map: dict):
34+
def __init__(
35+
self, options: dict, channel_key: str, request_map: dict, timeout: int = None
36+
):
2537
self._request_map = request_map
2638
self._channel_key = channel_key
2739
self.metadata = options.get("metadata", {})
40+
self.timeout = timeout or 60
2841

2942
def _check_message(self, client_call_details, request_or_iterator, is_stream):
3043
if client_call_details.method in self._request_map:
@@ -123,18 +136,27 @@ def _retry_call(
123136
return response_or_iterator
124137

125138
except Exception as e:
126-
if e.error_code == "ERROR_GRPC_CONNECTION":
139+
if not isinstance(e, ERROR_BASE):
140+
e = ERROR_UNKNOWN(message=str(e))
141+
142+
if (
143+
e.error_code == "ERROR_GRPC_CONNECTION"
144+
or e.status_code == "DEADLINE_EXCEEDED"
145+
):
127146
if retries >= _MAX_RETRIES:
128147
channel = e.meta.get("channel")
129148
if channel in _GRPC_CHANNEL:
130149
_LOGGER.error(
131150
f"Disconnect gRPC Endpoint. (channel = {channel})"
132151
)
133152
del _GRPC_CHANNEL[channel]
153+
154+
if e.status_code == "DEADLINE_EXCEEDED":
155+
raise ERROR_GRPC_TIMEOUT()
134156
raise e
135157
else:
136158
_LOGGER.debug(
137-
f"Retry gRPC Call: reason = {e.message}, retry = {retries + 1}"
159+
f"Retry gRPC Call: method = {client_call_details.method}, reason = {e.message}, retry = {retries + 1}"
138160
)
139161
else:
140162
raise e
@@ -160,9 +182,20 @@ def _intercept_call(
160182
is_response_stream,
161183
)
162184

185+
def _create_new_call_details(self, client_call_details):
186+
return _ClientCallDetails(
187+
method=client_call_details.method,
188+
timeout=self.timeout,
189+
metadata=client_call_details.metadata,
190+
credentials=client_call_details.credentials,
191+
wait_for_ready=client_call_details.wait_for_ready,
192+
)
193+
163194
def intercept_unary_unary(self, continuation, client_call_details, request):
195+
new_call_details = self._create_new_call_details(client_call_details)
196+
164197
return self._intercept_call(
165-
continuation, client_call_details, request, False, False
198+
continuation, new_call_details, request, False, False
166199
)
167200

168201
def intercept_unary_stream(self, continuation, client_call_details, request):
@@ -263,7 +296,7 @@ def _bind_grpc_method(
263296

264297

265298
class GRPCClient(object):
266-
def __init__(self, channel, options, channel_key):
299+
def __init__(self, channel, options, channel_key, timeout=None):
267300
self._request_map = {}
268301
self._api_resources = {}
269302

@@ -272,7 +305,7 @@ def __init__(self, channel, options, channel_key):
272305
self._init_grpc_reflection()
273306

274307
_client_interceptor = _ClientInterceptor(
275-
options, channel_key, self._request_map
308+
options, channel_key, self._request_map, timeout
276309
)
277310
_intercept_channel = grpc.intercept_channel(channel, _client_interceptor)
278311
self._bind_grpc_stub(_intercept_channel)
@@ -326,7 +359,13 @@ def _create_insecure_channel(endpoint, options):
326359
return grpc.insecure_channel(endpoint, options=options)
327360

328361

329-
def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_opts):
362+
def client(
363+
endpoint=None,
364+
ssl_enabled=False,
365+
max_message_length=None,
366+
timeout=None,
367+
**client_opts,
368+
):
330369
if endpoint is None:
331370
raise Exception("Client's endpoint is undefined.")
332371

@@ -350,7 +389,9 @@ def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_o
350389
)
351390

352391
try:
353-
_GRPC_CHANNEL[endpoint] = GRPCClient(channel, client_opts, endpoint)
392+
_GRPC_CHANNEL[endpoint] = GRPCClient(
393+
channel, client_opts, endpoint, timeout
394+
)
354395
except Exception as e:
355396
if hasattr(e, "details"):
356397
raise ERROR_GRPC_CONNECTION(channel=endpoint, message=e.details())

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /