Skip to content

Commit

Permalink
[Hotfix] Fix Master integration test case has been skipped in ci
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Sep 21, 2024
1 parent f697665 commit 25eeef3
Show file tree
Hide file tree
Showing 74 changed files with 513 additions and 467 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public static void main(String[] args) {

@PostConstruct
public void run() {
ServerLifeCycleManager.toRunning();
log.info("AlertServer is staring ...");
alertBootstrapService.start();
log.info("AlertServer is started ...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics;
import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.dao.PluginDao;
Expand Down Expand Up @@ -59,6 +60,7 @@ public static void main(String[] args) {

@EventListener
public void run(ApplicationReadyEvent readyEvent) {
ServerLifeCycleManager.toRunning();
log.info("Received spring application context ready event will load taskPlugin and write to DB");
DataSourceProcessorProvider.initialize();
TaskPluginManager.loadTaskPlugin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public static long getServerStartupTime() {
return serverStartupTime;
}

public static void toRunning() {
serverStatus = ServerStatus.RUNNING;
}

public static boolean isRunning() {
return serverStatus == ServerStatus.RUNNING;
}
Expand Down Expand Up @@ -81,6 +85,7 @@ public static synchronized boolean toStopped() {
if (serverStatus == ServerStatus.STOPPED) {
return false;
}
log.info("The current server status changed from {} to {}", serverStatus, ServerStatus.STOPPED);
serverStatus = ServerStatus.STOPPED;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.common.model;

import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -54,11 +53,6 @@ public synchronized void start() {
public void run() {
while (runningFlag) {
try {
if (!ServerLifeCycleManager.isRunning()) {
log.info("The current server status is {}, will not write heartBeatInfo into registry",
ServerLifeCycleManager.getServerStatus());
continue;
}
T heartBeat = getHeartBeat();
// if first time or heartBeat status changed, write heartBeatInfo into registry
if (System.currentTimeMillis() - lastWriteTime >= heartBeatInterval
Expand Down
23 changes: 23 additions & 0 deletions dolphinscheduler-master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<exclusions>
<exclusion>
<!-- avoid change the version of annotations-->
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
Expand Down Expand Up @@ -312,6 +324,17 @@
</dependencies>

<build>
<testResources>
<testResource>
<directory>${project.basedir}/../dolphinscheduler-dao/src/main/resources</directory>
<includes>
<include>sql/**</include>
</includes>
</testResource>
<testResource>
<directory>${project.basedir}/src/test/resources</directory>
</testResource>
</testResources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Date;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -106,6 +107,7 @@ public static void main(String[] args) {
*/
@PostConstruct
public void initialized() {
ServerLifeCycleManager.toRunning();
final long startupTime = System.currentTimeMillis();

// init rpc server
Expand Down Expand Up @@ -150,11 +152,11 @@ public void initialized() {
log.info("MasterServer initialized successfully in {} ms", System.currentTimeMillis() - startupTime);
}

/**
* gracefully close
*
* @param cause close cause
*/
@PreDestroy
public void shutdown() {
close("MasterServer shutdown");
}

public void close(String cause) {
// set stop signal is true
// execute only once
Expand All @@ -165,6 +167,7 @@ public void close(String cause) {
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
try (
SystemEventBusFireWorker systemEventBusFireWorker1 = systemEventBusFireWorker;
WorkflowEngine workflowEngine1 = workflowEngine;
SchedulerApi closedSchedulerApi = schedulerApi;
MasterRpcServer closedRpcServer = masterRPCServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
Expand Down Expand Up @@ -80,7 +79,7 @@
*/
@Slf4j
@Component
public class TaskGroupCoordinator extends BaseDaemonThread {
public class TaskGroupCoordinator extends BaseDaemonThread implements AutoCloseable {

@Autowired
private RegistryClient registryClient;
Expand All @@ -97,6 +96,8 @@ public class TaskGroupCoordinator extends BaseDaemonThread {
@Autowired
private WorkflowInstanceDao workflowInstanceDao;

private boolean flag = true;

private static int DEFAULT_LIMIT = 1000;

public TaskGroupCoordinator() {
Expand All @@ -106,17 +107,15 @@ public TaskGroupCoordinator() {
@Override
public synchronized void start() {
log.info("TaskGroupCoordinator starting...");
flag = true;
super.start();
log.info("TaskGroupCoordinator started...");
}

@Override
public void run() {
while (!ServerLifeCycleManager.isStopped()) {
while (flag) {
try {
if (!ServerLifeCycleManager.isRunning()) {
continue;
}
registryClient.getLock(RegistryNodeType.MASTER_TASK_GROUP_COORDINATOR_LOCK.getRegistryPath());
try {
StopWatch taskGroupCoordinatorRoundCost = StopWatch.createStarted();
Expand Down Expand Up @@ -488,4 +487,9 @@ private void deleteTaskGroupQueueSlot(TaskGroupQueue taskGroupQueue) {
log.info("Success release TaskGroupQueue: {}", taskGroupQueue);
}

@Override
public void close() throws Exception {
flag = false;
log.info("TaskGroupCoordinator closed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public void close() throws Exception {
try (
final CommandEngine commandEngine1 = commandEngine;
final WorkflowEventBusCoordinator workflowEventBusCoordinator1 = workflowEventBusCoordinator;
final MasterTaskExecutorBootstrap masterTaskExecutorBootstrap1 = masterTaskExecutorBootstrap;) {
final MasterTaskExecutorBootstrap masterTaskExecutorBootstrap1 = masterTaskExecutorBootstrap;
final TaskGroupCoordinator taskGroupCoordinator1 = taskGroupCoordinator) {
// closed the resource
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
Expand Down Expand Up @@ -83,6 +82,8 @@ public class CommandEngine extends BaseDaemonThread implements AutoCloseable {

private ExecutorService commandHandleThreadPool;

private boolean flag = false;

protected CommandEngine() {
super("MasterCommandLoopThread");
}
Expand All @@ -92,27 +93,23 @@ public synchronized void start() {
log.info("MasterSchedulerBootstrap starting..");
this.commandHandleThreadPool = ThreadUtils.newDaemonFixedThreadExecutor("MasterCommandHandleThreadPool",
Runtime.getRuntime().availableProcessors());
flag = true;
super.start();
log.info("MasterSchedulerBootstrap started...");
}

@Override
public void close() throws Exception {
log.info("MasterSchedulerBootstrap stopping...");

flag = false;
log.info("MasterSchedulerBootstrap stopped...");
}

@Override
public void run() {
MasterServerLoadProtection serverLoadProtection = masterConfig.getServerLoadProtection();
while (!ServerLifeCycleManager.isStopped()) {
while (flag) {
try {
if (!ServerLifeCycleManager.isRunning()) {
// the current server is not at running status, cannot consume command.
log.warn("The current server is not at running status, cannot consumes commands.");
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
// todo: if the workflow event queue is much, we need to handle the back pressure
SystemMetrics systemMetrics = metricsProvider.getSystemMetrics();
if (serverLoadProtection.isOverload(systemMetrics)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ protected void assembleWorkflowEventBus(

protected void assembleWorkflowInstanceLifecycleListeners(
final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
workflowExecuteContextBuilder.setWorkflowInstanceLifecycleListeners(
workflowExecuteContextBuilder.getWorkflowInstanceLifecycleListeners());
workflowExecuteContextBuilder.setWorkflowInstanceLifecycleListeners(workflowLifecycleListeners);
}

protected void assembleWorkflowDefinition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@Slf4j
@Component
@SuppressWarnings({"unchecked", "rawtypes"})
public class SystemEventBusFireWorker extends BaseDaemonThread {
public class SystemEventBusFireWorker extends BaseDaemonThread implements AutoCloseable {

@Autowired
private SystemEventBus systemEventBus;
Expand All @@ -49,19 +49,22 @@ public class SystemEventBusFireWorker extends BaseDaemonThread {
@Autowired
private List<ISystemEventHandler> systemEventHandlers;

private static boolean flag = false;

public SystemEventBusFireWorker() {
super("SystemEventBusFireWorker");
}

@Override
public void start() {
flag = true;
super.start();
log.info("SystemEventBusFireWorker started");
}

@Override
public void run() {
while (!ServerLifeCycleManager.isStopped()) {
while (flag) {
final AbstractSystemEvent systemEvent;
try {
systemEvent = systemEventBus.take();
Expand Down Expand Up @@ -99,4 +102,10 @@ private void fireSystemEvent(final AbstractSystemEvent systemEvent) {
stopWatch.stop();
log.info("Fire SystemEvent: {} cost: {} ms", systemEvent, stopWatch.getTime());
}

@Override
public void close() {
flag = false;
log.info("SystemEventBusFireWorker closed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void failover() {
.withTaskInstance(taskInstance)
.build();
initializeTaskExecutionContext();

getWorkflowEventBus().publish(TaskStartLifecycleEvent.of(this));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void notifyWorkflowLifecycleEvent(final IWorkflowExecutionRunnable workfl
final ICommandParam commandParam =
JSONUtils.parseObject(workflowInstance.getCommandParam(), ICommandParam.class);
if (commandParam == null) {
log.warn("Command param: {} is invalid for workflow{}", workflowInstance.getCommandParam(),
log.warn("Command param: {} is invalid for workflow: {}", workflowInstance.getCommandParam(),
workflowInstance.getName());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ public void setRegistryStoppable(IStoppable stoppable) {
@Override
public void close() {
// TODO unsubscribe MasterRegistryDataListener
deregister();
if (masterHeartBeatTask != null) {
masterHeartBeatTask.shutdown();
}
if (registryClient.isConnected()) {
deregister();
}
log.info("Closed MasterRegistryClient");
}

/**
Expand Down
Loading

0 comments on commit 25eeef3

Please sign in to comment.