-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-1910]Refactor dag manager to reduce bulkiness for adhoc REST calls for launch, resume and kill #3776
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work. this is a massive change and it's complicated.
high-level thoughts:
- in the future, if there's a way to break functionality into multiple PRs, that makes it much easier to review smaller ones. here you might have created the structure of the classes and their declarations w/o putting all the logic within them... just get them to type check, but otherwise throw
UnsupportedOperationException
- as described in the comments, logic needs reworking between the various methods ITO where it should live. I'll wait for that to settle, before I go over specific logic more exhaustively
- because there are so many files, look for a way to put subsets into subpackages rather than into the flat
orchestration
package. maybeorchestration.proc
(should also containDagProcFactory
)? - still, contrary to that advice, I don't believe our convention is to put exceptions in an
exception
sub-package. that said, those too could go intoorchestration.proc
(or whatever name you choose)
...n-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AdvanceDagProc.java
Outdated
Show resolved
Hide resolved
...n-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AdvanceDagProc.java
Outdated
Show resolved
Hide resolved
...n-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AdvanceDagProc.java
Outdated
Show resolved
Hide resolved
...n-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AdvanceDagProc.java
Outdated
Show resolved
Hide resolved
...n-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AdvanceDagProc.java
Outdated
Show resolved
Hide resolved
...in-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LaunchDagTask.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java
Outdated
Show resolved
Hide resolved
...n-service/src/main/java/org/apache/gobblin/service/modules/orchestration/WorkInProgress.java
Outdated
Show resolved
Hide resolved
...service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
Show resolved
Hide resolved
...service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
Outdated
Show resolved
Hide resolved
88e1596
to
cffd894
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now, first half of second review iteration
gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
Outdated
Show resolved
Hide resolved
...in-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProc.java
Outdated
Show resolved
Hide resolved
...n-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
Outdated
Show resolved
Hide resolved
...n-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java
Outdated
Show resolved
Hide resolved
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java
Outdated
Show resolved
Hide resolved
...in-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for now, finishing part 2 of second review iteration
...src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java
Outdated
Show resolved
Hide resolved
...n-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java
Outdated
Show resolved
Hide resolved
LaunchDagProc host(DagTaskVisitor visitor) throws IOException, InstantiationException, IllegalAccessException { | ||
|
||
return (LaunchDagProc) visitor.meet(this); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does the need arise for an impl of this method to return the specific type of the DagProc
? the cast should not be necessary. and do keep in mind that DagTaskVisitor<T>
is a generic, hence:
@Override
<T> T host(DagTaskVisitor<T> visitor) {
return visitor.meet(this);
}
@@ -94,6 +103,9 @@ public DagActionStoreChangeMonitor(String topic, Config config, DagActionStore d | |||
this.flowCatalog = flowCatalog; | |||
this.orchestrator = orchestrator; | |||
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled; | |||
// instantiating using default ctor; subsequent PR will handle instantiating with multi-args ctor | |||
this.dagTaskStream = new DagTaskStream(); | |||
this.isRefactoredDagManagerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather you rename this variable as isMultiLeaderDagManagerEnabled
otherwise the question becomes why didn't we just modify the base classes.
...src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java
Outdated
Show resolved
Hide resolved
...in-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
if (dag == null) { | ||
log.error("Dag " + dagIdToResume + " was found in memory but not found in failed dag state store"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this be in the above check rather than here for this scenario? If Dag is null then whatever object was sent is invalid
.../src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java
Outdated
Show resolved
Hide resolved
* @param flowName | ||
* @param triggerTimeStamp | ||
*/ | ||
void launchFlow(String flowGroup, String flowName, long triggerTimeStamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving it as a note for you in case you want to document behavior in the java doc that adhoc flows triggerTimeStamp will be "0" (see code here), while "-1" indicates some error. All valid values should be >= 0 essentially.
void launchFlow(String flowGroup, String flowName, long triggerTimeStamp); | ||
|
||
/** | ||
* Currently, it is handling just the resume of a {@link Dag} request via REST client for adhoc flows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this and other java docs other than LAUNCH
, the "for adhoc flows" is not accurate. Rather it's a resume call from REST client, but they could be resuming a scheduled flow.
...in-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
Outdated
Show resolved
Hide resolved
* @param flowGroup | ||
* @param flowName | ||
* @param flowExecutionId | ||
* @param triggerTimeStamp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add the throws exception to be consistent with above
public void addJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); | ||
|
||
public void deleteJobState(String dagId, Dag.DagNode<JobExecutionPlan> dagNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when are these actions called? do you ever want to updateJobState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these will be called when we complete a job in a Dag and based on the job status, we would update it and decide on what should happen using the onJobFinish()
method. The impl. for the same will be provided as part of another PR where I add the logic for AdvanceDagProc
...src/main/java/org/apache/gobblin/service/modules/orchestration/processor/CleanUpDagProc.java
Outdated
Show resolved
Hide resolved
...ervice/src/main/java/org/apache/gobblin/service/modules/orchestration/processor/DagProc.java
Outdated
Show resolved
Hide resolved
...ce/src/main/java/org/apache/gobblin/service/modules/orchestration/processor/KillDagProc.java
Outdated
Show resolved
Hide resolved
...ce/src/main/java/org/apache/gobblin/service/modules/orchestration/processor/KillDagProc.java
Outdated
Show resolved
Hide resolved
* It is invoked after {@link DagProc#process(DagManagementStateStore)} is completed successfully. | ||
* @param multiActiveLeaseArbiter | ||
* @throws IOException | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different from our approach to launch in that we finish the action before completing lease. Can you document the pros and cons of this method? For ex pro we don't have to do lease arbitration twice for attempting the action and doing it. However now the lease validity time needs to be much larger and include the time to contact executor and carry out the action. Let's note these details here or somewhere else appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's some confusion here. The lease acquired for a DagTask
happens after we have received or processed a message from the CDC stream based on what action needs to be taken on the Dag. Now, we acquire the lease... do our processing on the Dag and upon completion release the lease ... marking it as successful. I agree with the part that lease validity time needs to be increased
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah you're right I am getting this multi-active part confused with phase 2. Let's add some comments or javadoc to denote potential failures and/or steps needed to complete this event ie: contact executor over network and response time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow, MASSIVE PR... which is looking better and better--great job, meeth!
public Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> getDagToJobs(); | ||
|
||
public Map<String, Dag<JobExecutionPlan>> getDagIdToDags(); | ||
|
||
public Map<String, Long> getDagToSLA(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for perf, I doubt we'd want to return such massive collections. is it truly necessary to have all the mappings at once? couldn't we instead return only the one related to a particular dag ID?
e.g.:
Dag<JobExecutionPlan> getDags(String dagId);
Long getSLA(String dagId);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree and it totally makes sense. I have added these additional methods to our DagManagementStateStore
interface.
@@ -108,7 +108,7 @@ static DagManager.DagId generateDagId(String flowGroup, String flowName, long fl | |||
return generateDagId(flowGroup, flowName, String.valueOf(flowExecutionId)); | |||
} | |||
|
|||
static DagManager.DagId generateDagId(String flowGroup, String flowName, String flowExecutionId) { | |||
public static DagManager.DagId generateDagId(String flowGroup, String flowName, String flowExecutionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NBD, but I don't find a compelling reason for this, as:
new DagManager.DagId(x, y, z)
is not much different than:
generateDagId(x, y, z);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed... only difference being one takes String
and the other takes long
value for flowExecutionId
. These are existing methods defined inside DagManagerUtils
... not sure if you want me to define just one and handle the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see... if already widely used, no need to rip out a method you did not add. but if only once or twice, I suggest getting rid of it.
contrary to your reply, I do not observe this impl converting between String
and long
...n-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java
Outdated
Show resolved
Hide resolved
@Slf4j | ||
public class DagProcessingEngine { | ||
|
||
public static final String DAG_PROCESSING_ENGINE_PREFIX = "gobblin.service.dagProcessingEngine."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this need to align w/ what's in ServiceConfigKeys
? if so, let's define the prefix over there and bring it in here.
private DagTaskStream dagTaskStream; | ||
private DagProcFactory dagProcFactory; | ||
private DagManagementStateStore dagManagementStateStore; | ||
private ScheduledExecutorService scheduledExecutorPool; | ||
private Config config; | ||
private Integer numThreads; | ||
private Integer pollingInterval; | ||
private DagProcessingEngine.Thread [] threads; | ||
private MultiActiveLeaseArbiter multiActiveLeaseArbiter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
many of these seem good candidates for final
. what's your take?
*/ | ||
|
||
@Alpha | ||
public abstract class DagTask<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make host
a generic method rather than DagTask
a generic class
|
||
@Override | ||
public DagProc meet(ResumeDagTask resumeDagTask) { | ||
throw new UnsupportedOperationException("Currently cannot provide resume proc"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although ResumeDagProc
might still throw UOE when process()
is invoked... isn't enough scaffolding in place yet to write this as:
return new ResumeDagProc(resumeDagTask);
?
public final class KillDagProc extends DagProc { | ||
|
||
private KillDagTask killDagTask; | ||
private DagManagementStateStore dagManagementStateStore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the process
method already takes this as a param... do we need to store one in addition to what will be passed to us?
private MetricContext metricContext; | ||
private Optional<EventSubmitter> eventSubmitter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps should these two also be args to process
?
*/ | ||
@Slf4j | ||
@Alpha | ||
public final class KillDagProc extends DagProc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recall that it's DagProc<S, R>
, decide what those types are for kill and then use them in the various method signatures (that currently degraded to Object
)
…for launch, resume and kill
…deadline enforcement
f53e6b3
to
e675e50
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finishing for now... part 1 (of 2)
* Responsible for defining the behavior of {@link DagTask} handling scenarios for launch, resume, kill, job start | ||
* and flow completion deadlines |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than "responsible for defining the behavior..." seems more the means by which such tasks are triggered.
that said, may be too much impl. detail to mention DagTask
in the javadoc, since I don't see it actually used in the code here
* The eventTimestamp for adhoc flows will always be 0 (zero), while -1 would indicate failures. | ||
* Essentially for a valid launch flow, the eventTimestamp needs to be >= 0. | ||
* Future implementations will cover launch of flows through the scheduler too! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than documenting this to be merely informative, could we instead rephrase as an interface req/policy?
(overall, I'm uncertain on your overall plan for the impl's validation, so this is based solely on reading the javadoc in isolation...)
e.g. throws IllegalArgException when eventTimestamp < 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
/** | ||
* An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: strike "An interface to provide"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add more description like for a Dag, allows one to update or extract its job state or ...
giving some high level descriptions
public void addDagSLA(String dagId, Long flowSla); | ||
|
||
public Long getDagSLA(String dagId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we have two kinds? also, since this has never been an SLA, but rather a deadline, would now be an opportunity to adopt the accurate nomenclature?
|
||
public boolean addFailedDagId(String dagId); | ||
|
||
public boolean checkFailedDagId(String dagId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unclear: what is meant by "check" - for membership/existence?
//marks lease success and releases it | ||
dagTaskStream.complete(dagTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first off, I absolutely LOVE how simple this impl is--even as it segregates responsibility elsewhere for deciding how to process each specific task type.
that said, I wonder whether we might go farther an encapsulate the complete
/commit.
e.g. the stream could construct each task w/ a:
@FunctionalInterface
interface Committer {
void commit();
}
that would internally store the lease. the DagProcFactory
would then take that from the task and preserve it in the proc, so the last step of the proc's process
could call it
* @param <DagProc> | ||
*/ | ||
@Alpha | ||
public interface DagTaskVisitor<DagProc> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: convention would be T
(or at the most DagProcType
). naming it after an actual type (even though not imported) is quite confusing
abstract protected R act(S state, DagManagementStateStore dagManagementStateStore) throws MaybeRetryableException, Exception; | ||
abstract protected void sendNotification(R result, EventSubmitter eventSubmitter) throws MaybeRetryableException, IOException; | ||
|
||
public final void process(DagManagementStateStore dagManagementStateStore, EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventually, retries may not be governed by a static count, but rather tracking to how much time remains in the lease
this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount, delayRetryMillis); | ||
log.info("Successfully processed Dag Request"); | ||
} catch (Exception ex) { | ||
throw new RuntimeException("Cannot process Dag Request: ", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use checked exceptions sparingly, but here, one may actually make sense (rather than RuntimeException
)
public KillDagTask(DagActionStore.DagAction killAction, MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) { | ||
|
||
this.killAction = killAction; | ||
this.leaseObtainedStatusStatus = leaseObtainedStatus; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be initialized by a call to super()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pausing again... part 2
this.resumesInvoked.mark(); | ||
} else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) { | ||
log.info("Received insert dag action and about to send kill flow request"); | ||
dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId)); | ||
|
||
if(isMultiLeaderDagManagerEnabled) { | ||
DagActionStore.DagAction killAction = new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.KILL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is killFlow
called in any other place? if we don't actually have a DagAction
already on hand in any of the callsites, not sure it makes sense to use that in the API. perhaps it should be:
killFlow(flowGroup, flowName, flowExecutionId, produceTimestamp)
@@ -80,6 +83,8 @@ public String load(String key) throws Exception { | |||
protected Orchestrator orchestrator; | |||
protected boolean isMultiActiveSchedulerEnabled; | |||
protected FlowCatalog flowCatalog; | |||
private DagTaskStream dagTaskStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change monitor shouldn't use a task stream as such (that's exclusively for the DagProcessingEngine
.) instead let it write to the DagManagement
API
@@ -94,6 +99,9 @@ public DagActionStoreChangeMonitor(String topic, Config config, DagActionStore d | |||
this.flowCatalog = flowCatalog; | |||
this.orchestrator = orchestrator; | |||
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled; | |||
// instantiating using default ctor; subsequent PR will handle instantiating with multi-args ctor | |||
// this.dagTaskStream = new DagTaskStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this isn't initialized anywhere? won't we get an NPE when using it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea we should use guice to bring in it in, see the DagActionStoreChangeMonitorFactory
|
||
|
||
/** | ||
* Defines an individual task or job in a Dag. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"job" already has other connotations as what we run on executors. maybe "defines a singular task in the lifecycle of a managed Dag"?
@Alpha | ||
public abstract class DagTask { | ||
|
||
protected MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatusStatus; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not private final
?
private KillDagTask killDagTask; | ||
|
||
public KillDagProc(KillDagTask killDagTask) { | ||
this.killDagTask = killDagTask; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tip: mark this final
and implement via @RequiredArgsConstructor
|
||
@Override | ||
protected List<Dag.DagNode<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore) throws IOException { | ||
String dagToCancel = this.killDagTask.getKillDagId().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since it's a String
, not a dag, I'd name dagId
or dagIdToCancel
anyway, for such a short impl, I'd just make a one-liner w/o an intermediate var (unless you're planning to log it)
* and cancel the job on the executor. The return type is kept as {@link Object} since we might want to refactor | ||
* or add more responsibility as part of the actions taken. Hence, after completing all possible scenarios, | ||
* it will make sense to update the method signature with its appropriate type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of date
String dagToCancel = this.killDagTask.getKillDagId().toString(); | ||
|
||
log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size()); | ||
for (Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) { | ||
killDagNode(dagNodeToCancel); | ||
} | ||
dagManagementStateStore.getDag(dagToCancel).setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED); | ||
dagManagementStateStore.getDag(dagToCancel).setMessage("Flow killed by request"); | ||
dagManagementStateStore.removeDagActionFromStore(this.killDagTask.getKillDagId(), DagActionStore.FlowActionType.KILL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits:
- move
dagToCancel
init after thefor
loop (and as observed on the name already, it's a dag ID, not a DAG) - call
dagMgmtStateStore.getDag(dagToCancel)
only once - decide whether still the need for an intermediate
dagToCancel
var
|
||
@Override | ||
protected void sendNotification(Dag<JobExecutionPlan> dag, EventSubmitter eventSubmitter) throws MaybeRetryableException { | ||
for(Dag.DagNode<JobExecutionPlan> dagNodeToCancel : dag.getNodes()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update your IDE to catch when you forget the space between for (
... this happens all over this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to be a part of ur change? was this done or included by mistake?
binder.bind(DagManagementStateStore.class).to(InMemoryDagManagementStateStore.class); | ||
binder.bind(DagProcFactory.class).in(Singleton.class); | ||
binder.bind(DagProcessingEngine.class).in(Singleton.class); | ||
binder.bind(DagTaskStream.class).in(Singleton.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you need to create an OptionalBinder
like I've done above OptionalBinder.newOptionalBinder(binder,MultiActiveLeaseArbiter.class);
as each of these classes are instantiated only in certain cases.
* The eventTimestamp for adhoc flows will always be 0 (zero), while -1 would indicate failures. | ||
* Essentially for a valid launch flow, the eventTimestamp needs to be >= 0. | ||
* Future implementations will cover launch of flows through the scheduler too! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
/** | ||
* An interface to provide abstractions for managing {@link Dag} and {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} states |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add more description like for a Dag, allows one to update or extract its job state or ...
giving some high level descriptions
public void addDagSLA(String dagId, Long flowSla); | ||
|
||
public Long getDagSLA(String dagId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are these dagStart or dagKill Deadlines? We have both configurable so may want to create separate functions for each.
|
||
/** | ||
* An implementation of {@link DagProc} that is responsible for cleaning up {@link Dag} that has reached an end state | ||
* likewise: FAILED, COMPLETE or CANCELED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ie:
or including:
@Override | ||
protected Object initialize(DagManagementStateStore dagManagementStateStore) throws MaybeRetryableException, IOException { | ||
throw new UnsupportedOperationException("Not supported"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra space
throw new RuntimeException("Max retry attempts reached. Cannot initialize Dag"); | ||
} | ||
|
||
protected final R actWithRetries(S state, DagManagementStateStore dagManagementStateStore, int maxRetryCount, long delayRetryMillis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have a generic method that deals with retries and takes a function as parameter? Seems like you are repeating code between these two functions and want to handle them the same way
@@ -94,6 +99,9 @@ public DagActionStoreChangeMonitor(String topic, Config config, DagActionStore d | |||
this.flowCatalog = flowCatalog; | |||
this.orchestrator = orchestrator; | |||
this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled; | |||
// instantiating using default ctor; subsequent PR will handle instantiating with multi-args ctor | |||
// this.dagTaskStream = new DagTaskStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea we should use guice to bring in it in, see the DagActionStoreChangeMonitorFactory
if (operation.equals("INSERT")) { | ||
if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) { | ||
log.info("Received insert dag action and about to send resume flow request"); | ||
dagManager.handleResumeFlowRequest(flowGroup, flowName,Long.parseLong(flowExecutionId)); | ||
//TODO: add a flag for if condition only if multi-active is enabled | ||
this.resumesInvoked.mark(); | ||
} else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) { | ||
log.info("Received insert dag action and about to send kill flow request"); | ||
dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the conditional above should be checked first and this should be the else right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or are we trying a "dummy kill in the new refactor" if so let's add a comment that the change below doesn't carry out the action. it's still in testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whew... all done w/ this review cycle
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
|
||
import org.jetbrains.annotations.NotNull; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like the wrong one... are you certain?
@Getter | ||
private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new LinkedBlockingQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a. why a public getter? could it not be solved via a more-targeted method/capability?
b. why always initialize w/ the empty queue, when there's also @AllArgsConstructor
?
if(leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) { | ||
dagTask = createDagTask(dagAction, | ||
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus); | ||
} | ||
if (dagTask != null) { | ||
break; // Exit the loop when dagTask is non-null | ||
} | ||
} catch (IOException e) { | ||
//TODO: need to handle exceptions gracefully | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
return dagTask; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return createDagTask(...)
would be more natural control flow, than is the DagTask dagTask = null;
, the break
, and finally return dagTask
private boolean add(DagActionStore.DagAction dagAction) { | ||
return this.dagActionQueue.offer(dagAction); | ||
} | ||
|
||
private DagActionStore.DagAction take() { | ||
return this.dagActionQueue.poll(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rearrange so public methods come prior to these and other private ones
*/ | ||
|
||
@Override | ||
public boolean enforceFlowCompletionDeadline(Dag.DagNode<JobExecutionPlan> node) throws ExecutionException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still wondering here... do we presume a timer/reminder has been set elsewhere and just triggered?
it seems this simply evaluates whether or not the deadline is exceeded, but doesn't actually take action to enforce. is that true? if so, whose responsibility is the enforcement?
* Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}. | ||
*/ | ||
|
||
protected JobStatus retrieveJobStatus(Dag.DagNode<JobExecutionPlan> dagNode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping here.... still not clear the extent of the "state" management/retrieval going on (and whether appropriate to this class), since getStatus
throws UnsupportedOperationException
* Going forward, each of these in-memory references will be read/write from MySQL store. | ||
* Thus, the {@link DagManager} would then be stateless and operate independently. | ||
*/ | ||
@Getter(onMethod_={@Synchronized}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering, since I usually use at field-level, not class level: will it create an accessor for each of the individual constituent collections? I didn't perceive the DagManagementStateStore
interface as meant to be quite so low-level... we don't want that do we?
final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>(); | ||
final Map<String, Long> dagToSLA = new HashMap<>(); | ||
private final Set<String> dagIdstoClean = new HashSet<>(); | ||
private Optional<DagActionStore> dagActionStore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why/when would this be empty?
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
I am creating this PR as part of refactoring the DagManager to make it less bulky and eventually stateless (after getting rid of the in-memory references). This PR is only handling the cases for adhoc REST client calls for launch, resume and kill/cancel flows. Currently, this PR is only providing a template for the end-to-end functionality and implementing some parts of existing DagManager's responsibility. There will be subsequent PRs to implement functionalities that are missing likewise scheduler triggers via orchestrator, advancing to the next set of nodes in a Dag, cleaning up of Dags etc.
The method signatures, instantiations and return types use widely accepted and default values, but will be restricted once responsibility/functionality of each class is determined.
Tests
Commits