From aedb1220541011e354a96f3f5bd29956f2024a6e Mon Sep 17 00:00:00 2001 From: Thomas Stern Date: Fri, 5 Apr 2019 14:01:54 +0200 Subject: [PATCH] Fix error with simultaneous docker operations by synchronizing those operations see also: - https://github.com/eclipse-ee4j/jersey/issues/3772 - https://github.com/zalando/zalenium/issues/808 --- .../container/swarm/SwarmContainerClient.java | 151 +++++------------- .../container/swarm/SwarmUtilities.java | 101 +++++++++++- 2 files changed, 133 insertions(+), 119 deletions(-) diff --git a/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmContainerClient.java b/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmContainerClient.java index 1c8658cf1d..d1a664d14c 100644 --- a/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmContainerClient.java +++ b/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmContainerClient.java @@ -2,9 +2,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import com.spotify.docker.client.AnsiProgressHandler; -import com.spotify.docker.client.DefaultDockerClient; -import com.spotify.docker.client.DockerClient; import com.spotify.docker.client.exceptions.DockerException; import com.spotify.docker.client.messages.*; import com.spotify.docker.client.messages.mount.Mount; @@ -30,9 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import static com.spotify.docker.client.DockerClient.ListContainersParam.withStatusCreated; -import static com.spotify.docker.client.DockerClient.ListContainersParam.withStatusRunning; - @SuppressWarnings("ConstantConditions") public class SwarmContainerClient implements ContainerClient { @@ -46,7 +40,6 @@ public class SwarmContainerClient implements ContainerClient { private static Environment env = defaultEnvironment; private static String seleniumContainerCpuLimit; private static String seleniumContainerMemoryLimit; - private static String dockerHost; private static AtomicBoolean environmentInitialised = new AtomicBoolean(false); static { @@ -55,7 +48,6 @@ public class SwarmContainerClient implements ContainerClient { private final Logger logger = LoggerFactory.getLogger(SwarmContainerClient.class.getName()); private final GoogleAnalyticsApi ga = new GoogleAnalyticsApi(); - private DockerClient dockerClient = new DefaultDockerClient(dockerHost); private String nodeId; private Map seleniumContainerLabels = new HashMap<>(); private AtomicBoolean seleniumContainerLabelsChecked = new AtomicBoolean(false); @@ -67,9 +59,6 @@ private static void readConfigurationFromEnvVariables() { String memoryLimit = env.getEnvVariable(ZALENIUM_SELENIUM_CONTAINER_MEMORY_LIMIT); setSeleniumContainerMemoryLimit(memoryLimit); - - String dockerHost = env.getStringEnvVariable("DOCKER_HOST", "unix:///var/run/docker.sock"); - setDockerHost(dockerHost); } @VisibleForTesting @@ -77,11 +66,6 @@ protected static void setEnv(final Environment env) { SwarmContainerClient.env = env; } - private static void setDockerHost(String dockerHost) { - // https://github.com/spotify/docker-client/issues/946 - SwarmContainerClient.dockerHost = dockerHost.replace("tcp", "http"); - } - private static String getSeleniumContainerCpuLimit() { return seleniumContainerCpuLimit; } @@ -104,15 +88,9 @@ public void setNodeId(String nodeId) { private String getContainerId(URL remoteUrl) { try { - List tasks = dockerClient.listTasks(); - for (Task task : tasks) { - for (NetworkAttachment networkAttachment : CollectionUtils.emptyIfNull(task.networkAttachments())) { - for (String address : networkAttachment.addresses()) { - if (address.startsWith(remoteUrl.getHost())) { - return task.status().containerStatus().containerId(); - } - } - } + ContainerStatus containerStatus = SwarmUtilities.getContainerByRemoteUrl(remoteUrl); + if (containerStatus != null) { + return containerStatus.containerId(); } } catch (DockerException | InterruptedException e) { e.printStackTrace(); @@ -127,7 +105,7 @@ private String getContainerId(String containerName) { List containerList = null; try { - containerList = dockerClient.listContainers(withStatusRunning(), withStatusCreated()); + containerList = SwarmUtilities.getRunningAndCreatedContainers(); } catch (DockerException | InterruptedException e) { logger.debug(nodeId + " Error while getting containerId", e); ga.trackException(e); @@ -152,20 +130,7 @@ public InputStream copyFiles(String containerId, String folderName) { public void stopContainer(String containerId) { try { - List tasks = dockerClient.listTasks(); - for (Task task : tasks) { - ContainerStatus containerStatus = task.status().containerStatus(); - if (containerStatus != null && containerStatus.containerId().equals(containerId)) { - String serviceId = task.serviceId(); - Service.Criteria criteria = Service.Criteria.builder() - .serviceId(serviceId) - .build(); - List services = dockerClient.listServices(criteria); - if (!CollectionUtils.isEmpty(services)) { - dockerClient.removeService(serviceId); - } - } - } + SwarmUtilities.stopServiceByContainerId(containerId); } catch (DockerException | InterruptedException e) { logger.warn(nodeId + " Error while stopping the container", e); ga.trackException(e); @@ -174,32 +139,22 @@ public void stopContainer(String containerId) { public void executeCommand(String containerId, String[] command, boolean waitForExecution) { try { - List tasks = dockerClient.listTasks(); - - pullSwarmExecImage(); - - for (Task task : CollectionUtils.emptyIfNull(tasks)) { - ContainerStatus containerStatus = task.status().containerStatus(); - - if (containerStatus != null && containerStatus.containerId().equals(containerId)) { - startSwarmExecContainer(task, command, containerId); - return; - } + Task task = SwarmUtilities.getTaskByContainerId(containerId); + if (task != null) { + pullSwarmExecImage(); + startSwarmExecContainer(task, command, containerId); + } else { + logger.warn("Couldn't execute command on container {}", containerId); } } catch (DockerException | InterruptedException e) { logger.warn("Error while executing comman on container {}", containerId); ga.trackException(e); } - - logger.warn("Couldn't execute command on container {}", containerId); } private void pullSwarmExecImage() { try { - List images = dockerClient.listImages(DockerClient.ListImagesParam.byName(SWARM_EXEC_IMAGE)); - if (CollectionUtils.isEmpty(images)) { - dockerClient.pull(SWARM_EXEC_IMAGE, new AnsiProgressHandler()); - } + SwarmUtilities.pullImageIfNotPresent(SWARM_EXEC_IMAGE); } catch (DockerException | InterruptedException e) { logger.warn(nodeId + " Error while checking (and pulling) if the image is present", e); ga.trackException(e); @@ -231,9 +186,7 @@ private void startSwarmExecContainer(Task task, String[] command, String contain .cmd(command) .build(); - ContainerCreation containerCreation = dockerClient.createContainer(containerConfig); - - dockerClient.startContainer(containerCreation.id()); + SwarmUtilities.startContainer(containerConfig); } public String getLatestDownloadedImage(String imageName) { @@ -256,7 +209,7 @@ public ContainerCreationStatus createContainer(String zaleniumContainerName, Str final ServiceSpec serviceSpec = buildServiceSpec(taskSpec, nodePort, noVncPort); try { - ServiceCreateResponse service = dockerClient.createService(serviceSpec); + ServiceCreateResponse service = SwarmUtilities.createService(serviceSpec); TaskStatus taskStatus = waitForTaskStatus(service.id()); @@ -361,14 +314,7 @@ private TaskStatus waitForTaskStatus(String serviceId) throws DockerException, I private TaskStatus waitForTaskStatus(String serviceId, int attempts) throws DockerException, InterruptedException { int attemptsLimit = 100; Thread.sleep(100); - String serviceName = dockerClient.inspectService(serviceId).spec().name(); - Task.Criteria criteria = Task.Criteria.builder().serviceName(serviceName).build(); - List tasks = dockerClient.listTasks(criteria); - Task task = null; - - if (!CollectionUtils.isEmpty(tasks)) { - task = tasks.get(0); - } + Task task = SwarmUtilities.getTaskByServiceId(serviceId); if (task == null && attempts < attemptsLimit) { return waitForTaskStatus(serviceId, attempts + 1); @@ -385,19 +331,14 @@ private TaskStatus waitForTaskStatus(String serviceId, int attempts) throws Dock } private ContainerCreationStatus getContainerCreationStatus(String serviceId, String nodePort) throws DockerException, InterruptedException { - Service service = dockerClient.inspectService(serviceId); - Task.Criteria criteria = Task.Criteria.builder() - .serviceName(service.spec().name()) - .build(); - List tasks = dockerClient.listTasks(criteria); - for (Task task : tasks) { - if (task.serviceId().equals(serviceId)) { - ContainerStatus containerStatus = task.status().containerStatus(); - if (containerStatus != null) { - String containerId = containerStatus.containerId(); - String containerName = containerStatus.containerId(); - return new ContainerCreationStatus(true, containerName, containerId, nodePort); - } + Task task = SwarmUtilities.getTaskByServiceId(serviceId); + + if (task != null) { + ContainerStatus containerStatus = task.status().containerStatus(); + if (containerStatus != null) { + String containerId = containerStatus.containerId(); + String containerName = containerStatus.containerId(); + return new ContainerCreationStatus(true, containerName, containerId, nodePort); } } @@ -469,18 +410,13 @@ public String getContainerIp(String containerName) { } try { - List tasks = dockerClient.listTasks(); String swarmOverlayNetwork = ZaleniumConfiguration.getSwarmOverlayNetwork(); - for (Task task : tasks) { - ContainerStatus containerStatus = task.status().containerStatus(); - if (containerStatus != null) { - if (containerStatus.containerId().equals(containerId)) { - for (NetworkAttachment networkAttachment : CollectionUtils.emptyIfNull(task.networkAttachments())) { - if (networkAttachment.network().spec().name().equals(swarmOverlayNetwork)) { - String cidrSuffix = "/\\d+$"; - return networkAttachment.addresses().get(0).replaceAll(cidrSuffix, ""); - } - } + Task task = SwarmUtilities.getTaskByContainerId(containerId); + if (task != null) { + for (NetworkAttachment networkAttachment : CollectionUtils.emptyIfNull(task.networkAttachments())) { + if (networkAttachment.network().spec().name().equals(swarmOverlayNetwork)) { + String cidrSuffix = "/\\d+$"; + return networkAttachment.addresses().get(0).replaceAll(cidrSuffix, ""); } } } @@ -516,30 +452,21 @@ public boolean isTerminated(ContainerCreationStatus container) { try { List termStates = Arrays.asList("complete", "failed", "shutdown", "rejected", "orphaned", "removed"); String containerId = container.getContainerId(); - List tasks = dockerClient.listTasks(); - boolean containerExists = tasks.stream().anyMatch(task -> { - ContainerStatus containerStatus = task.status().containerStatus(); - return containerStatus != null && containerStatus.containerId().equals(containerId); - }); + Task task = SwarmUtilities.getTaskByContainerId(containerId); - if (!containerExists) { + if (task == null) { logger.info("Container {} has no corresponding task - flagging it as terminated", container); return true; } else { - return tasks.stream().anyMatch(task -> { - ContainerStatus containerStatus = task.status().containerStatus(); - boolean hasTerminalState = termStates.contains(task.status().state()); - boolean isContainer = containerStatus != null && containerStatus.containerId().equals(containerId); - boolean isTerminated = isContainer && hasTerminalState; - - if (isTerminated) { - logger.info("State of Container {} is {} - flagging it as terminated", - container.getContainerId(), - task.status().state()); - } + boolean isTerminated = termStates.contains(task.status().state()); + + if (isTerminated) { + logger.info("State of Container {} is {} - flagging it as terminated", + container.getContainerId(), + task.status().state()); + } - return isTerminated; - }); + return isTerminated; } } catch (DockerException | InterruptedException e) { logger.warn("Failed to fetch container status [" + container.getContainerId() + "].", e); diff --git a/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmUtilities.java b/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmUtilities.java index 5a497dbf74..4021aab6c7 100644 --- a/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmUtilities.java +++ b/src/main/java/de/zalando/ep/zalenium/container/swarm/SwarmUtilities.java @@ -2,13 +2,14 @@ import com.google.common.collect.ImmutableMap; +import com.spotify.docker.client.AnsiProgressHandler; import com.spotify.docker.client.DefaultDockerClient; import com.spotify.docker.client.DockerClient; import com.spotify.docker.client.exceptions.DockerException; -import com.spotify.docker.client.messages.AttachedNetwork; -import com.spotify.docker.client.messages.ContainerInfo; -import com.spotify.docker.client.messages.Network; +import com.spotify.docker.client.messages.*; +import com.spotify.docker.client.messages.Network; +import com.spotify.docker.client.messages.swarm.*; import de.zalando.ep.zalenium.util.Environment; import de.zalando.ep.zalenium.util.ZaleniumConfiguration; @@ -18,17 +19,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.URL; import java.util.List; import java.util.Map; +import static com.spotify.docker.client.DockerClient.ListContainersParam.withStatusCreated; +import static com.spotify.docker.client.DockerClient.ListContainersParam.withStatusRunning; + public class SwarmUtilities { - private static final Environment defaultEnvironment = new Environment(); - private static final String dockerHost = defaultEnvironment.getStringEnvVariable("DOCKER_HOST", "unix:///var/run/docker.sock"); - private static final DockerClient dockerClient = new DefaultDockerClient(dockerHost); private static final String overlayNetwork = ZaleniumConfiguration.getSwarmOverlayNetwork(); private static final Logger logger = LoggerFactory.getLogger(SwarmUtilities.class.getName()); + private static final Environment defaultEnvironment = new Environment(); + private static final String dockerHost = defaultEnvironment + .getStringEnvVariable("DOCKER_HOST", "unix:///var/run/docker.sock") + // https://github.com/spotify/docker-client/issues/946 + .replace("tcp", "http"); + private static final DockerClient dockerClient = new DefaultDockerClient(dockerHost); - public static ContainerInfo getContainerByIp(String ipAddress) { + public static synchronized ContainerInfo getContainerByIp(String ipAddress) { try { List networks = dockerClient.listNetworks(); for (Network network : CollectionUtils.emptyIfNull(networks)) { @@ -50,6 +58,85 @@ public static ContainerInfo getContainerByIp(String ipAddress) { return null; } + static synchronized List getRunningAndCreatedContainers() throws DockerException, InterruptedException { + return dockerClient.listContainers(withStatusRunning(), withStatusCreated()); + } + + static synchronized ContainerStatus getContainerByRemoteUrl(URL remoteUrl) throws DockerException, InterruptedException { + List tasks = dockerClient.listTasks(); + for (Task task : tasks) { + for (NetworkAttachment networkAttachment : CollectionUtils.emptyIfNull(task.networkAttachments())) { + for (String address : networkAttachment.addresses()) { + if (address.startsWith(remoteUrl.getHost())) { + return task.status().containerStatus(); + } + } + } + } + + return null; + } + + static synchronized void stopServiceByContainerId(String containerId) throws DockerException, InterruptedException { + List tasks = dockerClient.listTasks(); + for (Task task : tasks) { + ContainerStatus containerStatus = task.status().containerStatus(); + if (containerStatus != null && containerId.equals(containerStatus.containerId())) { + String serviceId = task.serviceId(); + Service.Criteria criteria = Service.Criteria.builder() + .serviceId(serviceId) + .build(); + List services = dockerClient.listServices(criteria); + if (!CollectionUtils.isEmpty(services)) { + dockerClient.removeService(serviceId); + } + } + } + } + + static synchronized Task getTaskByContainerId(String containerId) throws DockerException, InterruptedException { + List tasks = dockerClient.listTasks(); + + for (Task task : CollectionUtils.emptyIfNull(tasks)) { + ContainerStatus containerStatus = task.status().containerStatus(); + + if (containerStatus != null && containerId.equals(containerStatus.containerId())) { + return task; + } + } + + return null; + } + + static synchronized Task getTaskByServiceId(String serviceId) throws DockerException, InterruptedException { + String serviceName = dockerClient.inspectService(serviceId).spec().name(); + Task.Criteria criteria = Task.Criteria.builder().serviceName(serviceName).build(); + List tasks = dockerClient.listTasks(criteria); + Task task = null; + + if (!CollectionUtils.isEmpty(tasks)) { + task = tasks.get(0); + } + + return task; + } + + static synchronized void pullImageIfNotPresent(String imageName) throws DockerException, InterruptedException { + List images = dockerClient.listImages(DockerClient.ListImagesParam.byName(imageName)); + if (CollectionUtils.isEmpty(images)) { + dockerClient.pull(imageName, new AnsiProgressHandler()); + } + } + + static synchronized void startContainer(ContainerConfig containerConfig) throws DockerException, InterruptedException { + ContainerCreation containerCreation = dockerClient.createContainer(containerConfig); + dockerClient.startContainer(containerCreation.id()); + } + + static synchronized ServiceCreateResponse createService(ServiceSpec serviceSpec) throws DockerException, InterruptedException { + return dockerClient.createService(serviceSpec); + } + public static String getSwarmIp(ContainerInfo containerInfo) { AttachedNetwork attachedNetwork = MapUtils.emptyIfNull(containerInfo.networkSettings().networks()) .get(overlayNetwork);