Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit b4d4422

Browse files
Abstracted out websocket interactions into an abstract base class that
OrderBook now inherits from. This allows easy creation of new types of listeners for the GDAX websocket API. ***************************** *** WebSocketFeedListener *** ***************************** Factored the web socket code away from the actual orderbook implementation. The new WebSocketFeedListener is an abstract base class that handles the websocket itself including: - Connect - Disconnect - Send / Receive messages - Subscribing to channels - Leveraging the trade log file - Async Context Manager calls WebSocketFeedListener inherits from ABC and has a single @AbstractMethod handle_message(self), which must be implemented by subclasses. ****************************** ********* OrderBook ********** ****************************** Removed all WebSocket manipulation from OrderBook and left just the OrderBook implementation. OrderBook now inherits from WebSocketFeedListener and implements handle_message.
1 parent eb4d93b commit b4d4422

File tree

3 files changed

+132
-75
lines changed

3 files changed

+132
-75
lines changed

‎gdax/__init__.py‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
import gdax.orderbook
22
import gdax.trader
33
import gdax.utils
4+
import gdax.websocket_feed_listener

‎gdax/orderbook.py‎

Lines changed: 13 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -9,62 +9,42 @@
99
import json
1010
import logging
1111
from operator import itemgetter
12-
import time
1312

1413
from bintrees import FastRBTree
15-
import aiofiles
1614
import aiohttp
1715

1816
import gdax.trader
1917
import gdax.utils
18+
from gdax.websocket_feed_listener import WebSocketFeedListener
2019

2120

2221
class OrderBookError(Exception):
2322
pass
2423

2524

26-
class OrderBook(object):
25+
class OrderBook(WebSocketFeedListener):
2726
def __init__(self, product_ids='ETH-USD', api_key=None, api_secret=None,
2827
passphrase=None, use_heartbeat=False,
2928
trade_log_file_path=None):
30-
ifapi_keyisnotNone:
31-
self._authenticated=True
32-
self.api_key=api_key
33-
self.api_secret=api_secret
34-
self.passphrase=passphrase
35-
else:
36-
self._authenticated=False
29+
30+
super().__init__(product_ids=product_ids,
31+
api_key=api_key,
32+
api_secret=api_secret,
33+
passphrase=passphrase,
34+
use_heartbeat=use_heartbeat,
35+
trade_log_file_path=trade_log_file_path)
3736

3837
if not isinstance(product_ids, list):
3938
product_ids = [product_ids]
40-
self.product_ids = product_ids
41-
self.use_heartbeat = use_heartbeat
42-
self.trade_log_file_path = trade_log_file_path
43-
self._trade_file = None
4439

4540
self.traders = {product_id: gdax.trader.Trader(product_id=product_id)
4641
for product_id in product_ids}
4742
self._asks = {product_id: FastRBTree() for product_id in product_ids}
4843
self._bids = {product_id: FastRBTree() for product_id in product_ids}
4944
self._sequences = {product_id: None for product_id in product_ids}
50-
self._ws_session = None
51-
self._ws_connect = None
52-
self._ws = None
53-
54-
async def _init(self):
55-
self._ws_session = aiohttp.ClientSession()
56-
self._ws_connect = self._ws_session.ws_connect(
57-
'wss://ws-feed.gdax.com')
58-
self._ws = await self._ws_connect.__aenter__()
59-
60-
# subscribe
61-
await self._subscribe()
62-
63-
if self.use_heartbeat:
64-
await self._send(type="heartbeat", on=True)
6545

6646
async def __aenter__(self):
67-
await asyncio.gather(self._init(), self._open_log_file())
47+
await super().__aenter__()
6848

