@@ -22,6 +22,7 @@ import (
2222 "path"
2323 "strconv"
2424 "strings"
25+ "sync"
2526 "time"
2627
2728 "github.com/pkg/errors"
@@ -41,12 +42,14 @@ type fileStore struct {
4142 sessionFname string
4243 senderSeqNumsFname string
4344 targetSeqNumsFname string
44- bodyFile * os.File
45- headerFile * os.File
46- sessionFile * os.File
47- senderSeqNumsFile * os.File
48- targetSeqNumsFile * os.File
49- fileSync bool
45+
46+ fileMu sync.Mutex
47+ bodyFile * os.File
48+ headerFile * os.File
49+ sessionFile * os.File
50+ senderSeqNumsFile * os.File
51+ targetSeqNumsFile * os.File
52+ fileSync bool
5053}
5154
5255// NewStoreFactory returns a file-based implementation of MessageStoreFactory.
@@ -218,6 +221,9 @@ func (store *fileStore) populateCache() (creationTimePopulated bool, err error)
218221}
219222
220223func (store * fileStore ) setSession () error {
224+ store .fileMu .Lock ()
225+ defer store .fileMu .Unlock ()
226+
221227 if _ , err := store .sessionFile .Seek (0 , io .SeekStart ); err != nil {
222228 return fmt .Errorf ("unable to rewind file: %s: %s" , store .sessionFname , err .Error ())
223229 }
@@ -238,6 +244,8 @@ func (store *fileStore) setSession() error {
238244}
239245
240246func (store * fileStore ) setSeqNum (f * os.File , seqNum int ) error {
247+ store .fileMu .Lock ()
248+ defer store .fileMu .Unlock ()
241249 if _ , err := f .Seek (0 , io .SeekStart ); err != nil {
242250 return fmt .Errorf ("unable to rewind file: %s: %s" , f .Name (), err .Error ())
243251 }
@@ -304,6 +312,8 @@ func (store *fileStore) SetCreationTime(_ time.Time) {
304312}
305313
306314func (store * fileStore ) SaveMessage (seqNum int , msg []byte ) error {
315+ store .fileMu .Lock ()
316+ defer store .fileMu .Unlock ()
307317 offset , err := store .bodyFile .Seek (0 , io .SeekEnd )
308318 if err != nil {
309319 return fmt .Errorf ("unable to seek to end of file: %s: %s" , store .bodyFname , err .Error ())
@@ -339,6 +349,9 @@ func (store *fileStore) SaveMessageAndIncrNextSenderMsgSeqNum(seqNum int, msg []
339349}
340350
341351func (store * fileStore ) IterateMessages (beginSeqNum , endSeqNum int , cb func ([]byte ) error ) error {
352+ store .fileMu .Lock ()
353+ defer store .fileMu .Unlock ()
354+
342355 // Sync files and seek to start of header file
343356 if err := store .bodyFile .Sync (); err != nil {
344357 return fmt .Errorf ("unable to flush file: %s: %s" , store .bodyFname , err .Error ())
@@ -386,19 +399,19 @@ func (store *fileStore) GetMessages(beginSeqNum, endSeqNum int) ([][]byte, error
386399
387400// Close closes the store's files.
388401func (store * fileStore ) Close () error {
389- if err := closeFile (store .bodyFile ); err != nil {
402+ if err := closeSyncFile (store .bodyFile ); err != nil {
390403 return err
391404 }
392- if err := closeFile (store .headerFile ); err != nil {
405+ if err := closeSyncFile (store .headerFile ); err != nil {
393406 return err
394407 }
395- if err := closeFile (store .sessionFile ); err != nil {
408+ if err := closeSyncFile (store .sessionFile ); err != nil {
396409 return err
397410 }
398- if err := closeFile (store .senderSeqNumsFile ); err != nil {
411+ if err := closeSyncFile (store .senderSeqNumsFile ); err != nil {
399412 return err
400413 }
401- if err := closeFile (store .targetSeqNumsFile ); err != nil {
414+ if err := closeSyncFile (store .targetSeqNumsFile ); err != nil {
402415 return err
403416 }
404417
0 commit comments