Skip to content

Commit

Permalink
🎨 WebDAV/S3 data sync and backup support configurable concurrent requ…
Browse files Browse the repository at this point in the history
  • Loading branch information
88250 committed Oct 16, 2024
1 parent 7d61fd4 commit 5fc0357
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 20 deletions.
35 changes: 22 additions & 13 deletions cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,25 @@ type Conf struct {

// ConfS3 用于描述 S3 对象存储协议所需配置。
type ConfS3 struct {
Endpoint string // 服务端点
AccessKey string // Access Key
SecretKey string // Secret Key
Region string // 存储区域
Bucket string // 存储空间
PathStyle bool // 是否使用路径风格寻址
SkipTlsVerify bool // 是否跳过 TLS 验证
Timeout int // 超时时间,单位:秒
Endpoint string // 服务端点
AccessKey string // Access Key
SecretKey string // Secret Key
Region string // 存储区域
Bucket string // 存储空间
PathStyle bool // 是否使用路径风格寻址
SkipTlsVerify bool // 是否跳过 TLS 验证
Timeout int // 超时时间,单位:秒
ConcurrentReqs int // 并发请求数
}

// ConfWebDAV 用于描述 WebDAV 协议所需配置。
type ConfWebDAV struct {
Endpoint string // 服务端点
Username string // 用户名
Password string // 密码
SkipTlsVerify bool // 是否跳过 TLS 验证
Timeout int // 超时时间,单位:秒
Endpoint string // 服务端点
Username string // 用户名
Password string // 密码
SkipTlsVerify bool // 是否跳过 TLS 验证
Timeout int // 超时时间,单位:秒
ConcurrentReqs int // 并发请求数
}

// Cloud 描述了云端存储服务,接入云端存储服务时需要实现该接口。
Expand Down Expand Up @@ -116,6 +118,9 @@ type Cloud interface {

// GetIndex 用于获取索引。
GetIndex(id string) (index *entity.Index, err error)

// GetConcurrentReqs 用于获取配置的并发请求数。
GetConcurrentReqs() int
}

// Traffic 描述了流量信息。
Expand Down Expand Up @@ -250,6 +255,10 @@ func (baseCloud *BaseCloud) GetIndex(id string) (index *entity.Index, err error)
return
}

func (baseCloud *BaseCloud) GetConcurrentReqs() int {
return 8
}

func (baseCloud *BaseCloud) GetConf() *Conf {
return baseCloud.Conf
}
Expand Down
19 changes: 18 additions & 1 deletion cloud/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ func (s3 *S3) GetIndex(id string) (index *entity.Index, err error) {
return
}

func (s3 *S3) GetConcurrentReqs() (ret int) {
ret = s3.S3.ConcurrentReqs
if 1 > ret {
ret = 8
}
if 16 < ret {
ret = 16
}
return
}

func (s3 *S3) ListObjects(pathPrefix string) (ret map[string]*entity.ObjectInfo, err error) {
ret = map[string]*entity.ObjectInfo{}
svc := s3.getService()
Expand Down Expand Up @@ -417,8 +428,14 @@ func (s3 *S3) getNotFound(keys []string) (ret []string, err error) {
if 1 > len(keys) {
return
}

poolSize := s3.GetConcurrentReqs()
if poolSize > len(keys) {
poolSize = len(keys)
}

waitGroup := &sync.WaitGroup{}
p, _ := ants.NewPoolWithFunc(8, func(arg interface{}) {
p, _ := ants.NewPoolWithFunc(poolSize, func(arg interface{}) {
defer waitGroup.Done()
key := arg.(string)
info, statErr := s3.statFile(key)
Expand Down
11 changes: 11 additions & 0 deletions cloud/webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,17 @@ func (webdav *WebDAV) GetIndex(id string) (index *entity.Index, err error) {
return
}

func (webdav *WebDAV) GetConcurrentReqs() (ret int) {
ret = webdav.Conf.WebDAV.ConcurrentReqs
if 1 > ret {
ret = 1
}
if 16 < ret {
ret = 16
}
return
}

func (webdav *WebDAV) ListObjects(pathPrefix string) (ret map[string]*entity.ObjectInfo, err error) {
ret = map[string]*entity.ObjectInfo{}

Expand Down
2 changes: 1 addition & 1 deletion repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (repo *Repo) OpenFile(file *entity.File) (ret []byte, err error) {
func (repo *Repo) removeCloudObjects(objects []string) (err error) {
waitGroup := &sync.WaitGroup{}
var removeErr error
poolSize := 8
poolSize := repo.cloud.GetConcurrentReqs()
if poolSize > len(objects) {
poolSize = len(objects)
}
Expand Down
10 changes: 5 additions & 5 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (repo *Repo) downloadCloudChunksPut(chunkIDs []string, context map[string]i

waitGroup := &sync.WaitGroup{}
var downloadErr error
poolSize := 8
poolSize := repo.cloud.GetConcurrentReqs()
if poolSize > len(chunkIDs) {
poolSize = len(chunkIDs)
}
Expand Down Expand Up @@ -814,7 +814,7 @@ func (repo *Repo) downloadCloudFilesPut(fileIDs []string, context map[string]int
lock := &sync.Mutex{}
waitGroup := &sync.WaitGroup{}
var downloadErr error
poolSize := 8
poolSize := repo.cloud.GetConcurrentReqs()
if poolSize > len(fileIDs) {
poolSize = len(fileIDs)
}
Expand Down Expand Up @@ -977,7 +977,7 @@ func (repo *Repo) uploadCloudMissingObjects(trafficStat *TrafficStat, context ma

waitGroup := &sync.WaitGroup{}
var uploadErr error
poolSize := 8
poolSize := repo.cloud.GetConcurrentReqs()
if poolSize > len(missingObjects) {
poolSize = len(missingObjects)
}
Expand Down Expand Up @@ -1180,7 +1180,7 @@ func (repo *Repo) uploadFiles(upsertFiles []*entity.File, context map[string]int

waitGroup := &sync.WaitGroup{}
var uploadErr error
poolSize := 8
poolSize := repo.cloud.GetConcurrentReqs()
if poolSize > len(upsertFiles) {
poolSize = len(upsertFiles)
}
Expand Down Expand Up @@ -1234,7 +1234,7 @@ func (repo *Repo) uploadChunks(upsertChunkIDs []string, context map[string]inter

waitGroup := &sync.WaitGroup{}
var uploadErr error
poolSize := 8
poolSize := repo.cloud.GetConcurrentReqs()
if poolSize > len(upsertChunkIDs) {
poolSize = len(upsertChunkIDs)
}
Expand Down

0 comments on commit 5fc0357

Please sign in to comment.