Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 119 additions & 24 deletions disk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"io/fs"
"log"
"os"
"path/filepath"
"regexp"
"strconv"
"time"
)

Expand All @@ -22,6 +25,10 @@ import (
// https://pkg.go.dev/os#File.Seek
const defaultWhence = 0

// MaxFileSize
// while writing to file if max file size reached, new file will be created
var MaxFileSize int64 = 1 << 30

// DiskStore is a Log-Structured Hash Table as described in the BitCask paper. We
// keep appending the data to a file, like a log. DiskStorage maintains an in-memory
// hash table called KeyDir, which keeps the row's location on the disk.
Expand Down Expand Up @@ -63,6 +70,8 @@ const defaultWhence = 0
// store.Set("othello", "shakespeare")
// author := store.Get("othello")
type DiskStore struct {
// directory name that contains all data files
dir string
// file object pointing the file_name
file *os.File
// current cursor position in the file where the data can be written
Expand All @@ -81,24 +90,26 @@ func isFileExists(fileName string) bool {
return false
}

func NewDiskStore(fileName string) (*DiskStore, error) {
ds := &DiskStore{keyDir: make(map[string]KeyEntry)}
// if the file exists already, then we will load the key_dir
if isFileExists(fileName) {
err := ds.initKeyDir(fileName)
func NewDiskStore(directoryName string) (*DiskStore, error) {
if !isFileExists(directoryName) {
err := os.MkdirAll(directoryName, os.ModePerm)

if err != nil {
log.Fatalf("error while loading the keys from disk: %v", err)
return nil, err
}
}
// we open the file in following modes:
// os.O_APPEND - says that the writes are append only.
// os.O_RDWR - says we can read and write to the file
// os.O_CREATE - creates the file if it does not exist
file, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0666)

ds := &DiskStore{
dir: directoryName,
keyDir: make(map[string]KeyEntry),
}

err := ds.initKeyDir(directoryName)

if err != nil {
return nil, err
log.Fatalf("error while loading the keys from disk: %v", err)
}
ds.file = file

return ds, nil
}

Expand All @@ -119,13 +130,13 @@ func (d *DiskStore) Get(key string) (string, error) {
}

// move the current pointer to the right offset
_, err := d.file.Seek(int64(kEntry.position), defaultWhence)
_, err := kEntry.file.Seek(int64(kEntry.position), defaultWhence)
if err != nil {
return "", ErrSeekFailed
}

data := make([]byte, kEntry.totalSize)
_, err = io.ReadFull(d.file, data)
_, err = io.ReadFull(kEntry.file, data)
if err != nil {
return "", ErrReadFailed
}
Expand Down Expand Up @@ -169,13 +180,42 @@ func (d *DiskStore) Set(key string, value string) error {
}
d.write(buf.Bytes())

d.keyDir[key] = NewKeyEntry(timestamp, uint32(d.writePosition), r.Size())
d.keyDir[key] = NewKeyEntry(timestamp, d.file, uint32(d.writePosition), r.Size())
// update last write position, so that next record can be written from this point
d.writePosition += int(r.Size())

return nil
}

func (d *DiskStore) checkMaxFileSizeReached(size int) error {
if d.file == nil {
err := d.createNewDataFile()
return err
}

stat, _ := d.file.Stat()
nextSize := stat.Size() + int64(size)
if nextSize > MaxFileSize {
err := d.createNewDataFile()

return err
}

return nil
}

func (d *DiskStore) createNewDataFile() error {
activeFile := createFilenameId(d.file.Name()) + ".bitcask.data"
file, err := os.Create(filepath.Join(d.dir, activeFile))
if err != nil {
return err
}
d.file = file
d.writePosition = 0

return nil
}

func (d *DiskStore) Delete(key string) error {
timestamp := uint32(time.Now().Unix())
value := ""
Expand Down Expand Up @@ -203,15 +243,24 @@ func (d *DiskStore) Close() bool {
// to the disk. Check documentation of DiskStore.write() to understand
// following the operations
// TODO: handle errors
d.file.Sync()
if err := d.file.Close(); err != nil {
// TODO: log the error
return false
if d.file != nil {
d.file.Sync()
if err := d.file.Close(); err != nil {
// TODO: log the error
return false
}
}
for _, v := range d.keyDir {
v.file.Close()
}
return true
}

func (d *DiskStore) write(data []byte) {
if err := d.checkMaxFileSizeReached(len(data)); err != nil {
panic(err)
}

// saving stuff to a file reliably is hard!
// if you would like to explore and learn more, then
// start from here: https://danluu.com/file-consistency/
Expand All @@ -226,15 +275,61 @@ func (d *DiskStore) write(data []byte) {
}
}

func (d *DiskStore) initKeyDir(existingFile string) error {
func (d *DiskStore) initKeyDir(directoryName string) error {
dirEntries, err := os.ReadDir(directoryName)
if err != nil {
return err
}

for _, entry := range dirEntries {
if entry.IsDir() {
continue
}

err = initKeyDirInternal(d.keyDir, filepath.Join(d.dir, entry.Name()))
if err != nil {
return err
}
}

fileName := createFilenameId("") + ".bitcask.data"

if len(dirEntries) > 0 {
fileName = createFilenameId(dirEntries[len(dirEntries)-1].Name()) + ".bitcask.data"
}

file, err := os.Create(filepath.Join(d.dir, fileName))
if err != nil {
return err
}
d.file = file
d.writePosition = 0

return nil
}

func createFilenameId(filename string) string {
if filename == "" {
return "1000000000"
}
pattern := regexp.MustCompile(`(\d+)\.bitcask`)
matches := pattern.FindStringSubmatch(filename)

filenameId, _ := strconv.Atoi(matches[1])

return strconv.Itoa(filenameId + 1)
}

func initKeyDirInternal(keyDir map[string]KeyEntry, existingFile string) error {
// we will initialise the keyDir by reading the contents of the file, record by
// record. As we read each record, we will also update our keyDir with the
// corresponding KeyEntry
//
// NOTE: this method is a blocking one, if the DB size is yuge then it will take
// a lot of time to startup
file, _ := os.Open(existingFile)
defer file.Close()
writePosition := 0

for {
header := make([]byte, headerSize)
_, err := io.ReadFull(file, header)
Expand Down Expand Up @@ -265,8 +360,8 @@ func (d *DiskStore) initKeyDir(existingFile string) error {
}

totalSize := headerSize + h.KeySize + h.ValueSize
d.keyDir[string(key)] = NewKeyEntry(h.TimeStamp, uint32(d.writePosition), totalSize)
d.writePosition += int(totalSize)
keyDir[string(key)] = NewKeyEntry(h.TimeStamp, file, uint32(writePosition), totalSize)
writePosition += int(totalSize)
}
return nil
}
Expand Down
Loading