Skip to content

Commit

Permalink
Merge pull request #48 from absolute8511/fix-eng-closed
Browse files Browse the repository at this point in the history
Fix eng closed and restore tool support hash
  • Loading branch information
absolute8511 authored May 10, 2018
2 parents f140777 + b575b4c commit d27faac
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 93 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# ZanRedisDB

[![Codacy Badge](https://api.codacy.com/project/badge/Grade/3288ed77f27c4f8a998e35ef936edc6f)](https://www.codacy.com/app/cool8511/ZanRedisDB?utm_source=github.com&utm_medium=referral&utm_content=absolute8511/ZanRedisDB&utm_campaign=badger)
[![Build Status](https://travis-ci.org/absolute8511/ZanRedisDB.svg?branch=master)](https://travis-ci.org/absolute8511/ZanRedisDB) [![GitHub release](https://img.shields.io/github/release/absolute8511/ZanRedisDB.svg)](https://github.com/absolute8511/ZanRedisDB/releases/latest) [![codecov](https://codecov.io/gh/absolute8511/ZanRedisDB/branch/master/graph/badge.svg)](https://codecov.io/gh/absolute8511/ZanRedisDB) [![Go Report Card](https://goreportcard.com/badge/github.com/absolute8511/ZanRedisDB)](https://goreportcard.com/report/github.com/absolute8511/ZanRedisDB)
[![Build Status](https://travis-ci.org/absolute8511/ZanRedisDB.svg?branch=master)](https://travis-ci.org/absolute8511/ZanRedisDB) [![GitHub release](https://img.shields.io/github/release/absolute8511/ZanRedisDB.svg)](https://github.com/absolute8511/ZanRedisDB/releases/latest) [![codecov](https://codecov.io/gh/absolute8511/ZanRedisDB/branch/master/graph/badge.svg)](https://codecov.io/gh/absolute8511/ZanRedisDB) [![Go Report Card](https://goreportcard.com/badge/github.com/absolute8511/ZanRedisDB)](https://goreportcard.com/report/github.com/absolute8511/ZanRedisDB) [![Documentation Status](https://readthedocs.org/projects/zanredisdb/badge/?version=latest)](http://zanredisdb.readthedocs.io/en/latest/?badge=latest)


## What is ZanRedisDB
ZanRedisDB is a distributed redis cluster with strong consistency.
Expand Down
156 changes: 103 additions & 53 deletions apps/restore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"encoding/binary"
"errors"
"flag"
"fmt"
"io"
Expand All @@ -11,6 +12,7 @@ import (
"time"

sdk "github.com/absolute8511/go-zanredisdb"
"github.com/absolute8511/redigo/redis"
)

var (
Expand Down Expand Up @@ -51,71 +53,65 @@ func checkParameter() {
}
}

func readLen(file *os.File, lenBuf [4]byte) (int, error) {
n, err := file.Read(lenBuf[:])
if err != nil {
if err == io.EOF {
return 0, err
} else {
log.Printf("read key length error. [err=%v]\n", err)
return 0, err
}
}
if n != len(lenBuf) {
log.Printf("read key length, length not equal.[n=%d]\n", n)
return 0, errors.New("read length not match")
}
length := int(binary.BigEndian.Uint32(lenBuf[:]))
return length, nil
}

func readLenAndBody(file *os.File, lenBuf [4]byte, key []byte) ([]byte, error) {
length, err := readLen(file, lenBuf)
if err != nil {
return nil, err
}

if length <= len(key) {
key = key[:length]
} else {
key = make([]byte, length)
}
n, err := file.Read(key)
if err != nil {
log.Printf("read key error.[err=%v]\n", err)
return key, err
}

if n != length {
log.Printf("read key length not equal.[key=%s, n=%d, length=%d]\n", string(key), n, length)
return key, err
}
return key, nil
}

func kvrestore(file *os.File, client *sdk.ZanRedisClient) {
lenBuf := make([]byte, 4)
var lenBuf [4]byte
var key []byte
var value []byte
var length int
var n int
var err error
var total uint64
var existed uint64
for {
n, err = file.Read(lenBuf)
key, err = readLenAndBody(file, lenBuf, key)
if err != nil {
if err == io.EOF {
break
} else {
log.Printf("read key length error. [err=%v]\n", err)
break
}
}
if n != 4 {
log.Printf("read key length, length not equal.[n=%d]\n", n)
break
}

length = int(binary.BigEndian.Uint32(lenBuf))

if length <= len(key) {
key = key[:length]
} else {
key = make([]byte, length)
}
n, err = file.Read(key)
if err != nil {
log.Printf("read key error.[err=%v]\n", err)
log.Printf("read key error. [err=%v]\n", err)
break
}

if n != length {
log.Printf("read key length not equal.[key=%s, n=%d, length=%d]\n", string(key), n, length)
break
}

n, err = file.Read(lenBuf)
value, err = readLenAndBody(file, lenBuf, value)
if err != nil {
log.Printf("read key length error. [err=%v]\n", err)
break
}
if n != 4 {
log.Printf("read key length, length not equal.[n=%d]\n", n)
break
}

length = int(binary.BigEndian.Uint32(lenBuf))
if length <= len(value) {
value = value[:length]
} else {
value = make([]byte, length)
}
n, err = file.Read(value)
if err != nil {
log.Printf("read value error.[key=%s, err=%v]\n", string(key), err)
break
}
if n != length {
log.Printf("read value length not equal.[key=%s, value=%v, n=%d, length=%d]\n", string(key), value, n, length)
log.Printf("read key error. [err=%v]\n", err)
break
}

Expand Down Expand Up @@ -143,7 +139,61 @@ func kvrestore(file *os.File, client *sdk.ZanRedisClient) {
}

func hrestore(file *os.File, client *sdk.ZanRedisClient) {
var lenBuf [4]byte
var key []byte
var field []byte
var value []byte
var fieldNum int
var err error
var total uint64
var existed uint64
for {
key, err = readLenAndBody(file, lenBuf, key)
if err != nil {
log.Printf("read key error. [err=%v]\n", err)
break
}

fieldNum, err = readLen(file, lenBuf)
if err != nil {
log.Printf("read error. [err=%v]\n", err)
break
}

for i := 0; i < fieldNum; i++ {
field, err = readLenAndBody(file, lenBuf, field)
if err != nil {
log.Printf("read error. [err=%v]\n", err)
break
}
value, err = readLenAndBody(file, lenBuf, value)
if err != nil {
log.Printf("read error. [err=%v]\n", err)
break
}
pk := sdk.NewPKey(oriNS, oriTable, key)
val, err := redis.Int(client.DoRedis("hsetnx", pk.ShardingKey(), true, pk.RawKey, field, value))
if err != nil {
log.Printf("restore error. [key=%s, val=%v, err=%v]\n", key, val, err)
break
}
if val == 0 {
existed++
}
}

total++
if total%1000 == 0 {
if tm > 0 {
time.Sleep(tm * 100)
}
fmt.Print(".")
}
if total%10000 == 0 {
fmt.Printf("%d(%d)", total, existed)
}
}
log.Printf("restore finished. [total=%d, existed=%d]\n", total, existed)
}

func srestore(file *os.File, client *sdk.ZanRedisClient) {
Expand Down
75 changes: 36 additions & 39 deletions rockredis/rockredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,28 +377,6 @@ func OpenRockDB(cfg *RockConfig) (*RockDB, error) {
hasher64: murmur3.New64(),
}

hcache, err := newHLLCache(HLLCacheSize, db)
if err != nil {
return nil, err
}
db.hllCache = hcache

eng, err := gorocksdb.OpenDb(opts, db.GetDataDir())
if err != nil {
return nil, err
}
db.eng = eng
db.indexMgr = NewIndexMgr()
err = db.indexMgr.LoadIndexes(db)
if err != nil {
dbLog.Infof("rocksdb %v load index failed: %v", db.GetDataDir(), err)
eng.Close()
return nil, err
}
atomic.StoreInt32(&db.engOpened, 1)
os.MkdirAll(db.GetBackupDir(), common.DIR_PERM)
dbLog.Infof("rocksdb opened: %v", db.GetDataDir())

switch cfg.ExpirationPolicy {
case common.ConsistencyDeletion:
db.expiration = newConsistencyExpiration(db)
Expand All @@ -412,7 +390,13 @@ func OpenRockDB(cfg *RockConfig) (*RockDB, error) {
return nil, errors.New("unsupported ExpirationPolicy")
}

db.expiration.Start()
err := db.reOpenEng()
if err != nil {
return nil, err
}

os.MkdirAll(db.GetBackupDir(), common.DIR_PERM)
dbLog.Infof("rocksdb opened: %v", db.GetDataDir())

db.wg.Add(1)
go func() {
Expand Down Expand Up @@ -460,17 +444,20 @@ func (r *RockDB) reOpenEng() error {

r.eng, err = gorocksdb.OpenDb(r.dbOpts, r.GetDataDir())
r.indexMgr = NewIndexMgr()
if err == nil {
err = r.indexMgr.LoadIndexes(r)
if err != nil {
dbLog.Infof("rocksdb %v load index failed: %v", r.GetDataDir(), err)
return err
}
r.expiration.Start()
atomic.StoreInt32(&r.engOpened, 1)
dbLog.Infof("rocksdb reopened: %v", r.GetDataDir())
if err != nil {
return err
}
return err
err = r.indexMgr.LoadIndexes(r)
if err != nil {
dbLog.Infof("rocksdb %v load index failed: %v", r.GetDataDir(), err)
r.eng.Close()
return err
}

r.expiration.Start()
atomic.StoreInt32(&r.engOpened, 1)
dbLog.Infof("rocksdb reopened: %v", r.GetDataDir())
return nil
}

func (r *RockDB) getDBEng() *gorocksdb.DB {
Expand Down Expand Up @@ -516,10 +503,11 @@ func (r *RockDB) CompactTableRange(table string) {
func (r *RockDB) closeEng() {
if r.eng != nil {
if atomic.CompareAndSwapInt32(&r.engOpened, 1, 0) {
r.hllCache.Flush()
r.indexMgr.Close()
r.expiration.Stop()
r.eng.Close()
dbLog.Infof("rocksdb closed: %v", r.GetDataDir())
dbLog.Infof("rocksdb engine closed: %v", r.GetDataDir())
}
}
}
Expand All @@ -530,7 +518,6 @@ func (r *RockDB) Close() {
}
close(r.quit)
r.wg.Wait()
r.hllCache.Flush()
r.closeEng()
if r.expiration != nil {
r.expiration.Destroy()
Expand Down Expand Up @@ -834,6 +821,10 @@ func (r *RockDB) backupLoop() {
}

func() {
// before close rsp.done or rsp.started, the raft loop will block,
// after the chan closed, the raft loop continue, so we need make sure
// the db engine will not be closed while doing checkpoint, we need hold read lock
// before closing the chan.
defer close(rsp.done)
dbLog.Infof("begin backup to:%v \n", rsp.backupDir)
start := time.Now()
Expand All @@ -851,10 +842,16 @@ func (r *RockDB) backupLoop() {
os.RemoveAll(rsp.backupDir)
}
rsp.rsp = []byte(rsp.backupDir)
time.AfterFunc(time.Millisecond*10, func() {
close(rsp.started)
})
err = ck.Save(rsp.backupDir, math.MaxUint64)
r.eng.RLock()
if r.eng.IsOpened() {
time.AfterFunc(time.Millisecond*10, func() {
close(rsp.started)
})
err = ck.Save(rsp.backupDir, math.MaxUint64)
} else {
err = errors.New("db engine closed")
}
r.eng.RUnlock()
r.checkpointDirLock.Unlock()
if err != nil {
dbLog.Infof("save checkpoint failed: %v", err)
Expand Down

0 comments on commit d27faac

Please sign in to comment.