Skip to content

Commit

Permalink
🎨 Improve data synchronization stability siyuan-note/siyuan#12991
Browse files Browse the repository at this point in the history
  • Loading branch information
88250 committed Nov 1, 2024
1 parent df68252 commit 5c8aefd
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 26 deletions.
8 changes: 8 additions & 0 deletions cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Cloud interface {
// UploadObject 用于上传对象,overwrite 参数用于指示是否覆盖已有对象。
UploadObject(filePath string, overwrite bool) (length int64, err error)

// UploadBytes 用于上传对象数据 data,overwrite 参数用于指示是否覆盖已有对象。
UploadBytes(filePath string, data []byte, overwrite bool) (length int64, err error)

// DownloadObject 用于下载对象数据 data。
DownloadObject(filePath string) (data []byte, err error)

Expand Down Expand Up @@ -207,6 +210,11 @@ func (baseCloud *BaseCloud) UploadObject(filePath string, overwrite bool) (err e
return
}

func (baseCloud *BaseCloud) UploadBytes(filePath string, data []byte, overwrite bool) (err error) {
err = ErrUnsupported
return
}

func (baseCloud *BaseCloud) DownloadObject(filePath string) (data []byte, err error) {
err = ErrUnsupported
return
Expand Down
22 changes: 22 additions & 0 deletions cloud/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cloud

import (
"bytes"
"context"
"io"
"math"
Expand Down Expand Up @@ -95,6 +96,27 @@ func (s3 *S3) UploadObject(filePath string, overwrite bool) (length int64, err e
return
}

func (s3 *S3) UploadBytes(filePath string, data []byte, overwrite bool) (length int64, err error) {
length = int64(len(data))
svc := s3.getService()
ctx, cancelFn := context.WithTimeout(context.Background(), time.Duration(s3.S3.Timeout)*time.Second)
defer cancelFn()

key := path.Join("repo", filePath)
_, err = svc.PutObjectWithContext(ctx, &as3.PutObjectInput{
Bucket: aws.String(s3.Conf.S3.Bucket),
Key: aws.String(key),
CacheControl: aws.String("no-cache"),
Body: bytes.NewReader(data),
})
if nil != err {
return
}

logging.LogInfof("uploaded object [%s]", key)
return
}

func (s3 *S3) DownloadObject(filePath string) (data []byte, err error) {
svc := s3.getService()
ctx, cancelFn := context.WithTimeout(context.Background(), time.Duration(s3.S3.Timeout)*time.Second)
Expand Down
113 changes: 111 additions & 2 deletions cloud/siyuan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cloud

import (
"bytes"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -79,7 +80,7 @@ func (siyuan *SiYuan) UploadObject(filePath string, overwrite bool) (length int6
return
}

logging.LogErrorf("upload object [%s] failed: %s", absFilePath, err)
logging.LogErrorf("upload object [%s] failed: %s", key, err)
if e, ok := err.(*client.ErrorInfo); ok {
if 614 == e.Code || strings.Contains(e.Err, "file exists") {
err = nil
Expand All @@ -96,7 +97,64 @@ func (siyuan *SiYuan) UploadObject(filePath string, overwrite bool) (length int6
return
}

logging.LogErrorf("upload object [%s] failed: %s", absFilePath, err)
logging.LogErrorf("upload object [%s] failed: %s", key, err)
if e, ok := err.(*client.ErrorInfo); ok {
if 614 == e.Code || strings.Contains(e.Err, "file exists") {
err = nil
return
}

logging.LogErrorf("error detail: %s", e.ErrorDetail())
}
}
return
}

logging.LogInfof("uploaded object [%s]", key)
return
}

func (siyuan *SiYuan) UploadBytes(filePath string, data []byte, overwrite bool) (length int64, err error) {
length = int64(len(data))

key := path.Join("siyuan", siyuan.Conf.UserID, "repo", siyuan.Conf.Dir, filePath)
keyUploadToken, scopeUploadToken, err := siyuan.requestScopeKeyUploadToken(key, overwrite)
if nil != err {
return
}

uploadToken := keyUploadToken
if !overwrite {
uploadToken = scopeUploadToken
}

formUploader := storage.NewFormUploader(&storage.Config{UseHTTPS: true})
ret := storage.PutRet{}
err = formUploader.Put(context.Background(), &ret, uploadToken, key, bytes.NewReader(data), length, &storage.PutExtra{})
if nil != err {
if msg := fmt.Sprintf("%s", err); strings.Contains(msg, "file exists") {
err = nil
return
}

logging.LogErrorf("upload object [%s] failed: %s", key, err)
if e, ok := err.(*client.ErrorInfo); ok {
if 614 == e.Code || strings.Contains(e.Err, "file exists") {
err = nil
return
}
logging.LogErrorf("error detail: %s", e.ErrorDetail())
}

time.Sleep(1 * time.Second)
err = formUploader.Put(context.Background(), &ret, uploadToken, key, bytes.NewReader(data), length, &storage.PutExtra{})
if nil != err {
if msg := fmt.Sprintf("%s", err); strings.Contains(msg, "file exists") {
err = nil
return
}

logging.LogErrorf("upload object [%s] failed: %s", key, err)
if e, ok := err.(*client.ErrorInfo); ok {
if 614 == e.Code || strings.Contains(e.Err, "file exists") {
err = nil
Expand Down Expand Up @@ -174,6 +232,57 @@ func (siyuan *SiYuan) RemoveObject(filePath string) (err error) {
return
}

func (siyuan *SiYuan) ListObjects(pathPrefix string) (objInfos map[string]*entity.ObjectInfo, err error) {
objInfos = map[string]*entity.ObjectInfo{}

token := siyuan.Conf.Token
dir := siyuan.Conf.Dir
userId := siyuan.Conf.UserID
server := siyuan.Conf.Server

result := gulu.Ret.NewResult()
request := httpclient.NewCloudRequest30s()
resp, err := request.
SetSuccessResult(&result).
SetBody(map[string]string{"repo": dir, "token": token, "pathPrefix": pathPrefix}).
Post(server + "/apis/siyuan/dejavu/listRepoObjects?uid=" + userId)
if nil != err {
err = fmt.Errorf("list cloud repo objects failed: %s", err)
return
}

if 200 != resp.StatusCode {
if 401 == resp.StatusCode {
err = ErrCloudAuthFailed
return
}
err = fmt.Errorf("list cloud repo objects failed [%d]", resp.StatusCode)
return
}

if 0 != result.Code {
err = fmt.Errorf("list cloud repo objects failed: %s", result.Msg)
return
}

retData := result.Data.(map[string]interface{})
retObjects := retData["objects"].([]interface{})
for _, retObj := range retObjects {
data, marshalErr := gulu.JSON.MarshalJSON(retObj)
if nil != marshalErr {
logging.LogErrorf("marshal obj failed: %s", marshalErr)
continue
}
obj := &entity.ObjectInfo{}
if unmarshalErr := gulu.JSON.UnmarshalJSON(data, obj); nil != unmarshalErr {
logging.LogErrorf("unmarshal obj failed: %s", unmarshalErr)
continue
}
objInfos[obj.Path] = obj
}
return
}

func (siyuan *SiYuan) GetTags() (tags []*Ref, err error) {
token := siyuan.Conf.Token
dir := siyuan.Conf.Dir
Expand Down
20 changes: 16 additions & 4 deletions cloud/webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,30 @@ func (webdav *WebDAV) GetRepos() (repos []*Repo, size int64, err error) {

func (webdav *WebDAV) UploadObject(filePath string, overwrite bool) (length int64, err error) {
absFilePath := filepath.Join(webdav.Conf.RepoPath, filePath)
info, err := os.Stat(absFilePath)
data, err := os.ReadFile(absFilePath)
if nil != err {
logging.LogErrorf("stat failed: %s", err)
return
}
length = info.Size()

data, err := os.ReadFile(absFilePath)
key := path.Join(webdav.Dir, "siyuan", "repo", filePath)
folder := path.Dir(key)
err = webdav.mkdirAll(folder)
if nil != err {
return
}

err = webdav.Client.Write(key, data, 0644)
err = webdav.parseErr(err)
if nil != err {
logging.LogErrorf("upload object [%s] failed: %s", key, err)
return
}
logging.LogInfof("uploaded object [%s]", key)
return
}

func (webdav *WebDAV) UploadBytes(filePath string, data []byte, overwrite bool) (length int64, err error) {
length = int64(len(data))
key := path.Join(webdav.Dir, "siyuan", "repo", filePath)
folder := path.Dir(key)
err = webdav.mkdirAll(folder)
Expand Down
Loading

0 comments on commit 5c8aefd

Please sign in to comment.