diff --git a/README.md b/README.md index 017dae9..b07216c 100644 --- a/README.md +++ b/README.md @@ -1,169 +1,3 @@ -# The Smart API Go client +# Migrated to New Github Repo https://github.com/angel-one/smartapigo +For latest updates and bug fixes please refer to the new git repo. -The official Go client for communicating with the Angel Broking Smart APIs. - -SmartAPI is a set of REST-like APIs that expose many capabilities required to build a complete investment and trading platform. Execute orders in real time, manage user portfolio, stream live market data (WebSockets), and more, with the simple HTTP API collection. - - -## Installation -``` -go get github.com/angel-one/smartapigo -``` -## API usage -```golang -package main - -import ( - "fmt" - SmartApi "github.com/angel-one/smartapigo" -) - -func main() { - - // Create New Angel Broking Client - ABClient := SmartApi.New("ClientCode", "Password","API Key") - - fmt.Println("Client :- ",ABClient) - - // User Login and Generate User Session - session, err := ABClient.GenerateSession("totp here") - - if err != nil { - fmt.Println(err.Error()) - return - } - - //Renew User Tokens using refresh token - session.UserSessionTokens, err = ABClient.RenewAccessToken(session.RefreshToken) - - if err != nil { - fmt.Println(err.Error()) - return - } - - fmt.Println("User Session Tokens :- ", session.UserSessionTokens) - - //Get User Profile - session.UserProfile, err = ABClient.GetUserProfile() - - if err != nil { - fmt.Println(err.Error()) - return - } - - fmt.Println("User Profile :- ", session.UserProfile) - fmt.Println("User Session Object :- ", session) - - //Place Order - order, err := ABClient.PlaceOrder(SmartApi.OrderParams{Variety: "NORMAL", TradingSymbol: "SBIN-EQ", SymbolToken: "3045", TransactionType: "BUY", Exchange: "NSE", OrderType: "LIMIT", ProductType: "INTRADAY", Duration: "DAY", Price: "19500", SquareOff: "0", StopLoss: "0", Quantity: "1"}) - - if err != nil { - fmt.Println(err.Error()) - return - } - - fmt.Println("Placed Order ID and Script :- ", order) -} -``` -## Websocket Data Streaming -```golang -package main - -import ( - "fmt" - SmartApi "github.com/angel-one/smartapigo" - "github.com/angel-one/smartapigo/websocket" - "time" -) - -var socketClient *websocket.SocketClient - -// Triggered when any error is raised -func onError(err error) { - fmt.Println("Error: ", err) -} - -// Triggered when websocket connection is closed -func onClose(code int, reason string) { - fmt.Println("Close: ", code, reason) -} - -// Triggered when connection is established and ready to send and accept data -func onConnect() { - fmt.Println("Connected") - err := socketClient.Subscribe() - if err != nil { - fmt.Println("err: ", err) - } -} - -// Triggered when a message is received -func onMessage(message []map[string]interface{}) { - fmt.Printf("Message Received :- %v\n",message) -} - -// Triggered when reconnection is attempted which is enabled by default -func onReconnect(attempt int, delay time.Duration) { - fmt.Printf("Reconnect attempt %d in %fs\n", attempt, delay.Seconds()) -} - -// Triggered when maximum number of reconnect attempt is made and the program is terminated -func onNoReconnect(attempt int) { - fmt.Printf("Maximum no of reconnect attempt reached: %d\n", attempt) -} - -func main() { - - // Create New Angel Broking Client - ABClient := SmartApi.New("ClientCode", "Password","API Key") - - // User Login and Generate User Session - session, err := ABClient.GenerateSession() - - if err != nil { - fmt.Println(err.Error()) - return - } - - //Get User Profile - session.UserProfile, err = ABClient.GetUserProfile() - - if err != nil { - fmt.Println(err.Error()) - return - } - - // New Websocket Client - socketClient = websocket.New(session.ClientCode,session.FeedToken,"nse_cm|17963&nse_cm|3499&nse_cm|11536&nse_cm|21808&nse_cm|317") - - // Assign callbacks - socketClient.OnError(onError) - socketClient.OnClose(onClose) - socketClient.OnMessage(onMessage) - socketClient.OnConnect(onConnect) - socketClient.OnReconnect(onReconnect) - socketClient.OnNoReconnect(onNoReconnect) - - // Start Consuming Data - socketClient.Serve() - -} -``` - -## Examples -Check example folder for more examples. - -You can run the following after updating the Credentials in the examples: -``` -go run example/example.go -``` -For websocket example -``` -go run example/websocket/example.go -``` - -## Run unit tests - -``` -go test -v -``` diff --git a/example/example.go b/example/example.go index 335d236..f69df3a 100644 --- a/example/example.go +++ b/example/example.go @@ -13,7 +13,7 @@ func main() { fmt.Println("Client :- ", ABClient) // User Login and Generate User Session - session, err := ABClient.GenerateSession("your totp here") + session, err := ABClient.GenerateSession() if err != nil { fmt.Println(err.Error()) diff --git a/go.mod b/go.mod index 967550e..87b2f51 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/angel-one/smartapigo -go 1.14 +go 1.17 require ( - github.com/gorilla/websocket v1.4.2 - github.com/jarcoal/httpmock v1.0.6 + github.com/gorilla/websocket v1.5.0 + github.com/jarcoal/httpmock v1.2.0 ) diff --git a/go.sum b/go.sum index de3c71d..5cee55c 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,8 @@ -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= -github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/jarcoal/httpmock v1.0.6 h1:e81vOSexXU3mJuJ4l//geOmKIt+Vkxerk1feQBC8D0g= -github.com/jarcoal/httpmock v1.0.6/go.mod h1:ATjnClrvW/3tijVmpL/va5Z3aAyGvqU3gCT8nX0Txik= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jarcoal/httpmock v1.2.0 h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc= +github.com/jarcoal/httpmock v1.2.0/go.mod h1:oCoTsnAz4+UoOUIf5lJOWV2QQIW5UoeUI6aM2YnWAZk= +github.com/maxatome/go-testdeep v1.11.0 h1:Tgh5efyCYyJFGUYiT0qxBSIDeXw0F5zSoatlou685kk= +github.com/maxatome/go-testdeep v1.11.0/go.mod h1:011SgQ6efzZYAen6fDn4BqQ+lUR72ysdyKe7Dyogw70= diff --git a/market.go b/market.go index c41f092..2a4b330 100644 --- a/market.go +++ b/market.go @@ -1,6 +1,8 @@ package smartapigo -import "net/http" +import ( + "net/http" +) // LTPResponse represents LTP API Response. type LTPResponse struct { diff --git a/model/smartstream.go b/model/smartstream.go new file mode 100644 index 0000000..22d9b63 --- /dev/null +++ b/model/smartstream.go @@ -0,0 +1,104 @@ +package model + +type ExchangeType int +type SmartStreamAction int8 +type SmartStreamSubsMode int8 + +const BYTES int = 20 + +const ( + NSECM ExchangeType = 1 + NSEFO ExchangeType = 2 + BSECM ExchangeType = 3 + BSEFO ExchangeType = 4 + MCXFO ExchangeType = 5 + NCXFO ExchangeType = 7 + CDEFO ExchangeType = 13 +) + +const ( + SUBS SmartStreamAction = 1 + UNSUBS SmartStreamAction = 0 +) + +const ( + LTP SmartStreamSubsMode = 1 + QUOTE SmartStreamSubsMode = 2 + SNAPQUOTE SmartStreamSubsMode = 3 +) + +type TokenInfo struct { + ExchangeType ExchangeType + Token string +} + +type SmartApiBBSInfo struct { + Flag uint16 + Quantity uint64 + Price uint64 + NumberOfOrders uint16 +} + +type LTPInfo struct { + TokenInfo TokenInfo + SequenceNumber uint64 + ExchangeFeedTimeEpochMillis uint64 + LastTradedPrice uint64 +} + +type Quote struct { + TokenInfo TokenInfo + SequenceNumber uint64 + ExchangeFeedTimeEpochMillis uint64 + LastTradedPrice uint64 + LastTradedQty uint64 + AvgTradedPrice uint64 + VolumeTradedToday uint64 + TotalBuyQty float64 + TotalSellQty float64 + OpenPrice uint64 + HighPrice uint64 + LowPrice uint64 + ClosePrice uint64 +} + +type SnapQuote struct { + TokenInfo TokenInfo + SequenceNumber uint64 + ExchangeFeedTimeEpochMillis uint64 + LastTradedPrice uint64 + LastTradedQty uint64 + AvgTradedPrice uint64 + VolumeTradedToday uint64 + TotalBuyQty float64 + TotalSellQty float64 + OpenPrice uint64 + HighPrice uint64 + LowPrice uint64 + ClosePrice uint64 + LastTradedTimestamp uint64 + OpenInterest uint64 + OpenInterestChangePerc float64 + BestFiveBuy []SmartApiBBSInfo + BestFiveSell []SmartApiBBSInfo + UpperCircuit uint64 + LowerCircuit uint64 + YearlyHighPrice uint64 + YearlyLowPrice uint64 +} + +type SubscriptionRequest struct { + CorrelationID string `json:"correlationID"` + Action int8 `json:"action"` + Params SubscriptionParam `json:"params"` +} + +type SubscriptionParam struct { + Mode SmartStreamSubsMode `json:"mode"` + TokenList []SubscriptionTokens `json:"tokenList"` +} + +type SubscriptionTokens struct { + ExchangeType ExchangeType `json:"exchangeType"` + Tokens []string `json:"tokens"` +} diff --git a/smartstream/internal/parser/parser.go b/smartstream/internal/parser/parser.go new file mode 100644 index 0000000..b6b4f1a --- /dev/null +++ b/smartstream/internal/parser/parser.go @@ -0,0 +1,101 @@ +package parser + +import ( + "encoding/binary" + "github.com/angel-one/smartapigo/model" + "math" +) + +func ParseLTP(msg []byte) model.LTPInfo { + sequenceNumer := binary.LittleEndian.Uint64(msg[27:35]) + exchangeTimestamp := binary.LittleEndian.Uint64(msg[35:43]) + ltp := binary.LittleEndian.Uint64(msg[43:51]) + + ltpInfo := model.LTPInfo{TokenInfo: getToken(msg), SequenceNumber: sequenceNumer, + ExchangeFeedTimeEpochMillis: exchangeTimestamp, LastTradedPrice: ltp} + + return ltpInfo +} + +func ParseQuote(msg []byte) model.Quote { + quote := model.Quote{} + ltpInfo := ParseLTP(msg) + quote.SequenceNumber = ltpInfo.SequenceNumber + quote.TokenInfo = ltpInfo.TokenInfo + quote.ExchangeFeedTimeEpochMillis = ltpInfo.ExchangeFeedTimeEpochMillis + quote.LastTradedPrice = ltpInfo.LastTradedPrice + quote.LastTradedQty = binary.LittleEndian.Uint64(msg[51:59]) + quote.AvgTradedPrice = binary.LittleEndian.Uint64(msg[59:67]) + quote.VolumeTradedToday = binary.LittleEndian.Uint64(msg[67:75]) + quote.TotalBuyQty = math.Float64frombits(binary.LittleEndian.Uint64(msg[75:83])) + quote.TotalSellQty = math.Float64frombits(binary.LittleEndian.Uint64(msg[83:91])) + quote.OpenPrice = binary.LittleEndian.Uint64(msg[91:99]) + quote.HighPrice = binary.LittleEndian.Uint64(msg[99:107]) + quote.LowPrice = binary.LittleEndian.Uint64(msg[107:115]) + quote.ClosePrice = binary.LittleEndian.Uint64(msg[115:123]) + return quote +} + +func ParseSnapquote(msg []byte) model.SnapQuote { + snapquote := model.SnapQuote{} + quote := ParseQuote(msg) + snapquote.SequenceNumber = quote.SequenceNumber + snapquote.TokenInfo = quote.TokenInfo + snapquote.ExchangeFeedTimeEpochMillis = quote.ExchangeFeedTimeEpochMillis + snapquote.LastTradedPrice = quote.LastTradedPrice + snapquote.LastTradedQty = quote.LastTradedQty + snapquote.AvgTradedPrice = quote.AvgTradedPrice + snapquote.VolumeTradedToday = quote.VolumeTradedToday + snapquote.TotalBuyQty = quote.TotalBuyQty + snapquote.TotalSellQty = quote.TotalSellQty + snapquote.OpenPrice = quote.OpenPrice + snapquote.HighPrice = quote.HighPrice + snapquote.LowPrice = quote.LowPrice + snapquote.ClosePrice = quote.ClosePrice + + snapquote.LastTradedTimestamp = binary.LittleEndian.Uint64(msg[123:131]) + snapquote.OpenInterest = binary.LittleEndian.Uint64(msg[131:139]) + snapquote.OpenInterestChangePerc = math.Float64frombits(binary.LittleEndian.Uint64(msg[139:147])) + + snapquote.BestFiveBuy, snapquote.BestFiveSell = getBestBuySellData(msg[147:347]) + + snapquote.UpperCircuit = binary.LittleEndian.Uint64(msg[347:355]) + snapquote.LowerCircuit = binary.LittleEndian.Uint64(msg[355:363]) + snapquote.YearlyHighPrice = binary.LittleEndian.Uint64(msg[363:371]) + snapquote.YearlyLowPrice = binary.LittleEndian.Uint64(msg[371:379]) + + return snapquote +} + +func getBestBuySellData(msg []byte) (bestFiveBuy []model.SmartApiBBSInfo, bestFiveSell []model.SmartApiBBSInfo) { + bestFiveBuy = make([]model.SmartApiBBSInfo, 0) + bestFiveSell = make([]model.SmartApiBBSInfo, 0) + for i := 0; i < 200; i = i + 20 { + info := model.SmartApiBBSInfo{} + info.Flag = binary.LittleEndian.Uint16(msg[i : i+2]) + info.Quantity = binary.LittleEndian.Uint64(msg[i+2 : i+10]) + info.Price = binary.LittleEndian.Uint64(msg[i+10 : i+18]) + info.NumberOfOrders = binary.LittleEndian.Uint16(msg[i+18 : i+20]) + if info.Flag == 1 { + bestFiveBuy = append(bestFiveBuy, info) + } else { + bestFiveSell = append(bestFiveSell, info) + } + + } + + return +} + +func getToken(msg []byte) model.TokenInfo { + exchangeType := model.ExchangeType(msg[1]) + tokenEnd := 0 + + for i := 2; i < 27; i++ { + tokenEnd++ + if int(msg[i]) == 0 { + break + } + } + return model.TokenInfo{ExchangeType: exchangeType, Token: string(msg[2 : tokenEnd+1])} +} diff --git a/smartstream/socket.go b/smartstream/socket.go new file mode 100644 index 0000000..00cd4a9 --- /dev/null +++ b/smartstream/socket.go @@ -0,0 +1,462 @@ +package smartstream + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "github.com/angel-one/smartapigo/model" + "github.com/angel-one/smartapigo/smartstream/internal/parser" + "github.com/gorilla/websocket" + "log" + "math" + "net/http" + "net/url" + "sync" + "time" +) + +type WebSocket struct { + clientID string + feedToken string + callbacks callbacks + subsMap map[model.SmartStreamSubsMode][]model.TokenInfo + Conn *websocket.Conn + url url.URL + autoReconnect bool + reconnectMaxRetries int + reconnectMaxDelay time.Duration + connectTimeout time.Duration + reconnectAttempt int + cancel context.CancelFunc + lastPongTime time.Time + subroutineContext context.Context + subroutineCancel context.CancelFunc +} + +//MessageHandler Handler interface for handling messages received over smartstream websocket +type callbacks struct { + onLTP func(ltpInfo model.LTPInfo) + onQuote func(quote model.Quote) + onSnapquote func(quote model.SnapQuote) + onText func(text []byte) + onConnected func() + onReconnectFailed func(reconnectAttempt int) + onReconnect func(attempt int, nextDelay time.Duration) + onError func(err error) + onClose func(int, string) +} + +var ( + // Default ticker url. + substreamURL = url.URL{Scheme: "ws", Host: "smartapisocket.angelone.in", Path: "/smart-stream"} +) + +const ( + // Auto reconnect defaults + // Default maximum number of reconnect attempts + defaultReconnectMaxAttempts = 300 + // Auto reconnect min delay. Reconnect delay can't be less than this. + reconnectMinDelay time.Duration = 5000 * time.Millisecond + // Default auto reconnect delay to be used for auto reconnection. + defaultReconnectMaxDelay time.Duration = 60000 * time.Millisecond + // Connect timeout for initial server handshake. + defaultConnectTimeout time.Duration = 7000 * time.Millisecond + // Interval in which the connection check is performed periodically. + connectionCheckInterval time.Duration = 10000 * time.Millisecond + + writeWait = 5 * time.Second + pingPeriod = 10 * time.Second + idleTimeout = 15 * time.Second + sessionCheckPeriod = 5 * time.Second + + //Headers for connection + clientIDHeader = "x-client-code" + feedTokenHeader = "x-feed-token" + clientLibHeader = "x-client-lib" +) + +// New creates a new socket client instance. +func New(clientID string, feedToken string) *WebSocket { + ws := &WebSocket{ + clientID: clientID, + feedToken: feedToken, + url: substreamURL, + autoReconnect: true, + reconnectMaxDelay: defaultReconnectMaxDelay, + reconnectMaxRetries: defaultReconnectMaxAttempts, + connectTimeout: defaultConnectTimeout, + subsMap: make(map[model.SmartStreamSubsMode][]model.TokenInfo), + } + + return ws +} + +// SetRootURL sets ticker root url. +func (ws *WebSocket) SetRootURL(u url.URL) { + ws.url = u +} + +// SetAccessToken set access token. +func (ws *WebSocket) SetFeedToken(feedToken string) { + ws.feedToken = feedToken +} + +// SetConnectTimeout sets default timeout for initial connect handshake +func (ws *WebSocket) SetConnectTimeout(val time.Duration) { + ws.connectTimeout = val +} + +// SetAutoReconnect enable/disable auto reconnect. +func (ws *WebSocket) SetAutoReconnect(val bool) { + ws.autoReconnect = val +} + +// SetReconnectMaxDelay sets maximum auto reconnect delay. +func (ws *WebSocket) SetReconnectMaxDelay(val time.Duration) error { + if val > reconnectMinDelay { + return fmt.Errorf("ReconnectMaxDelay can't be less than %fms", reconnectMinDelay.Seconds()*1000) + } + + ws.reconnectMaxDelay = val + return nil +} + +// SetReconnectMaxRetries sets maximum reconnect attempts. +func (ws *WebSocket) SetReconnectMaxRetries(val int) { + ws.reconnectMaxRetries = val +} + +func (ws *WebSocket) SetOnConnected(fn func()) { + if fn != nil { + ws.callbacks.onConnected = fn + } +} + +func (ws *WebSocket) SetOnSnapquote(fn func(model.SnapQuote)) { + if fn != nil { + ws.callbacks.onSnapquote = fn + } +} + +func (ws *WebSocket) SetOnLTP(fn func(info model.LTPInfo)) { + if fn != nil { + ws.callbacks.onLTP = fn + } +} + +func (ws *WebSocket) SetOnQuote(fn func(quote model.Quote)) { + if fn != nil { + ws.callbacks.onQuote = fn + } +} + +func (ws *WebSocket) SetOnError(fn func(err error)) { + if fn != nil { + ws.callbacks.onError = fn + } +} + +func (ws *WebSocket) SetOnReconnect(fn func(attempt int, nextDelay time.Duration)) { + if fn != nil { + ws.callbacks.onReconnect = fn + } +} + +func (ws *WebSocket) SetOnReconnectFailed(fn func(attempt int)) { + if fn != nil { + ws.callbacks.onReconnectFailed = fn + } +} + +func (ws *WebSocket) SetOnClose(fn func(int, string)) { + if fn != nil { + ws.callbacks.onClose = fn + } +} + +func (ws *WebSocket) Connect() error { + return ws.ConnectWithContext(context.Background()) +} + +func (ws *WebSocket) ConnectWithContext(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + ws.cancel = cancel + defer func() { + ws.Stop() + }() + for { + select { + case <-ctx.Done(): + return nil + default: + if ws.reconnectAttempt > ws.reconnectMaxRetries { + ws.onReconnectFailed(ws.reconnectAttempt) + } + if ws.reconnectAttempt > 0 { + nextDelay := time.Duration(math.Pow(2, float64(ws.reconnectAttempt))) * time.Second + if nextDelay > ws.reconnectMaxDelay || nextDelay <= 0 { + nextDelay = ws.reconnectMaxDelay + } + + ws.onReconnect(ws.reconnectAttempt, nextDelay) + log.Printf("attempting reconnect in %f seconds", nextDelay.Seconds()) + time.Sleep(nextDelay) + + if ws.Conn != nil { // Closing previous connection + ws.Conn.Close() + } + } + + err := ws.createConnection() + + if err != nil { + ws.onError(err) + if ws.autoReconnect { + ws.reconnectAttempt++ + continue + } + return err + } + ws.onConnected() + ws.subroutineContext, ws.subroutineCancel = context.WithCancel(context.Background()) + go ws.startPing() + + var wg sync.WaitGroup + + // Receive stream data + wg.Add(1) + go ws.readMessage(&wg) + + wg.Add(1) + go ws.checkIdleConnection(&wg) + + wg.Wait() + + } + } +} + +func (ws *WebSocket) onReconnectFailed(reconnectAttempt int) { + if ws.callbacks.onReconnectFailed != nil { + ws.callbacks.onReconnectFailed(reconnectAttempt) + } + +} + +func (ws *WebSocket) onReconnect(attempt int, delay time.Duration) { + if ws.callbacks.onReconnect != nil { + ws.callbacks.onReconnect(attempt, delay) + } +} + +func (ws *WebSocket) onConnected() { + + if ws.reconnectAttempt > 0 { + err := ws.resubscribe() + if err != nil { + return + } + ws.reconnectAttempt = 0 + } else { + if ws.callbacks.onConnected != nil { + ws.callbacks.onConnected() + } + } +} + +func (ws *WebSocket) onError(err error) { + if ws.callbacks.onError != nil { + ws.callbacks.onError(err) + } +} + +func (ws *WebSocket) resubscribe() (err error) { + for k, v := range ws.subsMap { + err = ws.subscribeToTokens(k, v) + if err != nil { + return + } + } + return +} + +func (ws *WebSocket) Subscribe(mode model.SmartStreamSubsMode, tokenIds []model.TokenInfo) error { + err := ws.subscribeToTokens(mode, tokenIds) + if err == nil { + if _, ok := ws.subsMap[mode]; !ok { + ws.subsMap[mode] = make([]model.TokenInfo, 0) + } + ws.subsMap[mode] = append(ws.subsMap[mode], tokenIds...) + } + return err +} + +func (ws *WebSocket) subscribeToTokens(mode model.SmartStreamSubsMode, tokenIds []model.TokenInfo) error { + request, err := ws.createSubsRequest(mode, tokenIds) + if err != nil { + return err + } + err = ws.Conn.WriteMessage(websocket.TextMessage, request) + return err +} + +func (ws *WebSocket) onClose(code int, text string) error { + fmt.Printf("connection closed ") + if ws.callbacks.onClose != nil { + ws.callbacks.onClose(code, text) + } + return nil +} + +func (ws *WebSocket) Stop() { + ws.closeRoutines() + if ws.cancel != nil { + ws.cancel() + } +} + +func (ws *WebSocket) closeRoutines() { + if ws.subroutineCancel != nil { + ws.subroutineCancel() + if ws.Conn != nil { + ws.Conn.Close() + } + } +} + +func (ws *WebSocket) onPong(appData string) error { + ws.lastPongTime = time.Now() + return nil +} + +func (ws *WebSocket) onTextMessage(text []byte) { + if ws.callbacks.onText != nil { + ws.callbacks.onText(text) + } +} + +func (ws *WebSocket) createConnection() error { + dialer := websocket.DefaultDialer + dialer.HandshakeTimeout = ws.connectTimeout + dialer.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: true, + } + headers := http.Header{} + headers.Add(clientIDHeader, ws.clientID) + headers.Add(feedTokenHeader, ws.feedToken) + headers.Add(clientLibHeader, "GOLANG") + + conn, _, err := dialer.Dial(ws.url.String(), headers) + if err != nil { + return err + } + conn.SetCloseHandler(ws.onClose) + conn.SetPongHandler(ws.onPong) + ws.Conn = conn + + return nil + +} + +func (ws *WebSocket) readMessage(wg *sync.WaitGroup) { + defer wg.Done() + for { + select { + case <-ws.subroutineContext.Done(): + return + default: + mType, msg, err := ws.Conn.ReadMessage() + if err != nil { + ws.onError(fmt.Errorf("Error reading data: %v", err)) + return + } + + //Parsing binary data + if mType == websocket.BinaryMessage { + mode := model.SmartStreamSubsMode(msg[0]) + + switch mode { + case model.LTP: + ltp := parser.ParseLTP(msg) + ws.callbacks.onLTP(ltp) + case model.QUOTE: + quote := parser.ParseQuote(msg) + ws.callbacks.onQuote(quote) + case model.SNAPQUOTE: + snapquote := parser.ParseSnapquote(msg) + ws.callbacks.onSnapquote(snapquote) + default: + log.Printf("Message mode not recognized") + } + + } else if mType == websocket.TextMessage { + ws.onTextMessage(msg) + } + } + } +} + +func (ws *WebSocket) startPing() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + }() + for { + select { + case <-ws.subroutineContext.Done(): + return + case <-ticker.C: + ws.Conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := ws.Conn.WriteMessage(websocket.PingMessage, []byte("ping")); err != nil { + return + } + } + } +} + +func (ws *WebSocket) checkIdleConnection(wg *sync.WaitGroup) { + defer wg.Done() + time.Sleep(pingPeriod * 2) + ticker := time.NewTicker(sessionCheckPeriod) + defer func() { + ticker.Stop() + }() + for { + select { + case <-ticker.C: + if time.Since(ws.lastPongTime).Seconds() > idleTimeout.Seconds() { + log.Printf("ping not received. Reconnecting ...") + ws.closeRoutines() + ws.reconnectAttempt++ + return + } + } + } +} + +func (ws *WebSocket) createSubsRequest(mode model.SmartStreamSubsMode, tokenIds []model.TokenInfo) ([]byte, error) { + + exchangeTokenMap := make(map[model.ExchangeType][]string) + for _, val := range tokenIds { + if _, ok := exchangeTokenMap[val.ExchangeType]; !ok { + exchangeTokenMap[val.ExchangeType] = make([]string, 0) + } + exchangeTokenMap[val.ExchangeType] = append(exchangeTokenMap[val.ExchangeType], val.Token) + } + + tokenList := make([]model.SubscriptionTokens, 0) + for k, v := range exchangeTokenMap { + subscriptionTokens := model.SubscriptionTokens{ExchangeType: k, Tokens: v} + tokenList = append(tokenList, subscriptionTokens) + } + params := model.SubscriptionParam{Mode: mode, TokenList: tokenList} + + subscriptionRequest := model.SubscriptionRequest{} + subscriptionRequest.Action = 1 + subscriptionRequest.Params = params + subscriptionRequest.CorrelationID = "abc" + + return json.Marshal(subscriptionRequest) + +} diff --git a/smartstream/socket_test.go b/smartstream/socket_test.go new file mode 100644 index 0000000..7fda5c9 --- /dev/null +++ b/smartstream/socket_test.go @@ -0,0 +1,28 @@ +package smartstream + +import ( + "github.com/angel-one/smartapigo/model" + "log" + "testing" +) + +var client *WebSocket + +func TestSmartStream(t *testing.T) { + client = New("A586457", "00998877") + client.callbacks.onConnected = onConnected + client.callbacks.onSnapquote = onSnapquote + client.Connect() +} + +func onConnected() { + log.Printf("connected") + err := client.Subscribe(model.SNAPQUOTE, []model.TokenInfo{model.TokenInfo{ExchangeType: model.NSECM, Token: "1594"}}) + if err != nil { + log.Printf("error while subscribing") + } +} + +func onSnapquote(snapquote model.SnapQuote) { + log.Printf("%d", snapquote.BestFiveSell[0]) +} diff --git a/user.go b/user.go index e38cf31..2098fad 100644 --- a/user.go +++ b/user.go @@ -14,9 +14,10 @@ type UserSession struct { type UserSessionTokens struct { AccessToken string `json:"jwtToken"` RefreshToken string `json:"refreshToken"` - FeedToken string `json:"feedToken"` + FeedToken string `json:"feedToken"` } + // UserProfile represents a user's personal and financial profile. type UserProfile struct { ClientCode string `json:"clientcode"` @@ -29,19 +30,18 @@ type UserProfile struct { Exchanges []string `json:"exchanges"` } + // GenerateSession gets a user session details in exchange of username and password. // Access token is automatically set if the session is retrieved successfully. // Do the token exchange with the `requestToken` obtained after the login flow, // and retrieve the `accessToken` required for all subsequent requests. The // response contains not just the `accessToken`, but metadata for the user who has authenticated. -//totp used is required for 2 factor authentication -func (c *Client) GenerateSession(totp string) (UserSession, error) { +func (c *Client) GenerateSession() (UserSession, error) { // construct url values params := make(map[string]interface{}) params["clientcode"] = c.clientCode - params["password"] = c.password - params["totp"] = totp + params["password"] = c.password var session UserSession err := c.doEnvelope(http.MethodPost, URILogin, params, nil, &session) @@ -52,6 +52,7 @@ func (c *Client) GenerateSession(totp string) (UserSession, error) { return session, err } + // RenewAccessToken renews expired access token using valid refresh token. func (c *Client) RenewAccessToken(refreshToken string) (UserSessionTokens, error) { @@ -59,7 +60,7 @@ func (c *Client) RenewAccessToken(refreshToken string) (UserSessionTokens, error params["refreshToken"] = refreshToken var session UserSessionTokens - err := c.doEnvelope(http.MethodPost, URIUserSessionRenew, params, nil, &session, true) + err := c.doEnvelope(http.MethodPost, URIUserSessionRenew, params, nil, &session,true) // Set accessToken on successful session retrieve if err == nil && session.AccessToken != "" { @@ -69,10 +70,11 @@ func (c *Client) RenewAccessToken(refreshToken string) (UserSessionTokens, error return session, err } + // GetUserProfile gets user profile. func (c *Client) GetUserProfile() (UserProfile, error) { var userProfile UserProfile - err := c.doEnvelope(http.MethodGet, URIUserProfile, nil, nil, &userProfile, true) + err := c.doEnvelope(http.MethodGet, URIUserProfile, nil, nil, &userProfile,true) return userProfile, err } @@ -81,9 +83,9 @@ func (c *Client) Logout() (bool, error) { var status bool params := map[string]interface{}{} params["clientcode"] = c.clientCode - err := c.doEnvelope(http.MethodPost, URILogout, params, nil, nil, true) - if err == nil { + err := c.doEnvelope(http.MethodPost, URILogout, params, nil, nil,true) + if err == nil{ status = true } - return status, err -} + return status,err +} \ No newline at end of file