diff --git a/package.json b/package.json index d43a3a3f6..2ca2efac6 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,6 @@ "cronstrue": "1.119.0", "cryptr": "4.0.2", "file-saver": "2.0.5", - "fluture": "14.0.0", "font-awesome": "4.7.0", "json-stringify-deterministic": "1.0.1", "jszip": "3.8.0", @@ -51,6 +50,7 @@ "react-router-dom": "5.3.0", "react-virtualized-auto-sizer": "1.0.6", "react-window": "1.8.6", + "real-cancellable-promise": "^1.2.1", "rehype-raw": "^6.1.1", "rehype-sanitize": "^5.0.1", "rxjs": "5.5.7", diff --git a/src/data/common/utils/api-futures.ts b/src/data/common/utils/api-futures.ts new file mode 100644 index 000000000..81b7c917d --- /dev/null +++ b/src/data/common/utils/api-futures.ts @@ -0,0 +1,21 @@ +import { Future, FutureData } from "../../../domain/common/entities/Future"; +import { CancelableResponse } from "../../../types/d2-api"; + +/** + * @description This file is refactored + */ +export function apiToFuture(res: CancelableResponse): FutureData { + return Future.fromComputation((resolve, reject) => { + res.getData() + .then(resolve) + .catch((err: unknown) => { + if (err instanceof Error) { + reject(err); + } else { + console.error("apiToFuture:uncatched", err); + reject(new Error("Unknown error")); + } + }); + return res.cancel; + }); +} diff --git a/src/data/common/utils/futures.ts b/src/data/common/utils/futures.ts deleted file mode 100644 index d82503dd0..000000000 --- a/src/data/common/utils/futures.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { CancelableResponse } from "@eyeseetea/d2-api/repositories/CancelableResponse"; -import { Future, FutureData } from "../../../domain/common/entities/Future"; - -export function apiToFuture(res: CancelableResponse): FutureData { - return Future.fromComputation((resolve, reject) => { - res.getData() - .then(resolve) - .catch(err => reject(err ? err.message : "Unknown error")); - return res.cancel; - }); -} diff --git a/src/data/scheduler/SchedulerD2ApiRepository.ts b/src/data/scheduler/SchedulerD2ApiRepository.ts deleted file mode 100644 index 8c6077d5d..000000000 --- a/src/data/scheduler/SchedulerD2ApiRepository.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { ConfigRepository } from "../../domain/config/repositories/ConfigRepository"; -import { SchedulerExecution } from "../../domain/scheduler/entities/SchedulerExecution"; -import { SchedulerRepository } from "../../domain/scheduler/repositories/SchedulerRepository"; -import { Namespace } from "../storage/Namespaces"; -import { SchedulerExecutionModel } from "./models/SchedulerExecutionModel"; - -export class SchedulerD2ApiRepository implements SchedulerRepository { - constructor(private configRepository: ConfigRepository) {} - - public async updateLastExecution(execution: SchedulerExecution): Promise { - const data = SchedulerExecutionModel.encode(execution); - const storage = await this.configRepository.getStorageClient(); - return storage.saveObject(Namespace.SCHEDULER_EXECUTIONS, data); - } - - public async getLastExecution(): Promise { - const storage = await this.configRepository.getStorageClient(); - const data = await storage.getOrCreateObject(Namespace.SCHEDULER_EXECUTIONS, {}); - return SchedulerExecutionModel.unsafeDecode(data); - } -} diff --git a/src/data/scheduler/SchedulerExecutionInfoD2ApiRepository.ts b/src/data/scheduler/SchedulerExecutionInfoD2ApiRepository.ts new file mode 100644 index 000000000..7469674ae --- /dev/null +++ b/src/data/scheduler/SchedulerExecutionInfoD2ApiRepository.ts @@ -0,0 +1,31 @@ +import { Future, FutureData } from "../../domain/common/entities/Future"; +import { SchedulerExecutionInfo } from "../../domain/scheduler/entities/SchedulerExecutionInfo"; +import { SchedulerExecutionInfoRepository } from "../../domain/scheduler/repositories/SchedulerExecutionInfoRepositoryConstructor"; +import { Namespace } from "../storage/Namespaces"; +import { StorageDataStoreClient } from "../storage/StorageDataStoreClient"; +import { SchedulerExecutionInfoModel } from "./models/SchedulerExecutionInfoModel"; + +/** + * @description This file is refactored + */ +export class SchedulerExecutionInfoD2ApiRepository implements SchedulerExecutionInfoRepository { + constructor(private dataStoreClient: StorageDataStoreClient) {} + + public updateExecutionInfo(execution: SchedulerExecutionInfo): FutureData { + const data = SchedulerExecutionInfoModel.encode(execution); + return Future.fromPromise( + this.dataStoreClient.saveObject(Namespace.SCHEDULER_EXECUTIONS, data) + ).flatMap(() => { + return Future.success(undefined); + }); + } + + public getLastExecutionInfo(): FutureData { + return Future.fromPromise( + this.dataStoreClient.getOrCreateObject(Namespace.SCHEDULER_EXECUTIONS, {}) + ).flatMap(data => { + const schedulerExecutionInfo: SchedulerExecutionInfo = SchedulerExecutionInfoModel.unsafeDecode(data); + return Future.success(schedulerExecutionInfo); + }); + } +} diff --git a/src/data/scheduler/models/SchedulerExecutionModel.ts b/src/data/scheduler/models/SchedulerExecutionInfoModel.ts similarity index 53% rename from src/data/scheduler/models/SchedulerExecutionModel.ts rename to src/data/scheduler/models/SchedulerExecutionInfoModel.ts index 8c0eec0dc..cd9beef9d 100644 --- a/src/data/scheduler/models/SchedulerExecutionModel.ts +++ b/src/data/scheduler/models/SchedulerExecutionInfoModel.ts @@ -1,7 +1,7 @@ -import { SchedulerExecution } from "../../../domain/scheduler/entities/SchedulerExecution"; +import { SchedulerExecutionInfo } from "../../../domain/scheduler/entities/SchedulerExecutionInfo"; import { Codec, Schema } from "../../../utils/codec"; -export const SchedulerExecutionModel: Codec = Schema.object({ +export const SchedulerExecutionInfoModel: Codec = Schema.object({ lastExecutionDuration: Schema.optional(Schema.number), lastExecution: Schema.optional(Schema.date), nextExecution: Schema.optional(Schema.date), diff --git a/src/domain/common/entities/Future.ts b/src/domain/common/entities/Future.ts index ac835e48b..c6ab56668 100644 --- a/src/domain/common/entities/Future.ts +++ b/src/domain/common/entities/Future.ts @@ -1,139 +1,205 @@ -import * as fluture from "fluture"; -import _ from "lodash"; -import { Either } from "purify-ts"; - +import * as rcpromise from "real-cancellable-promise"; +import { Cancellation } from "real-cancellable-promise"; + +/** + * Futures are async values similar to promises, with some differences: + * - Futures are only executed when their method `run` is called. + * - Futures are cancellable (thus, they can be easily used in a `React.useEffect`, for example). + * - Futures have fully typed errors. Subclass Error if you need full stack traces. + * - You may still use async/await monad-style blocks (check Future.block). + * + * More info: https://github.com/EyeSeeTea/know-how/wiki/Async-futures + */ + +/** + * @description Future is refactored + */ export class Future { - private constructor(private instance: fluture.FutureInstance) {} + private constructor(private _promise: () => rcpromise.CancellablePromise) {} - run(onSuccess: Fn, onError: Fn): Cancel { - return fluture.fork(onError)(onSuccess)(this.instance); + static success(data: D): Future { + return new Future(() => rcpromise.CancellablePromise.resolve(data)); } - map(mapper: (data: D) => D2): Future { - const instance2 = fluture.map(mapper)(this.instance) as fluture.FutureInstance; - return new Future(instance2); + static error(error: E): Future { + return new Future(() => rcpromise.CancellablePromise.reject(error)); } - mapError(mapper: (data: E) => E2): Future { - const instance2 = fluture.mapRej(mapper)(this.instance) as fluture.FutureInstance; - return new Future(instance2); + static fromComputation( + computation: (resolve: (value: D) => void, reject: (error: E) => void) => Cancel + ): Future { + let cancel: Cancel = () => {}; + + return new Future(() => { + const promise = new Promise((resolve, reject) => { + cancel = computation(resolve, error => reject(error)); + }); + + return new rcpromise.CancellablePromise(promise, cancel || (() => {})); + }); } - bimap(dataMapper: (data: D) => D2, errorMapper: (error: E) => E2): Future { - const instance2 = fluture.bimap(errorMapper)(dataMapper)(this.instance); - return new Future(instance2); + static fromPromise(promise: Promise): FutureData { + return Future.fromComputation((resolve, reject) => { + promise.then(resolve).catch(err => reject(err ? err.message : "Unknown error")); + return () => {}; + }); } - flatMap(mapper: (data: D) => Future): Future { - const chainMapper = fluture.chain(data => mapper(data).instance); - return new Future(chainMapper(this.instance)); + run(onSuccess: (data: D) => void, onError: (error: E) => void): Cancel { + return this._promise().then(onSuccess, err => { + if (err instanceof rcpromise.Cancellation) { + // no-op + } else { + onError(err); + } + }).cancel; } - flatMapError(mapper: (error: E) => Future): Future { - const chainRejMapper = fluture.chainRej(error => mapper(error).instance); - return new Future(chainRejMapper(this.instance)); + map(fn: (data: D) => U): Future { + return new Future(() => this._promise().then(fn)); } - toPromise(onError?: (error: E) => void): Promise { - return new Promise((resolve, reject) => { - this.run(resolve, onError ?? reject); - }); + mapError(fn: (error: E) => E2): Future { + return new Future(() => + this._promise().catch((error: E) => { + throw fn(error); + }) + ); } - runAsync(): Promise<{ data?: D; error?: E }> { - return new Promise(resolve => { - this.run( - data => resolve({ data }), - error => resolve({ error }) - ); + flatMap(fn: (data: D) => Future): Future { + return new Future(() => this._promise().then(data => fn(data)._promise())); + } + + chain(fn: (data: D) => Future): Future { + return this.flatMap(fn); + } + + toPromise(): Promise { + return this._promise(); + } + + static join2(async1: Future, async2: Future): Future { + return new Future(() => { + return rcpromise.CancellablePromise.all([async1._promise(), async2._promise()]); }); } - /* Static methods */ - static noCancel: Cancel = () => {}; + static joinObj>>( + obj: Obj, + options: ParallelOptions = { concurrency: 1 } + ): Future< + Obj[keyof Obj] extends Future ? E : never, + { [K in keyof Obj]: Obj[K] extends Future ? U : never } + > { + const asyncs = Object.values(obj); - static fromPromise(computation: Promise): Future { - return new Future( - fluture.Future((reject, resolve) => { - computation.then(data => resolve(data)).catch(error => reject(error)); - return Future.noCancel; - }) - ); + return Future.parallel(asyncs, options).map(values => { + const keys = Object.keys(obj); + const pairs = keys.map((key, idx) => [key, values[idx]]); + return Object.fromEntries(pairs); + }); } - static fromComputation(computation: Computation): Future { - return new Future(fluture.Future((reject, resolve) => computation(resolve, reject))); + static sequential(asyncs: Future[]): Future { + return Future.block(async $ => { + const output: D[] = []; + for (const async of asyncs) { + const res = await $(async); + output.push(res); + } + return output; + }); } - static fromPurifyEither(input: Either): Future { - return new Future( - fluture.Future((reject, resolve) => { - if (input.isRight()) resolve(input.extract()); - else if (input.isLeft()) reject(input.extract()); - return () => {}; + static parallel(asyncs: Future[], options: ParallelOptions): Future { + return new Future(() => + rcpromise.buildCancellablePromise(async $ => { + const queue: rcpromise.CancellablePromise[] = []; + const output: D[] = new Array(asyncs.length); + + for (const [idx, async] of asyncs.entries()) { + const queueItem$ = async._promise().then(res => { + queue.splice(queue.indexOf(queueItem$), 1); + output[idx] = res; + }); + + queue.push(queueItem$); + + if (queue.length >= options.concurrency) await $(rcpromise.CancellablePromise.race(queue)); + } + + await $(rcpromise.CancellablePromise.all(queue)); + return output; }) ); } - static success(data: D): Future { - return new Future(fluture.resolve(data)); + static sleep(ms: number): Future { + return new Future(() => rcpromise.CancellablePromise.delay(ms)).map(() => ms); } - static error(error: E): Future { - return new Future(fluture.reject(error)); + static void(): Future { + return Future.success(undefined); } - static join2(future1: Future, future2: Future): Future { - const instance = fluture.both(future1.instance)(future2.instance); - return new Future(instance); - } + static block(blockFn: (capture: CaptureAsync) => Promise): Future { + return new Future((): rcpromise.CancellablePromise => { + return rcpromise.buildCancellablePromise(capturePromise => { + const captureAsync: CaptureAsync = async => { + return capturePromise(async._promise()); + }; - static parallel(futures: Array>, options: ParallelOptions = {}): Future> { - const { maxConcurrency = 10, catchErrors } = options; - const parallel = fluture.parallel(maxConcurrency); - const coalesce = catchErrors ? fluture.coalesce(error => catchErrors(error))((d: D) => d) : undefined; - const instance = parallel(futures.map(future => (coalesce ? coalesce(future.instance) : future.instance))); - return new Future(instance); - } + captureAsync.throw = function (error: E) { + throw error; + }; - static joinObj>>( - futuresObj: FuturesObj, - options: ParallelOptions = {} - ): JoinObj { - const { maxConcurrency = 10 } = options; - const parallel = fluture.parallel(maxConcurrency); - const keys = _.keys(futuresObj); - const futures = _.values(futuresObj); - const flutures = parallel(futures.map(future => future.instance)); - const futureObj = new Future(flutures).map(values => _.zipObject(keys, values)); - return futureObj as JoinObj; + return blockFn(captureAsync); + }); + }); } - static futureMap( - inputValues: T[], - mapper: (value: T, index: number) => Future, - options?: ParallelOptions - ): Future { - return this.parallel( - inputValues.map((value, index) => mapper(value, index)), - options - ); + static block_() { + return function (blockFn: (capture: CaptureAsync) => Promise): Future { + return Future.block(blockFn); + }; } } -type ParallelOptions = { maxConcurrency?: number; catchErrors?: (error: E) => D }; - -type JoinObj>> = Future< - ExtractFutureError, - { [K in keyof Futures]: ExtractFutureData } ->; +export type Cancel = (() => void) | undefined; -export type ExtractFutureData = F extends Future ? D : never; -export type ExtractFutureError = F extends Future ? E : never; +interface CaptureAsync { + (async: Future): Promise; + throw: (error: E) => never; +} -type Fn = (value: T) => void; +type ParallelOptions = { concurrency: number }; + +export function getJSON(url: string): Future { + const abortController = new AbortController(); + + return Future.fromComputation((resolve, reject) => { + // exceptions: TypeError | DOMException[name=AbortError] + fetch(url, { method: "get", signal: abortController.signal }) + .then(res => res.json() as unknown as U) // exceptions: SyntaxError + .then(data => resolve(data)) + .catch((error: unknown) => { + if (isNamedError(error) && error.name === "AbortError") { + throw new Cancellation(); + } else if (error instanceof TypeError || error instanceof SyntaxError) { + reject(error); + } else { + reject(new TypeError("Unknown error")); + } + }); + + return () => abortController.abort(); + }); +} -export type Cancel = () => void; +function isNamedError(error: unknown): error is { name: string } { + return Boolean(error && typeof error === "object" && "name" in error); +} -export type Computation = (resolve: Fn, reject: Fn) => fluture.Cancel; -export type FutureData = Future; +export type FutureData = Future; diff --git a/src/domain/common/factories/RepositoryFactory.ts b/src/domain/common/factories/RepositoryFactory.ts index 396ad7c13..c01eea683 100644 --- a/src/domain/common/factories/RepositoryFactory.ts +++ b/src/domain/common/factories/RepositoryFactory.ts @@ -20,7 +20,6 @@ import { GitHubRepositoryConstructor } from "../../packages/repositories/GitHubR import { ReportsRepositoryConstructor } from "../../reports/repositories/ReportsRepository"; import { FileRulesRepositoryConstructor } from "../../rules/repositories/FileRulesRepository"; import { RulesRepositoryConstructor } from "../../rules/repositories/RulesRepository"; -import { SchedulerRepositoryConstructor } from "../../scheduler/repositories/SchedulerRepository"; import { SettingsRepositoryConstructor } from "../../settings/SettingsRepository"; import { DownloadRepositoryConstructor } from "../../storage/repositories/DownloadRepository"; import { StoreRepositoryConstructor } from "../../stores/repositories/StoreRepository"; @@ -37,6 +36,9 @@ import { UserRepositoryConstructor } from "../../user/repositories/UserRepositor type ClassType = new (...args: any[]) => any; +/** + * @todo We need to think how to refactor RepositoryFactory concept + */ export class RepositoryFactory { constructor(private encryptionKey: string) {} @@ -189,12 +191,6 @@ export class RepositoryFactory { return this.get(Repositories.MappingRepository, [config]); } - @cache() - public schedulerRepository(instance: Instance) { - const config = this.configRepository(instance); - return this.get(Repositories.SchedulerRepository, [config]); - } - @cache() public settingsRepository(instance: Instance) { const config = this.configRepository(instance); @@ -231,7 +227,6 @@ export const Repositories = { UserRepository: "userRepository", MappingRepository: "mappingRepository", SettingsRepository: "settingsRepository", - SchedulerRepository: "schedulerRepository", DataStoreMetadataRepository: "dataStoreMetadataRepository", DhisReleasesRepository: "dhisReleasesRepository", TableColumnsRepository: "tableColumnsRepository", diff --git a/src/domain/rules/usecases/ListSyncRuleUseCase.ts b/src/domain/rules/usecases/ListSyncRuleUseCase.ts index a6615fabc..88c0fd456 100644 --- a/src/domain/rules/usecases/ListSyncRuleUseCase.ts +++ b/src/domain/rules/usecases/ListSyncRuleUseCase.ts @@ -15,7 +15,7 @@ export interface ListSyncRuleUseCaseParams { sorting?: { field: keyof SynchronizationRule; order: "asc" | "desc" }; filters?: { targetInstanceFilter?: string; - enabledFilter?: string; + schedulerEnabledFilter?: string; lastExecutedFilter?: Date | null; types?: SynchronizationType[]; search?: string; @@ -39,7 +39,13 @@ export class ListSyncRuleUseCase implements UseCase { }: ListSyncRuleUseCaseParams): Promise { const rawData = await this.repositoryFactory.rulesRepository(this.localInstance).list(); - const { targetInstanceFilter = null, enabledFilter = null, lastExecutedFilter = null, types, search } = filters; + const { + targetInstanceFilter = null, + schedulerEnabledFilter = null, + lastExecutedFilter = null, + types, + search, + } = filters; const filteredData = search ? _.filter(rawData, item => @@ -67,8 +73,11 @@ export class ListSyncRuleUseCase implements UseCase { }) .filter(rule => (targetInstanceFilter ? rule.targetInstances.includes(targetInstanceFilter) : true)) .filter(rule => { - if (!enabledFilter) return true; - return (rule.enabled && enabledFilter === "enabled") || (!rule.enabled && enabledFilter === "disabled"); + if (!schedulerEnabledFilter) return true; + return ( + (rule.enabled && schedulerEnabledFilter === "enabled") || + (!rule.enabled && schedulerEnabledFilter === "disabled") + ); }) .filter(rule => lastExecutedFilter && rule.lastExecuted diff --git a/src/domain/scheduler/entities/ScheduledJob.ts b/src/domain/scheduler/entities/ScheduledJob.ts new file mode 100644 index 000000000..f745c0570 --- /dev/null +++ b/src/domain/scheduler/entities/ScheduledJob.ts @@ -0,0 +1,8 @@ +// NOTICE: This is refactored + +export type ScheduledJob = { + id: string; + nextExecution: Date; +}; + +export const DEFAULT_SCHEDULED_JOB_ID = "__default__"; diff --git a/src/scheduler/entities/SchedulerConfig.ts b/src/domain/scheduler/entities/SchedulerConfig.ts similarity index 79% rename from src/scheduler/entities/SchedulerConfig.ts rename to src/domain/scheduler/entities/SchedulerConfig.ts index a4c179841..4cea18500 100644 --- a/src/scheduler/entities/SchedulerConfig.ts +++ b/src/domain/scheduler/entities/SchedulerConfig.ts @@ -1,5 +1,6 @@ -import { Codec, Schema } from "../../utils/codec"; +import { Codec, Schema } from "../../../utils/codec"; +// NOTICE: This is refactored export const ConfigModel: Codec = Schema.object({ baseUrl: Schema.string, username: Schema.string, diff --git a/src/domain/scheduler/entities/SchedulerExecution.ts b/src/domain/scheduler/entities/SchedulerExecutionInfo.ts similarity index 55% rename from src/domain/scheduler/entities/SchedulerExecution.ts rename to src/domain/scheduler/entities/SchedulerExecutionInfo.ts index efea69c87..1c352e81a 100644 --- a/src/domain/scheduler/entities/SchedulerExecution.ts +++ b/src/domain/scheduler/entities/SchedulerExecutionInfo.ts @@ -1,4 +1,6 @@ -export interface SchedulerExecution { +// NOTICE: This is refactored + +export interface SchedulerExecutionInfo { lastExecutionDuration?: number; lastExecution?: Date; nextExecution?: Date; diff --git a/src/domain/scheduler/repositories/SchedulerExecutionInfoRepositoryConstructor.ts b/src/domain/scheduler/repositories/SchedulerExecutionInfoRepositoryConstructor.ts new file mode 100644 index 000000000..aa92a4a57 --- /dev/null +++ b/src/domain/scheduler/repositories/SchedulerExecutionInfoRepositoryConstructor.ts @@ -0,0 +1,10 @@ +import { SchedulerExecutionInfo } from "../entities/SchedulerExecutionInfo"; +import { FutureData } from "../../common/entities/Future"; + +/** + * @description This file is refactored + */ +export interface SchedulerExecutionInfoRepository { + updateExecutionInfo(executionInfo: SchedulerExecutionInfo): FutureData; + getLastExecutionInfo(): FutureData; +} diff --git a/src/domain/scheduler/repositories/SchedulerRepository.ts b/src/domain/scheduler/repositories/SchedulerRepository.ts deleted file mode 100644 index 6e5a2e1cd..000000000 --- a/src/domain/scheduler/repositories/SchedulerRepository.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { ConfigRepository } from "../../config/repositories/ConfigRepository"; -import { SchedulerExecution } from "../entities/SchedulerExecution"; - -export interface SchedulerRepositoryConstructor { - new (configRepository: ConfigRepository): SchedulerRepository; -} - -export interface SchedulerRepository { - updateLastExecution(execution: SchedulerExecution): Promise; - getLastExecution(): Promise; -} diff --git a/src/domain/scheduler/usecases/GetLastSchedulerExecutionInfoUseCase.ts b/src/domain/scheduler/usecases/GetLastSchedulerExecutionInfoUseCase.ts new file mode 100644 index 000000000..1cda40e46 --- /dev/null +++ b/src/domain/scheduler/usecases/GetLastSchedulerExecutionInfoUseCase.ts @@ -0,0 +1,15 @@ +import { FutureData } from "../../common/entities/Future"; +import { UseCase } from "../../common/entities/UseCase"; +import { SchedulerExecutionInfo } from "../entities/SchedulerExecutionInfo"; +import { SchedulerExecutionInfoRepository } from "../repositories/SchedulerExecutionInfoRepositoryConstructor"; + +/** + * @description This file is refactored + */ +export class GetLastSchedulerExecutionInfoUseCase implements UseCase { + constructor(private schedulerExecutionInfoRepository: SchedulerExecutionInfoRepository) {} + + public execute(): FutureData { + return this.schedulerExecutionInfoRepository.getLastExecutionInfo(); + } +} diff --git a/src/domain/scheduler/usecases/GetLastSchedulerExecutionUseCase.ts b/src/domain/scheduler/usecases/GetLastSchedulerExecutionUseCase.ts deleted file mode 100644 index 43eed370b..000000000 --- a/src/domain/scheduler/usecases/GetLastSchedulerExecutionUseCase.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { UseCase } from "../../common/entities/UseCase"; -import { RepositoryFactory } from "../../common/factories/RepositoryFactory"; -import { Instance } from "../../instance/entities/Instance"; -import { SchedulerExecution } from "../entities/SchedulerExecution"; - -export class GetLastSchedulerExecutionUseCase implements UseCase { - constructor(private repositoryFactory: RepositoryFactory, private localInstance: Instance) {} - - public execute(): Promise { - return this.repositoryFactory.schedulerRepository(this.localInstance).getLastExecution(); - } -} diff --git a/src/domain/scheduler/usecases/UpdateLastSchedulerExecutionUseCase.ts b/src/domain/scheduler/usecases/UpdateLastSchedulerExecutionUseCase.ts deleted file mode 100644 index 417714ed2..000000000 --- a/src/domain/scheduler/usecases/UpdateLastSchedulerExecutionUseCase.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { UseCase } from "../../common/entities/UseCase"; -import { RepositoryFactory } from "../../common/factories/RepositoryFactory"; -import { Instance } from "../../instance/entities/Instance"; -import { SchedulerExecution } from "../entities/SchedulerExecution"; - -export class UpdateLastSchedulerExecutionUseCase implements UseCase { - constructor(private repositoryFactory: RepositoryFactory, private localInstance: Instance) {} - - public execute(execution: SchedulerExecution): Promise { - return this.repositoryFactory.schedulerRepository(this.localInstance).updateLastExecution(execution); - } -} diff --git a/src/domain/scheduler/usecases/UpdateSchedulerExecutionInfoUseCase.ts b/src/domain/scheduler/usecases/UpdateSchedulerExecutionInfoUseCase.ts new file mode 100644 index 000000000..f5a1977c4 --- /dev/null +++ b/src/domain/scheduler/usecases/UpdateSchedulerExecutionInfoUseCase.ts @@ -0,0 +1,15 @@ +import { FutureData } from "../../common/entities/Future"; +import { UseCase } from "../../common/entities/UseCase"; +import { SchedulerExecutionInfo } from "../entities/SchedulerExecutionInfo"; +import { SchedulerExecutionInfoRepository } from "../repositories/SchedulerExecutionInfoRepositoryConstructor"; + +/** + * @description This file is refactored + */ +export class UpdateSchedulerExecutionInfoUseCase implements UseCase { + constructor(private schedulerExecutionInfoRepository: SchedulerExecutionInfoRepository) {} + + public execute(executionInfo: SchedulerExecutionInfo): FutureData { + return this.schedulerExecutionInfoRepository.updateExecutionInfo(executionInfo); + } +} diff --git a/src/presentation/CompositionRoot.ts b/src/presentation/CompositionRoot.ts index 0572a8653..a51b4d705 100644 --- a/src/presentation/CompositionRoot.ts +++ b/src/presentation/CompositionRoot.ts @@ -13,7 +13,7 @@ import { GitHubOctokitRepository } from "../data/packages/GitHubOctokitRepositor import { ReportsD2ApiRepository } from "../data/reports/ReportsD2ApiRepository"; import { FileRulesDefaultRepository } from "../data/rules/FileRulesDefaultRepository"; import { RulesD2ApiRepository } from "../data/rules/RulesD2ApiRepository"; -import { SchedulerD2ApiRepository } from "../data/scheduler/SchedulerD2ApiRepository"; +import { SchedulerExecutionInfoD2ApiRepository } from "../data/scheduler/SchedulerExecutionInfoD2ApiRepository"; import { SettingsD2ApiRepository } from "../data/settings/SettingsD2ApiRepository"; import { DownloadWebRepository } from "../data/storage/DownloadWebRepository"; import { StoreD2ApiRepository } from "../data/stores/StoreD2ApiRepository"; @@ -100,8 +100,8 @@ import { GetSyncRuleUseCase } from "../domain/rules/usecases/GetSyncRuleUseCase" import { ListSyncRuleUseCase } from "../domain/rules/usecases/ListSyncRuleUseCase"; import { ReadSyncRuleFilesUseCase } from "../domain/rules/usecases/ReadSyncRuleFilesUseCase"; import { SaveSyncRuleUseCase } from "../domain/rules/usecases/SaveSyncRuleUseCase"; -import { GetLastSchedulerExecutionUseCase } from "../domain/scheduler/usecases/GetLastSchedulerExecutionUseCase"; -import { UpdateLastSchedulerExecutionUseCase } from "../domain/scheduler/usecases/UpdateLastSchedulerExecutionUseCase"; +import { GetLastSchedulerExecutionInfoUseCase } from "../domain/scheduler/usecases/GetLastSchedulerExecutionInfoUseCase"; +import { UpdateSchedulerExecutionInfoUseCase } from "../domain/scheduler/usecases/UpdateSchedulerExecutionInfoUseCase"; import { GetSettingsUseCase } from "../domain/settings/GetSettingsUseCase"; import { SaveSettingsUseCase } from "../domain/settings/SaveSettingsUseCase"; import { DownloadFileUseCase } from "../domain/storage/usecases/DownloadFileUseCase"; @@ -128,7 +128,11 @@ import { TableColumnsDataStoreRepository } from "../data/table-columns/TableColu import { getD2APiFromInstance } from "../utils/d2-utils"; import { RoleD2ApiRepository } from "../data/role/RoleD2ApiRepository"; import { ValidateRolesUseCase } from "../domain/role/ValidateRolesUseCase"; +import { StorageDataStoreClient } from "../data/storage/StorageDataStoreClient"; +/** + * @todo needs refactoring + */ export class CompositionRoot { private repositoryFactory: RepositoryFactory; @@ -155,7 +159,6 @@ export class CompositionRoot { this.repositoryFactory.bind(Repositories.MetadataRepository, MetadataJSONRepository, "json"); this.repositoryFactory.bind(Repositories.TransformationRepository, TransformationD2ApiRepository); this.repositoryFactory.bind(Repositories.MappingRepository, MappingD2ApiRepository); - this.repositoryFactory.bind(Repositories.SchedulerRepository, SchedulerD2ApiRepository); this.repositoryFactory.bind(Repositories.SettingsRepository, SettingsD2ApiRepository); this.repositoryFactory.bind(Repositories.DataStoreMetadataRepository, DataStoreMetadataD2Repository); this.repositoryFactory.bind(Repositories.DhisReleasesRepository, DhisReleasesLocalRepository); @@ -404,9 +407,15 @@ export class CompositionRoot { @cache() public get scheduler() { + const dataStoreClient = new StorageDataStoreClient(this.localInstance); + return getExecute({ - getLastExecution: new GetLastSchedulerExecutionUseCase(this.repositoryFactory, this.localInstance), - updateLastExecution: new UpdateLastSchedulerExecutionUseCase(this.repositoryFactory, this.localInstance), + getLastExecutionInfo: new GetLastSchedulerExecutionInfoUseCase( + new SchedulerExecutionInfoD2ApiRepository(dataStoreClient) + ), + updateExecutionInfo: new UpdateSchedulerExecutionInfoUseCase( + new SchedulerExecutionInfoD2ApiRepository(dataStoreClient) + ), }); } diff --git a/src/presentation/react/core/components/scheduler-info/SchedulerInfo.tsx b/src/presentation/react/core/components/scheduler-info/SchedulerInfo.tsx index fc1567859..320b3016d 100644 --- a/src/presentation/react/core/components/scheduler-info/SchedulerInfo.tsx +++ b/src/presentation/react/core/components/scheduler-info/SchedulerInfo.tsx @@ -1,32 +1,26 @@ import FiberManualRecordIcon from "@material-ui/icons/FiberManualRecord"; -import React, { useCallback, useEffect, useState } from "react"; +import React, { useEffect } from "react"; import styled from "styled-components"; -import { SchedulerExecution } from "../../../../../domain/scheduler/entities/SchedulerExecution"; import i18n from "../../../../../locales"; -import { useAppContext } from "../../contexts/AppContext"; +import { useSnackbar } from "@eyeseetea/d2-ui-components"; +import { useSchedulerInfo } from "./useSchedulerInfo"; + +// NOTICE: This is refactored export interface SchedulerInfoProps { onSchedulerRun?: (timestamp: string) => void; } export const SchedulerInfo: React.FC = React.memo(props => { + const snackbar = useSnackbar(); const { onSchedulerRun } = props; - const { compositionRoot } = useAppContext(); - const [status, setStatus] = useState(false); - - const getSchedulerInfo = useCallback(() => { - compositionRoot.scheduler.getLastExecution().then(execution => { - const timestamp = execution?.lastExecution?.toISOString() ?? ""; - if (onSchedulerRun) onSchedulerRun(timestamp); - return setStatus(isRunning(execution)); - }); - }, [compositionRoot, onSchedulerRun]); + const { status, errorMessage } = useSchedulerInfo(onSchedulerRun); useEffect(() => { - getSchedulerInfo(); - const intervalId = setInterval(() => getSchedulerInfo(), 60 * 1000); - return () => clearInterval(intervalId); - }, [getSchedulerInfo, setStatus]); + if (errorMessage) { + snackbar.error(errorMessage); + } + }, [errorMessage, snackbar]); return ( @@ -48,7 +42,3 @@ const SchedulerContainer = styled.div` border: none; gap: 10px; `; - -function isRunning(info?: SchedulerExecution): boolean { - return !!info?.nextExecution && info.nextExecution >= new Date(); -} diff --git a/src/presentation/react/core/components/scheduler-info/useSchedulerInfo.ts b/src/presentation/react/core/components/scheduler-info/useSchedulerInfo.ts new file mode 100644 index 000000000..16f36e977 --- /dev/null +++ b/src/presentation/react/core/components/scheduler-info/useSchedulerInfo.ts @@ -0,0 +1,46 @@ +import { useCallback, useEffect, useState } from "react"; +import { useAppContext } from "../../contexts/AppContext"; +import { SchedulerExecutionInfo } from "../../../../../domain/scheduler/entities/SchedulerExecutionInfo"; + +// NOTICE: This is refactored + +export type SchedulerInfoState = { + status: boolean; + errorMessage: string; +}; + +export function useSchedulerInfo(onSchedulerRun?: (timestamp: string) => void): SchedulerInfoState { + const { compositionRoot } = useAppContext(); + + const [status, setStatus] = useState(false); + const [errorMessage, setErrorMessage] = useState(""); + + const isRunning = useCallback((info?: SchedulerExecutionInfo) => { + return !!info?.nextExecution && info.nextExecution >= new Date(); + }, []); + + const getSchedulerInfo = useCallback(() => { + compositionRoot.scheduler.getLastExecutionInfo().run( + lastExecutionInfo => { + const timestamp = lastExecutionInfo?.lastExecution?.toISOString() ?? ""; + if (onSchedulerRun) onSchedulerRun(timestamp); + return setStatus(isRunning(lastExecutionInfo)); + }, + err => { + console.debug(err); + setErrorMessage(err.message); + } + ); + }, [compositionRoot.scheduler, isRunning, onSchedulerRun]); + + useEffect(() => { + getSchedulerInfo(); + const intervalId = setInterval(() => getSchedulerInfo(), 60 * 1000); + return () => clearInterval(intervalId); + }, [getSchedulerInfo, setStatus]); + + return { + status: status, + errorMessage: errorMessage, + }; +} diff --git a/src/presentation/webapp/WebApp.tsx b/src/presentation/webapp/WebApp.tsx index cff3884e0..e363eff95 100644 --- a/src/presentation/webapp/WebApp.tsx +++ b/src/presentation/webapp/WebApp.tsx @@ -101,7 +101,7 @@ const App = () => { - {appConfig && } + {appConfig?.feedback && } diff --git a/src/presentation/webapp/core/pages/sync-rules-list/SyncRulesListPage.tsx b/src/presentation/webapp/core/pages/sync-rules-list/SyncRulesListPage.tsx index 2d48093be..09d925212 100644 --- a/src/presentation/webapp/core/pages/sync-rules-list/SyncRulesListPage.tsx +++ b/src/presentation/webapp/core/pages/sync-rules-list/SyncRulesListPage.tsx @@ -65,7 +65,12 @@ const config: { }, }; -const enabledFilterData = [ +type SchedulerFilter = { + id: "enabled" | "disabled"; + name: string; +}; + +const schedulerEnabledFilterData: SchedulerFilter[] = [ { id: "enabled", name: i18n.t("Enabled") }, { id: "disabled", name: i18n.t("Disabled") }, ]; @@ -86,7 +91,7 @@ export const SyncRulesListPage: React.FC = () => { const [toDelete, setToDelete] = useState([]); const [search, setSearchFilter] = useState(""); const [targetInstanceFilter, setTargetInstanceFilter] = useState(""); - const [enabledFilter, setEnabledFilter] = useState(""); + const [schedulerEnabledFilter, setSchedulerEnabledFilter] = useState(""); const [lastExecutedFilter, setLastExecutedFilter] = useState(null); const [syncReport, setSyncReport] = useState(null); const [sharingSettingsObject, setSharingSettingsObject] = useState(null); @@ -99,7 +104,7 @@ export const SyncRulesListPage: React.FC = () => { filters: { types: [type], targetInstanceFilter, - enabledFilter, + schedulerEnabledFilter, lastExecutedFilter, search, }, @@ -112,7 +117,7 @@ export const SyncRulesListPage: React.FC = () => { type, search, targetInstanceFilter, - enabledFilter, + schedulerEnabledFilter, lastExecutedFilter, sharingSettingsObject, ]); @@ -631,9 +636,9 @@ export const SyncRulesListPage: React.FC = () => { /> diff --git a/src/scheduler/LoggerLog4js.ts b/src/scheduler/LoggerLog4js.ts new file mode 100644 index 000000000..282b7b554 --- /dev/null +++ b/src/scheduler/LoggerLog4js.ts @@ -0,0 +1,45 @@ +import { configure, getLogger } from "log4js"; +import { Logger } from "./SchedulerCLI/Logger"; + +/** + * @description This file is refactored. + */ +export default class LoggerLog4js implements Logger { + constructor(isDevelopment: boolean) { + configure({ + appenders: { + out: { type: "stdout" }, + file: { type: "file", filename: "debug.log" }, + }, + categories: { default: { appenders: ["file", "out"], level: isDevelopment ? "all" : "debug" } }, + }); + } + + trace(name: string, message: string): void { + getLogger(name).trace(message); + } + + debug(name: string, message: string): void { + getLogger(name).debug(message); + } + + info(name: string, message: string): void { + getLogger(name).info(message); + } + + warn(name: string, message: string): void { + getLogger(name).warn(message); + } + + error(name: string, message: string): void { + getLogger(name).error(message); + } + + fatal(name: string, message: string): void { + getLogger(name).fatal(message); + } + + mark(name: string, message: string): void { + getLogger(name).mark(message); + } +} diff --git a/src/scheduler/Scheduler.ts b/src/scheduler/Scheduler.ts new file mode 100644 index 000000000..219c9d068 --- /dev/null +++ b/src/scheduler/Scheduler.ts @@ -0,0 +1,39 @@ +import schedule from "node-schedule"; +import { SchedulerContract } from "./SchedulerCLI/SchedulerContract"; +import { DEFAULT_SCHEDULED_JOB_ID, ScheduledJob } from "../domain/scheduler/entities/ScheduledJob"; +import { EVERY_MINUTE_FREQUENCY } from "./SchedulerCLI/SchedulerCLI"; + +/** + * @description This file is refactored. + */ +export default class Scheduler implements SchedulerContract { + public getScheduledJobs(): ScheduledJob[] { + const jobs = schedule.scheduledJobs; + const jobIds = Object.keys(jobs); + + return jobIds.map((jobId: keyof typeof jobs) => this.buildScheduledJob(jobs[jobId])); + } + + public scheduleJob(options: { + jobId?: string; + frequency?: string; + jobCallback: () => Promise; + }): ScheduledJob { + const { jobId = DEFAULT_SCHEDULED_JOB_ID, frequency = EVERY_MINUTE_FREQUENCY, jobCallback } = options; + + const job = schedule.scheduleJob(jobId, frequency, jobCallback); + + return this.buildScheduledJob(job); + } + + public cancelJob(id: string): void { + schedule.scheduledJobs[id].cancel(); + } + + private buildScheduledJob(job: schedule.Job): ScheduledJob { + return { + id: job.name, + nextExecution: job.nextInvocation(), + }; + } +} diff --git a/src/scheduler/SchedulerCLI/Logger.ts b/src/scheduler/SchedulerCLI/Logger.ts new file mode 100644 index 000000000..f47459009 --- /dev/null +++ b/src/scheduler/SchedulerCLI/Logger.ts @@ -0,0 +1,12 @@ +/** + * @description This is refactored + */ +export interface Logger { + trace(name: string, message: string): void; + debug(name: string, message: string): void; + info(name: string, message: string): void; + warn(name: string, message: string): void; + error(name: string, message: string): void; + fatal(name: string, message: string): void; + mark(name: string, message: string): void; +} diff --git a/src/scheduler/SchedulerCLI/SchedulerCLI.ts b/src/scheduler/SchedulerCLI/SchedulerCLI.ts new file mode 100644 index 000000000..3181a4162 --- /dev/null +++ b/src/scheduler/SchedulerCLI/SchedulerCLI.ts @@ -0,0 +1,212 @@ +import moment from "moment"; +import cronstrue from "cronstrue"; +import { CompositionRoot } from "../../presentation/CompositionRoot"; +import { SchedulerContract } from "./SchedulerContract"; +import { DEFAULT_SCHEDULED_JOB_ID, ScheduledJob } from "../../domain/scheduler/entities/ScheduledJob"; +import { SchedulerExecutionInfo } from "../../domain/scheduler/entities/SchedulerExecutionInfo"; +import { Logger } from "./Logger"; +import { SynchronizationRule } from "../../domain/rules/entities/SynchronizationRule"; + +// NOTICE: This is refactored + +export type SyncRuleJobConfig = { + id: string; + name: string; + frequency: string; +}; + +export const EVERY_MINUTE_FREQUENCY = "0 * * * * *"; + +/** + * @description This file is refactored + */ +export class SchedulerCLI { + constructor( + private options: { + scheduler: SchedulerContract; + compositionRoot: CompositionRoot; + logger: Logger; + } + ) {} + + public initialize(apiPath: string): void { + this.fetchTask(apiPath); + const { scheduler, logger } = this.options; + + scheduler.scheduleJob({ jobCallback: (): Promise => this.fetchTask(apiPath) }); + + logger.info("main", `Loading synchronization rules from remote server`); + } + + private async fetchTask(apiPath: string): Promise { + const { scheduler, logger } = this.options; + try { + const syncRuleJobConfigs = await this.getSyncRuleJobConfigs(); + const jobIdsToBeScheduled = syncRuleJobConfigs.map(({ id }) => id); + + const scheduledJobs = scheduler.getScheduledJobs(); + const currentJobIdsScheduled = scheduledJobs.map(({ id }) => id); + + const jobIdsToCancel = currentJobIdsScheduled.filter( + id => !jobIdsToBeScheduled.includes(id) && id !== DEFAULT_SCHEDULED_JOB_ID + ); + if (jobIdsToCancel.length > 0) { + this.cancelScheduledJobs(jobIdsToCancel, scheduledJobs); + } + + const newSyncRuleJobConfigs = syncRuleJobConfigs.filter(({ id }) => !currentJobIdsScheduled.includes(id)); + if (newSyncRuleJobConfigs.length > 0) { + this.createNewScheduledJobs(newSyncRuleJobConfigs, apiPath); + } + + // TODO: update currentJobIdsScheduled if frequency has changed: first cancel job and then schedule it again + + this.updateNextExecutionOfScheduler(scheduledJobs); + } catch (error) { + const errorMessage = typeof error === "string" ? error : JSON.stringify(error, null, 2); + logger.error("scheduler", `${errorMessage}`); + } + } + + private async synchronizationTask(ruleId: string, apiPath: string): Promise { + const { logger, compositionRoot } = this.options; + + const rule = await compositionRoot.rules.get(ruleId); + + if (!rule) return; + + const { name, frequency, builder, id: syncRule, type = "metadata" } = rule; + + try { + const readableFrequency = cronstrue.toString(frequency || ""); + logger.debug(name, `Start ${type} rule with frequency: ${readableFrequency}`); + const result = await compositionRoot.sync.prepare(type, builder); + const sync = compositionRoot.sync[type]({ ...builder, syncRule }); + + const synchronize = async () => { + for await (const { message, syncReport, done } of sync.execute()) { + if (message) logger.debug(name, message); + if (syncReport) await compositionRoot.reports.save(syncReport); + if (done && syncReport && syncReport.id) { + const reportUrl = this.buildUrl(apiPath, type, syncReport.id); + logger.debug(name, `Finished. Report available at ${reportUrl}`); + } else if (done) logger.warn(name, `Finished with errors`); + } + }; + + await result.match({ + success: async () => { + await synchronize(); + }, + error: async code => { + switch (code) { + case "PULL_REQUEST": + case "PULL_REQUEST_RESPONSIBLE": + logger.error(name, "Metadata has a custodian, unable to proceed with sync"); + break; + case "INSTANCE_NOT_FOUND": + logger.error(name, "Couldn't connect with instance"); + break; + case "NOT_AUTHORIZED": + logger.error(name, "User is not authorized to one or more instances"); + break; + default: + logger.error(name, "Unknown synchronization error"); + } + }, + }); + } catch (error) { + const errorMessage = typeof error === "string" ? error : JSON.stringify(error, null, 2); + logger.error(name, `Failed executing rule: ${errorMessage}`); + } + } + + // use same use case list rules schedulerEnabledFilter = true + private async getSyncRuleJobConfigs(): Promise { + const { logger, compositionRoot } = this.options; + + const { rows: rulesWithSchedulerEnabled } = await compositionRoot.rules.list({ + paging: false, + filters: { schedulerEnabledFilter: "enabled" }, + }); + + const syncRuleJobConfigsToBeScheduled = + this.mapSynchronizationRulesToSyncRuleJobConfigs(rulesWithSchedulerEnabled); + + logger.trace( + "scheduler", + `There are ${syncRuleJobConfigsToBeScheduled.length} valid sync rules marked to be scheduled` + ); + + return syncRuleJobConfigsToBeScheduled; + } + + private mapSynchronizationRulesToSyncRuleJobConfigs(syncRule: SynchronizationRule[]): SyncRuleJobConfig[] { + return syncRule.reduce((acc: SyncRuleJobConfig[], syncRule: SynchronizationRule): SyncRuleJobConfig[] => { + if (syncRule.frequency) { + return [ + ...acc, + { + id: syncRule.id, + name: syncRule.name, + frequency: syncRule.frequency, + }, + ]; + } else { + return acc; + } + }, []); + } + + private cancelScheduledJobs(jobIdsToCancel: string[], scheduledJobs: ScheduledJob[]): void { + const { scheduler, logger } = this.options; + + jobIdsToCancel.forEach((id: string) => { + logger.info("scheduler", `Cancelling disabled rule with id ${id}`); + const scheduledJobToCancel = scheduledJobs.find(scheduledJob => scheduledJob.id === id); + if (scheduledJobToCancel) { + scheduler.cancelJob(scheduledJobToCancel.id); + } + }); + } + + private createNewScheduledJobs(syncRuleJobConfig: SyncRuleJobConfig[], apiPath: string): void { + syncRuleJobConfig.forEach((syncRuleJobConfig: SyncRuleJobConfig): void => { + const { scheduler, logger } = this.options; + + const { id, name, frequency } = syncRuleJobConfig; + + if (id && frequency) { + const job = scheduler.scheduleJob({ + jobId: id, + frequency: frequency, + jobCallback: (): Promise => this.synchronizationTask(id, apiPath), + }); + + // Format date to keep timezone offset + const nextDate = moment(job.nextExecution.toISOString()).toISOString(true); + logger.info("scheduler", `Scheduling new sync rule ${name} (${id}) at ${nextDate}`); + } + }); + } + + private async updateNextExecutionOfScheduler(scheduledJobs: ScheduledJob[]): Promise { + const { compositionRoot } = this.options; + + const defaultScheduledJob = scheduledJobs.find(scheduledJob => scheduledJob.id === DEFAULT_SCHEDULED_JOB_ID); + const nextExecution = defaultScheduledJob?.nextExecution; + const schedulerExecutionInfo = await compositionRoot.scheduler.getLastExecutionInfo().toPromise(); + + const newSchedulerExecutionInfo: SchedulerExecutionInfo = { + ...schedulerExecutionInfo, + lastExecution: schedulerExecutionInfo.nextExecution, + nextExecution: nextExecution, + }; + + await compositionRoot.scheduler.updateExecutionInfo(newSchedulerExecutionInfo).toPromise(); + } + + private buildUrl(apiPath: string, type: string, id: string): string { + return `${apiPath}/apps/MetaData-Synchronization/index.html#/history/${type}/${id}`; + } +} diff --git a/src/scheduler/SchedulerCLI/SchedulerContract.ts b/src/scheduler/SchedulerCLI/SchedulerContract.ts new file mode 100644 index 000000000..da296e7c1 --- /dev/null +++ b/src/scheduler/SchedulerCLI/SchedulerContract.ts @@ -0,0 +1,10 @@ +import { ScheduledJob } from "../../domain/scheduler/entities/ScheduledJob"; + +/** + * @description This is refactored + */ +export interface SchedulerContract { + getScheduledJobs(): ScheduledJob[]; + scheduleJob(options: { jobId?: string; frequency?: string; jobCallback: () => Promise }): ScheduledJob; + cancelJob(id: string): void; +} diff --git a/src/scheduler/SchedulerCLI/__tests__/SchedulerCLI.spec.ts b/src/scheduler/SchedulerCLI/__tests__/SchedulerCLI.spec.ts new file mode 100644 index 000000000..68f3348d0 --- /dev/null +++ b/src/scheduler/SchedulerCLI/__tests__/SchedulerCLI.spec.ts @@ -0,0 +1,121 @@ +import { Either } from "../../../domain/common/entities/Either"; +import { FutureData, Future } from "../../../domain/common/entities/Future"; +import { SynchronizationRule } from "../../../domain/rules/entities/SynchronizationRule"; +import { ListSyncRuleUseCaseParams } from "../../../domain/rules/usecases/ListSyncRuleUseCase"; +import { SchedulerExecutionInfo } from "../../../domain/scheduler/entities/SchedulerExecutionInfo"; +import { PrepareSyncError } from "../../../domain/synchronization/usecases/PrepareSyncUseCase"; +import { CompositionRoot } from "../../../presentation/CompositionRoot"; +import { Logger } from "../Logger"; +import { SchedulerCLI, SyncRuleJobConfig } from "../SchedulerCLI"; +import { SchedulerContract } from "../SchedulerContract"; +import { getSynchronizationRule } from "./data/getSynchronizationRule"; +import { MockLogger } from "./mocks/MockLogger"; +import { MockScheduler } from "./mocks/MockScheduler"; + +// TODO: This is the first version of tests, needs a refactor to use ts-mockito instead + +const API_PATH = "baseUrl/api/"; + +describe("SchedulerCLI", () => { + let schedulerCLI: SchedulerCLI; + let mockScheduler: SchedulerContract; + let mockCompositionRoot: CompositionRoot; + let mockLogger: Logger; + + beforeEach(() => { + jest.clearAllMocks(); + mockScheduler = new MockScheduler(); + mockCompositionRoot = new MockCompositionRoot() as unknown as CompositionRoot; + mockLogger = new MockLogger(); + + schedulerCLI = new SchedulerCLI({ + scheduler: mockScheduler, + compositionRoot: mockCompositionRoot as unknown as CompositionRoot, + logger: mockLogger, + }); + }); + + describe("initialize", () => { + it("should fetch tasks, schedule a job and log a message", async () => { + const spyGetSyncRuleJobConfigs = jest.spyOn(mockCompositionRoot.rules, "list"); + const spyScheduleJob = jest.spyOn(mockScheduler, "scheduleJob"); + const spyLoggerInfo = jest.spyOn(mockLogger, "info"); + + schedulerCLI.initialize(API_PATH); + + expect(spyGetSyncRuleJobConfigs).toHaveBeenCalled(); + expect(spyScheduleJob).toHaveBeenCalled(); + expect(spyLoggerInfo).toHaveBeenCalledWith("main", "Loading synchronization rules from remote server"); + }); + + it("should call compositionRoot.rules.list with correct params and return correct SyncRuleJobConfig[]", async () => { + const spyListRules = jest.spyOn(mockCompositionRoot.rules, "list"); + + const expectedConfigs: SyncRuleJobConfig[] = [getSyncRuleJobConfig()]; + + const result = await schedulerCLI["getSyncRuleJobConfigs"](); + + expect(spyListRules).toHaveBeenCalledWith({ + paging: false, + filters: { schedulerEnabledFilter: "enabled" }, + }); + expect(result).toEqual(expectedConfigs); + }); + }); +}); + +function getSyncRuleJobConfig(): SyncRuleJobConfig { + return { + id: "sync-rule-id", + name: "Sync Rule", + frequency: "0 */2 * * * *", + }; +} + +/** + * @deprecated this is only a workaround, we need to replace this by a correct implementation of testing CompositionRoot + */ +class MockCompositionRoot { + rules = { + get(): Promise { + return Promise.resolve(getSynchronizationRule()); + }, + + list(params: ListSyncRuleUseCaseParams): Promise<{ rows: SynchronizationRule[] }> { + const { schedulerEnabledFilter = null } = params.filters || {}; + const isSchedulerEnabled = schedulerEnabledFilter === "enabled"; + return Promise.resolve({ rows: [getSynchronizationRule(isSchedulerEnabled)] }); + }, + }; + + scheduler = { + getLastExecutionInfo(): FutureData { + return Future.success({ lastExecution: new Date() }); + }, + + updateExecutionInfo(_executionInfo: SchedulerExecutionInfo): FutureData { + return Future.success(undefined); + }, + }; + + sync = { + prepare(): Promise> { + return Promise.resolve(Either.success(undefined)); + }, + + metadata() { + return { + execute: async function* () { + yield { message: "Task started", done: false }; + yield { message: "Task finished", done: true, syncReport: { id: "report1" } }; + }, + }; + }, + }; + + reports = { + save(): Promise { + return Promise.resolve(); + }, + }; +} diff --git a/src/scheduler/SchedulerCLI/__tests__/data/getSynchronizationRule.ts b/src/scheduler/SchedulerCLI/__tests__/data/getSynchronizationRule.ts new file mode 100644 index 000000000..b91667655 --- /dev/null +++ b/src/scheduler/SchedulerCLI/__tests__/data/getSynchronizationRule.ts @@ -0,0 +1,53 @@ +import { SynchronizationRule, SynchronizationRuleData } from "../../../../domain/rules/entities/SynchronizationRule"; + +export function getSynchronizationRule(isSchedulerEnabled = false): SynchronizationRule { + const data: SynchronizationRuleData = { + id: "sync-rule-id", + code: "Sync Rule", + name: "Sync Rule", + type: "metadata", + user: { id: "user-id", name: "User Name" }, + created: new Date(), + enabled: isSchedulerEnabled, + frequency: "0 */2 * * * *", + description: "Sync Rule", + lastUpdated: new Date(), + lastExecuted: new Date(), + publicAccess: "rw------", + userAccesses: [], + lastUpdatedBy: { id: "user-id", name: "User Name" }, + lastExecutedBy: { id: "user-id", name: "User Name" }, + targetInstances: ["target-instance-id"], + userGroupAccesses: [], + lastSuccessfulSync: new Date(), + builder: { + dataParams: { + dryRun: false, + allTEIs: true, + strategy: "NEW_AND_UPDATES", + allEvents: true, + enableAggregation: false, + allAttributeCategoryOptions: true, + }, + syncParams: { + mergeMode: "MERGE", + atomicMode: "ALL", + importMode: "COMMIT", + enableMapping: false, + importStrategy: "CREATE_AND_UPDATE", + metadataModelsSyncAll: [], + includeSharingSettings: true, + removeOrgUnitReferences: false, + useDefaultIncludeExclude: true, + }, + excludedIds: [], + filterRules: [], + metadataIds: ["metadata-ids"], + metadataTypes: ["optionSets"], + originInstance: "LOCAL", + targetInstances: ["target-instance-id"], + }, + }; + + return SynchronizationRule.build(data); +} diff --git a/src/scheduler/SchedulerCLI/__tests__/mocks/MockLogger.ts b/src/scheduler/SchedulerCLI/__tests__/mocks/MockLogger.ts new file mode 100644 index 000000000..d190adae7 --- /dev/null +++ b/src/scheduler/SchedulerCLI/__tests__/mocks/MockLogger.ts @@ -0,0 +1,25 @@ +import { Logger } from "../../Logger"; + +export class MockLogger implements Logger { + trace(name: string, message: string): void { + console.debug(`[TRACE] ${name}: ${message}`); + } + fatal(name: string, message: string): void { + console.error(`[FATAL] ${name}: ${message}`); + } + mark(name: string, message: string): void { + console.debug(`[MARK] ${name}: ${message}`); + } + debug(...args: any[]): void { + console.debug(`[DEBUG]`, ...args); + } + error(...args: any[]): void { + console.error(`[ERROR]`, ...args); + } + info(...args: any[]): void { + console.debug(`[INFO]`, ...args); + } + warn(...args: any[]): void { + console.warn(`[WARN]`, ...args); + } +} diff --git a/src/scheduler/SchedulerCLI/__tests__/mocks/MockScheduler.ts b/src/scheduler/SchedulerCLI/__tests__/mocks/MockScheduler.ts new file mode 100644 index 000000000..4424f3e96 --- /dev/null +++ b/src/scheduler/SchedulerCLI/__tests__/mocks/MockScheduler.ts @@ -0,0 +1,26 @@ +import { ScheduledJob, DEFAULT_SCHEDULED_JOB_ID } from "../../../../domain/scheduler/entities/ScheduledJob"; +import { SchedulerContract } from "../../SchedulerContract"; + +export class MockScheduler implements SchedulerContract { + private jobs: ScheduledJob[] = []; + + getScheduledJobs(): ScheduledJob[] { + return this.jobs; + } + + scheduleJob(options: { jobId?: string; frequency?: string; jobCallback: () => Promise }): ScheduledJob { + const { jobId = DEFAULT_SCHEDULED_JOB_ID } = options; + + const job: ScheduledJob = { + id: jobId, + nextExecution: new Date(), + }; + + this.jobs = [...this.jobs, job]; + return job; + } + + cancelJob(id: string): void { + this.jobs = this.jobs.filter(job => job.id !== id); + } +} diff --git a/src/scheduler/cli.ts b/src/scheduler/cli.ts index c0631243c..9333816ad 100644 --- a/src/scheduler/cli.ts +++ b/src/scheduler/cli.ts @@ -1,24 +1,19 @@ import { command, option, run, string } from "cmd-ts"; import "dotenv/config"; import fs from "fs"; -import { configure, getLogger } from "log4js"; import path from "path"; -import { Future, FutureData } from "../domain/common/entities/Future"; import { Instance } from "../domain/instance/entities/Instance"; import { CompositionRoot } from "../presentation/CompositionRoot"; import { D2Api } from "../types/d2-api"; -import { ConfigModel, SchedulerConfig } from "./entities/SchedulerConfig"; -import Scheduler from "./scheduler"; +import { ConfigModel, SchedulerConfig } from "../domain/scheduler/entities/SchedulerConfig"; +import Scheduler from "./Scheduler"; +import LoggerLog4js from "./LoggerLog4js"; +import { SchedulerCLI } from "./SchedulerCLI/SchedulerCLI"; +import { Future, FutureData } from "../domain/common/entities/Future"; -const development = process.env.NODE_ENV === "development"; +// NOTICE: This file is refactored -configure({ - appenders: { - out: { type: "stdout" }, - file: { type: "file", filename: "debug.log" }, - }, - categories: { default: { appenders: ["file", "out"], level: development ? "all" : "debug" } }, -}); +const isDevelopment = process.env.NODE_ENV === "development"; const checkMigrations = (compositionRoot: CompositionRoot): FutureData => { return Future.fromPromise(compositionRoot.migrations.hasPending()) @@ -27,7 +22,9 @@ const checkMigrations = (compositionRoot: CompositionRoot): FutureData }) .flatMap(pendingMigrations => { if (pendingMigrations) { - return Future.error("There are pending migrations, unable to continue"); + return Future.error( + new Error("There are pending migrations. Please run them before starting the script") + ); } return Future.success(pendingMigrations); @@ -47,14 +44,17 @@ async function main() { }), }, handler: async args => { + const logger = new LoggerLog4js(isDevelopment); + try { const text = fs.readFileSync(args.config, "utf8"); const contents = JSON.parse(text); const config = ConfigModel.unsafeDecode(contents); - await start(config); - } catch (err) { - getLogger("main").fatal(err); + await start(config, logger); + } catch (error) { + const errorMessage = typeof error === "string" ? error : JSON.stringify(error, null, 2); + logger.fatal("main", `${errorMessage}`); process.exit(1); } }, @@ -63,10 +63,10 @@ async function main() { run(cmd, process.argv.slice(2)); } -const start = async (config: SchedulerConfig): Promise => { +const start = async (config: SchedulerConfig, logger: LoggerLog4js): Promise => { const { baseUrl, username, password, encryptionKey } = config; if (!baseUrl || !username || !password || !encryptionKey) { - getLogger("main").fatal("Missing fields from configuration file"); + logger.fatal("main", "Missing fields from configuration file"); return; } @@ -87,10 +87,18 @@ const start = async (config: SchedulerConfig): Promise => { await checkMigrations(compositionRoot).toPromise(); const welcomeMessage = `Script initialized on ${baseUrl} with user ${username}`; - getLogger("main").info("-".repeat(welcomeMessage.length)); - getLogger("main").info(welcomeMessage); + logger.info("main", "-".repeat(welcomeMessage.length)); + logger.info("main", welcomeMessage); + + const scheduler = new Scheduler(); + + const schedulerCLI = new SchedulerCLI({ + scheduler: scheduler, + logger: logger, + compositionRoot: compositionRoot, + }); - new Scheduler(api, compositionRoot).initialize(); + schedulerCLI.initialize(api.apiPath); }; main(); diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts deleted file mode 100644 index b3e5d51a8..000000000 --- a/src/scheduler/scheduler.ts +++ /dev/null @@ -1,114 +0,0 @@ -import cronstrue from "cronstrue"; -import _ from "lodash"; -import { getLogger } from "log4js"; -import moment from "moment"; -import schedule from "node-schedule"; -import { SynchronizationRule } from "../domain/rules/entities/SynchronizationRule"; -import { CompositionRoot } from "../presentation/CompositionRoot"; -import { D2Api } from "../types/d2-api"; - -const DEFAULT_CODE = "__default__"; - -export default class Scheduler { - constructor(private api: D2Api, private compositionRoot: CompositionRoot) {} - - private synchronizationTask = async (id: string): Promise => { - const rule = await this.compositionRoot.rules.get(id); - if (!rule) return; - - const { name, frequency, builder, id: syncRule, type = "metadata" } = rule; - - try { - const readableFrequency = cronstrue.toString(frequency || ""); - getLogger(name).debug(`Start ${type} rule with frequency: ${readableFrequency}`); - const result = await this.compositionRoot.sync.prepare(type, builder); - const sync = this.compositionRoot.sync[type]({ ...builder, syncRule }); - - const synchronize = async () => { - for await (const { message, syncReport, done } of sync.execute()) { - if (message) getLogger(name).debug(message); - if (syncReport) await this.compositionRoot.reports.save(syncReport); - if (done && syncReport && syncReport.id) { - const reportUrl = this.buildUrl(type, syncReport.id); - getLogger(name).debug(`Finished. Report available at ${reportUrl}`); - } else if (done) getLogger(name).warn(`Finished with errors`); - } - }; - - await result.match({ - success: async () => { - await synchronize(); - }, - error: async code => { - switch (code) { - case "PULL_REQUEST": - case "PULL_REQUEST_RESPONSIBLE": - getLogger(name).error("Metadata has a custodian, unable to proceed with sync"); - break; - case "INSTANCE_NOT_FOUND": - getLogger(name).error("Couldn't connect with instance"); - break; - case "NOT_AUTHORIZED": - getLogger(name).error("User is not authorized to one or more instances"); - break; - default: - getLogger(name).error("Unknown synchronization error"); - } - }, - }); - } catch (error: any) { - getLogger(name).error(`Failed executing rule`, error); - } - }; - - private fetchTask = async (): Promise => { - try { - const { rows: rules } = await this.compositionRoot.rules.list({ paging: false }); - - const jobs = _.filter(rules, rule => rule.enabled); - const enabledJobIds = jobs.map(({ id }) => id); - getLogger("scheduler").trace(`There are ${jobs.length} total jobs scheduled`); - - // Cancel disabled jobs that were scheduled - const currentJobIds = _.keys(schedule.scheduledJobs); - const newJobs = _.reject(jobs, ({ id }) => currentJobIds.includes(id)); - const idsToCancel = _.difference(currentJobIds, enabledJobIds, [DEFAULT_CODE]); - idsToCancel.forEach((id: string) => { - getLogger("scheduler").info(`Cancelling disabled rule with id ${id}`); - schedule.scheduledJobs[id].cancel(); - }); - - // Create or update enabled jobs - newJobs.forEach((syncRule: SynchronizationRule): void => { - const { id, name, frequency } = syncRule; - - if (id && frequency) { - const job = schedule.scheduleJob(id, frequency, (): Promise => this.synchronizationTask(id)); - // Format date to keep timezone offset - const nextDate = moment(job.nextInvocation().toISOString()).toISOString(true); - getLogger("scheduler").info(`Scheduling new sync rule ${name} (${id}) at ${nextDate}`); - } - }); - - const nextExecution = schedule.scheduledJobs[DEFAULT_CODE].nextInvocation(); - const lastExecution = await this.compositionRoot.scheduler.getLastExecution(); - await this.compositionRoot.scheduler.updateLastExecution({ ...lastExecution, nextExecution }); - } catch (error) { - getLogger("scheduler").error(error); - } - }; - - private buildUrl(type: string, id: string): string { - return `${this.api.apiPath}/apps/MetaData-Synchronization/index.html#/history/${type}/${id}`; - } - - public initialize(): void { - // Execute fetch task immediately - this.fetchTask(); - - // Schedule periodic fetch task every minute - schedule.scheduleJob(DEFAULT_CODE, "0 * * * * *", this.fetchTask); - - getLogger("main").info(`Loading synchronization rules from remote server`); - } -} diff --git a/src/types/d2-api.ts b/src/types/d2-api.ts index 116bf6f57..c9fba2220 100644 --- a/src/types/d2-api.ts +++ b/src/types/d2-api.ts @@ -11,3 +11,5 @@ export const getMockApi = getMockApiFromClass(D2Api); export const API_VERSION = 36; export type FieldsOf = GetOptionValue["fields"]; + +export { CancelableResponse } from "@eyeseetea/d2-api"; diff --git a/yarn.lock b/yarn.lock index 3eb41061a..c05086ed1 100644 --- a/yarn.lock +++ b/yarn.lock @@ -9314,7 +9314,7 @@ flush-write-stream@^1.0.0, flush-write-stream@^1.0.2: inherits "^2.0.3" readable-stream "^2.3.6" -fluture@14.0.0, fluture@^14.0.0: +fluture@^14.0.0: version "14.0.0" resolved "https://registry.yarnpkg.com/fluture/-/fluture-14.0.0.tgz#81e1cae996a262e214aa2a8d7845fe1359018e3b" integrity sha512-pENtLF948a8DfduVKugT8edTAbFi4rBS94xjHwzLanQqIu5PYtLGl+xqs6H8TaIRL7z/B0cDpswdINzH/HRUGA== @@ -15842,6 +15842,11 @@ readline-sync@^1.4.1: resolved "https://registry.yarnpkg.com/readline-sync/-/readline-sync-1.4.10.tgz#41df7fbb4b6312d673011594145705bf56d8873b" integrity sha512-gNva8/6UAe8QYepIQH/jQ2qn91Qj0B9sYjMBBs3QOB8F2CXcKgLxQaJRP76sWVRQt+QU+8fAkCbCvjjMFu7Ycw== +real-cancellable-promise@^1.2.1: + version "1.2.1" + resolved "https://registry.yarnpkg.com/real-cancellable-promise/-/real-cancellable-promise-1.2.1.tgz#a6cc6c887597e44426c157534ca739ae148cbc30" + integrity sha512-JwhiWJTMMyzFYfpKsiSb8CyQktCi1MZ8ZBn3wXvq28qXDh8Y5dM7RYzgW3r6SV22JTEcof8pRsvDp4GxLmGIxg== + recompose@^0.26.0: version "0.26.0" resolved "https://registry.yarnpkg.com/recompose/-/recompose-0.26.0.tgz#9babff039cb72ba5bd17366d55d7232fbdfb2d30"