6949
# get order book snapshot
7050
books = await asyncio.gather(
@@ -95,51 +75,6 @@ async def __aenter__(self):
9575
self._sequences[product_id] = book['sequence']
9676
return self
9777

98-
async def __aexit__(self, exc_type, exc, traceback):
99-
res = await asyncio.gather(
100-
self._ws_session.__aexit__(exc_type, exc, traceback),
101-
self._close_log_file(),
102-
)
103-
return res[0]
104-
105-
async def _open_log_file(self):
106-
if self.trade_log_file_path is not None:
107-
self._trade_file = await aiofiles.open(self.trade_log_file_path,
108-
mode='a').__aenter__()
109-
110-
async def _close_log_file(self):
111-
if self._trade_file is not None:
112-
await self._trade_file.__aexit__(None, None, None)
113-
114-
async def _send(self, **kwargs):
115-
await self._ws.send_json(kwargs)
116-
117-
async def _recv(self):
118-
json_data = await self._ws.receive_str()
119-
if self._trade_file:
120-
await self._trade_file.write(f'W {json_data}\n')
121-
return json.loads(json_data)
122-
123-
async def _subscribe(self):
124-
message = {
125-
'type': 'subscribe',
126-
'product_ids': self.product_ids
127-
}
128-
129-
if self._authenticated:
130-
path = '/users/self'
131-
method = 'GET'
132-
body = ''
133-
timestamp = str(time.time())
134-
135-
message['signature'] = gdax.utils.get_signature(
136-
path, method, body, timestamp, self.api_secret)
137-
message['timestamp'] = timestamp
138-
message['key'] = self.api_key
139-
message['passphrase'] = self.passphrase
140-
141-
return await self._send(**message)
142-
14378
async def handle_message(self):
14479
try:
14580
message = await self._recv()
@@ -155,6 +90,9 @@ async def handle_message(self):
15590
if msg_type == 'error':
15691
raise OrderBookError(f'Error: {message["message"]}')
15792

93+
if msg_type == 'subscriptions':
94+
return # must filter out here because the subscriptions message does not have a product_id key
95+
15896
product_id = message['product_id']
15997
assert self._sequences[product_id] is not None
16098
sequence = message['sequence']

‎gdax/websocket_feed_listener.py‎

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""
2+
Base class for listening to web socket feed messages based on GDAX's Websocket endpoint.
3+
4+
See: https://docs.gdax.com/#websocket-feed.
5+
6+
"""
7+
8+
import asyncio
9+
import json
10+
11+
import time
12+
13+
import aiofiles
14+
import aiohttp
15+
16+
import gdax.utils
17+
18+
from abc import ABC, abstractmethod
19+
20+
21+
class WebSocketFeedListener(ABC):
22+
def __init__(self, product_ids='ETH-USD', channels=None, api_key=None, api_secret=None,
23+
passphrase=None, use_heartbeat=False,
24+
trade_log_file_path=None):
25+
if api_key is not None:
26+
self._authenticated = True
27+
self.api_key = api_key
28+
self.api_secret = api_secret
29+
self.passphrase = passphrase
30+
else:
31+
self._authenticated = False
32+
33+
if not isinstance(product_ids, list):
34+
product_ids = [product_ids]
35+
self.product_ids = product_ids
36+
37+
self.channels = None
38+
if channels is not None:
39+
if not isinstance(channels, list):
40+
channels = [channels]
41+
self.channels = channels
42+
43+
self.use_heartbeat = use_heartbeat
44+
self.trade_log_file_path = trade_log_file_path
45+
self._trade_file = None
46+
47+
self._ws_session = None
48+
self._ws_connect = None
49+
self._ws = None
50+
51+
async def _init(self):
52+
self._ws_session = aiohttp.ClientSession()
53+
self._ws_connect = self._ws_session.ws_connect(
54+
'wss://ws-feed.gdax.com')
55+
self._ws = await self._ws_connect.__aenter__()
56+
57+
# subscribe
58+
await self._subscribe()
59+
60+
if self.use_heartbeat:
61+
await self._send(type="heartbeat", on=True)
62+
63+
async def __aenter__(self):
64+
await asyncio.gather(self._init(), self._open_log_file())
65+
66+
return self
67+
68+
async def __aexit__(self, exc_type, exc, traceback):
69+
res = await asyncio.gather(
70+
self._ws_session.__aexit__(exc_type, exc, traceback),
71+
self._close_log_file(),
72+
)
73+
return res[0]
74+
75+
async def _open_log_file(self):
76+
if self.trade_log_file_path is not None:
77+
self._trade_file = await aiofiles.open(self.trade_log_file_path,
78+
mode='a').__aenter__()
79+
80+
async def _close_log_file(self):
81+
if self._trade_file is not None:
82+
await self._trade_file.__aexit__(None, None, None)
83+
84+
async def _send(self, **kwargs):
85+
await self._ws.send_json(kwargs)
86+
87+
async def _recv(self):
88+
json_data = await self._ws.receive_str()
89+
if self._trade_file:
90+
await self._trade_file.write(f'W {json_data}\n')
91+
return json.loads(json_data)
92+
93+
async def _subscribe(self):
94+
message = {
95+
'type': 'subscribe',
96+
'product_ids': self.product_ids,
97+
}
98+
99+
if self.channels is not None:
100+
message['channels'] = self.channels
101+
102+
if self._authenticated:
103+
path = '/users/self'
104+
method = 'GET'
105+
body = ''
106+
timestamp = str(time.time())
107+
108+
message['signature'] = gdax.utils.get_signature(
109+
path, method, body, timestamp, self.api_secret)
110+
message['timestamp'] = timestamp
111+
message['key'] = self.api_key
112+
message['passphrase'] = self.passphrase
113+
114+
return await self._send(**message)
115+
116+
@abstractmethod
117+
async def handle_message(self):
118+
pass

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /