Skip to content

Commit

Permalink
[FLINK-34097] Remove JobMasterGateway#requestJobDetails
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jan 17, 2024
1 parent 4559b85 commit 1ffb481
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
Expand Down Expand Up @@ -861,11 +860,6 @@ public CompletableFuture<Void> heartbeatFromResourceManager(final ResourceID res
return resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

@Override
public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
return CompletableFuture.completedFuture(schedulerNG.requestJobDetails());
}

@Override
public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
return CompletableFuture.completedFuture(schedulerNG.requestJobStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.registration.RegistrationResponse;
Expand Down Expand Up @@ -183,14 +182,6 @@ CompletableFuture<Void> heartbeatFromTaskManager(
*/
CompletableFuture<Void> heartbeatFromResourceManager(final ResourceID resourceID);

/**
* Request the details of the executed job.
*
* @param timeout for the rpc call
* @return Future details of the executed job
*/
CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout);

/**
* Requests the current job status.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
Expand Down Expand Up @@ -819,12 +818,6 @@ public JobStatus requestJobStatus() {
return executionGraph.getState();
}

@Override
public JobDetails requestJobDetails() {
mainThreadExecutor.assertRunningInMainThread();
return JobDetails.createDetailsForJob(executionGraph);
}

@Override
public KvStateLocation requestKvStateLocation(final JobID jobId, final String registrationName)
throws UnknownKvStateLocation, FlinkJobNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
Expand Down Expand Up @@ -106,8 +105,6 @@ ExecutionState requestPartitionState(

JobStatus requestJobStatus();

JobDetails requestJobDetails();

// ------------------------------------------------------------------------------------
// Methods below do not belong to Scheduler but are included due to historical reasons
// ------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
Expand Down Expand Up @@ -613,11 +612,6 @@ public JobStatus requestJobStatus() {
return state.getJobStatus();
}

@Override
public JobDetails requestJobDetails() {
return JobDetails.createDetailsForJob(state.getJob());
}

@Override
public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName)
throws UnknownKvStateLocation, FlinkJobNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
Expand Down Expand Up @@ -129,7 +128,7 @@ public class TestingJobMasterGateway implements JobMasterGateway {
@Nonnull
private final Function<ResourceID, CompletableFuture<Void>> resourceManagerHeartbeatFunction;

@Nonnull private final Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier;
@Nonnull private final Supplier<CompletableFuture<JobStatus>> requestJobStatusSupplier;

@Nonnull private final Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier;

Expand Down Expand Up @@ -244,7 +243,7 @@ public TestingJobMasterGateway(
CompletableFuture<Void>>
taskManagerHeartbeatFunction,
@Nonnull Function<ResourceID, CompletableFuture<Void>> resourceManagerHeartbeatFunction,
@Nonnull Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier,
@Nonnull Supplier<CompletableFuture<JobStatus>> requestJobStatusSupplier,
@Nonnull Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier,
@Nonnull
Supplier<CompletableFuture<CheckpointStatsSnapshot>>
Expand Down Expand Up @@ -327,7 +326,7 @@ public TestingJobMasterGateway(
this.registerTaskManagerFunction = registerTaskManagerFunction;
this.taskManagerHeartbeatFunction = taskManagerHeartbeatFunction;
this.resourceManagerHeartbeatFunction = resourceManagerHeartbeatFunction;
this.requestJobDetailsSupplier = requestJobDetailsSupplier;
this.requestJobStatusSupplier = requestJobStatusSupplier;
this.requestJobSupplier = requestJobSupplier;
this.checkpointStatsSnapshotSupplier = checkpointStatsSnapshotSupplier;
this.triggerSavepointFunction = triggerSavepointFunction;
Expand Down Expand Up @@ -412,14 +411,9 @@ public CompletableFuture<Void> heartbeatFromResourceManager(ResourceID resourceI
return resourceManagerHeartbeatFunction.apply(resourceID);
}

@Override
public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
return requestJobDetailsSupplier.get();
}

@Override
public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
return requestJobDetailsSupplier.get().thenApply(JobDetails::getStatus);
return requestJobStatusSupplier.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmaster.utils;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple6;
Expand All @@ -45,7 +46,6 @@
import org.apache.flink.runtime.jobmaster.TaskManagerRegistrationInformation;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
Expand Down Expand Up @@ -118,7 +118,7 @@ public class TestingJobMasterGatewayBuilder {
(ignoredA, ignoredB) -> FutureUtils.completedVoidFuture();
private Function<ResourceID, CompletableFuture<Void>> resourceManagerHeartbeatFunction =
ignored -> FutureUtils.completedVoidFuture();
private Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier =
private Supplier<CompletableFuture<JobStatus>> requestJobStatusSupplier =
() -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
private Supplier<CompletableFuture<ExecutionGraphInfo>> requestJobSupplier =
() -> FutureUtils.completedExceptionally(new UnsupportedOperationException());
Expand Down Expand Up @@ -280,9 +280,9 @@ public TestingJobMasterGatewayBuilder setResourceManagerHeartbeatFunction(
return this;
}

public TestingJobMasterGatewayBuilder setRequestJobDetailsSupplier(
Supplier<CompletableFuture<JobDetails>> requestJobDetailsSupplier) {
this.requestJobDetailsSupplier = requestJobDetailsSupplier;
public TestingJobMasterGatewayBuilder setRequestJobStatusSupplier(
Supplier<CompletableFuture<JobStatus>> requestJobStatusSupplier) {
this.requestJobStatusSupplier = requestJobStatusSupplier;
return this;
}

Expand Down Expand Up @@ -446,7 +446,7 @@ public TestingJobMasterGateway build() {
registerTaskManagerFunction,
taskManagerHeartbeatFunction,
resourceManagerHeartbeatFunction,
requestJobDetailsSupplier,
requestJobStatusSupplier,
requestJobSupplier,
checkpointStatsSnapshotSupplier,
triggerSavepointFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
Expand Down Expand Up @@ -157,12 +156,6 @@ public JobStatus requestJobStatus() {
return JobStatus.CREATED;
}

@Override
public JobDetails requestJobDetails() {
failOperation();
return null;
}

@Override
public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) {
failOperation();
Expand Down

0 comments on commit 1ffb481

Please sign in to comment.