Skip to content

Commit

Permalink
[enhancement](maxcompute)support maxcompute timestamp column type. (#…
Browse files Browse the repository at this point in the history
…48768)

### What problem does this PR solve?

Problem Summary:
1. support maxcompute timestamp column type.
2. Add the parameter `mc.datetime_predicate_push_down` to disable
predicate pushdown for odps catalog datetime type, because the timestamp
precision of odps is 9, while the mapping precision of Doris is 6. If we
insert `2023-02-02 00:00:00.123456789` into odps, doris will read it as
`2023-02-02 00:00:00.123456`. Due to the lack of "789", we cannot push
it down correctly. If you don't need such a high precision (greater than
6) on odps, it will not affect your normal use.
  • Loading branch information
hubgeter authored Mar 9, 2025
1 parent 00ec587 commit 2988dc3
Show file tree
Hide file tree
Showing 7 changed files with 545 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.TimeStampNanoVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.ValueVector;
Expand Down Expand Up @@ -233,15 +234,17 @@ public LocalDateTime getDateTime() {
LocalDateTime result;

ArrowType.Timestamp timestampType = ( ArrowType.Timestamp) column.getField().getFieldType().getType();
if (timestampType.getUnit() == org.apache.arrow.vector.types.TimeUnit.MILLISECOND) {
if (timestampType.getUnit() == org.apache.arrow.vector.types.TimeUnit.MILLISECOND) { //DATETIME
result = convertToLocalDateTime((TimeStampMilliTZVector) column, idx);
} else {
} else if (timestampType.getTimezone() == null) { // TIMESTAMP_NTZ
NullableTimeStampNanoHolder valueHoder = new NullableTimeStampNanoHolder();
((TimeStampNanoVector) column).get(idx, valueHoder);
long timestampNanos = valueHoder.value;

result = LocalDateTime.ofEpochSecond(timestampNanos / 1_000_000_000,
(int) (timestampNanos % 1_000_000_000), java.time.ZoneOffset.UTC);
} else { // TIMESTAMP
result = convertToLocalDateTime((TimeStampNanoTZVector) column, idx);
}

/*
Expand Down Expand Up @@ -330,4 +333,9 @@ public LocalDateTime convertToLocalDateTime(TimeStampMilliTZVector milliTZVector
return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestampMillis), timeZone);
}

public LocalDateTime convertToLocalDateTime(TimeStampNanoTZVector nanoTZVector, int index) {
long timestampNano = nanoTZVector.get(index);
return Instant.ofEpochSecond(timestampNano / 1_000_000_000, timestampNano % 1_000_000_000)
.atZone(timeZone).toLocalDateTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
private int readTimeout;
private int retryTimes;

public boolean dateTimePredicatePushDown;

private static final Map<String, ZoneId> REGION_ZONE_MAP;
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
MCProperties.PROJECT,
Expand Down Expand Up @@ -201,7 +203,9 @@ protected void initLocalObjectsImpl() {
accessKey = credential.getAccessKey();
secretKey = credential.getSecretKey();


dateTimePredicatePushDown = Boolean.parseBoolean(
props.getOrDefault(MCProperties.DATETIME_PREDICATE_PUSH_DOWN,
MCProperties.DEFAULT_DATETIME_PREDICATE_PUSH_DOWN));

Account account = new AliyunAccount(accessKey, secretKey);
this.odps = new Odps(account);
Expand Down Expand Up @@ -338,6 +342,10 @@ public int getReadTimeout() {
return readTimeout;
}

public boolean getDateTimePredicatePushDown() {
return dateTimePredicatePushDown;
}

public ZoneId getProjectDateTimeZone() {
makeSureInitialized();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ private Type mcTypeToDorisType(TypeInfo typeInfo) {
case DATETIME: {
return ScalarType.createDatetimeV2Type(3);
}
case TIMESTAMP:
case TIMESTAMP_NTZ: {
return ScalarType.createDatetimeV2Type(6);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
import java.util.stream.Collectors;

public class MaxComputeScanNode extends FileQueryScanNode {
static final DateTimeFormatter dateTime3Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
static final DateTimeFormatter dateTime6Formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");

private final MaxComputeExternalTable table;
private Predicate filterPredicate;
Expand Down Expand Up @@ -492,16 +494,40 @@ private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws A
return " \"" + dateLiteral.getStringValue(dstType) + "\" ";
}
case DATETIME: {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(3);
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
if (mcCatalog.getDateTimePredicatePushDown()) {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(3);

return " \"" + convertDateTimezone(dateLiteral.getStringValue(dstType),
((MaxComputeExternalCatalog) table.getCatalog()).getProjectDateTimeZone()) + "\" ";
return " \"" + convertDateTimezone(dateLiteral.getStringValue(dstType), dateTime3Formatter,
ZoneId.of("UTC")) + "\" ";
}
break;
}
/**
* Disable the predicate pushdown to the odps API because the timestamp precision of odps is 9 and the
* mapping precision of Doris is 6. If we insert `2023-02-02 00:00:00.123456789` into odps, doris reads
* it as `2023-02-02 00:00:00.123456`. Since "789" is missing, we cannot push it down correctly.
*/
case TIMESTAMP: {
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
if (mcCatalog.getDateTimePredicatePushDown()) {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(6);

return " \"" + convertDateTimezone(dateLiteral.getStringValue(dstType), dateTime6Formatter,
ZoneId.of("UTC")) + "\" ";
}
break;
}
case TIMESTAMP_NTZ: {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(6);
return " \"" + dateLiteral.getStringValue(dstType) + "\" ";
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
if (mcCatalog.getDateTimePredicatePushDown()) {
DateLiteral dateLiteral = (DateLiteral) literalExpr;
ScalarType dstType = ScalarType.createDatetimeV2Type(6);
return " \"" + dateLiteral.getStringValue(dstType) + "\" ";
}
break;
}
default: {
break;
Expand All @@ -511,12 +537,11 @@ private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws A
}


public static String convertDateTimezone(String dateTimeStr, ZoneId toZone) {
public static String convertDateTimezone(String dateTimeStr, DateTimeFormatter formatter, ZoneId toZone) {
if (DateUtils.getTimeZone().equals(toZone)) {
return dateTimeStr;
}

DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
LocalDateTime localDateTime = LocalDateTime.parse(dateTimeStr, formatter);

ZonedDateTime sourceZonedDateTime = localDateTime.atZone(DateUtils.getTimeZone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public class MCProperties extends BaseProperties {
public static final String SPLIT_CROSS_PARTITION = "mc.split_cross_partition";
public static final String DEFAULT_SPLIT_CROSS_PARTITION = "true";

public static final String DATETIME_PREDICATE_PUSH_DOWN =
"mc.datetime_predicate_push_down";
public static final String DEFAULT_DATETIME_PREDICATE_PUSH_DOWN = "true";

public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
Expand Down
Loading

0 comments on commit 2988dc3

Please sign in to comment.