diff --git a/README.md b/README.md index 7d6eaad..305e976 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # INTERX +For test INTERX is an interchain engine, proxy, load balancer & security gateway service for communication between backend and frontend. It will connect to the node using the GRPC endpoint as well as the RPC endpoint ([`Tendermint RPC`](https://docs.tendermint.com/master/rpc/)). @@ -509,4 +510,4 @@ Remember this settings when you set/update manually from `config.json`. ### How to update caching configurations All caching configurations are set in `config.json` file. -`config.json` file includes `rpc_methods` field and there you can set/update caching config of each endpoint. \ No newline at end of file +`config.json` file includes `rpc_methods` field and there you can set/update caching config of each endpoint. diff --git a/common/gateway.go b/common/gateway.go index 69c7155..0d7be12 100644 --- a/common/gateway.go +++ b/common/gateway.go @@ -3,6 +3,7 @@ package common import ( "encoding/base64" "encoding/json" + "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -344,3 +345,9 @@ func RosettaBuildError(code int, message string, description string, retriable b func RosettaServeError(code int, data string, message string, statusCode int) (interface{}, interface{}, int) { return nil, RosettaBuildError(code, message, data, true, nil), statusCode } + +// BuildTxSearchEndpoint creates a tx_search endpoint. +func BuildTxSearchEndpoint(rpcAddr string, query string, page int, limit int, orderBy string) string { + return fmt.Sprintf("%s/tx_search?query=\"%s\"&page=%d&per_page=%d&order_by=\"%s\"", + rpcAddr, query, page, limit, orderBy) +} diff --git a/config/constants.go b/config/constants.go index bcc7a8f..cf84c2f 100755 --- a/config/constants.go +++ b/config/constants.go @@ -1,6 +1,7 @@ package config const ( + InterxVersion = "v0.4.49" SekaiVersion = "v0.3.45" CosmosVersion = "v0.47.6" diff --git a/database/transactions.go b/database/transactions.go index 5b0dbf0..071e0dd 100644 --- a/database/transactions.go +++ b/database/transactions.go @@ -13,9 +13,18 @@ import ( // GetTransactions is a function to get user transactions from cache func GetTransactions(address string, isWithdraw bool) (*tmTypes.ResultTxSearch, error) { - filePath := fmt.Sprintf("%s/transactions/%s", config.GetDbCacheDir(), address) - if !isWithdraw { - filePath = filePath + "-inbound" + var filePath string + + basePath := fmt.Sprintf("%s/transactions", config.GetDbCacheDir()) + suffix := "-inbound" + if isWithdraw { + suffix = "" + } + + if address == "" { + filePath = fmt.Sprintf("%s/all-transactions%s", basePath, suffix) + } else { + filePath = fmt.Sprintf("%s/%s%s", basePath, address, suffix) } data := tmTypes.ResultTxSearch{} @@ -55,10 +64,12 @@ func GetLastBlockFetched(address string, isWithdraw bool) int64 { func SaveTransactions(address string, txsData tmTypes.ResultTxSearch, isWithdraw bool) error { cachedData, _ := GetTransactions(address, isWithdraw) - // Append new txs to the cached txs array - if cachedData.TotalCount > 0 { - txsData.Txs = append(txsData.Txs, cachedData.Txs...) - txsData.TotalCount = txsData.TotalCount + cachedData.TotalCount + if address != "" { + // Append new txs to the cached txs array + if cachedData.TotalCount > 0 { + txsData.Txs = append(txsData.Txs, cachedData.Txs...) + txsData.TotalCount = txsData.TotalCount + cachedData.TotalCount + } } data, err := json.Marshal(txsData) @@ -67,10 +78,8 @@ func SaveTransactions(address string, txsData tmTypes.ResultTxSearch, isWithdraw } folderPath := fmt.Sprintf("%s/transactions", config.GetDbCacheDir()) - filePath := fmt.Sprintf("%s/%s", folderPath, address) - if !isWithdraw { - filePath = filePath + "-inbound" - } + fileName := resolveFileName(address, isWithdraw) + filePath := fmt.Sprintf("%s/%s", folderPath, fileName) global.Mutex.Lock() err = os.MkdirAll(folderPath, os.ModePerm) @@ -85,8 +94,21 @@ func SaveTransactions(address string, txsData tmTypes.ResultTxSearch, isWithdraw global.Mutex.Unlock() if err != nil { - fmt.Println("[cache] Unable to save response: ", filePath) + fmt.Println("[SaveTransactions][cache] Unable to save response: ", filePath) } return err } + +// Helper function to determine the file name +func resolveFileName(address string, isWithdraw bool) string { + if address == "" { + address = "all-transactions" + } + + if !isWithdraw { + return fmt.Sprintf("%s-inbound", address) + } + + return address +} diff --git a/gateway/interx/interx.tx.go b/gateway/interx/interx.tx.go index 7d72a49..5d41296 100644 --- a/gateway/interx/interx.tx.go +++ b/gateway/interx/interx.tx.go @@ -26,11 +26,6 @@ import ( "golang.org/x/exp/slices" ) -type TxsResponse struct { - Transactions []types.TransactionResponse `json:"transactions"` - TotalCount int `json:"total_count"` -} - // RegisterInterxTxRoutes registers tx query routers. func RegisterInterxTxRoutes(r *mux.Router, gwCosmosmux *runtime.ServeMux, rpcAddr string) { r.HandleFunc(config.QueryUnconfirmedTxs, QueryUnconfirmedTxs(rpcAddr)).Methods("GET") @@ -47,10 +42,6 @@ func GetTransactionsWithSync(rpcAddr string, address string, isOutbound bool) (* var limit = 100 var limitPages = 100 - if address == "" { - return &tmTypes.ResultTxSearch{}, nil - } - lastBlock := database.GetLastBlockFetched(address, isOutbound) totalResult := tmTypes.ResultTxSearch{ Txs: []*tmTypes.ResultTx{}, @@ -59,15 +50,23 @@ func GetTransactionsWithSync(rpcAddr string, address string, isOutbound bool) (* for page < limitPages { var events = make([]string, 0, 5) - if isOutbound { - events = append(events, fmt.Sprintf("message.sender='%s'", address)) + if address != "" { + if isOutbound { + events = append(events, fmt.Sprintf("message.sender='%s'", address)) + } else { + events = append(events, fmt.Sprintf("transfer.recipient='%s'", address)) + } + events = append(events, fmt.Sprintf("tx.height>%d", lastBlock)) + } + + var query string + if address == "" { + query = "tx.height>0" } else { - events = append(events, fmt.Sprintf("transfer.recipient='%s'", address)) + query = strings.Join(events, "%20AND%20") } - events = append(events, fmt.Sprintf("tx.height>%d", lastBlock)) - // search transactions - endpoint := fmt.Sprintf("%s/tx_search?query=\"%s\"&page=%d&per_page=%d&order_by=\"desc\"", rpcAddr, strings.Join(events, "%20AND%20"), page, limit) + endpoint := common.BuildTxSearchEndpoint(rpcAddr, query, page, limit, "desc") common.GetLogger().Info("[query-transaction] Entering transaction search: ", endpoint) resp, err := http.Get(endpoint) @@ -131,31 +130,23 @@ func GetFilteredTransactions(rpcAddr string, address string, txtypes []string, d Txs: []*tmTypes.ResultTx{}, TotalCount: 0, } + if len(directions) == 0 { directions = []string{"inbound", "outbound"} } - if slices.Contains(directions, "inbound") { - cachedTxs1, err := GetTransactionsWithSync(rpcAddr, address, false) - for _, cachedTx := range cachedTxs1.Txs { - hashToDirectionMap[cachedTx.Hash.String()] = append(hashToDirectionMap[cachedTx.Hash.String()], "inbound") - } - if err != nil { - return nil, err - } - cachedTxs.TotalCount += cachedTxs1.TotalCount - cachedTxs.Txs = append(cachedTxs.Txs, cachedTxs1.Txs...) - } - if slices.Contains(directions, "outbound") { - cachedTxs2, err := GetTransactionsWithSync(rpcAddr, address, true) - for _, cachedTx := range cachedTxs2.Txs { - hashToDirectionMap[cachedTx.Hash.String()] = append(hashToDirectionMap[cachedTx.Hash.String()], "outbound") - } - if err != nil { - return nil, err + if address != "" { + for _, direction := range directions { + switch direction { + case "inbound": + cachedTxs, _ = processDirection(rpcAddr, address, false, "inbound", hashToDirectionMap, &cachedTxs) + + case "outbound": + cachedTxs, _ = processDirection(rpcAddr, address, true, "outbound", hashToDirectionMap, &cachedTxs) + } } - cachedTxs.TotalCount += cachedTxs2.TotalCount - cachedTxs.Txs = append(cachedTxs.Txs, cachedTxs2.Txs...) + } else { + cachedTxs, _ = processDirection(rpcAddr, address, true, "outbound", hashToDirectionMap, &cachedTxs) } var res []types.TransactionResponse @@ -224,6 +215,7 @@ func GetFilteredTransactions(rpcAddr string, address string, txtypes []string, d Status: hashStatus, Direction: hashToDirectionMap[cachedTx.Hash.String()][0], Hash: fmt.Sprintf("0x%X", cachedTx.Hash), + Height: cachedTx.Height, Txs: txResponses, } if len(hashToDirectionMap[cachedTx.Hash.String()]) > 1 { @@ -306,39 +298,6 @@ func SearchTxHashHandle(rpcAddr string, sender string, recipient string, txType return result, nil } -// Get block height for tx hash from cache or tendermint -func getBlockHeight(rpcAddr string, hash string) (int64, error) { - endpoint := fmt.Sprintf("%s/tx?hash=%s", rpcAddr, hash) - common.GetLogger().Info("[query-block] Entering block query: ", endpoint) - - resp, err := http.Get(endpoint) - if err != nil { - common.GetLogger().Error("[query-block] Unable to connect to ", endpoint) - return 0, err - } - defer resp.Body.Close() - - respBody, _ := ioutil.ReadAll(resp.Body) - response := new(tmJsonRPCTypes.RPCResponse) - - if err := json.Unmarshal(respBody, response); err != nil { - common.GetLogger().Error("[query-block] Unable to decode response: ", err) - return 0, err - } - if response.Error != nil { - common.GetLogger().Error("[query-block] Error response:", response.Error.Message) - return 0, errors.New(response.Error.Message) - } - - result := new(tmTypes.ResultTx) - if err := tmjson.Unmarshal(response.Result, result); err != nil { - common.GetLogger().Error("[query-block] Failed to unmarshal result:", err) - return 0, fmt.Errorf("error unmarshalling result: %w", err) - } - - return result.Height, nil -} - func QueryBlockTransactionsHandler(rpcAddr string, r *http.Request) (interface{}, interface{}, int) { err := r.ParseForm() if err != nil { @@ -371,10 +330,6 @@ func QueryBlockTransactionsHandler(rpcAddr string, r *http.Request) (interface{} //------------ Address ------------ account = r.FormValue("address") - if account == "" { - common.GetLogger().Error("[query-transactions] 'address' is not set") - return common.ServeError(0, "'address' is not set", "", http.StatusBadRequest) - } //------------ Direction ------------ directionsParam := r.FormValue("direction") @@ -509,7 +464,7 @@ func QueryBlockTransactionsHandler(rpcAddr string, r *http.Request) (interface{} } txResults = txResults[offset:int(math.Min(float64(offset+limit), float64(len(txResults))))] - res := TxsResponse{ + res := types.TxsResponse{ TotalCount: totalCount, Transactions: txResults, } @@ -655,3 +610,24 @@ func QueryUnconfirmedTxs(rpcAddr string) http.HandlerFunc { common.WrapResponse(w, request, *response, statusCode, false) } } + +func processDirection( + rpcAddr, address string, + isOutbound bool, + direction string, + hashToDirectionMap map[string][]string, + cachedTxs *tmTypes.ResultTxSearch, +) (tmTypes.ResultTxSearch, error) { + transactions, err := GetTransactionsWithSync(rpcAddr, address, isOutbound) + if err != nil { + return tmTypes.ResultTxSearch{}, err + } + + for _, tx := range transactions.Txs { + hashToDirectionMap[tx.Hash.String()] = append(hashToDirectionMap[tx.Hash.String()], direction) + } + + cachedTxs.TotalCount += transactions.TotalCount + cachedTxs.Txs = append(cachedTxs.Txs, transactions.Txs...) + return *cachedTxs, nil +} diff --git a/gateway/interx/interx.tx_test.go b/gateway/interx/interx.tx_test.go index b4f32ce..b9cf0b4 100644 --- a/gateway/interx/interx.tx_test.go +++ b/gateway/interx/interx.tx_test.go @@ -2,12 +2,16 @@ package interx import ( "encoding/json" + "errors" + "fmt" + "io/ioutil" "net/http" "net/http/httptest" "os" "testing" "time" + "github.com/KiraCore/interx/common" "github.com/KiraCore/interx/config" "github.com/KiraCore/interx/database" "github.com/KiraCore/interx/test" @@ -96,7 +100,7 @@ func (suite *InterxTxTestSuite) TestBlockTransactionsHandler() { suite.Assert() } - resultTxSearch := TxsResponse{} + resultTxSearch := types.TxsResponse{} err = json.Unmarshal(suite.blockTransactionsQueryResponse.Result, &resultTxSearch) suite.Require().NoError(err) suite.Require().EqualValues(result.TotalCount, resultTxSearch.TotalCount) @@ -140,7 +144,7 @@ func TestInterxTxTestSuite(t *testing.T) { txMsg := make(map[string]interface{}) txMsg["type"] = "send" - resBytes, err = json.Marshal(TxsResponse{ + resBytes, err = json.Marshal(types.TxsResponse{ TotalCount: 1, Transactions: []types.TransactionResponse{ { @@ -229,3 +233,36 @@ func TestInterxTxTestSuite(t *testing.T) { tendermintServer.Close() } + +// Get block height for tx hash from cache or tendermint +func getBlockHeight(rpcAddr string, hash string) (int64, error) { + endpoint := fmt.Sprintf("%s/tx?hash=%s", rpcAddr, hash) + common.GetLogger().Info("[query-block] Entering block query: ", endpoint) + + resp, err := http.Get(endpoint) + if err != nil { + common.GetLogger().Error("[query-block] Unable to connect to ", endpoint) + return 0, err + } + defer resp.Body.Close() + + respBody, _ := ioutil.ReadAll(resp.Body) + response := new(tmJsonRPCTypes.RPCResponse) + + if err := json.Unmarshal(respBody, response); err != nil { + common.GetLogger().Error("[query-block] Unable to decode response: ", err) + return 0, err + } + if response.Error != nil { + common.GetLogger().Error("[query-block] Error response:", response.Error.Message) + return 0, errors.New(response.Error.Message) + } + + result := new(tmRPCTypes.ResultTx) + if err := tmjson.Unmarshal(response.Result, result); err != nil { + common.GetLogger().Error("[query-block] Failed to unmarshal result:", err) + return 0, fmt.Errorf("error unmarshalling result: %w", err) + } + + return result.Height, nil +} diff --git a/types/main.go b/types/main.go index a7490fc..5656f54 100644 --- a/types/main.go +++ b/types/main.go @@ -76,6 +76,7 @@ type ResponseSign struct { type TransactionResponse struct { Time int64 `json:"time"` Hash string `json:"hash"` + Height int64 `json:"height"` Status string `json:"status"` Direction string `json:"direction"` Memo string `json:"memo"` @@ -337,3 +338,8 @@ const ( // POST is a constant to refer POST HTTP Method POST string = "POST" ) + +type TxsResponse struct { + Transactions []TransactionResponse `json:"transactions"` + TotalCount int `json:"total_count"` +}