Skip to content

Commit d28bb30

Browse files
authored
storage: add a ResourceVersion field for lists (#68)
this allows List/Watch to better synchronize on the latest changes fixes #67 Signed-off-by: Nick Santos <[email protected]>
1 parent a7c6fad commit d28bb30

File tree

6 files changed

+94
-33
lines changed

6 files changed

+94
-33
lines changed

pkg/server/builder/builder_resource.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
// Registers a request handler for the resource that stores it on the file system.
1212
func (a *Server) WithResourceFileStorage(obj resource.Object, path string) *Server {
13-
fs := filepath.RealFS{}
13+
fs := filepath.NewRealFS()
1414
ws := filepath.NewWatchSet()
1515
strategy := rest.DefaultStrategy{
1616
Object: obj,

pkg/storage/filepath/fs.go

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,35 +23,54 @@ type FS interface {
2323
EnsureDir(dirname string) error
2424
Write(encoder runtime.Encoder, filepath string, obj runtime.Object, storageVersion uint64) error
2525
Read(decoder runtime.Decoder, path string, newFunc func() runtime.Object) (runtime.Object, error)
26-
VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) error
26+
VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) (uint64, error)
2727
}
2828

2929
type RealFS struct {
30+
mu sync.Mutex
31+
rev uint64
3032
}
3133

32-
var _ FS = RealFS{}
34+
func NewRealFS() *RealFS {
35+
return &RealFS{rev: 1}
36+
}
3337

34-
func (fs RealFS) Remove(filepath string) error {
38+
var _ FS = &RealFS{}
39+
40+
func (fs *RealFS) Remove(filepath string) error {
41+
fs.mu.Lock()
42+
defer fs.mu.Unlock()
43+
_ = fs.incrementRev()
3544
return os.Remove(filepath)
3645
}
3746

38-
func (fs RealFS) Exists(filepath string) bool {
47+
func (fs *RealFS) Exists(filepath string) bool {
48+
fs.mu.Lock()
49+
defer fs.mu.Unlock()
3950
_, err := os.Stat(filepath)
4051
return err == nil
4152
}
4253

43-
func (fs RealFS) EnsureDir(dirname string) error {
44-
if !fs.Exists(dirname) {
54+
func (fs *RealFS) EnsureDir(dirname string) error {
55+
fs.mu.Lock()
56+
defer fs.mu.Unlock()
57+
_, err := os.Stat(dirname)
58+
if err != nil {
4559
return os.MkdirAll(dirname, 0700)
4660
}
4761
return nil
4862
}
4963

50-
func (fs RealFS) Write(encoder runtime.Encoder, filepath string, obj runtime.Object, storageVersion uint64) error {
51-
// TODO(milas): use storageVersion to ensure we don't perform stale writes
64+
func (fs *RealFS) Write(encoder runtime.Encoder, filepath string, obj runtime.Object, storageVersion uint64) error {
65+
fs.mu.Lock()
66+
defer fs.mu.Unlock()
67+
rev := fs.incrementRev()
68+
69+
// TODO(milas): Currently we don't have optimistic concurrency at all.
70+
// Each write has last-one-wins semantics.
5271
// (currently, this isn't a critical priority as our use cases that rely
5372
// on RealFS do not have simultaneous writers)
54-
if err := setResourceVersion(obj, storageVersion+1); err != nil {
73+
if err := setResourceVersion(obj, rev); err != nil {
5574
return err
5675
}
5776

@@ -62,11 +81,17 @@ func (fs RealFS) Write(encoder runtime.Encoder, filepath string, obj runtime.Obj
6281
return ioutil.WriteFile(filepath, buf.Bytes(), 0600)
6382
}
6483

65-
func (fs RealFS) Read(decoder runtime.Decoder, path string, newFunc func() runtime.Object) (runtime.Object, error) {
84+
func (fs *RealFS) Read(decoder runtime.Decoder, path string, newFunc func() runtime.Object) (runtime.Object, error) {
85+
fs.mu.Lock()
6686
content, err := ioutil.ReadFile(filepath.Clean(path))
87+
fs.mu.Unlock()
6788
if err != nil {
6889
return nil, err
6990
}
91+
return fs.decode(decoder, newFunc, content)
92+
}
93+
94+
func (fs *RealFS) decode(decoder runtime.Decoder, newFunc func() runtime.Object, content []byte) (runtime.Object, error) {
7095
newObj := newFunc()
7196
decodedObj, _, err := decoder.Decode(content, nil, newObj)
7297
if err != nil {
@@ -75,8 +100,10 @@ func (fs RealFS) Read(decoder runtime.Decoder, path string, newFunc func() runti
75100
return decodedObj, nil
76101
}
77102

78-
func (fs RealFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) error {
79-
return filepath.Walk(dirname, func(path string, info os.FileInfo, err error) error {
103+
func (fs *RealFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) (uint64, error) {
104+
fs.mu.Lock()
105+
defer fs.mu.Unlock()
106+
err := filepath.Walk(dirname, func(path string, info os.FileInfo, err error) error {
80107
if err != nil {
81108
return err
82109
}
@@ -86,12 +113,28 @@ func (fs RealFS) VisitDir(dirname string, newFunc func() runtime.Object, codec r
86113
if !strings.HasSuffix(info.Name(), ".json") {
87114
return nil
88115
}
89-
newObj, err := fs.Read(codec, path, newFunc)
116+
content, err := ioutil.ReadFile(filepath.Clean(path))
117+
if err != nil {
118+
return err
119+
}
120+
newObj, err := fs.decode(codec, newFunc, content)
90121
if err != nil {
91122
return err
92123
}
93124
return visitFunc(path, newObj)
94125
})
126+
if err != nil {
127+
return 0, err
128+
}
129+
return fs.rev, nil
130+
}
131+
132+
// incrementRev increases the revision counter and returns the new value.
133+
//
134+
// mu must be held.
135+
func (fs *RealFS) incrementRev() uint64 {
136+
fs.rev++
137+
return fs.rev
95138
}
96139

97140
// An in-memory structure that pretends to be a filesystem,
@@ -105,6 +148,7 @@ type MemoryFS struct {
105148
func NewMemoryFS() *MemoryFS {
106149
return &MemoryFS{
107150
dir: make(map[string]interface{}),
151+
rev: 1,
108152
}
109153
}
110154

@@ -289,27 +333,29 @@ func (fs *MemoryFS) decodeBuffer(decoder runtime.Decoder, rawObj versionedData,
289333
}
290334

291335
// Walk the directory, reading all objects in it.
292-
func (fs *MemoryFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) error {
336+
// Return the ResourceVersion of when we did the read.
337+
func (fs *MemoryFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) (uint64, error) {
293338
fs.mu.Lock()
294339
keyPaths, buffers, err := fs.readDir(dirname)
340+
version := fs.rev
295341
fs.mu.Unlock()
296342
if err != nil {
297-
return err
343+
return 0, err
298344
}
299345

300346
// Do decoding and visitation outside the lock.
301347
for i, keyPath := range keyPaths {
302348
buf := buffers[i]
303349
obj, err := fs.decodeBuffer(codec, buf, newFunc)
304350
if err != nil {
305-
return err
351+
return 0, err
306352
}
307353
err = visitFunc(keyPath, obj)
308354
if err != nil {
309-
return err
355+
return 0, err
310356
}
311357
}
312-
return nil
358+
return version, nil
313359
}
314360

315361
// Internal helper for reading the directory. Must hold the mutex.

pkg/storage/filepath/jsonfile_rest.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (f *filepathREST) List(
134134
}
135135

136136
dirname := f.objectDirName(ctx)
137-
if err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) error {
137+
rev, err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) error {
138138
ok, err := p.Matches(obj)
139139
if err != nil {
140140
return err
@@ -143,9 +143,16 @@ func (f *filepathREST) List(
143143
appendItem(v, obj)
144144
}
145145
return nil
146-
}); err != nil {
146+
})
147+
148+
if err != nil {
147149
return nil, fmt.Errorf("failed walking filepath %v: %v", dirname, err)
148150
}
151+
152+
err = setResourceVersion(newListObj, rev)
153+
if err != nil {
154+
return nil, err
155+
}
149156
return newListObj, nil
150157
}
151158

@@ -415,7 +422,7 @@ func (f *filepathREST) DeleteCollection(
415422
return nil, err
416423
}
417424
dirname := f.objectDirName(ctx)
418-
if err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) error {
425+
rev, err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) error {
419426
ok, err := p.Matches(obj)
420427
if err != nil {
421428
return err
@@ -425,9 +432,15 @@ func (f *filepathREST) DeleteCollection(
425432
appendItem(v, obj)
426433
}
427434
return nil
428-
}); err != nil {
435+
})
436+
if err != nil {
429437
return nil, fmt.Errorf("failed walking filepath %v: %v", dirname, err)
430438
}
439+
440+
err = setResourceVersion(newListObj, rev)
441+
if err != nil {
442+
return nil, err
443+
}
431444
return newListObj, nil
432445
}
433446

pkg/storage/filepath/jsonfile_rest_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func TestFilepathREST_Update_OptimisticConcurrency(t *testing.T) {
135135
m.Spec.Message = "updated"
136136
})
137137

138-
require.Equal(t, "2", f.mustMeta(obj).GetResourceVersion())
138+
require.Equal(t, "3", f.mustMeta(obj).GetResourceVersion())
139139
require.Equal(t, "updated", obj.(*v1alpha1.Manifest).Spec.Message)
140140

141141
obj, err := f.update("test-obj", func(obj runtime.Object) {
@@ -151,7 +151,7 @@ func TestFilepathREST_Update_OptimisticConcurrency(t *testing.T) {
151151
obj, err = f.get("test-obj")
152152
require.NoError(t, err, "Failed to fetch object")
153153
// object should not have changed
154-
require.Equal(t, "2", f.mustMeta(obj).GetResourceVersion())
154+
require.Equal(t, "3", f.mustMeta(obj).GetResourceVersion())
155155
require.Equal(t, "updated", obj.(*v1alpha1.Manifest).Spec.Message)
156156
}
157157

@@ -182,13 +182,13 @@ func TestFilepathREST_Update_OptimisticConcurrency_Subresource(t *testing.T) {
182182
m.Status.Message = "updated_status_message"
183183
})
184184

185-
assert.Equal(t, "2", f.mustMeta(obj).GetResourceVersion())
185+
assert.Equal(t, "3", f.mustMeta(obj).GetResourceVersion())
186186
assert.Equal(t, "spec_message", obj.(*v1alpha1.Manifest).Spec.Message)
187187
require.Equal(t, "updated_status_message", obj.(*v1alpha1.Manifest).Status.Message)
188188

189189
obj, err := f.update("test-obj", func(obj runtime.Object) {
190190
m := obj.(*v1alpha1.Manifest)
191-
m.SetResourceVersion("1")
191+
m.SetResourceVersion("2")
192192
m.Status.Message = "impossible"
193193
})
194194

@@ -200,7 +200,7 @@ func TestFilepathREST_Update_OptimisticConcurrency_Subresource(t *testing.T) {
200200
obj, err = f.get("test-obj")
201201
require.NoError(t, err, "Failed to fetch object")
202202
// object should not have changed
203-
assert.Equal(t, "2", f.mustMeta(obj).GetResourceVersion())
203+
assert.Equal(t, "3", f.mustMeta(obj).GetResourceVersion())
204204
assert.Equal(t, "updated_status_message", obj.(*v1alpha1.Manifest).Status.Message)
205205
}
206206

@@ -312,7 +312,7 @@ func TestFilepathREST_UpdateIdentical(t *testing.T) {
312312
// 1) the result of create doesn't have a populated TypeMeta, and the result of update does
313313
// 2) the result of update has a truncated CreationTimestamp
314314
actual := result.(*v1alpha1.Manifest)
315-
require.Equal(t, "1", actual.ResourceVersion)
315+
require.Equal(t, "2", actual.ResourceVersion)
316316
}
317317

318318
type restOptionsGetter struct {

pkg/storage/filepath/jsonfile_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Manifest = v1alpha1.Manifest
3030
type ManifestList = v1alpha1.ManifestList
3131

3232
func fileSystems() []filepath.FS {
33-
return []filepath.FS{filepath.RealFS{}, filepath.NewMemoryFS()}
33+
return []filepath.FS{filepath.NewRealFS(), filepath.NewMemoryFS()}
3434
}
3535

3636
func TestReadEmpty(t *testing.T) {
@@ -185,6 +185,8 @@ func (f *fixture) TestCreateThenList() {
185185

186186
manifestList = obj.(*ManifestList)
187187
assert.Equal(f.t, 1, len(manifestList.Items))
188+
assert.Equal(f.t, manifestList.Items[0].ResourceVersion, manifestList.ResourceVersion)
189+
assert.NotEqual(f.t, "", manifestList.ResourceVersion)
188190
}
189191

190192
func (f *fixture) TestCreateThenReadThenDelete() {

pkg/storage/filepath/version.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ func getResourceVersion(obj runtime.Object) (uint64, error) {
1212
if obj == nil {
1313
return 0, nil
1414
}
15-
objMeta, err := meta.Accessor(obj)
15+
objMeta, err := meta.CommonAccessor(obj)
1616
if err != nil {
1717
return 0, err
1818
}
@@ -24,7 +24,7 @@ func setResourceVersion(obj runtime.Object, v uint64) error {
2424
return fmt.Errorf("resourceVersion must be positive: %d", v)
2525
}
2626

27-
objMeta, err := meta.Accessor(obj)
27+
objMeta, err := meta.CommonAccessor(obj)
2828
if err != nil {
2929
return err
3030
}
@@ -33,7 +33,7 @@ func setResourceVersion(obj runtime.Object, v uint64) error {
3333
}
3434

3535
func clearResourceVersion(obj runtime.Object) error {
36-
objMeta, err := meta.Accessor(obj)
36+
objMeta, err := meta.CommonAccessor(obj)
3737
if err != nil {
3838
return err
3939
}

0 commit comments

Comments
 (0)