Skip to content

Commit 1ebddea

Browse files
committed
[ZEPPELIN-6408] Improve fallback UX and per-paragraph cancel safety
- Hide the Service Account JSON form on subsequent runs once a value has been supplied. The form is only re-rendered when the value is missing or when the supplied key fails to parse. - Replace the single volatile currentJobId field with a per-paragraph ConcurrentMap of running queries (client + job id). In shared interpreter mode, concurrent paragraphs no longer clobber each other's job state, so cancel always targets the correct job.
1 parent 48b32e4 commit 1ebddea

1 file changed

Lines changed: 49 additions & 26 deletions

File tree

bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import java.util.List;
4242
import java.util.Properties;
4343
import java.util.UUID;
44+
import java.util.concurrent.ConcurrentHashMap;
45+
import java.util.concurrent.ConcurrentMap;
4446

4547
import org.apache.zeppelin.interpreter.Interpreter;
4648
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -86,9 +88,23 @@ public class BigQueryInterpreter extends Interpreter {
8688
static final String SQL_DIALECT = "zeppelin.bigquery.sql_dialect";
8789
static final String REGION = "zeppelin.bigquery.region";
8890

89-
private volatile JobId currentJobId = null;
91+
private static final String SA_JSON_FORM_KEY = "GCP Service Account JSON";
92+
93+
// Tracks running queries per paragraph so concurrent paragraphs in shared
94+
// interpreter mode don't clobber each other's job/cancel state.
95+
private final ConcurrentMap<String, RunningQuery> runningQueries = new ConcurrentHashMap<>();
9096
private Exception exceptionOnConnect;
9197

98+
private static final class RunningQuery {
99+
private final BigQuery client;
100+
private final JobId jobId;
101+
102+
RunningQuery(BigQuery client, JobId jobId) {
103+
this.client = client;
104+
this.jobId = jobId;
105+
}
106+
}
107+
92108
private static final List<InterpreterCompletion> NO_COMPLETION = new ArrayList<>();
93109

94110
private static final List<String> BQ_SCOPES = Collections.singletonList(
@@ -159,15 +175,17 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) {
159175
try {
160176
bqClient = getClientForUser(context);
161177
} catch (IOException e) {
162-
// Fallback: prompt for Service Account JSON via a masked password form
163-
// to avoid rendering the key in plaintext in the note UI.
164-
LOGGER.error("Authentication failed. Requesting service account JSON via GUI", e);
165-
Object raw = context.getGui().password("GCP Service Account JSON");
166-
String saJson = raw == null ? "" : raw.toString();
178+
// Fallback: read a previously-supplied Service Account JSON from paragraph
179+
// form params, or render a masked password form to collect it.
180+
LOGGER.error("Authentication failed. Falling back to user-supplied SA JSON via GUI", e);
181+
Object existing = context.getGui().getParams().get(SA_JSON_FORM_KEY);
182+
String saJson = existing == null ? "" : existing.toString();
167183
if (StringUtils.isBlank(saJson)) {
184+
// No value yet: render the masked form so the user can enter the key.
185+
context.getGui().password(SA_JSON_FORM_KEY);
168186
return new InterpreterResult(Code.ERROR, "%html ⚠️ <b>Authentication Required</b><br/>" +
169187
"Could not find Application Default Credentials. Please input your " +
170-
"Service Account JSON key in the form below and run again.");
188+
"Service Account JSON key in the form and run again.");
171189
}
172190
try {
173191
GoogleCredentials credentials = ServiceAccountCredentials.fromStream(
@@ -188,6 +206,8 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) {
188206
// Do not cache this client in a shared field to avoid leaking user credentials
189207
exceptionOnConnect = null;
190208
} catch (IOException ex) {
209+
// Re-render the masked form so the user can correct an invalid key.
210+
context.getGui().password(SA_JSON_FORM_KEY);
191211
return new InterpreterResult(Code.ERROR, "Failed to parse Service Account JSON: " +
192212
ex.getMessage());
193213
}
@@ -216,16 +236,19 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) {
216236
QueryJobConfiguration queryConfig = queryConfigBuilder.build();
217237

218238
String jobIdStr = UUID.randomUUID().toString();
239+
JobId jobId;
219240
if (StringUtils.isNotBlank(region)) {
220-
currentJobId = JobId.newBuilder().setLocation(region).setJob(jobIdStr).build();
241+
jobId = JobId.newBuilder().setLocation(region).setJob(jobIdStr).build();
221242
} else {
222-
currentJobId = JobId.of(jobIdStr);
243+
jobId = JobId.of(jobIdStr);
223244
}
224245

246+
String paragraphId = context.getParagraphId();
247+
runningQueries.put(paragraphId, new RunningQuery(bqClient, jobId));
225248
try {
226249
LOGGER.info("Executing query: {}", sql);
227250
Job queryJob = bqClient.create(
228-
JobInfo.newBuilder(queryConfig).setJobId(currentJobId).build());
251+
JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
229252

230253
// Wait for the query to complete
231254
queryJob = queryJob.waitFor();
@@ -267,7 +290,7 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) {
267290
LOGGER.error("Query execution failed", ex);
268291
return new InterpreterResult(Code.ERROR, ex.getMessage());
269292
} finally {
270-
currentJobId = null;
293+
runningQueries.remove(paragraphId);
271294
}
272295
}
273296

@@ -301,22 +324,22 @@ public int getProgress(InterpreterContext context) {
301324

302325
@Override
303326
public void cancel(InterpreterContext context) {
304-
LOGGER.info("Trying to Cancel current query statement.");
305-
if (service != null && currentJobId != null) {
306-
try {
307-
boolean cancelled = service.cancel(currentJobId);
308-
if (cancelled) {
309-
LOGGER.info("Query Execution cancelled");
310-
} else {
311-
LOGGER.warn("Query Execution cancellation returned false");
312-
}
313-
} catch (RuntimeException e) {
314-
LOGGER.warn("Failed to cancel BigQuery job {}", currentJobId, e);
315-
} finally {
316-
currentJobId = null;
317-
}
318-
} else {
327+
String paragraphId = context.getParagraphId();
328+
LOGGER.info("Trying to cancel query for paragraph {}", paragraphId);
329+
RunningQuery running = runningQueries.remove(paragraphId);
330+
if (running == null) {
319331
LOGGER.info("Query Execution was already cancelled or not started");
332+
return;
333+
}
334+
try {
335+
boolean cancelled = running.client.cancel(running.jobId);
336+
if (cancelled) {
337+
LOGGER.info("Query Execution cancelled");
338+
} else {
339+
LOGGER.warn("Query Execution cancellation returned false");
340+
}
341+
} catch (RuntimeException e) {
342+
LOGGER.warn("Failed to cancel BigQuery job {}", running.jobId, e);
320343
}
321344
}
322345

0 commit comments

Comments
 (0)