Skip to content

Commit

Permalink
Merge pull request #7 from everFinance/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
zyjblockchain authored Feb 22, 2022
2 parents 4c0f3bc + db3faf6 commit ec49a91
Show file tree
Hide file tree
Showing 15 changed files with 375 additions and 210 deletions.
6 changes: 3 additions & 3 deletions api-job.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (s *Server) broadcast(c *gin.Context) {
c.JSON(http.StatusBadRequest, "arId incorrect")
return
}
if err := s.jobManager.RegisterJob(arid, jobTypeBroadcast, len(s.peers)); err != nil {
if err := s.jobManager.RegisterJob(arid, jobTypeBroadcast); err != nil {
c.JSON(http.StatusBadGateway, err.Error())
return
}
Expand Down Expand Up @@ -44,7 +44,7 @@ func (s *Server) sync(c *gin.Context) {
return
}

if err := s.jobManager.RegisterJob(arid, jobTypeSync, len(s.peers)); err != nil {
if err := s.jobManager.RegisterJob(arid, jobTypeSync); err != nil {
c.JSON(http.StatusBadGateway, err.Error())
return
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (s *Server) killJob(c *gin.Context) {
func (s *Server) getJob(c *gin.Context) {
arid := c.Param("arid")
jobType := c.Param("jobType")
if !strings.Contains(jobTypeSync+jobTypeBroadcast, strings.ToLower(jobType)) {
if !strings.Contains(jobTypeSync+jobTypeBroadcast+jobTypeSubmitTxBroadcast, strings.ToLower(jobType)) {
c.JSON(http.StatusBadRequest, "jobType not exist")
return
}
Expand Down
26 changes: 11 additions & 15 deletions api.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package arseeding

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand All @@ -10,7 +9,6 @@ import (
"io/ioutil"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"

Expand Down Expand Up @@ -81,14 +79,11 @@ func (s *Server) submitTx(c *gin.Context) {
c.JSON(http.StatusBadRequest, err.Error())
return
}
if err := s.processSubmitTx(arTx); err != nil {

if err := s.broadcastSubmitTx(arTx); err != nil {
c.JSON(http.StatusBadRequest, err.Error())
return
}

// proxy to arweave
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer([]byte(by)))
proxyArweaveGateway(c)
}

func (s *Server) submitChunk(c *gin.Context) {
Expand All @@ -114,10 +109,6 @@ func (s *Server) submitChunk(c *gin.Context) {
c.JSON(http.StatusBadRequest, err.Error())
return
}

// proxy to arweave
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer([]byte(by)))
proxyArweaveGateway(c)
}

func (s *Server) getTxOffset(c *gin.Context) {
Expand Down Expand Up @@ -271,6 +262,9 @@ func getData(dataRoot, dataSize string, db *Store) ([]byte, error) {
if err != nil {
return nil, err
}
if size == 0 {
return []byte{}, nil
}

data := make([]byte, 0, size)
txDataEndOffset, err := db.LoadTxDataEndOffSet(dataRoot, dataSize)
Expand All @@ -292,11 +286,13 @@ func getData(dataRoot, dataSize string, db *Store) ([]byte, error) {
}

func proxyArweaveGateway(c *gin.Context) {
var proxyUrl = new(url.URL)
proxyUrl.Scheme = "https"
proxyUrl.Host = "arweave.net"
directer := func(req *http.Request) {
req.URL.Scheme = "https"
req.URL.Host = "arweave.net"
req.Host = "arweave.net"
}
proxy := &httputil.ReverseProxy{Director: directer}

proxy := httputil.NewSingleHostReverseProxy(proxyUrl)
proxy.ServeHTTP(c.Writer, c.Request)
c.Abort()
}
4 changes: 2 additions & 2 deletions example/arweave-pool-broadcast/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (b *BcPool) updatePendingTxIds() {
}
}

func (b *BcPool) checkBroadcastTxStatus(broadcastNodeNum int64) {
func (b *BcPool) checkBroadcastTxStatus() {
needCheckBroadcastTxIds := make([]string, 0)
b.mapLock.RLock()
for txId, finished := range b.broadcastMap {
Expand All @@ -112,7 +112,7 @@ func (b *BcPool) checkBroadcastTxStatus(broadcastNodeNum int64) {
log.Error("example.GetJob(arId,\"broadcast\",b.seedCli)", "err", err, "arId", arId)
continue
}
if jobStatus.CountSuccessed >= broadcastNodeNum {
if jobStatus.CountSuccessed >= 10 {
// close job
if err := example.KillJob(arId, "broadcast", b.seedCli); err != nil {
log.Error("example.KillJob(arId,\"broadcast\",b.seedCli)", "err", err, "arId", arId)
Expand Down
3 changes: 2 additions & 1 deletion example/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"errors"
"fmt"
"github.com/everFinance/arseeding"
"github.com/labstack/gommon/log"
"github.com/panjf2000/ants/v2"
"gopkg.in/h2non/gentleman.v2"
"sync"
"time"
)

var log = arseeding.NewLog("example")

func MustBatchSyncTxIds(txIds []string, seedCli *gentleman.Client) (successTxIds []string) {
var wg sync.WaitGroup
successTxIds = make([]string, 0, len(txIds))
Expand Down
6 changes: 4 additions & 2 deletions example/everpay-sync/everpay_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package everpay_sync

import "testing"
import (
"testing"
)

func Test_EverPaySync(t *testing.T) {
dsn := "root@tcp(127.0.0.1:3306)/sandy_test?charset=utf8mb4&parseTime=True&loc=Local"
seedUrl := "http://127.0.0.1:8080" // your deployed arseeding services
seedUrl := "https://seed-dev.everpay.io" // your deployed arseeding services
epSync := New(dsn, seedUrl)
epSync.Run()

Expand Down
2 changes: 1 addition & 1 deletion example/everpay-sync/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func (e *EverPaySync) runJobs() {
}

func (e *EverPaySync) FetchArIds() {
processedArTx, err := e.wdb.GetLastPostedTx()
processedArTx, err := e.wdb.GetLastPostTx()
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions example/everpay-sync/wdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func (w *Wdb) GetArIds(fromId int) ([]RollupArId, error) {
return rollupTxs, err
}

func (w *Wdb) GetLastPostedTx() (RollupArId, error) {
func (w *Wdb) GetLastPostTx() (RollupArId, error) {
tx := RollupArId{}
err := w.Db.Model(&RollupArId{}).Where("post = ?", true).Order("id desc").Limit(1).Scan(&tx).Error
err := w.Db.Model(&RollupArId{}).Order("id desc").Limit(1).Scan(&tx).Error
if err == gorm.ErrRecordNotFound {
return tx, nil
}
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ module github.com/everFinance/arseeding
go 1.15

require (
github.com/everFinance/goar v1.3.8
github.com/everFinance/goar v1.4.0
github.com/getsentry/sentry-go v0.11.0
github.com/gin-gonic/gin v1.7.4
github.com/go-co-op/gocron v1.11.0
github.com/inconshreveable/log15 v0.0.0-20201112154412-8562bdadbbac
github.com/jinzhu/now v1.1.4 // indirect
github.com/labstack/gommon v0.3.0
github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 // indirect
github.com/panjf2000/ants/v2 v2.4.7
github.com/stretchr/testify v1.7.0
Expand Down
Loading

0 comments on commit ec49a91

Please sign in to comment.