You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Work status reporting starts before the main work execution within BatchDataflowWorker#executeWork. The DataflowWorkExecutor is created via DataflowMapTaskExecutorFactory#create in BatchDataflowWorker#doWork before the work status report starts.
During this DataflowWorkExecutor creation, DoFn instances are initiated, and their Setup methods are invoked. See the stack trace below.
... (skipped) ...
at baeminbo.LongSetupPipeline$LongSetupDoFn.setup(LongSetupPipeline.java:46)
at baeminbo.LongSetupPipeline$LongSetupDoFn$DoFnInvoker.invokeSetup(Unknown Source)
at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor(DoFnInvokers.java:53) <--
at org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:86)
at org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:68)
at org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100)
at org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)
at org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:248)
at org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access$000(IntrinsicMapTaskExecutorFactory.java:80)
at org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:167)
at org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$1.typedApply(IntrinsicMapTaskExecutorFactory.java:149)
at org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:67)
at org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:54)
at org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:91)
at org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:109)
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:252) <--
at org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:206)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:150)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:130)
at org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:117)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Therefore, long running Setup methods delays the initial work status report. If no work status is reported within 3 mins, the work item fails with lease expiration. 4 times of failure for the same work item cause the entire job to fail.
This issue can be fixed by starting the work status report before the DataflowWorkExecutor creation.
b/401034290
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
Component: Python SDK
Component: Java SDK
Component: Go SDK
Component: Typescript SDK
Component: IO connector
Component: Beam YAML
Component: Beam examples
Component: Beam playground
Component: Beam katas
Component: Website
Component: Infrastructure
Component: Spark Runner
Component: Flink Runner
Component: Samza Runner
Component: Twister2 Runner
Component: Hazelcast Jet Runner
Component: Google Cloud Dataflow Runner
The text was updated successfully, but these errors were encountered:
Thanks for reporting this. The description is clear.
starting the work status report before the DataflowWorkExecutor creation
It's a bit tricky to do so. Currently, progressUpdater.startReportingProgress is called at the beginning of BatchDataflowWorker.executeWork. progressUpdater is constructed using a worker, to move the call before worker creation one would need to refactor the worker creation and invoke callSetup later
What happened?
Work status reporting starts before the main work execution within BatchDataflowWorker#executeWork. The
DataflowWorkExecutor
is created viaDataflowMapTaskExecutorFactory#create
in BatchDataflowWorker#doWork before the work status report starts.During this
DataflowWorkExecutor
creation,DoFn
instances are initiated, and theirSetup
methods are invoked. See the stack trace below.Therefore, long running
Setup
methods delays the initial work status report. If no work status is reported within 3 mins, the work item fails with lease expiration. 4 times of failure for the same work item cause the entire job to fail.This issue can be fixed by starting the work status report before the
DataflowWorkExecutor
creation.b/401034290
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: