0

I am using the official binance API connector. It consisted mainly of two parts:

  1. Subscribe WS to receive data
  2. REST to send requests. all in sync version

I first send two orders using my modify async function to send two orders at the same time. Then I will receive two order_updates. Inside each one of them I want to trigger a new order( new order has a new update that trigger another order, on and on...).

But since my first two orders was sent at the same time using async, then I will receive the two updates at about the same time. But sending a order usually takes 10ms. I want send order instantly without blocking. For example, first update trigger an order, after 1ms, another update was received (but first order was not finished). I want send another order instantly without waiting for the first one to be finished.

The following is my code, but it did not work. First two orders was sent successfully, but when update trigger another order, the new one seems never send (stuck at the mark endpoint).

class Strategy(Utils):
 def __init__(self, async_client):
 super().__init__()
 self.async_client = async_client
 self.order_queue = asyncio.Queue()
 self.loop = asyncio.get_event_loop()
 def order_trigger_sync(self, msg):
 """Callback function for WebSocket to enqueue updates."""
 try:
 if self.loop.is_closed():
 print("Event loop is closed. Cannot enqueue message.")
 return
 self.order_queue.put_nowait(msg)
 except Exception as e:
 print(f"Error enqueuing message: {e}")
 
 async def order_worker(self, worker_id):
 """Asynchronous worker to process orders from the queue."""
 while True:
 msg = await self.order_queue.get()
 print(f'dd{msg}')
 try:
 if "e" in msg and msg['e'] == 'ORDER_TRADE_UPDATE':
 to_send = {}
 for token in self.tokens:
 mid = self.book_series[token].get_current_mid()
 to_send[token] = self.gen_signal(token, mid)
 orders = self.to_orders(to_send)
 print(f"Worker {worker_id}: Sending orders: {orders}")
 await self.async_client.process_requests(orders)
 except Exception as e:
 logging.error(f"Worker {worker_id}: Error processing order: {e}")
 finally:
 self.order_queue.task_done()
 async def start_workers(self, num_workers=3):
 """Start multiple workers to process the order queue."""
 for i in range(num_workers):
 asyncio.create_task(self.order_worker(worker_id=i))
 async def start(self):
 await self.start_workers()
 await asyncio.sleep(10)
 to_send = dict()
 for token in self.tokens:
 mid = self.book_series[token].get_current_mid()
 to_send[token] = self.gen_signal(token, mid)
 orders = self.to_orders(to_send)
 await self.async_client.process_requests(orders)
async def main():
 async with AsyncClient(show_header=True) as client:
 strategy = Strategy(async_client=client)
 # Start WebSocket connection and register callbacks
 connect = Connection(
 book_callback=strategy.book_callback, # Depth book updates
 trade_callback=strategy.order_trigger_sync, # Triggered on order updates
 )
 await strategy.start()
if __name__ == "__main__":
 asyncio.run(main())

My process_requests function is like this:

 async def process_requests(self, reqs):
 tasks = []
 for action, content in reqs.items():
 if len(content) != 0:
 for _ in content:
 tasks.append(self.action_mapper[action](**_))
 if len(tasks) != 0:
 print('mark')
 done, pending = await asyncio.wait(tasks)
marc_s
760k186 gold badges1.4k silver badges1.5k bronze badges
asked Jan 15, 2025 at 5:25
1
  • Some general comments: When you execute asyncio.create_task, you need to save the returned task instance to prevent the task from prematurely being garbage collected. See this description. Also, you should use asyncio.get_running_loop instead of asyncio.get_event_loop since the latter has been deprecated. Commented Jan 23, 2025 at 21:56

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.