Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cwms-data-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ dependencies {
implementation(libs.bundles.overrides)

testImplementation(libs.bundles.java.parser)
implementation(libs.togglz.core)
implementation(libs.minio)
}

task extractWebJars(type: Copy) {
Expand Down Expand Up @@ -245,7 +247,7 @@ task run(type: JavaExec) {
}

task integrationTests(type: Test) {
dependsOn test
// dependsOn test
dependsOn generateConfig
dependsOn war

Expand Down
56 changes: 47 additions & 9 deletions cwms-data-api/src/main/java/cwms/cda/api/BlobController.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import com.codahale.metrics.Timer;
import cwms.cda.api.errors.CdaError;
import cwms.cda.data.dao.BlobDao;
import cwms.cda.data.dao.BlobAccess;
import cwms.cda.data.dao.ObjectStorageBlobDao;
import cwms.cda.data.dao.ObjectStorageConfig;
import cwms.cda.data.dao.JooqDao;
import cwms.cda.data.dto.Blob;
import cwms.cda.data.dto.Blobs;
Expand All @@ -33,6 +36,9 @@

import org.jetbrains.annotations.NotNull;
import org.jooq.DSLContext;
import org.togglz.core.context.FeatureContext;
import cwms.cda.features.CdaFeatures;
import org.togglz.core.manager.FeatureManager;


/**
Expand Down Expand Up @@ -62,6 +68,31 @@ protected DSLContext getDslContext(Context ctx) {
return JooqDao.getDslContext(ctx);
}

private BlobAccess chooseBlobAccess(DSLContext dsl) {
boolean useObjectStore = isObjectStorageEnabled();
try {
// Prefer Togglz if available
FeatureManager featureManager = FeatureContext.getFeatureManager();
useObjectStore = featureManager.isActive(CdaFeatures.USE_OBJECT_STORAGE_BLOBS);
} catch (Throwable ignore) {
// fall back to system/env property check
}
if (useObjectStore) {
ObjectStorageConfig cfg = ObjectStorageConfig.fromSystem();
return new ObjectStorageBlobDao(cfg);
}
return new BlobDao(dsl);
}

private boolean isObjectStorageEnabled() {
// System properties first, then env. Accept FEATURE=true
String key = String.valueOf(CdaFeatures.USE_OBJECT_STORAGE_BLOBS);
String v = System.getProperty(key);
if (v == null) v = System.getProperty(key);
if (v == null) v = System.getenv(key);
return v != null && ("true".equalsIgnoreCase(v) || "1".equals(v));
}

@OpenApi(
queryParams = {
@OpenApiParam(name = OFFICE,
Expand Down Expand Up @@ -115,7 +146,7 @@ public void getAll(@NotNull Context ctx) {
String formatHeader = ctx.header(Header.ACCEPT);
ContentType contentType = Formats.parseHeader(formatHeader, Blobs.class);

BlobDao dao = new BlobDao(dsl);
BlobAccess dao = chooseBlobAccess(dsl);
Blobs blobs = dao.getBlobs(cursor, pageSize, office, like);

String result = Formats.format(contentType, blobs);
Expand Down Expand Up @@ -151,12 +182,13 @@ public void getAll(@NotNull Context ctx) {
public void getOne(@NotNull Context ctx, @NotNull String blobId) {

try (final Timer.Context ignored = markAndTime(GET_ONE)) {
String idQueryParam = ctx.queryParam(CLOB_ID);
String idQueryParam = ctx.queryParam(BLOB_ID);
if (idQueryParam != null) {
blobId = idQueryParam;
}
DSLContext dsl = getDslContext(ctx);
BlobDao dao = new BlobDao(dsl);

BlobAccess dao = chooseBlobAccess(dsl);
String officeQP = ctx.queryParam(OFFICE);
Optional<String> office = Optional.ofNullable(officeQP);

Expand Down Expand Up @@ -204,7 +236,7 @@ public void create(@NotNull Context ctx) {
boolean failIfExists = ctx.queryParamAsClass(FAIL_IF_EXISTS, Boolean.class).getOrDefault(true);
ContentType contentType = Formats.parseHeader(formatHeader, Blob.class);
Blob blob = Formats.parseContent(contentType, ctx.bodyAsInputStream(), Blob.class);
BlobDao dao = new BlobDao(dsl);
BlobAccess dao = chooseBlobAccess(dsl);
dao.create(blob, failIfExists, false);
ctx.status(HttpCode.CREATED);
}
Expand All @@ -213,7 +245,7 @@ public void create(@NotNull Context ctx) {
@OpenApi(
description = "Update an existing Blob",
pathParams = {
@OpenApiParam(name = BLOB_ID, description = "The blob identifier to be deleted"),
@OpenApiParam(name = BLOB_ID, description = "The blob identifier to be updated"),
},
requestBody = @OpenApiRequestBody(
content = {
Expand All @@ -235,7 +267,7 @@ public void create(@NotNull Context ctx) {
@Override
public void update(@NotNull Context ctx, @NotNull String blobId) {
try (final Timer.Context ignored = markAndTime(UPDATE)) {
String idQueryParam = ctx.queryParam(CLOB_ID);
String idQueryParam = ctx.queryParam(BLOB_ID);
if (idQueryParam != null) {
blobId = idQueryParam;
}
Expand All @@ -260,7 +292,13 @@ public void update(@NotNull Context ctx, @NotNull String blobId) {
+ "updating a blob");
}

BlobDao dao = new BlobDao(dsl);
if(!blob.getId().equals(blobId)) {
throw new FormattingException("The blob id parameter does not match the blob id in the body. " +
"The blob end-point does not support renaming blobs. " +
"Create a new blob with the new id and delete the old one.");
}

BlobAccess dao = chooseBlobAccess(dsl);
dao.update(blob, false);
ctx.status(HttpServletResponse.SC_OK);
}
Expand All @@ -287,13 +325,13 @@ public void update(@NotNull Context ctx, @NotNull String blobId) {
@Override
public void delete(@NotNull Context ctx, @NotNull String blobId) {
try (Timer.Context ignored = markAndTime(DELETE)) {
String idQueryParam = ctx.queryParam(CLOB_ID);
String idQueryParam = ctx.queryParam(BLOB_ID);
if (idQueryParam != null) {
blobId = idQueryParam;
}
DSLContext dsl = getDslContext(ctx);
String office = requiredParam(ctx, OFFICE);
BlobDao dao = new BlobDao(dsl);
BlobAccess dao = chooseBlobAccess(dsl);
dao.delete(office, blobId);
ctx.status(HttpServletResponse.SC_NO_CONTENT);
}
Expand Down
107 changes: 107 additions & 0 deletions cwms-data-api/src/main/java/cwms/cda/api/RangeParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package cwms.cda.api;

import org.jspecify.annotations.NonNull;

import java.util.*;
import java.util.regex.*;

/**
* Utility class for parsing HTTP Range headers.
* These typically look like: bytes=100-1234
* or: bytes=100- this is common to resume a download
* or: bytes=0- equivalent to a regular request for the whole file
* but by returning 206 we show that we support range requests
* Note that multiple ranges can be requested at once such
* as: bytes=500-600,700-999 Server responds identifies separator and then puts separator between chunks
* bytes=0-0,-1 also legal its just the first and the last byte
* or: bytes=500-600,601-999 legal but what is the point?
* or: bytes=500-700,601-999 legal, notice they overlap.
*
*
*/
public class RangeParser {

private static final Pattern RANGE_PATTERN = Pattern.compile("(\\d*)-(\\d*)");

/**
* Return a list of two element long[] containing byte ranges parsed from the HTTP Range header.
* If the end of a range is not specified ( e.g. bytes=100- ) then a -1 is returned in the second position
* If the range only includes a negative byte (e.g bytes=-50) then -1 is returned as the start of the range
* and -1*end is returned as the end of the range. bytes=-50 will result in [-1,50]
*
* @param header the HTTP Range header
* @return a list of byte ranges
*/
public static List<long[]> parse(String header) {
if (header == null || header.isEmpty() ) {
return Collections.emptyList();
} else if ( !header.startsWith("bytes=")){
throw new IllegalArgumentException("Invalid Range header: " + header);
}

String rangePart = header.substring(6);
List<long[]> retval = parseRanges(rangePart);
if( retval.isEmpty() ){
throw new IllegalArgumentException("Invalid Range header: " + header);
}
return retval;
}

public static @NonNull List<long[]> parseRanges(String rangePart) {
if( rangePart == null || rangePart.isEmpty() ){
throw new IllegalArgumentException("Invalid range specified: " + rangePart);
}
String[] parts = rangePart.split(",");
List<long[]> ranges = new ArrayList<>();

for (String part : parts) {
Matcher m = RANGE_PATTERN.matcher(part.trim());
if (m.matches()) {
String start = m.group(1);
String end = m.group(2);

long s = start.isEmpty() ? -1 : Long.parseLong(start);
long e = end.isEmpty() ? -1 : Long.parseLong(end);

ranges.add(new long[]{s, e});
}
}
return ranges;
}

/**
* The parse() method in this class can return -1 for unspecified values or when suffix ranges are supplied.
* This method interprets the negative values in regard to the totalSize and returns inclusive indices of the
* requested range.
* @param inputs the array of start and end byte positions
* @param totalBytes the total number of bytes in the file
* @return a long array with the start and end byte positions, these are inclusive. [0,0] means return the first byte
*/
public static long[] interpret(long[] inputs, long totalBytes){
if(inputs == null){
throw new IllegalArgumentException("null range array provided");
} else if( inputs.length != 2 ){
throw new IllegalArgumentException("Invalid number of inputs: " + Arrays.toString(inputs));
}

long start = inputs[0];
long end = inputs[1];

if(start == -1L){
// its a suffix request.
start = totalBytes - end;
end = totalBytes - 1;
} else {
if (start < 0 || end < start) {
throw new IllegalArgumentException("Invalid range specified: " + Arrays.toString(inputs));
}

start = Math.min(start, totalBytes - 1);
end = Math.min(end, totalBytes - 1);
}

return new long[]{start, end};
}


}
Loading
Loading