Skip to content

feat(QTDI-1305): improve error in record #1041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c924108
Add withError in Record.Builder
yyin-talend Apr 21, 2025
ae4cad7
Test for AvroRecord
yyin-talend Apr 21, 2025
9205765
fix isValid
yyin-talend Apr 22, 2025
cfa68e8
Merge branch 'master' into yyin/QTDI-1305-ImproveErrorInRecord
yyin-talend Apr 22, 2025
cf0a3c6
Add sample-feature\supporterror
yyin-talend Apr 30, 2025
7d049d1
worked
yyin-talend Apr 30, 2025
441faae
fix bug
yyin-talend May 6, 2025
0d8f532
Merge branch 'master' into yyin/QTDI-1305-ImproveErrorInRecord
yyin-talend May 6, 2025
de71297
Add new method to white list.
yyin-talend May 6, 2025
d5ab539
Add "valid":true to serialization
yyin-talend May 6, 2025
86ad7ee
Add "valid":true to serialization
yyin-talend May 6, 2025
884420f
Add "valid":true to serialization
yyin-talend May 7, 2025
a9450e3
rename
yyin-talend May 7, 2025
727c512
revert junit which no need to change
yyin-talend May 7, 2025
e2dbb27
fix one problem
yyin-talend May 7, 2025
21cccfc
Merge branch 'master' into yyin/QTDI-1305-ImproveErrorInRecord
yyin-talend May 7, 2025
d2cf309
fix import order
yyin-talend May 7, 2025
02a5ebd
fix import order
yyin-talend May 7, 2025
92df0c0
change command jar
yyin-talend May 8, 2025
fb6a891
fix sonar
yyin-talend May 8, 2025
7314bfb
remove comment
yyin-talend May 8, 2025
01389f5
add readme
yyin-talend May 9, 2025
7884086
remove withError in record API
yyin-talend May 12, 2025
e9ea736
add getErrorMessage,getErrorFallbackValue in Entry API
yyin-talend May 12, 2025
8633ea1
feat(QTDI-1305): Fix RecordBuilder builder pattern +some improvements…
ypiel-talend May 13, 2025
9e8120a
Merge branch 'master' into yyin/QTDI-1305-ImproveErrorInRecord-2
yyin-talend May 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

