订单簿本地维护
如何使用增量更新维护本地订单簿的完整指南。
核心同步流程
1. 订阅增量更新
{
"action": "subscribe",
"streams": ["{symbol}@order_book_update"]
}
2. 缓存增量消息
在获取快照前,缓存所有接收到的增量更新消息。
3. 获取初始快照
调用 REST API 获取订单簿快照:
GET /api/v1/stock/open-api/depth?symbol=XSM&limit=100
响应中的 lastUpdateId 用于后续同步。
4. 处理缓存消息
丢弃所有 u < lastUpdateId + 1 的增量消息。
5. 持续应用增量
按顺序应用增量更新:
- 如果价格档位已存在,更新数量
- 如果数量为
"0",删除该价格档位 - 如果价格档位不存在且数量不为
"0",添加新档位
示例代码
class OrderBookManager {
constructor(symbol) {
this.symbol = symbol;
this.bids = new Map(); // price -> quantity
this.asks = new Map();
this.lastUpdateId = 0;
this.buffer = []; // 缓存增量消息
}
// 1. 订阅增量
subscribe() {
this.ws.send(JSON.stringify({
action: 'subscribe',
streams: [`${this.symbol}@order_book_update`]
}));
}
// 2. 接收增量,先缓存
onMessage(data) {
if (data.action === 'order_book_update') {
if (!this.lastUpdateId) {
this.buffer.push(data.result);
} else {
this.processUpdate(data.result);
}
}
}
// 3. 获取快照并初始化
async initSnapshot() {
const snapshot = await fetch(
`/api/v1/stock/open-api/depth?symbol=${this.symbol}&limit=100`
).then(r => r.json());
this.lastUpdateId = snapshot.data.lastUpdateId;
// 初始化订单簿
snapshot.data.bids.forEach(([price, qty]) => {
this.bids.set(price, qty);
});
snapshot.data.asks.forEach(([price, qty]) => {
this.asks.set(price, qty);
});
// 4. 处理缓存
this.buffer = this.buffer.filter(msg => msg.u >= this.lastUpdateId + 1);
this.buffer.forEach(msg => this.processUpdate(msg));
this.buffer = [];
}
// 5. 应用增量更新
processUpdate(update) {
// 验证连续性
if (update.U !== this.lastUpdateId + 1) {
console.error('序列断裂,需要重新同步');
return;
}
// 更新买盘
update.b.forEach(([price, qty]) => {
if (qty === '0') {
this.bids.delete(price);
} else {
this.bids.set(price, qty);
}
});
// 更新卖盘
update.a.forEach(([price, qty]) => {
if (qty === '0') {
this.asks.delete(price);
} else {
this.asks.set(price, qty);
}
});
this.lastUpdateId = update.u;
}
}
注意事项
- 序列连续性: 确保
U = lastUpdateId + 1,否则需要重新同步 - 数量为0: 表示删除该价格档位
- 并发控制: 使用队列确保增量消息按顺序处理
- 断线重连: 重连后需要重新执行完整同步流程