feat: add depth websocket from coinank (#1362)

This commit is contained in:
wqqqqqq
2026-01-27 22:07:38 +08:00
committed by GitHub
parent fcc0267a46
commit 9dbc861cdf
3 changed files with 150 additions and 0 deletions
+108
View File
@@ -0,0 +1,108 @@
package coinank_api
import (
"context"
"encoding/json"
"nofx/provider/coinank/coinank_enum"
"golang.org/x/net/websocket"
)
const MainDepthWsUrl = "wss://ws.coinank.com/wsDepth/wsKline"
type DepthWs struct {
conn *websocket.Conn
DepthV3Ch <-chan *WsResult[DepthV3]
}
// DepthWsConn connect ws , read data from DepthV3Ch
func DepthWsConn(ctx context.Context) (*DepthWs, error) {
conn, ch, err := depth_ws(ctx)
if err != nil {
return nil, err
}
ws := &DepthWs{
conn: conn,
DepthV3Ch: ch,
}
return ws, nil
}
// Subscribe subscribe depth
func (ws *DepthWs) Subscribe(symbol string, exchange coinank_enum.Exchange, step string) error {
var args = "depthV3@" + symbol + "@" + string(exchange) + "@SWAP@" + step
info := SubscribeInfo{
Op: "subscribe",
Args: args,
}
json, err := json.Marshal(info)
if err != nil {
return err
}
err = websocket.Message.Send(ws.conn, json)
if err != nil {
return err
}
return nil
}
// UnSubscribe unsubscribe depth
func (ws *DepthWs) UnSubscribe(symbol string, exchange coinank_enum.Exchange, step string) error {
var args = "depthV3@" + symbol + "@" + string(exchange) + "@SWAP@" + step
info := SubscribeInfo{
Op: "unsubscribe",
Args: args,
}
json, err := json.Marshal(info)
if err != nil {
return err
}
err = websocket.Message.Send(ws.conn, json)
if err != nil {
return err
}
return nil
}
// Close websocket
func (ws *DepthWs) Close() error {
return ws.conn.Close()
}
func depth_ws(ctx context.Context) (*websocket.Conn, <-chan *WsResult[DepthV3], error) {
config, err := websocket.NewConfig(MainDepthWsUrl, "http://localhost")
if err != nil {
return nil, nil, err
}
conn, err := config.DialContext(ctx)
if err != nil {
return nil, nil, err
}
ch := make(chan *WsResult[DepthV3], 1024)
go depth_read(conn, ch)
return conn, ch, nil
}
func depth_read(conn *websocket.Conn, ch chan *WsResult[DepthV3]) {
defer conn.Close()
defer close(ch)
var msg string
for {
err := websocket.Message.Receive(conn, &msg)
if err != nil {
return
}
var depth WsResult[DepthV3]
err = json.Unmarshal([]byte(msg), &depth)
if err == nil {
ch <- &depth
}
}
}
type DepthV3 struct {
Type string `json:"type"`
Ts uint64 `json:"ts"`
Asks [][]string `json:"asks"`
Bids [][]string `json:"bids"`
}
@@ -0,0 +1,42 @@
package coinank_api
import (
"context"
"encoding/json"
"fmt"
"nofx/provider/coinank/coinank_enum"
"testing"
"time"
)
func TestDepthWs(t *testing.T) {
ctx := context.TODO()
ws, err := DepthWsConn(ctx)
if err != nil {
t.Fatal(err)
}
go func() {
for tickers := range ws.DepthV3Ch {
msg, err := json.Marshal(tickers)
if err != nil {
fmt.Println("json err:", err)
}
fmt.Println(string(msg))
}
fmt.Println("DepthV3Ch closed")
}()
err = ws.Subscribe("BTCUSDT", coinank_enum.Binance, "0.1")
if err != nil {
t.Fatal(err)
}
fmt.Println("sub success")
time.Sleep(10 * time.Second)
err = ws.UnSubscribe("BTCUSDT", coinank_enum.Binance, "0.1")
if err != nil {
t.Fatal(err)
}
fmt.Println("unsub success")
time.Sleep(10 * time.Second)
ws.Close()
fmt.Println("cancel success")
}