diff --git a/disk_store.go b/disk_store.go index 7d717aa..c05e22c 100644 --- a/disk_store.go +++ b/disk_store.go @@ -7,6 +7,9 @@ import ( "io/fs" "log" "os" + "path/filepath" + "regexp" + "strconv" "time" ) @@ -22,6 +25,10 @@ import ( // https://pkg.go.dev/os#File.Seek const defaultWhence = 0 +// MaxFileSize +// while writing to file if max file size reached, new file will be created +var MaxFileSize int64 = 1 << 30 + // DiskStore is a Log-Structured Hash Table as described in the BitCask paper. We // keep appending the data to a file, like a log. DiskStorage maintains an in-memory // hash table called KeyDir, which keeps the row's location on the disk. @@ -63,6 +70,8 @@ const defaultWhence = 0 // store.Set("othello", "shakespeare") // author := store.Get("othello") type DiskStore struct { + // directory name that contains all data files + dir string // file object pointing the file_name file *os.File // current cursor position in the file where the data can be written @@ -81,24 +90,26 @@ func isFileExists(fileName string) bool { return false } -func NewDiskStore(fileName string) (*DiskStore, error) { - ds := &DiskStore{keyDir: make(map[string]KeyEntry)} - // if the file exists already, then we will load the key_dir - if isFileExists(fileName) { - err := ds.initKeyDir(fileName) +func NewDiskStore(directoryName string) (*DiskStore, error) { + if !isFileExists(directoryName) { + err := os.MkdirAll(directoryName, os.ModePerm) + if err != nil { - log.Fatalf("error while loading the keys from disk: %v", err) + return nil, err } } - // we open the file in following modes: - // os.O_APPEND - says that the writes are append only. - // os.O_RDWR - says we can read and write to the file - // os.O_CREATE - creates the file if it does not exist - file, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0666) + + ds := &DiskStore{ + dir: directoryName, + keyDir: make(map[string]KeyEntry), + } + + err := ds.initKeyDir(directoryName) + if err != nil { - return nil, err + log.Fatalf("error while loading the keys from disk: %v", err) } - ds.file = file + return ds, nil } @@ -119,13 +130,13 @@ func (d *DiskStore) Get(key string) (string, error) { } // move the current pointer to the right offset - _, err := d.file.Seek(int64(kEntry.position), defaultWhence) + _, err := kEntry.file.Seek(int64(kEntry.position), defaultWhence) if err != nil { return "", ErrSeekFailed } data := make([]byte, kEntry.totalSize) - _, err = io.ReadFull(d.file, data) + _, err = io.ReadFull(kEntry.file, data) if err != nil { return "", ErrReadFailed } @@ -169,13 +180,42 @@ func (d *DiskStore) Set(key string, value string) error { } d.write(buf.Bytes()) - d.keyDir[key] = NewKeyEntry(timestamp, uint32(d.writePosition), r.Size()) + d.keyDir[key] = NewKeyEntry(timestamp, d.file, uint32(d.writePosition), r.Size()) // update last write position, so that next record can be written from this point d.writePosition += int(r.Size()) return nil } +func (d *DiskStore) checkMaxFileSizeReached(size int) error { + if d.file == nil { + err := d.createNewDataFile() + return err + } + + stat, _ := d.file.Stat() + nextSize := stat.Size() + int64(size) + if nextSize > MaxFileSize { + err := d.createNewDataFile() + + return err + } + + return nil +} + +func (d *DiskStore) createNewDataFile() error { + activeFile := createFilenameId(d.file.Name()) + ".bitcask.data" + file, err := os.Create(filepath.Join(d.dir, activeFile)) + if err != nil { + return err + } + d.file = file + d.writePosition = 0 + + return nil +} + func (d *DiskStore) Delete(key string) error { timestamp := uint32(time.Now().Unix()) value := "" @@ -203,15 +243,24 @@ func (d *DiskStore) Close() bool { // to the disk. Check documentation of DiskStore.write() to understand // following the operations // TODO: handle errors - d.file.Sync() - if err := d.file.Close(); err != nil { - // TODO: log the error - return false + if d.file != nil { + d.file.Sync() + if err := d.file.Close(); err != nil { + // TODO: log the error + return false + } + } + for _, v := range d.keyDir { + v.file.Close() } return true } func (d *DiskStore) write(data []byte) { + if err := d.checkMaxFileSizeReached(len(data)); err != nil { + panic(err) + } + // saving stuff to a file reliably is hard! // if you would like to explore and learn more, then // start from here: https://danluu.com/file-consistency/ @@ -226,7 +275,52 @@ func (d *DiskStore) write(data []byte) { } } -func (d *DiskStore) initKeyDir(existingFile string) error { +func (d *DiskStore) initKeyDir(directoryName string) error { + dirEntries, err := os.ReadDir(directoryName) + if err != nil { + return err + } + + for _, entry := range dirEntries { + if entry.IsDir() { + continue + } + + err = initKeyDirInternal(d.keyDir, filepath.Join(d.dir, entry.Name())) + if err != nil { + return err + } + } + + fileName := createFilenameId("") + ".bitcask.data" + + if len(dirEntries) > 0 { + fileName = createFilenameId(dirEntries[len(dirEntries)-1].Name()) + ".bitcask.data" + } + + file, err := os.Create(filepath.Join(d.dir, fileName)) + if err != nil { + return err + } + d.file = file + d.writePosition = 0 + + return nil +} + +func createFilenameId(filename string) string { + if filename == "" { + return "1000000000" + } + pattern := regexp.MustCompile(`(\d+)\.bitcask`) + matches := pattern.FindStringSubmatch(filename) + + filenameId, _ := strconv.Atoi(matches[1]) + + return strconv.Itoa(filenameId + 1) +} + +func initKeyDirInternal(keyDir map[string]KeyEntry, existingFile string) error { // we will initialise the keyDir by reading the contents of the file, record by // record. As we read each record, we will also update our keyDir with the // corresponding KeyEntry @@ -234,7 +328,8 @@ func (d *DiskStore) initKeyDir(existingFile string) error { // NOTE: this method is a blocking one, if the DB size is yuge then it will take // a lot of time to startup file, _ := os.Open(existingFile) - defer file.Close() + writePosition := 0 + for { header := make([]byte, headerSize) _, err := io.ReadFull(file, header) @@ -265,8 +360,8 @@ func (d *DiskStore) initKeyDir(existingFile string) error { } totalSize := headerSize + h.KeySize + h.ValueSize - d.keyDir[string(key)] = NewKeyEntry(h.TimeStamp, uint32(d.writePosition), totalSize) - d.writePosition += int(totalSize) + keyDir[string(key)] = NewKeyEntry(h.TimeStamp, file, uint32(writePosition), totalSize) + writePosition += int(totalSize) } return nil } diff --git a/disk_store_test.go b/disk_store_test.go index 8180c00..c3631d6 100644 --- a/disk_store_test.go +++ b/disk_store_test.go @@ -10,11 +10,12 @@ import ( ) func TestDiskStore_Get(t *testing.T) { - store, err := NewDiskStore("test.db") + store, err := NewDiskStore("test_db") if err != nil { t.Fatalf("failed to create disk store: %v", err) } - defer os.Remove("test.db") + defer os.RemoveAll("test_db") + defer store.Close() store.Set("name", "jojo") val, _ := store.Get("name") if val != "jojo" { @@ -23,11 +24,12 @@ func TestDiskStore_Get(t *testing.T) { } func TestDiskStore_GetInvalid(t *testing.T) { - store, err := NewDiskStore("test.db") + store, err := NewDiskStore("test_db") if err != nil { t.Fatalf("failed to create disk store: %v", err) } - defer os.Remove("test.db") + defer os.RemoveAll("test_db") + defer store.Close() val, _ := store.Get("some key") if val != "" { t.Errorf("Get() = %v, want %v", val, "") @@ -35,11 +37,12 @@ func TestDiskStore_GetInvalid(t *testing.T) { } func TestDiskStore_SetWithPersistence(t *testing.T) { - store, err := NewDiskStore("test.db") + store, err := NewDiskStore("test_db") if err != nil { t.Fatalf("failed to create disk store: %v", err) } - defer os.Remove("test.db") + defer os.RemoveAll("test_db") + defer store.Close() tests := map[string]string{ "crime and punishment": "dostoevsky", @@ -59,7 +62,8 @@ func TestDiskStore_SetWithPersistence(t *testing.T) { } } store.Close() - store, err = NewDiskStore("test.db") + + store, err = NewDiskStore("test_db") if err != nil { t.Fatalf("failed to create disk store: %v", err) } @@ -73,11 +77,12 @@ func TestDiskStore_SetWithPersistence(t *testing.T) { } func TestDiskStore_Delete(t *testing.T) { - store, err := NewDiskStore("test.db") + store, err := NewDiskStore("test_db") if err != nil { t.Fatalf("failed to create disk store: %v", err) } - defer os.Remove("test.db") + defer os.RemoveAll("test_db") + defer store.Close() tests := map[string]string{ "crime and punishment": "dostoevsky", @@ -100,7 +105,7 @@ func TestDiskStore_Delete(t *testing.T) { } store.Close() - store, err = NewDiskStore("test.db") + store, err = NewDiskStore("test_db") if err != nil { t.Fatalf("failed to create disk store: %v", err) } @@ -121,9 +126,9 @@ func TestDiskStore_Delete(t *testing.T) { } func TestDiskStore_InValidCheckSum(t *testing.T) { - store, _ := NewDiskStore("test.db") + store, _ := NewDiskStore("test_db") + defer os.RemoveAll("test_db") defer store.Close() - defer os.Remove("test.db") k1, v1 := "👋", "world" h1 := Header{TimeStamp: uint32(time.Now().Unix()), KeySize: uint32(len(k1)), ValueSize: uint32(len(v1)), Meta: 0} @@ -148,7 +153,7 @@ func TestDiskStore_InValidCheckSum(t *testing.T) { tt.EncodeKV(buf) // store the data - store.keyDir[tt.Key] = NewKeyEntry(tt.Header.TimeStamp, uint32(store.writePosition), tt.Size()) + store.keyDir[tt.Key] = NewKeyEntry(tt.Header.TimeStamp, store.file, uint32(store.writePosition), tt.Size()) store.writePosition += int(tt.Size()) store.write(buf.Bytes()) @@ -172,7 +177,7 @@ func TestDiskStore_InValidCheckSum(t *testing.T) { } // write the corrupted bytes and update the hash table - store.keyDir[tt.Key] = NewKeyEntry(tt.Header.TimeStamp+uint32(time.Now().Unix()), uint32(store.writePosition), tt.Size()) + store.keyDir[tt.Key] = NewKeyEntry(tt.Header.TimeStamp+uint32(time.Now().Unix()), store.file, uint32(store.writePosition), tt.Size()) store.writePosition += int(tt.Size()) store.write(kvRecord) @@ -200,3 +205,62 @@ func TestDiskStore_InValidCheckSum(t *testing.T) { } } } + +func TestDiskStore_NewFileCreatedAfterMaxFileSizeReached(t *testing.T) { + store, err := NewDiskStore("test_db") + if err != nil { + t.Fatalf("failed to create disk store: %v", err) + } + defer os.RemoveAll("test_db") + defer store.Close() + + MaxFileSize = 50 + + store.Set("crime and punishment", "dostoevsky") + store.Set("anna karenina", "tolstoy") + + dirEntry, _ := os.ReadDir("test_db") + + if len(dirEntry) == 1 { + t.Errorf("directory must have more than 1 file") + } + +} + +func TestDiskStore_ReadFromMultipleFiles(t *testing.T) { + store, err := NewDiskStore("test_db") + if err != nil { + t.Fatalf("failed to create disk store: %v", err) + } + defer os.RemoveAll("test_db") + + tests := map[string]string{ + "crime and punishment": "dostoevsky", + "anna karenina": "tolstoy", + "war and peace": "tolstoy", + "hamlet": "shakespeare", + "othello": "shakespeare", + "brave new world": "huxley", + "dune": "frank herbert", + } + for key, val := range tests { + store.Set(key, val) + } + + MaxFileSize = 50 + + store.Close() + + store, err = NewDiskStore("test_db") + if err != nil { + t.Fatalf("failed to create disk store: %v", err) + } + for key, val := range tests { + actualVal, _ := store.Get(key) + if actualVal != val { + t.Errorf("Get() = %v, want %v", actualVal, val) + } + } + + store.Close() +} diff --git a/format.go b/format.go index 38c3194..e38d834 100644 --- a/format.go +++ b/format.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "hash/crc32" + "os" ) // format file provides encode/decode functions for serialisation and deserialisation @@ -80,6 +81,8 @@ type KeyEntry struct { // Timestamp at which we wrote the KV pair to the disk. The value // is current time in seconds since the epoch. timestamp uint32 + // The file that the data exists + file *os.File // The position is the byte offset in the file where the data // exists position uint32 @@ -103,8 +106,8 @@ type Record struct { RecordSize uint32 } -func NewKeyEntry(timestamp uint32, position uint32, totalSize uint32) KeyEntry { - return KeyEntry{timestamp, position, totalSize} +func NewKeyEntry(timestamp uint32, file *os.File, position uint32, totalSize uint32) KeyEntry { + return KeyEntry{timestamp, file, position, totalSize} } func (h *Header) EncodeHeader(buf *bytes.Buffer) error { @@ -180,4 +183,4 @@ func (r *Record) CalculateCheckSum() uint32 { func (r *Record) VerifyCheckSum(data []byte) bool { return crc32.ChecksumIEEE(data[4:]) == r.Header.CheckSum -} +} \ No newline at end of file