diff --git a/commander/src/main/java/com/iluwatar/commander/Retry.java b/commander/src/main/java/com/iluwatar/commander/HandleErrorIssue.java similarity index 77% rename from commander/src/main/java/com/iluwatar/commander/Retry.java rename to commander/src/main/java/com/iluwatar/commander/HandleErrorIssue.java index 71614668254b..c06f6edfc376 100644 --- a/commander/src/main/java/com/iluwatar/commander/Retry.java +++ b/commander/src/main/java/com/iluwatar/commander/HandleErrorIssue.java @@ -23,50 +23,81 @@ * THE SOFTWARE. */ package com.iluwatar.commander; - import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -59,6 +62,7 @@ public interface HandleErrorIssue { + /** * Retry pattern. * * @param is the type of object passed into HandleErrorIssue as a parameter. */ - public class Retry { - /** * Operation Interface will define method to be implemented. */ - public interface Operation { void operation(List list) throws Exception; } - /** * HandleErrorIssue defines how to handle errors. * * @param is the type of object to be passed into the method as parameter. */ - public interface HandleErrorIssue { void handleIssue(T obj, Exception e); } private static final SecureRandom RANDOM = new SecureRandom(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final Operation op; private final HandleErrorIssue handleError; private final int maxAttempts; + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -86,26 +90,25 @@ public interface HandleErrorIssue { + private final long maxDelay; private final AtomicInteger attempts; private final Predicate test; private final List errors; - Retry(Operation op, HandleErrorIssue handleError, int maxAttempts, long maxDelay, Predicate... ignoreTests) { this.op = op; @@ -77,7 +108,6 @@ public interface HandleErrorIssue { this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); this.errors = new ArrayList<>(); } - /** * Performing the operation with retries. * @@ -86,26 +116,25 @@ public interface HandleErrorIssue { */ public void perform(List list, T obj) { - do { + scheduler.schedule(() -> { try { op.operation(list); - return; - } catch (Exception e) { + }catch (Exception e){ this.errors.add(e); if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { this.handleError.handleIssue(obj, e); + scheduler.shutdown(); return; //return here... don't go further } - try { - long testDelay = - (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000); - long delay = Math.min(testDelay, this.maxDelay); - Thread.sleep(delay); - } catch (InterruptedException f) { - //ignore - } + perform(list, obj); } - } while (true); + }, calculateDelay(), TimeUnit.MILLISECONDS); + } + + private long calculateDelay(){ + long testDelay = + (long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000); + return Math.min(testDelay, this.maxDelay); } -} +} \ No newline at end of file diff --git a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java index 37417e21267d..7ec33c2750fd 100644 --- a/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java +++ b/microservices-log-aggregation/src/main/java/com/iluwatar/logaggregation/LogAggregator.java @@ -25,12 +25,22 @@ package com.iluwatar.logaggregation; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; + + + + + + + Expand All + + @@ -45,7 +45,7 @@ public class LogAggregator { + /** * Responsible for collecting and buffering logs from different services. * Once the logs reach a certain threshold or after a certain time interval, @@ -40,15 +50,31 @@ */ @Slf4j public class LogAggregator { - private static final int BUFFER_THRESHOLD = 3; private final CentralLogStore centralLogStore; private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); private final LogLevel minLogLevel; - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final AtomicInteger logCount = new AtomicInteger(0); /** + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -90,8 +90,8 @@ public void collectLog(LogEntry logEntry) { + * constructor of LogAggregator. * * @param centralLogStore central log store implement @@ -59,7 +85,6 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) { this.minLogLevel = minLogLevel; startBufferFlusher(); } - /** * Collects a given log entry, and filters it by the defined log level. * @@ -70,19 +95,15 @@ public void collectLog(LogEntry logEntry) { LOGGER.warn("Log level or threshold level is null. Skipping."); return; } - if (logEntry.getLevel().compareTo(minLogLevel) < 0) { LOGGER.debug("Log level below threshold. Skipping."); return; } - buffer.offer(logEntry); - if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) { flushBuffer(); } } - /** * Stops the log aggregator service and flushes any remaining logs to * the central log store. @@ -90,13 +111,23 @@ public void collectLog(LogEntry logEntry) { * @throws InterruptedException If any thread has interrupted the current thread. */ public void stop() throws InterruptedException { - executorService.shutdownNow(); - if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) { LOGGER.error("Log aggregator did not terminate."); } flushBuffer(); - } + + + + + + + Expand All + + @@ -106,15 +106,7 @@ private void flushBuffer() { + + } private void flushBuffer() { LogEntry logEntry; while ((logEntry = buffer.poll()) != null) { @@ -106,15 +137,7 @@ private void flushBuffer() { } private void startBufferFlusher() { - executorService.execute(() -> { - while (!Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(5000); // Flush every 5 seconds. - flushBuffer(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - }); + //flush every 5 seconds + scheduler.scheduleWithFixedDelay(this::flushBuffer, 0, 5000, TimeUnit.MILLISECONDS); } -} +} \ No newline at end of file diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java index 7042ff7b79a2..2d233ced73e9 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/App.java @@ -26,10 +26,26 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; /** + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -104,12 +103,7 @@ public static void main(String[] args) { + * Many solutions in the cloud involve running tasks that invoke services. In this environment, if a * service is subjected to intermittent heavy loads, it can cause performance or reliability * issues. @@ -60,58 +76,54 @@ */ @Slf4j public class App { - //Executor shut down time limit. private static final int SHUTDOWN_TIME = 15; - /** * Program entry point. * * @param args command line args */ public static void main(String[] args) { - // An Executor that provides methods to manage termination and methods that can // produce a Future for tracking progress of one or more asynchronous tasks. ExecutorService executor = null; - try { // Create a MessageQueue object. var msgQueue = new MessageQueue(); - LOGGER.info("Submitting TaskGenerators and ServiceExecutor threads."); - // Create three TaskGenerator threads. Each of them will submit different number of jobs. final var taskRunnable1 = new TaskGenerator(msgQueue, 5); final var taskRunnable2 = new TaskGenerator(msgQueue, 1); final var taskRunnable3 = new TaskGenerator(msgQueue, 2); - // Create e service which should process the submitted jobs. final var srvRunnable = new ServiceExecutor(msgQueue); - // Create a ThreadPool of 2 threads and // submit all Runnable task for execution to executor executor = Executors.newFixedThreadPool(2); executor.submit(taskRunnable1); executor.submit(taskRunnable2); executor.submit(taskRunnable3); - // submitting serviceExecutor thread to the Executor service. executor.submit(srvRunnable); - // Initiates an orderly shutdown. LOGGER.info("Initiating shutdown." + " Executor will shutdown only after all the Threads are completed."); executor.shutdown(); - // Wait for SHUTDOWN_TIME seconds for all the threads to complete - // their tasks and then shut down the executor and then exit. - if (!executor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) { - LOGGER.info("Executor was shut down and Exiting."); - executor.shutdownNow(); - } + srvRunnable.shutdown(SHUTDOWN_TIME); } catch (Exception e) { LOGGER.error(e.getMessage()); } + + + + + + + + Expand Down + + + } } \ No newline at end of file diff --git a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java index 02530042b370..d056902462f3 100644 --- a/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java +++ b/queue-based-load-leveling/src/main/java/com/iluwatar/queue/load/leveling/ServiceExecutor.java @@ -25,6 +25,9 @@ package com.iluwatar.queue.load.leveling; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * ServiceExecuotr class. This class will pick up Messages one by one from the Blocking Queue and @@ -32,31 +35,59 @@ */ @Slf4j public class ServiceExecutor implements Runnable { - private final MessageQueue msgQueue; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); public ServiceExecutor(MessageQueue msgQueue) { this.msgQueue = msgQueue; - } + + + + + + + Expand All + + @@ -43,19 +46,26 @@ public ServiceExecutor(MessageQueue msgQueue) { + + } /** * The ServiceExecutor thread will retrieve each message and process it. */ public void run() { - try { - while (!Thread.currentThread().isInterrupted()) { - var msg = msgQueue.retrieveMsg(); + scheduler.scheduleWithFixedDelay(() -> { + var msg = msgQueue.retrieveMsg(); - if (null != msg) { - LOGGER.info(msg + " is served."); - } else { - LOGGER.info("Service Executor: Waiting for Messages to serve .. "); - } + if (null != msg) { + LOGGER.info(msg + " is served."); + } else { + LOGGER.info("Service Executor: Waiting for Messages to serve .. "); + } + }, 0, 1, TimeUnit.SECONDS); + } - Thread.sleep(1000); + public void shutdown(int shutdownTime) { + // Wait for SHUTDOWN_TIME seconds for all the threads to complete + // their tasks and then shut down the executor and then exit. + try { + if (!scheduler.awaitTermination(shutdownTime, TimeUnit.SECONDS)) { + LOGGER.info("Executor was shut down and Exiting."); + scheduler.shutdownNow(); } - } catch (Exception e) { + } catch (InterruptedException e) { LOGGER.error(e.getMessage()); } } -} + + + + + + + + Expand Down + + + +} \ No newline at end of file diff --git a/retry/src/main/java/com/iluwatar/retry/Retry.java b/retry/src/main/java/com/iluwatar/retry/Retry.java index ad9580454993..5829e1eb666d 100644 --- a/retry/src/main/java/com/iluwatar/retry/Retry.java +++ b/retry/src/main/java/com/iluwatar/retry/Retry.java @@ -23,14 +23,29 @@ * THE SOFTWARE. */ package com.iluwatar.retry; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; + + + + + + + + Expand All + + @@ -38,6 +43,7 @@ + /** * Decorates {@link BusinessOperation business operation} with "retry" capabilities. * @@ -38,12 +53,29 @@ */ public final class Retry implements BusinessOperation { private final BusinessOperation op; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final int maxAttempts; private final long delay; private final AtomicInteger attempts; + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -88,22 +94,36 @@ public int attempts() { + private final Predicate test; private final List errors; - /** * Ctor. * @@ -67,7 +99,6 @@ public Retry( this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); this.errors = new ArrayList<>(); } - /** * The errors encountered while retrying, in the encounter order. * @@ -76,7 +107,6 @@ public Retry( public List errors() { return Collections.unmodifiableList(this.errors); } - /** * The number of retries performed. * @@ -88,22 +118,36 @@ public int attempts() { @Override public T perform() throws BusinessException { - do { + final CompletableFuture future = new CompletableFuture<>(); + + performRetry(future); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof BusinessException be){ + throw be; + } + throw new BusinessException("Unexpected exception occurred " + e.getMessage()); + } finally { + scheduler.shutdown(); + } + } + + private void performRetry(CompletableFuture future){ + scheduler.schedule(() -> { try { - return this.op.perform(); - } catch (BusinessException e) { + T result = this.op.perform(); + future.complete(result); + } catch (BusinessException e){ this.errors.add(e); if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - throw e; + future.completeExceptionally(e); + return; } - try { - Thread.sleep(this.delay); - } catch (InterruptedException f) { - //ignore - } + performRetry(future); } - } while (true); + }, this.delay, TimeUnit.MILLISECONDS); } -} +} \ No newline at end of file diff --git a/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java b/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java index 1661095b7298..d43603b1eb1a 100644 --- a/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java +++ b/retry/src/main/java/com/iluwatar/retry/RetryExponentialBackoff.java @@ -22,91 +22,139 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.iluwatar.retry; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; + package com.iluwatar.retry; -/** - * Decorates {@link BusinessOperation business operation} with "retry" capabilities. - * - * @param the remote op's return type - */ -public final class RetryExponentialBackoff implements BusinessOperation { - private static final Random RANDOM = new Random(); - private final BusinessOperation op; - private final int maxAttempts; - private final long maxDelay; - private final AtomicInteger attempts; - private final Predicate test; - private final List errors; - - /** - * Ctor. - * - * @param op the {@link BusinessOperation} to retry - * @param maxAttempts number of times to retry - * @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions - * will be ignored if no tests are given - */ - @SafeVarargs - public RetryExponentialBackoff( - BusinessOperation op, - int maxAttempts, - long maxDelay, - Predicate... ignoreTests - ) { - this.op = op; - this.maxAttempts = maxAttempts; - this.maxDelay = maxDelay; - this.attempts = new AtomicInteger(); - this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); - this.errors = new ArrayList<>(); - } - - /** - * The errors encountered while retrying, in the encounter order. - * - * @return the errors encountered while retrying - */ - public List errors() { - return Collections.unmodifiableList(this.errors); - } - - /** - * The number of retries performed. - * - * @return the number of retries performed - */ - public int attempts() { - return this.attempts.intValue(); - } - - @Override - public T perform() throws BusinessException { - do { - try { - return this.op.perform(); - } catch (BusinessException e) { - this.errors.add(e); - - if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { - throw e; - } - - try { - var testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000); - var delay = Math.min(testDelay, this.maxDelay); - Thread.sleep(delay); - } catch (InterruptedException f) { - //ignore - } - } - } while (true); - } -} + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collections; + import java.util.List; + import java.util.Random; + import java.util.concurrent.CompletableFuture; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.function.Predicate; + + + + + + + + + Expand All + + @@ -40,6 +46,7 @@ + + /** + * Decorates {@link BusinessOperation business operation} with "retry" capabilities. + * + * @param the remote op's return type + */ + public final class RetryExponentialBackoff implements BusinessOperation { + private static final Random RANDOM = new Random(); + private final BusinessOperation op; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final int maxAttempts; + private final long maxDelay; + private final AtomicInteger attempts; + + + + + + + + Expand Down + + + + + + Expand Up + + @@ -89,24 +96,41 @@ public int attempts() { + + private final Predicate test; + private final List errors; + /** + * Ctor. + * + * @param op the {@link BusinessOperation} to retry + * @param maxAttempts number of times to retry + * @param ignoreTests tests to check whether the remote exception can be ignored. No exceptions + * will be ignored if no tests are given + */ + @SafeVarargs + public RetryExponentialBackoff( + BusinessOperation op, + int maxAttempts, + long maxDelay, + Predicate... ignoreTests + ) { + this.op = op; + this.maxAttempts = maxAttempts; + this.maxDelay = maxDelay; + this.attempts = new AtomicInteger(); + this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false); + this.errors = new ArrayList<>(); + } + /** + * The errors encountered while retrying, in the encounter order. + * + * @return the errors encountered while retrying + */ + public List errors() { + return Collections.unmodifiableList(this.errors); + } + /** + * The number of retries performed. + * + * @return the number of retries performed + */ + public int attempts() { + return this.attempts.intValue(); + } + + @Override + public T perform() throws BusinessException { + final CompletableFuture future = new CompletableFuture<>(); + + performWithRetries(future); + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof BusinessException be) { + throw be; + } + throw new BusinessException("Unexpected exception occurred " + e.getMessage()); + } finally { + scheduler.shutdown(); + } + } + + private void performWithRetries(CompletableFuture future) { + scheduler.schedule(() -> { + try { + T result = this.op.perform(); + future.complete(result); + } catch (BusinessException e) { + this.errors.add(e); + + if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) { + future.completeExceptionally(e); + return; + } + + performWithRetries(future); + } + }, calculateDelay(), TimeUnit.MILLISECONDS); + } + + private long calculateDelay() { + var testDelay = (long) Math.pow(2, this.attempts()) * 1000 + RANDOM.nextInt(1000); + return Math.min(testDelay, this.maxDelay); + } \ No newline at end of file diff --git a/server-session/src/main/java/com/iluwatar/sessionserver/App.java b/server-session/src/main/java/com/iluwatar/sessionserver/App.java index a3c66d3ff634..ee655e7e7878 100644 --- a/server-session/src/main/java/com/iluwatar/sessionserver/App.java +++ b/server-session/src/main/java/com/iluwatar/sessionserver/App.java @@ -23,17 +23,30 @@ * THE SOFTWARE. */ package com.iluwatar.sessionserver; - import com.sun.net.httpserver.HttpServer; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Instant; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; /** + + + + + + + + Expand All + + @@ -54,8 +57,9 @@ + * The server session pattern is a behavioral design pattern concerned with assigning the responsibility * of storing session data on the server side. Within the context of stateless protocols like HTTP all * requests are isolated events independent of previous requests. In order to create sessions during @@ -49,16 +62,27 @@ * requests in a list. When a user logs out the session identifier is deleted from the list along with * the appropriate user session data, which is handle by the ({@link LogoutHandler}) class. */ - @Slf4j public class App { // Map to store session data (simulated using a HashMap) - private static Map sessions = new HashMap<>(); - private static Map sessionCreationTimes = new HashMap<>(); + private static final Map sessions = new ConcurrentHashMap<>(); + private static final Map sessionCreationTimes = new ConcurrentHashMap<>(); + private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private static final long SESSION_EXPIRATION_TIME = 10000; /** + + + + + + + + Expand All + + @@ -81,31 +85,25 @@ public static void main(String[] args) throws IOException { + * Main entry point. * @param args arguments * @throws IOException ex @@ -66,46 +90,36 @@ public class App { public static void main(String[] args) throws IOException { // Create HTTP server listening on port 8000 HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0); - // Set up session management endpoints server.createContext("/login", new LoginHandler(sessions, sessionCreationTimes)); server.createContext("/logout", new LogoutHandler(sessions, sessionCreationTimes)); - // Start the server server.start(); - // Start background task to check for expired sessions sessionExpirationTask(); - LOGGER.info("Server started. Listening on port 8080..."); } private static void sessionExpirationTask() { - new Thread(() -> { - while (true) { - try { - LOGGER.info("Session expiration checker started..."); - Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time - Instant currentTime = Instant.now(); - synchronized (sessions) { - synchronized (sessionCreationTimes) { - Iterator> iterator = - sessionCreationTimes.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { - sessions.remove(entry.getKey()); - iterator.remove(); - } - } - } + scheduler.scheduleWithFixedDelay(() -> { + try { + LOGGER.info("Session expiration checker started..."); + + Instant currentTime = Instant.now(); + Iterator> iterator = sessionCreationTimes.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) { + sessions.remove(entry.getKey()); + iterator.remove(); } - LOGGER.info("Session expiration checker finished!"); - } catch (InterruptedException e) { - LOGGER.error("An error occurred: ", e); - Thread.currentThread().interrupt(); } + LOGGER.info("Session expiration checker finished!"); + + } catch (Exception e) { + LOGGER.error("An error occured: ", e); } - }).start(); + }, SESSION_EXPIRATION_TIME, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS); } } \ No newline at end of file diff --git a/twin/src/main/java/com/iluwatar/twin/BallThread.java b/twin/src/main/java/com/iluwatar/twin/BallThread.java index 9d4d9cf71a76..2e5ed21417c0 100644 --- a/twin/src/main/java/com/iluwatar/twin/BallThread.java +++ b/twin/src/main/java/com/iluwatar/twin/BallThread.java @@ -24,55 +24,76 @@ */ package com.iluwatar.twin; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.Setter; import lombok.extern.slf4j.Slf4j; + + + + + + + + Expand All + + @@ -40,24 +43,19 @@ public class BallThread extends Thread { + /** * This class is a UI thread for drawing the {@link BallItem}, and provide the method for suspend * and resume. It holds the reference of {@link BallItem} to delegate the draw task. */ - @Slf4j public class BallThread extends Thread { - @Setter private BallItem twin; private volatile boolean isSuspended; - private volatile boolean isRunning = true; + private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + /** * Run the thread. */ public void run() { - - while (isRunning) { + scheduler.scheduleWithFixedDelay(() -> { if (!isSuspended) { twin.draw(); twin.move(); } - try { - Thread.sleep(250); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } + }, 0, 250, TimeUnit.MILLISECONDS); } public void suspendMe() { + + + + + + + + Expand All + + @@ -70,9 +68,14 @@ public void resumeMe() { + isSuspended = true; LOGGER.info("Begin to suspend BallThread"); } - public void resumeMe() { isSuspended = false; LOGGER.info("Begin to resume BallThread"); } + /** + * Stop the scheduled task. + */ public void stopMe() { - this.isRunning = false; this.isSuspended = true; + if (scheduler != null) { + scheduler.shutdown(); + } } -} - +} \ No newline at end of file diff --git a/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java b/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java index 26cf78509dcf..c0f1cdbf2bea 100644 --- a/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java +++ b/twin/src/test/java/com/iluwatar/twin/BallThreadTest.java @@ -21,100 +21,149 @@ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. - */ -package com.iluwatar.twin; - -import static java.lang.Thread.UncaughtExceptionHandler; -import static java.lang.Thread.sleep; -import static java.time.Duration.ofMillis; -import static org.junit.jupiter.api.Assertions.assertTimeout; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; - -import org.junit.jupiter.api.Test; -/** - * BallThreadTest - * */ -class BallThreadTest { - - /** - * Verify if the {@link BallThread} can be resumed - */ - @Test - void testSuspend() { - assertTimeout(ofMillis(5000), () -> { - final var ballThread = new BallThread(); - - final var ballItem = mock(BallItem.class); - ballThread.setTwin(ballItem); - - ballThread.start(); - sleep(200); - verify(ballItem, atLeastOnce()).draw(); - verify(ballItem, atLeastOnce()).move(); - ballThread.suspendMe(); - - sleep(1000); - - ballThread.stopMe(); - ballThread.join(); - - verifyNoMoreInteractions(ballItem); - }); - } - - /** - * Verify if the {@link BallThread} can be resumed - */ - @Test - void testResume() { - assertTimeout(ofMillis(5000), () -> { - final var ballThread = new BallThread(); - - final var ballItem = mock(BallItem.class); - ballThread.setTwin(ballItem); - - ballThread.suspendMe(); - ballThread.start(); - - sleep(1000); - - verifyNoMoreInteractions(ballItem); - - ballThread.resumeMe(); - sleep(300); - verify(ballItem, atLeastOnce()).draw(); - verify(ballItem, atLeastOnce()).move(); - - ballThread.stopMe(); - ballThread.join(); - - verifyNoMoreInteractions(ballItem); - }); - } - - /** - * Verify if the {@link BallThread} is interruptible - */ - @Test - void testInterrupt() { - assertTimeout(ofMillis(5000), () -> { - final var ballThread = new BallThread(); - final var exceptionHandler = mock(UncaughtExceptionHandler.class); - ballThread.setUncaughtExceptionHandler(exceptionHandler); - ballThread.setTwin(mock(BallItem.class)); - ballThread.start(); - ballThread.interrupt(); - ballThread.join(); - verify(exceptionHandler).uncaughtException(eq(ballThread), any(RuntimeException.class)); - verifyNoMoreInteractions(exceptionHandler); - }); - } -} \ No newline at end of file + package com.iluwatar.twin; + + + + + + import static java.lang.Thread.sleep; + + import static java.time.Duration.ofMillis; + + import static org.junit.jupiter.api.Assertions.assertTimeout; + + + + import static org.mockito.Mockito.atLeastOnce; + + import static org.mockito.Mockito.mock; + + import static org.mockito.Mockito.verify; + import static org.mockito.Mockito.verifyNoMoreInteractions; + + import org.junit.jupiter.api.Test; + + /** + * BallThreadTest + * + */ + class BallThreadTest { + + /** + * Verify if the {@link BallThread} can be resumed + */ + @Test + void testSuspend() { + assertTimeout(ofMillis(5000), () -> { + final var ballThread = new BallThread(); + + final var ballItem = mock(BallItem.class); + ballThread.setTwin(ballItem); + + ballThread.start(); + sleep(200); + verify(ballItem, atLeastOnce()).draw(); + verify(ballItem, atLeastOnce()).move(); + ballThread.suspendMe(); + + sleep(1000); + + ballThread.stopMe(); + ballThread.join(); + + verifyNoMoreInteractions(ballItem); + }); + } + + /** + * Verify if the {@link BallThread} can be resumed + */ + @Test + void testResume() { + assertTimeout(ofMillis(5000), () -> { + final var ballThread = new BallThread(); + + final var ballItem = mock(BallItem.class); + ballThread.setTwin(ballItem); + + ballThread.suspendMe(); + ballThread.start(); + + sleep(1000); + + verifyNoMoreInteractions(ballItem); + + ballThread.resumeMe(); + sleep(300); + verify(ballItem, atLeastOnce()).draw(); + verify(ballItem, atLeastOnce()).move(); + + ballThread.stopMe(); + ballThread.join(); + + verifyNoMoreInteractions(ballItem); + }); + + } + + + + + /** + + * Verify if the {@link BallThread} can be stopped + + */ + + @Test + + void testStopped() { + + assertTimeout(ofMillis(5000), () -> { + + final var ballThread = new BallThread(); + + final var twin = mock(BallItem.class); + + ballThread.setTwin(twin); + + + ballThread.start(); + + + + + + + + sleep(300); + + verify(twin, atLeastOnce()).draw(); + + verify(twin, atLeastOnce()).move(); + + + + + // Stop the thread + + ballThread.stopMe(); + + ballThread.join(); + + + + + // Ensure that the thread has stopped and no more interactions occur + + verifyNoMoreInteractions(twin); + + }); + + } + + } \ No newline at end of file