最近在重构之前的量化策略系统,把主战场从A股扩到了港股。本以为只是换个Ticker代码的事,结果在“实时数据源”这一关卡了整整两周。
遇到的痛点:做过港股交易的朋友应该都有体会,港交所(HKEX)的数据源分发机制和A股、美股完全不同。在A股,我们有太多现成的库可以用,但在港股,如果不买昂贵的终端,你能拿到的数据大概率是延时15分钟的“快照”。对于做日内趋势或者高频策略的我来说,15分钟的延迟等于策略直接报废。
数据与逻辑的冲突:我起初尝试写爬虫去抓某些财经网站的Web端接口,结果发现两大问题:第一是反爬极其严格,IP封得怀疑人生;第二是Websocket断连频繁,丢包率高的时候能达到5%,这意味着K线合成出来是残缺的。
解决方案:既然“白嫖”不稳定,直连官方又太贵,我开始寻找中间件解决方案。我的需求很明确:必须要Websocket推送(HTTP轮询效率太低),必须包含Tick级数据,且SDK对Python友好。
在测试了几个第三方API后,我目前在用 AllTick 的接口做数据清洗,主要是看中它在连接保持这块做得比较稳,没有出现过断崖式的延时。
技术实现路径:搞定源头后,剩下的就是工程化落地。我采用的是 websocket-client 库来进行长连接管理。这里有个细节要注意:在 on_message 回调里,尽量不要做耗时的计算逻辑,否则会阻塞心跳包的发送,导致服务端主动断开连接。
我的建议是:接收到数据 -> 丢进消息队列(如RabbitMQ或Redis) -> 另起消费者进程处理策略。
核心代码实现:这里分享一下最基础的连接与鉴权逻辑,跑通了这个,数据流就稳了:
Python
import websocket
import json
def on_message(ws, message):
data = json.loads(message)
print(data)
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print("Closed")
def on_open(ws):
print("Connected to the WebSocket")
ws_url = "wss://api.alltick.co/realtime/marketdata"
ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close)
ws.on_open = on_open
ws.run_forever()
def process_data(data):
symbol = data['symbol']
price = data['price']
change = data['change']
print(f"Stock: {symbol}, Price: {price}, Change: {change}%")
def on_message(ws, message):
data = json.loads(message)
process_data(data)效果验证:经过这套“第三方源 + 本地异步队列”的改造,目前系统在早盘9:30-10:00的高峰期,数据到达延迟控制在了毫秒级,策略触发的滑点也终于在可接受范围内了。