I connected to the websocket, and the data response in the callback function handle_message
. I wish to insert all the data to influxdb using write_api.write()
, since I need with InfluxClient()
to wrap the whole CRUD function, I wrote like this:
from time import sleep
from pybit import usdt_perpetual
from dataclasses import dataclass
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
@dataclass
class Trade:
tick_direction: str
side: float
is_block_trade: str
price: float
size: float
timestamp: int
ws_linear = usdt_perpetual.WebSocket(
test=False,
ping_interval=30,
ping_timeout=10,
domain="bybit"
)
with InfluxDBClient(
url="http://localhost:8086",
token="fake_token_id",
org="organization") as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
def handle_message(msg: dict) -> None:
for data in msg["data"]:
trade = Trade(data["tick_direction"], data["side"], data["is_block_trade"], float(data["price"]),
float(data["size"]), data["timestamp"])
write_api.write(bucket="test5",
record=trade,
record_measurement_name="tradeBTCUSDT",
record_time_key="timestamp",
record_tag_keys=["tick_direction", "side", "is_block_trade"],
record_field_keys=["price", "size"])
ws_linear.trade_stream(
handle_message, "BTCUSDT"
)
while True:
sleep(1)
I think it is messy that I wrap handle_message
by with InfluxClient
, how can I improve it and the overall structure of the program?
1 Answer 1
I find it unlikely this program works as expected. You have written approximately:
- Open a database connection, & assign that to
client
- Get
write_api
fromclient
- Define a function which (at some point in the future) uses
write_api
- Call
client.close()
due to the exit of thewith
statement! - Ask another function to handle messages by calling your defined function which uses an api on a "now closed"
client
!
As you can see, the with
statement is not doing anything but giving you a false sense of security. You need to either:
- open a new database connection inside
handle_message
, or - indent the subsequent code such that
client
is not closed until after yourwhile True:
loop exits (presumably by aKeyboardInterrupt
)
Here is a toy demonstration of the issue:
class Client:
def __init__(self):
self.opened = False
def __enter__(self):
self.opened = True
print("Client opened")
return self
def __exit__(self, *args):
self.opened = False
print("Client closed")
def write_api(self):
return WriteApi(self)
class WriteApi():
def __init__(self, client):
self.client = client
def use_api(self):
print("Using API. Client open =", self.client.opened)
def trade_stream(handler):
handler(None)
print("About to enter with-statement")
with Client() as client:
print("Entered with-statement")
write_api = client.write_api()
def handle_message(msg):
write_api.use_api()
print("Exiting with-statement")
print("Exited with-statement")
trade_stream(handle_message)
Output:
About to enter with-statement
Client opened
Entered with-statement
Exiting with-statement
Client closed
Exited with-statement
Using API. Client open = False
Note that using the API happens after the client has been closed.
So why do you appear to have working code?
From the documentation ...
from influxdb_client import InfluxDBClient
# Initialize background batching instance of WriteApi
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
with client.write_api() as write_api:
pass
... it appears client.write_api()
returns an object which itself can behave as a context manager, and so should be used in a with-statement.
If the write_api
is "open", and then the client
is closed, whether or not the write_api
is invalid will depend on how the library is written. It may internally hold the client
open, until the write_api
is explicitly closed (which you are not doing), or it may be Undefined Behaviour and might appear to work at the moment, and start failing in the future.
As stated earlier, the fix could be to indent the subsequent code, as in:
with InfluxDBClient(
url="http://localhost:8086",
token="fake_token_id",
org="organization") as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
def handle_message(msg: dict) -> None:
...
# still indented, so still inside the `with client`
ws_linear.trade_stream(
handle_message, "BTCUSDT"
)
# still indented, so still inside the `with client`
while True:
sleep(1)
# with statement ends here
# client will be closed here
Explore related questions
See similar questions with these tags.