2
\$\begingroup\$

I connected to the websocket, and the data response in the callback function handle_message. I wish to insert all the data to influxdb using write_api.write(), since I need with InfluxClient() to wrap the whole CRUD function, I wrote like this:

from time import sleep
from pybit import usdt_perpetual
from dataclasses import dataclass
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
@dataclass
class Trade:
 tick_direction: str
 side: float
 is_block_trade: str
 price: float
 size: float
 timestamp: int
ws_linear = usdt_perpetual.WebSocket(
 test=False,
 ping_interval=30, 
 ping_timeout=10, 
 domain="bybit" 
)
with InfluxDBClient(
 url="http://localhost:8086",
 token="fake_token_id",
 org="organization") as client:
 write_api = client.write_api(write_options=SYNCHRONOUS)
 def handle_message(msg: dict) -> None:
 for data in msg["data"]:
 trade = Trade(data["tick_direction"], data["side"], data["is_block_trade"], float(data["price"]),
 float(data["size"]), data["timestamp"])
 write_api.write(bucket="test5",
 record=trade,
 record_measurement_name="tradeBTCUSDT",
 record_time_key="timestamp",
 record_tag_keys=["tick_direction", "side", "is_block_trade"],
 record_field_keys=["price", "size"])
ws_linear.trade_stream(
 handle_message, "BTCUSDT"
)
while True:
 sleep(1)

I think it is messy that I wrap handle_message by with InfluxClient, how can I improve it and the overall structure of the program?

asked Dec 20, 2022 at 2:35
\$\endgroup\$

1 Answer 1

4
\$\begingroup\$

I find it unlikely this program works as expected. You have written approximately:

  1. Open a database connection, & assign that to client
  2. Get write_api from client
  3. Define a function which (at some point in the future) uses write_api
  4. Call client.close() due to the exit of the with statement!
  5. Ask another function to handle messages by calling your defined function which uses an api on a "now closed" client!

As you can see, the with statement is not doing anything but giving you a false sense of security. You need to either:

  • open a new database connection inside handle_message, or
  • indent the subsequent code such that client is not closed until after your while True: loop exits (presumably by a KeyboardInterrupt)

Here is a toy demonstration of the issue:

class Client:
 def __init__(self):
 self.opened = False
 
 def __enter__(self):
 self.opened = True
 print("Client opened")
 return self
 def __exit__(self, *args):
 self.opened = False
 print("Client closed")
 def write_api(self):
 return WriteApi(self)
class WriteApi():
 def __init__(self, client):
 self.client = client
 def use_api(self):
 print("Using API. Client open =", self.client.opened)
def trade_stream(handler):
 handler(None)
print("About to enter with-statement")
with Client() as client:
 print("Entered with-statement")
 write_api = client.write_api()
 
 def handle_message(msg):
 write_api.use_api()
 print("Exiting with-statement")
print("Exited with-statement")
trade_stream(handle_message)

Output:

About to enter with-statement
Client opened
Entered with-statement
Exiting with-statement
Client closed
Exited with-statement
Using API. Client open = False

Note that using the API happens after the client has been closed.


So why do you appear to have working code?

From the documentation ...

from influxdb_client import InfluxDBClient
# Initialize background batching instance of WriteApi
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
 with client.write_api() as write_api:
 pass

... it appears client.write_api() returns an object which itself can behave as a context manager, and so should be used in a with-statement.

If the write_api is "open", and then the client is closed, whether or not the write_api is invalid will depend on how the library is written. It may internally hold the client open, until the write_api is explicitly closed (which you are not doing), or it may be Undefined Behaviour and might appear to work at the moment, and start failing in the future.


As stated earlier, the fix could be to indent the subsequent code, as in:

with InfluxDBClient(
 url="http://localhost:8086",
 token="fake_token_id",
 org="organization") as client:
 write_api = client.write_api(write_options=SYNCHRONOUS)
 def handle_message(msg: dict) -> None:
 ...
 # still indented, so still inside the `with client`
 ws_linear.trade_stream(
 handle_message, "BTCUSDT"
 )
 # still indented, so still inside the `with client`
 while True:
 sleep(1)
 # with statement ends here
# client will be closed here
answered Dec 20, 2022 at 5:27
\$\endgroup\$
0

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.