diff --git a/provider/coinank/coinank_api/depth_ws.go b/provider/coinank/coinank_api/depth_ws.go new file mode 100644 index 00000000..679a38b6 --- /dev/null +++ b/provider/coinank/coinank_api/depth_ws.go @@ -0,0 +1,108 @@ +package coinank_api + +import ( + "context" + "encoding/json" + "nofx/provider/coinank/coinank_enum" + + "golang.org/x/net/websocket" +) + +const MainDepthWsUrl = "wss://ws.coinank.com/wsDepth/wsKline" + +type DepthWs struct { + conn *websocket.Conn + DepthV3Ch <-chan *WsResult[DepthV3] +} + +// DepthWsConn connect ws , read data from DepthV3Ch +func DepthWsConn(ctx context.Context) (*DepthWs, error) { + conn, ch, err := depth_ws(ctx) + if err != nil { + return nil, err + } + ws := &DepthWs{ + conn: conn, + DepthV3Ch: ch, + } + return ws, nil +} + +// Subscribe subscribe depth +func (ws *DepthWs) Subscribe(symbol string, exchange coinank_enum.Exchange, step string) error { + var args = "depthV3@" + symbol + "@" + string(exchange) + "@SWAP@" + step + info := SubscribeInfo{ + Op: "subscribe", + Args: args, + } + json, err := json.Marshal(info) + if err != nil { + return err + } + err = websocket.Message.Send(ws.conn, json) + if err != nil { + return err + } + return nil +} + +// UnSubscribe unsubscribe depth +func (ws *DepthWs) UnSubscribe(symbol string, exchange coinank_enum.Exchange, step string) error { + var args = "depthV3@" + symbol + "@" + string(exchange) + "@SWAP@" + step + info := SubscribeInfo{ + Op: "unsubscribe", + Args: args, + } + json, err := json.Marshal(info) + if err != nil { + return err + } + err = websocket.Message.Send(ws.conn, json) + if err != nil { + return err + } + return nil +} + +// Close websocket +func (ws *DepthWs) Close() error { + return ws.conn.Close() +} + +func depth_ws(ctx context.Context) (*websocket.Conn, <-chan *WsResult[DepthV3], error) { + config, err := websocket.NewConfig(MainDepthWsUrl, "http://localhost") + if err != nil { + return nil, nil, err + } + conn, err := config.DialContext(ctx) + if err != nil { + return nil, nil, err + } + ch := make(chan *WsResult[DepthV3], 1024) + go depth_read(conn, ch) + return conn, ch, nil +} + +func depth_read(conn *websocket.Conn, ch chan *WsResult[DepthV3]) { + defer conn.Close() + defer close(ch) + var msg string + for { + err := websocket.Message.Receive(conn, &msg) + if err != nil { + return + } + var depth WsResult[DepthV3] + err = json.Unmarshal([]byte(msg), &depth) + if err == nil { + ch <- &depth + } + } +} + +type DepthV3 struct { + Type string `json:"type"` + Ts uint64 `json:"ts"` + Asks [][]string `json:"asks"` + Bids [][]string `json:"bids"` +} diff --git a/provider/coinank/coinank_api/depth_ws_test.go b/provider/coinank/coinank_api/depth_ws_test.go new file mode 100644 index 00000000..3e55b814 --- /dev/null +++ b/provider/coinank/coinank_api/depth_ws_test.go @@ -0,0 +1,42 @@ +package coinank_api + +import ( + "context" + "encoding/json" + "fmt" + "nofx/provider/coinank/coinank_enum" + "testing" + "time" +) + +func TestDepthWs(t *testing.T) { + ctx := context.TODO() + ws, err := DepthWsConn(ctx) + if err != nil { + t.Fatal(err) + } + go func() { + for tickers := range ws.DepthV3Ch { + msg, err := json.Marshal(tickers) + if err != nil { + fmt.Println("json err:", err) + } + fmt.Println(string(msg)) + } + fmt.Println("DepthV3Ch closed") + }() + err = ws.Subscribe("BTCUSDT", coinank_enum.Binance, "0.1") + if err != nil { + t.Fatal(err) + } + fmt.Println("sub success") + time.Sleep(10 * time.Second) + err = ws.UnSubscribe("BTCUSDT", coinank_enum.Binance, "0.1") + if err != nil { + t.Fatal(err) + } + fmt.Println("unsub success") + time.Sleep(10 * time.Second) + ws.Close() + fmt.Println("cancel success") +} diff --git a/provider/coinank/coinank_api/klinw_ws_test.go b/provider/coinank/coinank_api/kline_ws_test.go similarity index 100% rename from provider/coinank/coinank_api/klinw_ws_test.go rename to provider/coinank/coinank_api/kline_ws_test.go