Skip to content

Commit a046c81

Browse files
authored
Add connection singleton and productFileCache (#619)
1 parent f218c24 commit a046c81

3 files changed

Lines changed: 105 additions & 9 deletions

File tree

api/handlers/productfile_cog.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,24 +59,21 @@ func ListProductfilesCOG(db *pgxpool.Pool) echo.HandlerFunc {
5959
// Content-Range/Accept-Ranges so a GDAL /vsicurl/ client can read tiles directly
6060
// (no full download). HEAD returns size + range support for /vsicurl/ probing.
6161
// Every request is authenticated (private route) and logged for metering.
62-
func StreamProductfileCOG(db *pgxpool.Pool, awsCfg *aws.Config, forcePathStyle bool) echo.HandlerFunc {
62+
func StreamProductfileCOG(db *pgxpool.Pool, client *s3.Client) echo.HandlerFunc {
6363
return func(c echo.Context) error {
6464
pfID, err := uuid.Parse(c.Param("productfile_id"))
6565
if err != nil {
6666
return c.String(http.StatusBadRequest, "Malformed productfile ID")
6767
}
6868

69-
obj, err := models.GetProductfileObject(db, pfID)
69+
// Memoized lookup — the productfile -> bucket/key mapping is immutable, so this avoids a
70+
// Postgres query on every Range request (one client import fires thousands).
71+
obj, err := models.GetProductfileObjectCached(db, pfID)
7072
if err != nil {
7173
return c.String(http.StatusNotFound, "Productfile not found")
7274
}
7375

7476
ctx := c.Request().Context()
75-
// Endpoint/scheme come from the environment (AWS_ENDPOINT_URL_S3) via
76-
// awsCfg; only path-style addressing still needs to be set per-client.
77-
client := s3.NewFromConfig(*awsCfg, func(o *s3.Options) {
78-
o.UsePathStyle = forcePathStyle
79-
})
8077

8178
// HEAD: metadata only (size + range support) for /vsicurl/ probing.
8279
if c.Request().Method == http.MethodHead {

api/main.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"strings"
99
"time"
1010

11+
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
1112
"github.com/aws/aws-sdk-go-v2/config"
13+
"github.com/aws/aws-sdk-go-v2/service/s3"
1214
"github.com/labstack/echo/v4"
1315
"golang.org/x/net/http2"
1416

@@ -64,6 +66,20 @@ func main() {
6466
return nil
6567
})
6668

69+
// One shared S3 client for the COG proxy. SDK v2 already shares the credential provider + HTTP
70+
// client via the config, but building the client once (no per-request allocation) and widening
71+
// the idle-connection pool lets the high request concurrency of a COG import reuse keep-alive
72+
// connections to S3 instead of re-handshaking. Safe for concurrent use; credentials auto-refresh.
73+
cogHTTPClient := awshttp.NewBuildableClient().WithTransportOptions(func(t *http.Transport) {
74+
t.MaxIdleConns = 200
75+
t.MaxIdleConnsPerHost = 100
76+
t.IdleConnTimeout = 90 * time.Second
77+
})
78+
cogS3Client := s3.NewFromConfig(cfg.AwsConfig, func(o *s3.Options) {
79+
o.UsePathStyle = cfg.AWSS3ForcePathStyle
80+
o.HTTPClient = cogHTTPClient
81+
})
82+
6783
// Database
6884
db := Connection(cfg)
6985

@@ -162,8 +178,8 @@ func main() {
162178
public.GET("/products/:product_id/files", handlers.ListProductfiles(db))
163179
// Direct, Range-capable COG access (authenticated + metered) for desktop clients
164180
private.GET("/products/:product_id/cog-files", handlers.ListProductfilesCOG(db))
165-
private.GET("/products/:product_id/cog/:productfile_id", handlers.StreamProductfileCOG(db, &cfg.AwsConfig, cfg.AWSS3ForcePathStyle))
166-
private.HEAD("/products/:product_id/cog/:productfile_id", handlers.StreamProductfileCOG(db, &cfg.AwsConfig, cfg.AWSS3ForcePathStyle))
181+
private.GET("/products/:product_id/cog/:productfile_id", handlers.StreamProductfileCOG(db, cogS3Client))
182+
private.HEAD("/products/:product_id/cog/:productfile_id", handlers.StreamProductfileCOG(db, cogS3Client))
167183

168184
// Productfiles
169185
private.POST("/productfiles", handlers.CreateProductfiles(db),

api/models/productfiles.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package models
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
// Postgres Database Driver
@@ -59,6 +60,88 @@ func GetProductfileObject(db *pgxpool.Pool, ID uuid.UUID) (*ProductfileObject, e
5960
return &obj, nil
6061
}
6162

63+
// --- Cached productfile -> {bucket, key} lookup ------------------------------------------------
64+
//
65+
// The COG proxy (StreamProductfileCOG) resolves the S3 bucket+key on EVERY Range request, and a
66+
// single client import fires thousands of them. The mapping is immutable for a given productfile id
67+
// (its file column never changes, and write_to_bucket is a process-constant config value), so it is
68+
// safe to memoize — keeping the 15-connection Postgres pool out of the per-read hot path.
69+
70+
// productfileCacheMax bounds the in-memory key cache so a long-running process can't grow without
71+
// limit. Entries are ~an id + a short key string; the cap is ~tens of MB.
72+
const productfileCacheMax = 200000
73+
74+
var (
75+
pfCacheMu sync.RWMutex
76+
pfCacheBucket string
77+
pfCacheKeys = make(map[uuid.UUID]string)
78+
)
79+
80+
// GetProductfileObjectCached returns the S3 bucket+key for a productfile id, memoized. On a miss it
81+
// falls back to the database (one cheap scalar query); the write_to_bucket bucket is read once and
82+
// reused. Safe for concurrent use.
83+
func GetProductfileObjectCached(db *pgxpool.Pool, ID uuid.UUID) (*ProductfileObject, error) {
84+
pfCacheMu.RLock()
85+
key, keyOK := pfCacheKeys[ID]
86+
bucket := pfCacheBucket
87+
pfCacheMu.RUnlock()
88+
89+
if keyOK && bucket != "" {
90+
return &ProductfileObject{Bucket: bucket, Key: key}, nil
91+
}
92+
93+
// Resolve the bucket once (immutable config value).
94+
if bucket == "" {
95+
b, err := getWriteToBucket(db)
96+
if err != nil {
97+
return nil, err
98+
}
99+
bucket = b
100+
pfCacheMu.Lock()
101+
pfCacheBucket = b
102+
pfCacheMu.Unlock()
103+
}
104+
105+
// Resolve + cache the key (immutable per id). A rare concurrent double-miss just queries twice
106+
// and stores the same value — harmless.
107+
if !keyOK {
108+
k, err := getProductfileKey(db, ID)
109+
if err != nil {
110+
return nil, err
111+
}
112+
key = k
113+
pfCacheMu.Lock()
114+
// Bound memory over a long uptime. Entries are immutable and cheap to repopulate, so a crude
115+
// drop-all on overflow is safe (subsequent reads simply re-query).
116+
if len(pfCacheKeys) >= productfileCacheMax {
117+
pfCacheKeys = make(map[uuid.UUID]string)
118+
}
119+
pfCacheKeys[ID] = k
120+
pfCacheMu.Unlock()
121+
}
122+
123+
return &ProductfileObject{Bucket: bucket, Key: key}, nil
124+
}
125+
126+
func getWriteToBucket(db *pgxpool.Pool) (string, error) {
127+
var bucket string
128+
err := db.QueryRow(
129+
context.Background(),
130+
`SELECT config_value FROM config WHERE config_name::text = 'write_to_bucket'::text`,
131+
).Scan(&bucket)
132+
return bucket, err
133+
}
134+
135+
func getProductfileKey(db *pgxpool.Pool, ID uuid.UUID) (string, error) {
136+
var key string
137+
err := db.QueryRow(
138+
context.Background(),
139+
`SELECT file FROM productfile WHERE id = $1`,
140+
ID,
141+
).Scan(&key)
142+
return key, err
143+
}
144+
62145
// ListProductfiles returns array of productfiles
63146
func ListProductfiles(db *pgxpool.Pool, ID uuid.UUID, after string, before string) ([]Productfile, error) {
64147
ff := make([]Productfile, 0)

0 commit comments

Comments
 (0)