Skip to content

Commit

Permalink
[Feature] Consume just filter key or value, not both. 消费消息支持单独过滤key或者…
Browse files Browse the repository at this point in the history
…value. (#1157)

close #1155

Consume just filter key or value, not both.
消费消息支持单独过滤key或者value。

---------

Co-authored-by: weidong_chang <[email protected]>
  • Loading branch information
chang-wd and weidong_chang authored Jun 30, 2024
1 parent 2390ae8 commit 260cbb9
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ const ConsumeClientTest = () => {
...configInfo,
needFilterKeyValue: changeValue === 1 || changeValue === 2,
needFilterSize: changeValue === 3 || changeValue === 4 || changeValue === 5,
needFilterKey: changeValue === 6,
needFilterValue: changeValue === 7,
});
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ export const cardList = [

export const filterList = [
{
label: 'none',
label: 'None',
value: 0,
},
{
label: 'contains',
label: 'Contains',
value: 1,
},
{
label: 'does not contains',
label: 'Does Not Contains',
value: 2,
},
{
label: 'equals',
label: 'Equals',
value: 3,
},
{
Expand All @@ -39,6 +39,14 @@ export const filterList = [
label: 'Under Size',
value: 5,
},
{
label: 'Key Contains',
value: 6,
},
{
label: 'Value Contains',
value: 7,
}
];

export const untilList = [
Expand Down Expand Up @@ -324,10 +332,10 @@ export const getFormConfig = (topicMetaData: any, info = {} as any, partitionLis
key: 'filterKey',
label: 'Key',
type: FormItemType.input,
invisible: !info?.needFilterKeyValue,
invisible: !info?.needFilterKeyValue && !info?.needFilterKey,
rules: [
{
required: info?.needFilterKeyValue,
required: info?.needFilterKeyValue || info?.needFilterKey,
message: '请输入Key',
},
],
Expand All @@ -336,10 +344,10 @@ export const getFormConfig = (topicMetaData: any, info = {} as any, partitionLis
key: 'filterValue',
label: 'Value',
type: FormItemType.input,
invisible: !info?.needFilterKeyValue,
invisible: !info?.needFilterKeyValue && !info?.needFilterValue,
rules: [
{
required: info?.needFilterKeyValue,
required: info?.needFilterKeyValue || info?.needFilterValue,
message: '请输入Value',
},
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,18 @@ private Result<Void> checkStartFromAndFilterLegal(KafkaConsumerStartFromDTO star
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "包含的方式过滤,必须有过滤的key或value");
}

// key包含过滤
if (KafkaConsumerFilterEnum.KEY_CONTAINS.getCode().equals(filter.getFilterType())
&& ValidateUtils.isBlank(filter.getFilterCompareKey())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "key包含的方式过滤,必须有过滤的key");
}

// value包含过滤
if (KafkaConsumerFilterEnum.VALUE_CONTAINS.getCode().equals(filter.getFilterType())
&& ValidateUtils.isBlank(filter.getFilterCompareValue())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "value包含的方式过滤,必须有过滤的value");
}

// 不包含过滤
if (KafkaConsumerFilterEnum.NOT_CONTAINS.getCode().equals(filter.getFilterType())
&& ValidateUtils.isBlank(filter.getFilterCompareKey()) && ValidateUtils.isBlank(filter.getFilterCompareValue())) {
Expand Down Expand Up @@ -550,6 +562,18 @@ private boolean checkMatchFilter(ConsumerRecord consumerRecord, KafkaConsumerFil
return true;
}

// key包含过滤
if (KafkaConsumerFilterEnum.KEY_CONTAINS.getCode().equals(filter.getFilterType())
&& (!ValidateUtils.isBlank(filter.getFilterCompareKey()) && consumerRecord.key() != null && consumerRecord.key().toString().contains(filter.getFilterCompareKey()))) {
return true;
}

// value包含过滤
if (KafkaConsumerFilterEnum.VALUE_CONTAINS.getCode().equals(filter.getFilterType())
&& (!ValidateUtils.isBlank(filter.getFilterCompareValue()) && consumerRecord.value() != null && consumerRecord.value().toString().contains(filter.getFilterCompareValue()))) {
return true;
}

// 不包含过滤
if (KafkaConsumerFilterEnum.NOT_CONTAINS.getCode().equals(filter.getFilterType())
&& (!ValidateUtils.isBlank(filter.getFilterCompareKey()) && (consumerRecord.key() == null || !consumerRecord.key().toString().contains(filter.getFilterCompareKey())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class KafkaConsumerFilterDTO extends BaseDTO {
/**
* @see KafkaConsumerFilterEnum
*/
@Range(min = 0, max = 5, message = "filterType最大和最小值必须在[0, 5]之间")
@Range(min = 0, max = 7, message = "filterType最大和最小值必须在[0, 7]之间")
@ApiModelProperty(value = "开始消费位置的类型", example = "2")
private Integer filterType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ public enum KafkaConsumerFilterEnum {

UNDER_SIZE(5, "size小于"),

KEY_CONTAINS(6, "key包含"),

VALUE_CONTAINS(7, "value包含"),

;

private final Integer code;
Expand Down

0 comments on commit 260cbb9

Please sign in to comment.