Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 98 additions & 10 deletions cmd/bucket-listobjects-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package cmd

import (
"context"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"time"

"github.com/gorilla/mux"
"github.com/minio/minio/internal/logger"
Expand All @@ -46,12 +49,60 @@ func concurrentDecryptETag(ctx context.Context, objects []ObjectInfo) {
g.Wait()
}

func mergeListObjects(l1, l2 []ObjectInfo) []ObjectInfo {
mergedMap := make(map[string]ObjectInfo)

// Helper function to add/update map entries
addOrUpdate := func(obj ObjectInfo) {
if existingObj, found := mergedMap[obj.Name]; !found || obj.ModTime.After(existingObj.ModTime) {
mergedMap[obj.Name] = obj
}
}
for _, obj := range l1 {
addOrUpdate(obj)
}
for _, obj := range l2 {
addOrUpdate(obj)
}

mergedList := make([]ObjectInfo, 0, len(mergedMap))
for _, obj := range mergedMap {
mergedList = append(mergedList, obj)
}

return mergedList
}

func mergePrefixes(l1, l2 []string) []string {
mergedMap := make(map[string]bool)

// Helper function to add/update map entries
addOrUpdate := func(pre string) {
if _, found := mergedMap[pre]; !found {
mergedMap[pre] = true
}
}
for _, pre := range l1 {
addOrUpdate(pre)
}
for _, pre := range l2 {
addOrUpdate(pre)
}

mergedList := make([]string, 0, len(mergedMap))
for pre, _ := range mergedMap {
mergedList = append(mergedList, pre)
}

return mergedList
}

// Validate all the ListObjects query arguments, returns an APIErrorCode
// if one of the args do not meet the required conditions.
// Special conditions required by MinIO server are as below
// - delimiter if set should be equal to '/', otherwise the request is rejected.
// - marker if set should have a common prefix with 'prefix' param, otherwise
// the request is rejected.
// - delimiter if set should be equal to '/', otherwise the request is rejected.
// - marker if set should have a common prefix with 'prefix' param, otherwise
// the request is rejected.
func validateListObjectsArgs(marker, delimiter, encodingType string, maxKeys int) APIErrorCode {
// Max keys cannot be negative.
if maxKeys < 0 {
Expand Down Expand Up @@ -200,6 +251,12 @@ func (api objectAPIHandlers) ListObjectsV2MHandler(w http.ResponseWriter, r *htt
// NOTE: It is recommended that this API to be used for application development.
// MinIO continues to support ListObjectsV1 for supporting legacy tools.
func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
fmt.Println("ListObjectsV2Handler")
st := time.Now()
defer func() {
elapsed := time.Since(st).Milliseconds()
fmt.Printf("ListObjectsV2Handler took %d ms\n", elapsed)
}()
ctx := newContext(r, w, "ListObjectsV2")

defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
Expand Down Expand Up @@ -235,9 +292,15 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
}

var (
listObjectsV2Info ListObjectsV2Info
err error
listObjectsV2Info ListObjectsV2Info
listObjectsV2InfoCache ListObjectsV2Info
err error
errC error
)
listObjectsV2Cache := objectAPI.ListObjectsV2
if api.CacheAPI() != nil {
listObjectsV2Cache = api.CacheAPI().ListObjectsV2
}

if r.Header.Get(xMinIOExtract) == "true" && strings.Contains(prefix, archivePattern) {
// Inititate a list objects operation inside a zip file based in the input params
Expand All @@ -247,11 +310,18 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
// On success would return back ListObjectsInfo object to be
// marshaled into S3 compatible XML header.
listObjectsV2Info, err = objectAPI.ListObjectsV2(ctx, bucket, prefix, token, delimiter, maxKeys, fetchOwner, startAfter)
listObjectsV2InfoCache, errC = listObjectsV2Cache(ctx, bucket, prefix, token, delimiter, maxKeys, fetchOwner, startAfter)
}
if err != nil {
if err != nil || errC != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
// fmt.Printf("Harsh listObjectsV2Infooo %+v\n", listObjectsV2Info)
// fmt.Printf("Harsh listObjectsV2InfoCache %+v\n", listObjectsV2InfoCache)
mergeObjects := mergeListObjects(listObjectsV2Info.Objects, listObjectsV2InfoCache.Objects)
mergePrefixes := mergePrefixes(listObjectsV2Info.Prefixes, listObjectsV2InfoCache.Prefixes)
listObjectsV2Info.Objects = mergeObjects
listObjectsV2Info.Prefixes = mergePrefixes

concurrentDecryptETag(ctx, listObjectsV2Info.Objects)

Expand Down Expand Up @@ -306,8 +376,13 @@ func proxyRequestByNodeIndex(ctx context.Context, w http.ResponseWriter, r *http
// This implementation of the GET operation returns some or all (up to 1000)
// of the objects in a bucket. You can use the request parameters as selection
// criteria to return a subset of the objects in a bucket.
//
func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
fmt.Println("ListObjectsV1Handler")
st := time.Now()
defer func() {
elapsed := time.Since(st).Milliseconds()
fmt.Printf("ListObjectsV1Handler took %d ms\n", elapsed)
}()
ctx := newContext(r, w, "ListObjectsV1")

defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r))
Expand Down Expand Up @@ -340,20 +415,33 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
}

listObjects := objectAPI.ListObjects

listObjectsCache := objectAPI.ListObjects
if api.CacheAPI() != nil {
listObjectsCache = api.CacheAPI().ListObjects
}
// Inititate a list objects operation based on the input params.
// On success would return back ListObjectsInfo object to be
// marshaled into S3 compatible XML header.
now := time.Now()
listObjectsInfo, err := listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
elapsedBackendList := time.Since(now)
listObjectsInfoCache, errC := listObjectsCache(ctx, bucket, prefix, marker, delimiter, maxKeys)
if err != nil || errC != nil {
writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL)
return
}
elapsedCacheList := time.Since(now) - elapsedBackendList
mergeObjects := mergeListObjects(listObjectsInfo.Objects, listObjectsInfoCache.Objects)
mergePrefixes := mergePrefixes(listObjectsInfo.Prefixes, listObjectsInfoCache.Prefixes)
elapsedMerge := time.Since(now) - elapsedCacheList - elapsedBackendList

listObjectsInfo.Objects = mergeObjects
listObjectsInfo.Prefixes = mergePrefixes

concurrentDecryptETag(ctx, listObjectsInfo.Objects)

response := generateListObjectsV1Response(bucket, prefix, marker, delimiter, encodingType, maxKeys, listObjectsInfo)

log.Println("listObjectsV1 ", "elapsedBackendList", elapsedBackendList.Milliseconds(), " elapsedCacheList", elapsedCacheList.Milliseconds(), " elapsedMerge", elapsedMerge.Milliseconds())
// Write success response.
writeSuccessResponseXML(w, encodeResponse(response))
}
24 changes: 22 additions & 2 deletions cmd/disk-cache-backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,22 @@ func newDiskCache(ctx context.Context, dir string, config cache.Config) (*diskCa
go cache.purgeWait(ctx)
go cache.cleanupStaleUploads(ctx)
if cache.commitWriteback {
go cache.scanCacheWritebackFailures(ctx)
go func() {
tickInterval := time.Duration(config.WriteBackInterval) * time.Second
fmt.Println("write back time interval", tickInterval)
ticker := time.NewTicker(tickInterval)
defer ticker.Stop()
defer close(cache.retryWritebackCh)
for {
select {
case <-ticker.C:
cache.scanCacheWritebackFailures(ctx)
case <-ctx.Done():
return
}
}
}()
//go cache.scanCacheWritebackFailures(ctx)
}
cache.diskSpaceAvailable(0) // update if cache usage is already high.
cache.NewNSLockFn = func(cachePath string) RWLocker {
Expand Down Expand Up @@ -1068,6 +1083,7 @@ func (c *diskCache) bitrotReadFromCache(ctx context.Context, filePath string, of
// Get returns ObjectInfo and reader for object from disk cache
func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, opts ObjectOptions) (gr *GetObjectReader, numHits int, err error) {
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
fmt.Println("cacheObjPath getCacheSHADir", cacheObjPath)
cLock := c.NewNSLockFn(cacheObjPath)
lkctx, err := cLock.GetRLock(ctx, globalOperationTimeout)
if err != nil {
Expand Down Expand Up @@ -1168,6 +1184,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang
go func() {
if writebackInProgress(objInfo.UserDefined) {
cacheObjPath = getCacheWriteBackSHADir(c.dir, bucket, object)
fmt.Println("cacheObjPath getCacheSHADir", cacheObjPath)
}
filePath := pathJoin(cacheObjPath, cacheFile)
err := c.bitrotReadFromCache(ctx, filePath, startOffset, length, pw)
Expand Down Expand Up @@ -1225,20 +1242,23 @@ func (c *diskCache) Exists(ctx context.Context, bucket, object string) bool {

// queues writeback upload failures on server startup
func (c *diskCache) scanCacheWritebackFailures(ctx context.Context) {
defer close(c.retryWritebackCh)
fmt.Println("scan cache write back failures")
//defer close(c.retryWritebackCh) // don't close the channel
filterFn := func(name string, typ os.FileMode) error {
if name == minioMetaBucket {
// Proceed to next file.
return nil
}
cacheDir := pathJoin(c.dir, name)
fmt.Println("cachedir to be uploaded in backend", cacheDir)
meta, _, _, err := c.statCachedMeta(ctx, cacheDir)
if err != nil {
return nil
}

objInfo := meta.ToObjectInfo()
status, ok := objInfo.UserDefined[writeBackStatusHeader]
fmt.Println("status of file", status)
if !ok || status == CommitComplete.String() {
return nil
}
Expand Down
Loading