public interface Record {

String RECORD_ERROR_SUPPORT = "talend.component.record.error.support";

/**
* @return the schema of this record.
*/
Expand Down Expand Up @@ -311,6 +313,11 @@ default Optional<Record> getOptionalRecord(final String name) {
return ofNullable(get(Record.class, name));
}

default boolean isValid() {
return !getSchema().getAllEntries()
.anyMatch(entry -> !entry.isValid());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See sonar comment, more easy to read that double negation.

}

/**
* Allows to create a record with a fluent API. This is the unique recommended way to create a record.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ interface Entry {
*/
boolean isMetadata();

/**
* @return Is this entry can be in error.
*/
boolean isErrorCapable();

/**
* @return true if the value of this entry is valid; false for invalid value.
*/
boolean isValid();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can also add a default behavior as returning true...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not using schema, but using Entry.


/**
* @param <T> the default value type.
*
Expand Down Expand Up @@ -296,6 +306,14 @@ default Entry.Builder toBuilder() {
throw new UnsupportedOperationException("#toBuilder is not implemented");
}

default String getErrorMessage() {
return getProp(SchemaProperty.ENTRY_ERROR_MESSAGE);
}

default String getErrorFallbackValue() {
return getProp(SchemaProperty.ENTRY_ERROR_FALLBACK_VALUE);
}

/**
* Plain builder matching {@link Entry} structure.
*/
Expand All @@ -317,6 +335,8 @@ default Builder withLogicalType(String logicalType) {

Builder withNullable(boolean nullable);

Builder withErrorCapable(boolean errorCapable);

Builder withMetadata(boolean metadata);

<T> Builder withDefaultValue(T value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public interface SchemaProperty {

String ALLOW_SPECIAL_NAME = "field.special.name";

String ENTRY_IS_ON_ERROR = "entry.on.error";

String ENTRY_ERROR_MESSAGE = "entry.error.message";

String ENTRY_ERROR_FALLBACK_VALUE = "entry.error.fallback.value";

String ERROR_EXCEPTION = "entry.error.exception";
Comment on lines +40 to +46
Copy link
Member

@undx undx May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my previous comment. Maybe smtg like record.value.


enum LogicalType {

DATE("date"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ public boolean isNullable() {
return true;
}

@Override
public boolean isErrorCapable() {
return false;
}

@Override
public boolean isMetadata() {
return false;
Expand Down Expand Up @@ -163,6 +168,12 @@ public String getProp(final String property) {
public Builder toBuilder() {
throw new UnsupportedOperationException("#toBuilder()");
}

@Override
public boolean isValid() {
return true;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public boolean isNullable() {
return false;
}

@Override
public boolean isErrorCapable() {
return false;
}

@Override
public boolean isMetadata() {
return false;
Expand Down Expand Up @@ -135,6 +140,11 @@ public Map<String, String> getProps() {
public String getProp(String property) {
return null;
}

@Override
public boolean isValid() {
return true;
}
};
Assertions.assertEquals("value", record.get(String.class, e1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ public boolean isNullable() {
throw new UnsupportedOperationException("#isNullable()");
}

@Override
public boolean isErrorCapable() {
throw new UnsupportedOperationException("#isErrorCapable()");
}

@Override
public boolean isMetadata() {
throw new UnsupportedOperationException("#isMetadata()");
Expand Down Expand Up @@ -326,6 +331,11 @@ public JsonValue getJsonProp(final String name) {
return Entry.super.getJsonProp(name);
}

@Override
public boolean isValid() {
return true;
}

}

class SchemaBuilder implements Schema.Builder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ static class MockEntry implements Schema.Entry {

private final boolean nullable;

private final boolean errorCapable;

private final boolean metadata;

private final Object defaultVal;
Expand All @@ -102,6 +104,11 @@ public String getProp(final String property) {
public Builder toBuilder() {
throw new UnsupportedOperationException("#toBuilder()");
}

@Override
public boolean isValid() {
return true;
}
}

@RequiredArgsConstructor
Expand Down Expand Up @@ -135,6 +142,12 @@ public Entry.Builder withNullable(boolean nullable) {
return this;
}

@Override
public Entry.Builder withErrorCapable(boolean errorCapable) {
this.builder.withErrorCapable(errorCapable);
return this;
}

@Override
public Entry.Builder withMetadata(boolean metadata) {
this.builder.withMetadata(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,53 @@ void mixedRecordTest() {
Assertions.assertNotNull(arrayType);
}

@Test
void testWithError() {
final String val = System.getProperty(Record.RECORD_ERROR_SUPPORT);
System.setProperty(Record.RECORD_ERROR_SUPPORT, "true");
final String val2 = System.getProperty(Record.RECORD_ERROR_SUPPORT);
Comment on lines +214 to +216
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the before and after annotations to handle this...


org.talend.sdk.component.api.record.Schema.Builder schemaBuilder = factory.newSchemaBuilder(Schema.Type.RECORD);
Schema.Entry nameEntry = factory
.newEntryBuilder()
.withName("name")
.withNullable(false)
.withType(Schema.Type.STRING)
.build();
Schema.Entry nmEntry = factory
.newEntryBuilder()
.withName("normal")
.withNullable(true)
.withType(Schema.Type.STRING)
.build();
Schema.Entry ageEntry = factory
.newEntryBuilder()
.withName("age")
.withNullable(false)
.withType(Schema.Type.INT)
.build();
Schema customerSchema = schemaBuilder.withEntry(nameEntry).withEntry(nmEntry).withEntry(ageEntry).build();
// record 1
Record.Builder recordBuilder = factory.newRecordBuilder(customerSchema);
Record record1 = recordBuilder.with(nameEntry, null)
.with(nmEntry, "normal")
.with(ageEntry, "is not an int")
.build();
assertFalse(record1.isValid());

final Schema.Entry entry =
record1.getSchema().getEntries().stream().filter(e -> "name".equals(e.getName())).findAny().get();
assertNotNull(entry);
Assertions.assertFalse(entry.isValid());

final Schema.Entry entry2 =
record1.getSchema().getEntries().stream().filter(e -> "age".equals(e.getName())).findAny().get();
assertNotNull(entry2);
Assertions.assertFalse(entry2.isValid());

System.setProperty(Record.RECORD_ERROR_SUPPORT, "false");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the before and after annotations to handle this...

}

@Test
void recordWithNewSchema() {
final Schema schema0 = new AvroSchemaBuilder()//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void toJson() throws Exception {
.create(new JsonbConfig().withPropertyOrderStrategy(PropertyOrderStrategy.LEXICOGRAPHICAL))) {
final String json = jsonb.toJson(schema);
assertEquals(
"{\"entries\":[{\"elementSchema\":{\"entries\":[],\"metadata\":[],\"props\":{},\"type\":\"STRING\"},\"metadata\":false,\"name\":\"array\",\"nullable\":true,\"props\":{\"talend.component.label\":\"array\"},\"rawName\":\"array\",\"type\":\"ARRAY\"}],\"metadata\":[],\"props\":{\"talend.fields.order\":\"array\"},\"type\":\"RECORD\"}",
"{\"entries\":[{\"elementSchema\":{\"entries\":[],\"metadata\":[],\"props\":{},\"type\":\"STRING\"},\"metadata\":false,\"name\":\"array\",\"nullable\":true,\"props\":{\"talend.component.label\":\"array\"},\"rawName\":\"array\",\"type\":\"ARRAY\",\"valid\":true}],\"metadata\":[],\"props\":{\"talend.fields.order\":\"array\"},\"type\":\"RECORD\"}",
json);
}
}
Expand All @@ -71,7 +71,7 @@ void toJsonWithMeta() throws Exception {
try (final Jsonb jsonb = JsonbBuilder
.create(new JsonbConfig().withPropertyOrderStrategy(PropertyOrderStrategy.LEXICOGRAPHICAL))) {
assertEquals(
"{\"entries\":[],\"metadata\":[{\"elementSchema\":{\"entries\":[],\"metadata\":[],\"props\":{},\"type\":\"STRING\"},\"metadata\":true,\"name\":\"array\",\"nullable\":true,\"props\":{\"talend.component.label\":\"array\"},\"rawName\":\"array\",\"type\":\"ARRAY\"}],\"props\":{\"talend.fields.order\":\"array\"},\"type\":\"RECORD\"}",
"{\"entries\":[],\"metadata\":[{\"elementSchema\":{\"entries\":[],\"metadata\":[],\"props\":{},\"type\":\"STRING\"},\"metadata\":true,\"name\":\"array\",\"nullable\":true,\"props\":{\"talend.component.label\":\"array\"},\"rawName\":\"array\",\"type\":\"ARRAY\",\"valid\":true}],\"props\":{\"talend.fields.order\":\"array\"},\"type\":\"RECORD\"}",
jsonb.toJson(schema));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.talend.sdk.component.api.record.Schema;
import org.talend.sdk.component.api.record.Schema.EntriesOrder;
import org.talend.sdk.component.api.record.Schema.Entry;
import org.talend.sdk.component.api.record.SchemaProperty;

import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -129,6 +130,8 @@ public static class BuilderImpl implements Builder {

private OrderState orderState;

private Map<String, Entry> entriesInError = new HashMap<>();

public BuilderImpl() {
this(null);
}
Expand All @@ -150,7 +153,7 @@ private void initOrderState() {
} else {
final List<Entry> fields = this.providedSchema.naturalOrder()
.getFieldsOrder()
.map(this.providedSchema::getEntry)
.map(n -> this.getEntryWithErrorIfAny(this.providedSchema.getEntry(n)))
.collect(Collectors.toList());
this.orderState = new OrderState(fields);
}
Expand All @@ -171,11 +174,16 @@ public Object getValue(final String name) {

@Override
public Builder with(final Entry entry, final Object value) {
validateTypeAgainstProvidedSchema(entry.getName(), entry.getType(), value);
try {
validateTypeAgainstProvidedSchema(entry.getName(), entry.getType(), value);
} catch (Exception e) {
return withError(entry, value, e.getMessage());
}
if (!entry.getType().isCompatible(value)) {
throw new IllegalArgumentException(String
.format("Entry '%s' of type %s is not compatible with value of type '%s'", entry.getName(),
entry.getType(), value.getClass().getName()));
return withError(entry, value, String
.format("Entry '%s' of type %s is not compatible with given value of type '%s': '%s'.",
entry.getName(),
entry.getType(), value.getClass().getName(), value));
}

if (entry.getType() == Schema.Type.DATETIME) {
Expand Down Expand Up @@ -323,25 +331,53 @@ private Schema.Entry validateTypeAgainstProvidedSchema(final String name, final
return entry;
}

/**
* This method return the updated entry with error information if any.
*
* @param e The entry to check.
* @return The entry updated with error information or the given one.
*/
private Entry getEntryWithErrorIfAny(final Entry e) {
if (!e.isErrorCapable()) {
// The entry doesn't support error management
return e;
}

return entriesInError.getOrDefault(e.getOriginalFieldName(), e);
}

public Record build() {
final Schema currentSchema;
if (this.providedSchema != null) {
final String missing = this.providedSchema
.getAllEntries()
.map(this::getEntryWithErrorIfAny)
.filter(it -> !it.isNullable() && !values.containsKey(it.getName()))
.map(Schema.Entry::getName)
.collect(joining(", "));
if (!missing.isEmpty()) {
throw new IllegalArgumentException("Missing entries: " + missing);
}

Schema schemaWithErrors = this.providedSchema;
if (!this.entriesInError.isEmpty()) {
Schema.Builder schemaBuilder = new SchemaImpl.BuilderImpl()
.withType(this.providedSchema.getType());
this.providedSchema.getEntries()
.stream()
.map(this::getEntryWithErrorIfAny)
.forEach(schemaBuilder::withEntry);
schemaWithErrors = schemaBuilder.build();
}

if (orderState != null && orderState.isOverride()) {
currentSchema = this.providedSchema.toBuilder().build(this.orderState.buildComparator());
currentSchema = schemaWithErrors.toBuilder().build(this.orderState.buildComparator());
} else {
currentSchema = this.providedSchema;
currentSchema = schemaWithErrors;
}
} else {
final Schema.Builder builder = new SchemaImpl.BuilderImpl().withType(RECORD);
this.entries.forEachValue(builder::withEntry);
this.entries.streams().map(this::getEntryWithErrorIfAny).forEach(builder::withEntry);
initOrderState();
currentSchema = builder.build(orderState.buildComparator());
}
Expand Down Expand Up @@ -513,6 +549,25 @@ public <T> Builder withArray(final Schema.Entry entry, final Collection<T> value
return append(entry, values);
}

private Builder withError(final Entry entry, final Object value, final String errorMessage) {
final boolean supportError = Boolean.parseBoolean(System.getProperty(RECORD_ERROR_SUPPORT, "false"));
if (!supportError || !entry.isErrorCapable()) {
throw new IllegalArgumentException(errorMessage);
} else {
// duplicate the schema instance with a modified Entry
final Entry updatedEntry = entry.toBuilder()
.withName(entry.getName())
.withNullable(true)
.withType(entry.getType())
.withProp(SchemaProperty.ENTRY_IS_ON_ERROR, "true")
.withProp(SchemaProperty.ENTRY_ERROR_MESSAGE, errorMessage)
.withProp(SchemaProperty.ENTRY_ERROR_FALLBACK_VALUE, String.valueOf(value))
.build();
this.entriesInError.put(updatedEntry.getOriginalFieldName(), updatedEntry);
return this;
}
}

private void assertType(final Schema.Type actual, final Schema.Type expected) {
if (actual != expected) {
throw new IllegalArgumentException("Expected entry type: " + expected + ", got: " + actual);
Expand Down
Loading
Loading