Skip to content

Commit c87930b

Browse files
nik9000claude
andauthored
ESQL: Fix two CASE bugs (#149907)
* ESQL: Fix two CASE bugs Backports for * ESQL: Fixes wrong warning in expressions with unrolled multivalues #145968 * ESQL: Fix data type of partial folded CASE #149752 * ESQL: Fix backport compile error in PropagateEvalFoldables LimitBy and MMR are plan nodes that don't exist in 9.3; remove those references from the backported PropagateEvalFoldables. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * Missing from backport * ESQL: Skip constant_keyword CSV spec tests in CsvTests Add LOAD_CONSTANT_KEYWORD capability and mark the ten stats_count_distinct tests that depend on per-shard constant_keyword folding. CsvTests uses DisabledSearchStats which returns null for constantValue(), so those tests fail with "Cannot find column" errors. They run correctly in EsqlSpecIT and AggOnCaseFoldIT against a real cluster. --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 2e64bb7 commit c87930b

19 files changed

Lines changed: 835 additions & 61 deletions

File tree

docs/changelog/145968.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: ES|QL
2+
issues: []
3+
pr: 145968
4+
summary: Fixes wrong warning in expressions with unrolled multivalues
5+
type: bug
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.multi_node;
9+
10+
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
11+
12+
import org.elasticsearch.client.Request;
13+
import org.elasticsearch.client.RequestOptions;
14+
import org.elasticsearch.client.WarningsHandler;
15+
import org.elasticsearch.common.Strings;
16+
import org.elasticsearch.test.TestClustersThreadFilter;
17+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
18+
import org.elasticsearch.test.rest.ESRestTestCase;
19+
import org.elasticsearch.xpack.esql.CsvTestsDataLoader;
20+
import org.junit.Before;
21+
import org.junit.ClassRule;
22+
23+
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.HashMap;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.concurrent.TimeUnit;
29+
30+
import static org.elasticsearch.test.MapMatcher.assertMap;
31+
import static org.elasticsearch.test.MapMatcher.matchesMap;
32+
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
34+
35+
/**
36+
* Tests for {@snippet lang="esql" :
37+
* | STATS COUNT_DISTINCT(CASE(foldable, ...))
38+
* }
39+
* when foldable at different places.
40+
*/
41+
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
42+
public class AggOnCaseFoldIT extends ESRestTestCase {
43+
44+
@ClassRule
45+
public static ElasticsearchCluster cluster = Clusters.testCluster(ignored -> {});
46+
47+
@Override
48+
protected String getTestRestCluster() {
49+
return cluster.getHttpAddresses();
50+
}
51+
52+
@Before
53+
public void enableChangeLogging() throws IOException {
54+
Request request = new Request("PUT", "/_cluster/settings");
55+
request.setJsonEntity(
56+
"{\"transient\": {\"logger.org.elasticsearch.xpack.esql.optimizer.LocalLogicalPlanOptimizer.changes\": \"TRACE\"}}"
57+
);
58+
assertOK(client().performRequest(request));
59+
}
60+
61+
@Before
62+
public void loadAndPinIndices() throws Exception {
63+
assumeFalse("Cannot pin shards to specific nodes in serverless mode", isServerless());
64+
CsvTestsDataLoader.loadDatasetsIntoEs(client(), List.of("event_alerts", "event_logs"));
65+
66+
// Pin each index to a different node so they are optimized independently of one another.
67+
Request nodesRequest = new Request("GET", "/_nodes");
68+
nodesRequest.addParameter("filter_path", "nodes.*.name");
69+
Map<String, Object> nodesResponse = entityAsMap(client().performRequest(nodesRequest));
70+
@SuppressWarnings("unchecked")
71+
Map<String, Object> nodes = (Map<String, Object>) nodesResponse.get("nodes");
72+
List<String> nodeNames = new ArrayList<>();
73+
for (Object nodeInfo : nodes.values()) {
74+
@SuppressWarnings("unchecked")
75+
Map<String, Object> info = (Map<String, Object>) nodeInfo;
76+
nodeNames.add((String) info.get("name"));
77+
}
78+
assertThat("Need at least 2 nodes", nodeNames.size(), greaterThanOrEqualTo(2));
79+
80+
Request pinAlerts = new Request("PUT", "/event_alerts/_settings");
81+
pinAlerts.setJsonEntity(Strings.format("""
82+
{
83+
"index.routing.allocation.require._name": "%s",
84+
"index.number_of_replicas": 0
85+
}
86+
""", nodeNames.get(0)));
87+
assertOK(client().performRequest(pinAlerts));
88+
89+
Request pinLogs = new Request("PUT", "/event_logs/_settings");
90+
pinLogs.setJsonEntity(Strings.format("""
91+
{
92+
"index.routing.allocation.require._name": "%s",
93+
"index.number_of_replicas": 0
94+
}
95+
""", nodeNames.get(1)));
96+
assertOK(client().performRequest(pinLogs));
97+
98+
ensureGreen("event_alerts");
99+
ensureGreen("event_logs");
100+
101+
// Paranoidly wait until the shards have landed on the nodes we expect
102+
Request shardsRequest = new Request("GET", "/_cat/shards/event_alerts,event_logs");
103+
shardsRequest.addParameter("format", "json");
104+
assertBusy(() -> {
105+
List<Object> shards = entityAsList(client().performRequest(shardsRequest));
106+
Map<String, List<Map<String, Object>>> shardByIndex = new HashMap<>();
107+
for (Object entry : shards) {
108+
@SuppressWarnings("unchecked")
109+
Map<String, Object> shard = (Map<String, Object>) entry;
110+
shardByIndex.computeIfAbsent((String) shard.get("index"), k -> new ArrayList<>()).add(shard);
111+
}
112+
String ctx = "nodeNames=" + nodeNames + " shards=" + shards;
113+
List<Map<String, Object>> alertsShards = shardByIndex.get("event_alerts");
114+
assertNotNull("event_alerts shards not found in: " + ctx, alertsShards);
115+
for (Map<String, Object> shard : alertsShards) {
116+
assertThat(ctx + " event_alerts shard must be STARTED", shard.get("state"), equalTo("STARTED"));
117+
assertThat(ctx + " event_alerts shard on wrong node", shard.get("node"), equalTo(nodeNames.get(0)));
118+
}
119+
List<Map<String, Object>> logsShards = shardByIndex.get("event_logs");
120+
assertNotNull("event_logs shards not found in: " + ctx, logsShards);
121+
for (Map<String, Object> shard : logsShards) {
122+
assertThat(ctx + " event_logs shard must be STARTED", shard.get("state"), equalTo("STARTED"));
123+
assertThat(ctx + " event_logs shard on wrong node", shard.get("node"), equalTo(nodeNames.get(1)));
124+
}
125+
}, 30, TimeUnit.SECONDS);
126+
}
127+
128+
public void testCoordinatingNodeFoldInline() throws IOException {
129+
List<List<Object>> values = esql("""
130+
FROM event_alerts, event_logs
131+
| STATS COUNT_DISTINCT(CASE(false, username, null))
132+
""");
133+
assertThat(values, equalTo(List.of(List.of(0))));
134+
}
135+
136+
public void testDataNodeFoldInline() throws IOException {
137+
List<List<Object>> values = esql("""
138+
FROM event_alerts, event_logs
139+
| STATS COUNT_DISTINCT(CASE(event_type == "alert", username, null))
140+
""");
141+
assertThat(values, equalTo(List.of(List.of(0))));
142+
}
143+
144+
public void testDataNodeFoldEval() throws IOException {
145+
List<List<Object>> values = esql("""
146+
FROM event_alerts, event_logs
147+
| EVAL username = CASE(event_type == "alert", username, null)
148+
| STATS COUNT_DISTINCT(username)
149+
""");
150+
assertThat(values, equalTo(List.of(List.of(0))));
151+
}
152+
153+
public void testDataNodeFoldEvalAnd() throws IOException {
154+
List<List<Object>> values = esql("""
155+
FROM event_alerts, event_logs
156+
| EVAL foo = CASE(event_type == "alert" AND severity > 0, username, null)
157+
| STATS COUNT_DISTINCT(foo)
158+
""");
159+
assertThat(values, equalTo(List.of(List.of(0))));
160+
}
161+
162+
public void testDataNodeFoldEvalAndTwoBranches() throws IOException {
163+
List<List<Object>> values = esql("""
164+
FROM event_alerts, event_logs
165+
| EVAL foo = CASE(event_type == "alert" AND severity > 0, username,
166+
event_type == "alert" AND severity > 1, label,
167+
null)
168+
| STATS COUNT_DISTINCT(foo)
169+
""");
170+
assertThat(values, equalTo(List.of(List.of(0))));
171+
}
172+
173+
public void testDataNodeFoldInlineTwoBranches() throws IOException {
174+
List<List<Object>> values = esql("""
175+
FROM event_alerts, event_logs
176+
| STATS COUNT_DISTINCT(CASE(event_type == "alert" AND severity > 0, username,
177+
event_type == "alert" AND severity > 1, label,
178+
null))
179+
""");
180+
assertThat(values, equalTo(List.of(List.of(0))));
181+
}
182+
183+
public void testDataNodeFoldEvalOr() throws IOException {
184+
List<List<Object>> values = esql("""
185+
FROM event_alerts, event_logs
186+
| EVAL foo = CASE(event_type == "alert" OR severity > 0, username, null)
187+
| STATS COUNT_DISTINCT(foo)
188+
""");
189+
assertThat(values, equalTo(List.of(List.of(0))));
190+
}
191+
192+
public void testDataNodeNotIn() throws IOException {
193+
List<List<Object>> values = esql("""
194+
FROM event_alerts, event_logs
195+
| EVAL foo = CASE(NOT event_type IN ("login", "other"), username, null)
196+
| STATS COUNT_DISTINCT(foo)
197+
""");
198+
assertThat(values, equalTo(List.of(List.of(0))));
199+
}
200+
201+
public void testDataNodeMultipleCases() throws IOException {
202+
List<List<Object>> values = esql("""
203+
FROM event_alerts, event_logs
204+
| EVAL severity_case = CASE(event_type == "alert", severity, null),
205+
username_case = CASE(event_type == "login", username, null)
206+
| STATS COUNT_DISTINCT(severity_case), COUNT_DISTINCT(username_case)
207+
BY ts_month = DATE_TRUNC(1 month, @timestamp)
208+
| SORT ts_month
209+
""");
210+
assertThat(values, equalTo(List.of(List.of(3, 0, "2024-01-01T00:00:00.000Z"), List.of(0, 3, "2024-02-01T00:00:00.000Z"))));
211+
}
212+
213+
public void testDataNodeMultipleCasesAnd() throws IOException {
214+
List<List<Object>> values = esql("""
215+
FROM event_alerts, event_logs
216+
| EVAL case1 = CASE(event_type == "alert" AND severity > 0, username, null),
217+
case2 = CASE(NOT event_type IN ("alert", "other"), username, null)
218+
| STATS COUNT_DISTINCT(case1), COUNT_DISTINCT(case2)
219+
BY ts_month = DATE_TRUNC(1 month, @timestamp)
220+
| SORT ts_month
221+
""");
222+
assertThat(values, equalTo(List.of(List.of(0, 0, "2024-01-01T00:00:00.000Z"), List.of(0, 3, "2024-02-01T00:00:00.000Z"))));
223+
}
224+
225+
/**
226+
* Returns true when the cluster is running in serverless mode, where index settings like
227+
* {@code index.number_of_replicas} and shard allocation filters are not available.
228+
*/
229+
private boolean isServerless() throws IOException {
230+
for (Map<?, ?> nodeInfo : getNodesInfo(client()).values()) {
231+
@SuppressWarnings("unchecked")
232+
List<Map<?, ?>> modules = (List<Map<?, ?>>) nodeInfo.get("modules");
233+
for (Map<?, ?> module : modules) {
234+
if (module.get("name").toString().startsWith("serverless-")) {
235+
return true;
236+
}
237+
}
238+
}
239+
return false;
240+
}
241+
242+
/**
243+
* Runs an ES|QL query, asserts the result is not partial, and returns the values rows.
244+
* The query may be a text block with newlines between pipe stages.
245+
*/
246+
private List<List<Object>> esql(String query) throws IOException {
247+
Request request = new Request("POST", "/_query");
248+
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(WarningsHandler.PERMISSIVE).build());
249+
String escaped = query.replace("\"", "\\\"").replace("\n", "\\n");
250+
request.setJsonEntity("{\"query\": \"" + escaped + "\"}");
251+
Map<String, Object> result = entityAsMap(client().performRequest(request));
252+
assertMap("no partial failures", result, matchesMap().extraOk().entry("is_partial", false));
253+
@SuppressWarnings("unchecked")
254+
List<List<Object>> values = (List<List<Object>>) result.get("values");
255+
return values;
256+
}
257+
}

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestsDataLoader.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ public class CsvTestsDataLoader {
101101
private static final TestDataset UL_LOGS = new TestDataset("ul_logs");
102102
private static final TestDataset SAMPLE_DATA = new TestDataset("sample_data");
103103
private static final TestDataset MV_SAMPLE_DATA = new TestDataset("mv_sample_data");
104+
private static final TestDataset EVENT_ALERTS = new TestDataset("event_alerts");
105+
private static final TestDataset EVENT_LOGS = new TestDataset("event_logs");
106+
private static final TestDataset EVENT_EMPTY = new TestDataset("event_empty").noData();
104107
private static final TestDataset SAMPLE_DATA_STR = SAMPLE_DATA.withIndex("sample_data_str")
105108
.withTypeMapping(Map.of("client_ip", "keyword"));
106109
private static final TestDataset SAMPLE_DATA_TS_LONG = SAMPLE_DATA.withIndex("sample_data_ts_long")
@@ -219,6 +222,9 @@ public class CsvTestsDataLoader {
219222
Map.entry(SAMPLE_DATA_PARTIAL_MAPPING_NO_SOURCE.indexName, SAMPLE_DATA_PARTIAL_MAPPING_NO_SOURCE),
220223
Map.entry(SAMPLE_DATA_PARTIAL_MAPPING_EXCLUDED_SOURCE.indexName, SAMPLE_DATA_PARTIAL_MAPPING_EXCLUDED_SOURCE),
221224
Map.entry(MV_SAMPLE_DATA.indexName, MV_SAMPLE_DATA),
225+
Map.entry(EVENT_ALERTS.indexName, EVENT_ALERTS),
226+
Map.entry(EVENT_LOGS.indexName, EVENT_LOGS),
227+
Map.entry(EVENT_EMPTY.indexName, EVENT_EMPTY),
222228
Map.entry(ALERTS.indexName, ALERTS),
223229
Map.entry(SAMPLE_DATA_STR.indexName, SAMPLE_DATA_STR),
224230
Map.entry(SAMPLE_DATA_TS_LONG.indexName, SAMPLE_DATA_TS_LONG),
@@ -537,6 +543,24 @@ public static void loadDataSetIntoEs(
537543
);
538544
}
539545

546+
/**
547+
* Load only the named datasets into ES. The names must be keys in {@link #CSV_DATASET_MAP}.
548+
*/
549+
public static void loadDatasetsIntoEs(RestClient client, List<String> datasetNames) throws IOException {
550+
Set<String> loadedDatasets = new HashSet<>();
551+
for (String name : datasetNames) {
552+
TestDataset dataset = CSV_DATASET_MAP.get(name);
553+
if (dataset == null) {
554+
throw new IllegalArgumentException("Unknown dataset: " + name);
555+
}
556+
load(client, dataset, logger, (restClient, indexName, indexMapping, indexSettings) -> {
557+
ESRestTestCase.createIndex(restClient, indexName, indexSettings, indexMapping, null);
558+
});
559+
loadedDatasets.add(dataset.indexName);
560+
}
561+
forceMerge(client, loadedDatasets, logger);
562+
}
563+
540564
private static void loadDataSetIntoEs(
541565
RestClient client,
542566
boolean supportsIndexModeLookup,

x-pack/plugin/esql/qa/testFixtures/src/main/resources/conditional.csv-spec

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,3 +404,14 @@ ROW d = "2025-01-15T00:00:00.000Z"::DATETIME
404404
duration_true:datetime | duration_false:datetime | period_true:datetime | period_false:datetime
405405
2025-01-16T00:00:00.000Z | 2025-01-25T00:00:00.000Z | 2025-02-15T00:00:00.000Z | 2026-01-15T00:00:00.000Z
406406
;
407+
408+
caseOnUnrolledMultivalueNoWarnings
409+
required_capability: fix_unrolled_foldable_mv_warning
410+
ROW x=1, y=[1,2,3] | STATS CASE(y==2, 10, 1) * MIN(x) BY y | SORT y
411+
;
412+
413+
CASE(y==2, 10, 1) * MIN(x):integer | y:integer
414+
1 | 1
415+
10 | 2
416+
1 | 3
417+
;
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
@timestamp:date,severity:integer,label:keyword
2+
2024-01-01T00:00:00.000Z,1,low
3+
2024-01-02T00:00:00.000Z,2,medium
4+
2024-01-03T00:00:00.000Z,3,high
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
@timestamp:date,username:keyword
2+
2024-02-01T00:00:00.000Z,alice
3+
2024-02-02T00:00:00.000Z,bob
4+
2024-02-03T00:00:00.000Z,carol
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"properties": {
3+
"@timestamp": {
4+
"type": "date"
5+
},
6+
"event_type": {
7+
"type": "constant_keyword",
8+
"value": "alert"
9+
},
10+
"severity": {
11+
"type": "integer"
12+
},
13+
"label": {
14+
"type": "keyword"
15+
}
16+
}
17+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"properties": {
3+
"@timestamp": {
4+
"type": "date"
5+
},
6+
"event_type": {
7+
"type": "constant_keyword"
8+
},
9+
"severity": {
10+
"type": "integer"
11+
}
12+
}
13+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"properties": {
3+
"@timestamp": {
4+
"type": "date"
5+
},
6+
"event_type": {
7+
"type": "constant_keyword",
8+
"value": "login"
9+
},
10+
"username": {
11+
"type": "keyword"
12+
}
13+
}
14+
}

0 commit comments

Comments
 (0)