diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index bfea710db66f7..59455b787a6ab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -71,7 +71,6 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -861,11 +860,6 @@ public CompletableFuture heartbeatFromResourceManager(final ResourceID res return resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); } - @Override - public CompletableFuture requestJobDetails(Time timeout) { - return CompletableFuture.completedFuture(schedulerNG.requestJobDetails()); - } - @Override public CompletableFuture requestJobStatus(Time timeout) { return CompletableFuture.completedFuture(schedulerNG.requestJobStatus()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 6c1b79568a8d5..02c3c7d501a0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -183,14 +182,6 @@ CompletableFuture heartbeatFromTaskManager( */ CompletableFuture heartbeatFromResourceManager(final ResourceID resourceID); - /** - * Request the details of the executed job. - * - * @param timeout for the rpc call - * @return Future details of the executed job - */ - CompletableFuture requestJobDetails(@RpcTimeout Time timeout); - /** * Requests the current job status. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index c11d7b2ca86d0..7f4ba383e437c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -78,7 +78,6 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; @@ -819,12 +818,6 @@ public JobStatus requestJobStatus() { return executionGraph.getState(); } - @Override - public JobDetails requestJobDetails() { - mainThreadExecutor.assertRunningInMainThread(); - return JobDetails.createDetailsForJob(executionGraph); - } - @Override public KvStateLocation requestKvStateLocation(final JobID jobId, final String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java index 1643dbee28285..b22e6204789eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java @@ -44,7 +44,6 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; @@ -106,8 +105,6 @@ ExecutionState requestPartitionState( JobStatus requestJobStatus(); - JobDetails requestJobDetails(); - // ------------------------------------------------------------------------------------ // Methods below do not belong to Scheduler but are included due to historical reasons // ------------------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 760f9e969b155..101766a97805b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -83,7 +83,6 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -613,11 +612,6 @@ public JobStatus requestJobStatus() { return state.getJobStatus(); } - @Override - public JobDetails requestJobDetails() { - return JobDetails.createDetailsForJob(state.getJob()); - } - @Override public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index c519c21f1c73f..ecdc78f45a937 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -48,7 +48,6 @@ import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -129,7 +128,7 @@ public class TestingJobMasterGateway implements JobMasterGateway { @Nonnull private final Function> resourceManagerHeartbeatFunction; - @Nonnull private final Supplier> requestJobDetailsSupplier; + @Nonnull private final Supplier> requestJobStatusSupplier; @Nonnull private final Supplier> requestJobSupplier; @@ -244,7 +243,7 @@ public TestingJobMasterGateway( CompletableFuture> taskManagerHeartbeatFunction, @Nonnull Function> resourceManagerHeartbeatFunction, - @Nonnull Supplier> requestJobDetailsSupplier, + @Nonnull Supplier> requestJobStatusSupplier, @Nonnull Supplier> requestJobSupplier, @Nonnull Supplier> @@ -327,7 +326,7 @@ public TestingJobMasterGateway( this.registerTaskManagerFunction = registerTaskManagerFunction; this.taskManagerHeartbeatFunction = taskManagerHeartbeatFunction; this.resourceManagerHeartbeatFunction = resourceManagerHeartbeatFunction; - this.requestJobDetailsSupplier = requestJobDetailsSupplier; + this.requestJobStatusSupplier = requestJobStatusSupplier; this.requestJobSupplier = requestJobSupplier; this.checkpointStatsSnapshotSupplier = checkpointStatsSnapshotSupplier; this.triggerSavepointFunction = triggerSavepointFunction; @@ -412,14 +411,9 @@ public CompletableFuture heartbeatFromResourceManager(ResourceID resourceI return resourceManagerHeartbeatFunction.apply(resourceID); } - @Override - public CompletableFuture requestJobDetails(Time timeout) { - return requestJobDetailsSupplier.get(); - } - @Override public CompletableFuture requestJobStatus(Time timeout) { - return requestJobDetailsSupplier.get().thenApply(JobDetails::getStatus); + return requestJobStatusSupplier.get(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java index e462a0d23c501..6fbc367f800cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster.utils; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple6; @@ -45,7 +46,6 @@ import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -118,7 +118,7 @@ public class TestingJobMasterGatewayBuilder { (ignoredA, ignoredB) -> FutureUtils.completedVoidFuture(); private Function> resourceManagerHeartbeatFunction = ignored -> FutureUtils.completedVoidFuture(); - private Supplier> requestJobDetailsSupplier = + private Supplier> requestJobStatusSupplier = () -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); private Supplier> requestJobSupplier = () -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); @@ -280,9 +280,9 @@ public TestingJobMasterGatewayBuilder setResourceManagerHeartbeatFunction( return this; } - public TestingJobMasterGatewayBuilder setRequestJobDetailsSupplier( - Supplier> requestJobDetailsSupplier) { - this.requestJobDetailsSupplier = requestJobDetailsSupplier; + public TestingJobMasterGatewayBuilder setRequestJobStatusSupplier( + Supplier> requestJobStatusSupplier) { + this.requestJobStatusSupplier = requestJobStatusSupplier; return this; } @@ -446,7 +446,7 @@ public TestingJobMasterGateway build() { registerTaskManagerFunction, taskManagerHeartbeatFunction, resourceManagerHeartbeatFunction, - requestJobDetailsSupplier, + requestJobStatusSupplier, requestJobSupplier, checkpointStatsSnapshotSupplier, triggerSavepointFunction, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java index 9820f94db8136..ad4d33edf96ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java @@ -40,7 +40,6 @@ import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -157,12 +156,6 @@ public JobStatus requestJobStatus() { return JobStatus.CREATED; } - @Override - public JobDetails requestJobDetails() { - failOperation(); - return null; - } - @Override public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) { failOperation();