This repository was archived by the owner on Oct 29, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 522
Fixes/msgpack default #783
Open
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
9b08282
Fix : uninitialized data using invalid protocol.
jgspiro 4bbe67b
Unit test both with and without msgpack. Retention policy replication...
jgspiro 088da3c
Fix client tests that are failing.
jgspiro 065ed44
Make msgpack default option, fix demonstrator test.
jgspiro 9a8b66d
Fix server test test_write_points_mixed_type
jgspiro 51d0213
Remove check for JSON result in server test test_write_points_mixed_type
jgspiro 44a26eb
Add use_msgpack docstring.
jgspiro File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,6 @@ | |
from influxdb.line_protocol import make_lines, quote_ident, quote_literal | ||
from influxdb.resultset import ResultSet | ||
from .exceptions import InfluxDBClientError | ||
from .exceptions import InfluxDBServerError | ||
|
||
|
||
class InfluxDBClient(object): | ||
|
@@ -69,6 +68,10 @@ class InfluxDBClient(object): | |
as a single file containing the private key and the certificate, or as | ||
a tuple of both files’ paths, defaults to None | ||
:type cert: str | ||
:param use_msgpack: A bool indicating to use msgpack to retrieve query | ||
results from InfluxDB. If False, the fallback will be JSON. This flag | ||
sets the Accept header of the request. Defaults to True | ||
:type use_msgpack: bool | ||
|
||
:raises ValueError: if cert is provided but ssl is disabled (set to False) | ||
""" | ||
|
@@ -89,6 +92,7 @@ def __init__(self, | |
pool_size=10, | ||
path='', | ||
cert=None, | ||
use_msgpack=True | ||
): | ||
"""Construct a new InfluxDBClient object.""" | ||
self.__host = host | ||
|
@@ -110,7 +114,9 @@ def __init__(self, | |
) | ||
|
||
if use_udp: | ||
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
self._udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
else: | ||
self._udp_socket = None | ||
|
||
if not path: | ||
self.__path = '' | ||
|
@@ -145,10 +151,16 @@ def __init__(self, | |
self._port, | ||
self._path) | ||
|
||
self._headers = { | ||
'Content-Type': 'application/json', | ||
'Accept': 'application/x-msgpack' | ||
} | ||
if use_msgpack: | ||
self._headers = { | ||
'Content-Type': 'application/json', | ||
'Accept': 'application/x-msgpack' | ||
} | ||
else: | ||
self._headers = { | ||
'Content-Type': 'application/json', | ||
'Accept': 'text/plain' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You would have to ask the original implementor - this is the way it was implemented before moving the default to msgpack. |
||
} | ||
|
||
@property | ||
def _baseurl(self): | ||
|
@@ -243,14 +255,14 @@ def request(self, url, method='GET', params=None, data=None, | |
:param method: the HTTP method for the request, defaults to GET | ||
:type method: str | ||
:param params: additional parameters for the request, defaults to None | ||
:type params: dict | ||
:type params: dict, optional | ||
:param data: the data of the request, defaults to None | ||
:type data: str | ||
:type data: str, optional | ||
:param expected_response_code: the expected response code of | ||
the request, defaults to 200 | ||
:type expected_response_code: int | ||
:param headers: headers to add to the request | ||
:type headers: dict | ||
:type headers: dict, optional | ||
:returns: the response from the request | ||
:rtype: :class:`requests.Response` | ||
:raises InfluxDBServerError: if the response code is any server error | ||
|
@@ -285,6 +297,7 @@ def request(self, url, method='GET', params=None, data=None, | |
verify=self._verify_ssl, | ||
timeout=self._timeout | ||
) | ||
response._msgpack = None | ||
break | ||
except (requests.exceptions.ConnectionError, | ||
requests.exceptions.HTTPError, | ||
|
@@ -297,30 +310,39 @@ def request(self, url, method='GET', params=None, data=None, | |
if not retry: | ||
raise | ||
|
||
type_header = response.headers and response.headers.get("Content-Type") | ||
if type_header == "application/x-msgpack" and response.content: | ||
response._msgpack = msgpack.unpackb( | ||
packed=response.content, | ||
ext_hook=_msgpack_parse_hook, | ||
raw=False) | ||
else: | ||
response._msgpack = None | ||
if self._is_msg_pack_response(response): | ||
if response.content: | ||
response._msgpack = msgpack.unpackb( | ||
packed=response.content, | ||
ext_hook=_msgpack_parse_hook, | ||
raw=False) | ||
|
||
def reformat_error(response): | ||
if response._msgpack: | ||
return json.dumps(response._msgpack, separators=(',', ':')) | ||
else: | ||
return response.content | ||
|
||
# if there's not an error, there must have been a successful response | ||
if 500 <= response.status_code < 600: | ||
raise InfluxDBServerError(reformat_error(response)) | ||
elif response.status_code == expected_response_code: | ||
if response.status_code == expected_response_code: | ||
return response | ||
else: | ||
err_msg = reformat_error(response) | ||
err_msg = self._reformat_msgpack_error(response) | ||
raise InfluxDBClientError(err_msg, response.status_code) | ||
|
||
@staticmethod | ||
def _is_msg_pack_response(response): | ||
if response is None: | ||
return False | ||
|
||
if response.headers is None: | ||
return False | ||
|
||
if "Content-Type" not in response.headers: | ||
return False | ||
|
||
content_type = response.headers["Content-Type"] | ||
return content_type == "application/x-msgpack" | ||
|
||
def _reformat_msgpack_error(self, _response): | ||
if _response._msgpack is not None: | ||
return json.dumps(_response._msgpack, separators=(',', ':')) | ||
else: | ||
return _response.content | ||
|
||
def write(self, data, params=None, expected_response_code=204, | ||
protocol='json'): | ||
"""Write data to InfluxDB. | ||
|
@@ -697,7 +719,7 @@ def create_retention_policy(self, name, duration, replication, | |
The minimum retention period is 1 hour. | ||
:type duration: str | ||
:param replication: the replication of the retention policy | ||
:type replication: str | ||
:type replication: int | ||
:param database: the database for which the retention policy is | ||
created. Defaults to current client's database | ||
:type database: str | ||
|
@@ -717,7 +739,7 @@ def create_retention_policy(self, name, duration, replication, | |
"CREATE RETENTION POLICY {0} ON {1} " \ | ||
"DURATION {2} REPLICATION {3} SHARD DURATION {4}".format( | ||
quote_ident(name), quote_ident(database or self._database), | ||
duration, replication, shard_duration) | ||
duration, int(replication), shard_duration) | ||
|
||
if default is True: | ||
query_string += " DEFAULT" | ||
|
@@ -1071,7 +1093,7 @@ def drop_continuous_query(self, name, database=None): | |
self.query(query_string) | ||
|
||
def send_packet(self, packet, protocol='json', time_precision=None): | ||
"""Send an UDP packet. | ||
"""Send an UDP packet. Only valid when use_udp is True. | ||
|
||
:param packet: the packet to be sent | ||
:type packet: (if protocol is 'json') dict | ||
|
@@ -1081,11 +1103,18 @@ def send_packet(self, packet, protocol='json', time_precision=None): | |
:param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None | ||
:type time_precision: str | ||
""" | ||
if not self._use_udp: | ||
raise RuntimeError("Unable to send packet : use_udp set to False") | ||
|
||
if protocol == 'json': | ||
data = make_lines(packet, time_precision).encode('utf-8') | ||
elif protocol == 'line': | ||
data = ('\n'.join(packet) + '\n').encode('utf-8') | ||
self.udp_socket.sendto(data, (self._host, self._udp_port)) | ||
else: | ||
raise InfluxDBClientError("Invalid protocol name : " | ||
"expected json or line") | ||
|
||
self._udp_socket.sendto(data, (self._host, self._udp_port)) | ||
|
||
def close(self): | ||
"""Close http session.""" | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.