Skip to content

[RORDEV-1410] Data stream audit sink setup improvements #1089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: epic/RORDEV-1263
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,50 @@ import monix.eval.Task
import org.apache.logging.log4j.scala.Logging
import tech.beshu.ror.accesscontrol.domain.{DataStreamName, RorAuditDataStream, TemplateName}
import tech.beshu.ror.es.DataStreamService
import tech.beshu.ror.es.DataStreamService.DataStreamSettings
import tech.beshu.ror.es.DataStreamService.DataStreamSettings.*
import tech.beshu.ror.es.DataStreamService.{DataStreamSettings, DataStreamSetupResult}
import tech.beshu.ror.implicits.*
import tech.beshu.ror.utils.RefinedUtils.*

import java.util.concurrent.TimeUnit

final class AuditDataStreamCreator(services: NonEmptyList[DataStreamService]) extends Logging {

def createIfNotExists(dataStreamName: RorAuditDataStream): Task[Unit] = {
services.toList.traverse(createIfNotExists(_, dataStreamName)).map((_: List[Unit]) => ())
def createIfNotExists(dataStreamName: RorAuditDataStream): Task[Either[String, Unit]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's create at least type def for the String (aka Message or sth like that)

services
.toList
.map(createIfNotExists(_, dataStreamName))
.sequence
.map(_.sequence.map((_: List[Unit]) => ()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any monoid here, so I guess we miss potential error messages (when more than one service fails)

}

private def createIfNotExists(service: DataStreamService, dataStreamName: RorAuditDataStream): Task[Unit] = {
private def createIfNotExists(service: DataStreamService, dataStreamName: RorAuditDataStream): Task[Either[String, Unit]] = {
service
.checkDataStreamExists(dataStreamName.dataStream)
.flatMap {
case true =>
Task.delay(logger.info(s"Data stream ${dataStreamName.dataStream.show} already exists"))
.as(Right(()))
case false =>
val settings = defaultSettingsFor(dataStreamName.dataStream)
setupDataStream(service, settings)
}
}

private def setupDataStream(service: DataStreamService, settings: DataStreamSettings): Task[Unit] = {
private def setupDataStream(service: DataStreamService, settings: DataStreamSettings): Task[Either[String, Unit]] = {
for {
_ <- Task.delay(logger.info(s"Trying to setup ROR audit data stream ${settings.dataStreamName.show} with settings.."))
_ <- service.fullySetupDataStream(settings)
_ <- Task.delay(logger.info(s"ROR audit data stream ${settings.dataStreamName.show} created."))
} yield ()
_ <- Task.delay(logger.info(s"Trying to setup ROR audit data stream ${settings.dataStreamName.show} with default settings.."))
result <- service.fullySetupDataStream(settings)
finalResult <- result match {
case DataStreamSetupResult.Success =>
Task.delay(logger.info(s"ROR audit data stream ${settings.dataStreamName.show} created."))
.as(Right(()))
case DataStreamSetupResult.Failure(reason) =>
val message = s"Failed to setup ROR audit data stream ${settings.dataStreamName.show}. Reason: ${reason.show}"
Task.delay(logger.error(message))
.as(Left(message))
}
} yield finalResult
}

private def defaultSettingsFor(dataStreamName: DataStreamName.Full) = {
Expand Down Expand Up @@ -112,4 +125,3 @@ final class AuditDataStreamCreator(services: NonEmptyList[DataStreamService]) ex
private def metadata(description: String) = Map("description" -> description)

}

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ object EsDataStreamBasedAuditSink {
auditSinkService
.dataStreamCreator
.createIfNotExists(rorAuditDataStream)
.map((_: Unit) => new EsDataStreamBasedAuditSink(serializer, rorAuditDataStream, auditSinkService))
.flatMap {
case Right(()) => Task.delay(new EsDataStreamBasedAuditSink(serializer, rorAuditDataStream, auditSinkService))
case Left(errorMsg) => Task.raiseError(new IllegalStateException(errorMsg))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it an illegal state? I think it's possible case

}
}
}
100 changes: 90 additions & 10 deletions core/src/main/scala/tech/beshu/ror/es/DataStreamService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,122 @@
*/
package tech.beshu.ror.es

import cats.data.NonEmptyList
import cats.data.{EitherT, NonEmptyList}
import eu.timepit.refined.types.numeric.PosInt
import eu.timepit.refined.types.string.NonEmptyString
import monix.eval.Task
import tech.beshu.ror.accesscontrol.domain.{DataStreamName, TemplateName}
import tech.beshu.ror.es.DataStreamService.CreationResult.*
import tech.beshu.ror.es.DataStreamService.DataStreamSettings.*
import tech.beshu.ror.es.DataStreamService.{CreationResult, DataStreamSettings}
import tech.beshu.ror.es.DataStreamService.{CreationResult, DataStreamSettings, DataStreamSetupResult}
import tech.beshu.ror.implicits.*
import tech.beshu.ror.utils.DurationOps.PositiveFiniteDuration
import tech.beshu.ror.utils.ScalaOps.retryBackoffEither

import scala.concurrent.duration.*

trait DataStreamService {

final def fullySetupDataStream(settings: DataStreamSettings): Task[Unit] = {
final def fullySetupDataStream(settings: DataStreamSettings): Task[DataStreamSetupResult] = {
for {
_ <- createIndexLifecyclePolicy(settings.lifecyclePolicy)
_ <- createComponentTemplateForMappings(settings.mappings)
_ <- createComponentTemplateForIndex(settings.componentSettings)
_ <- createIndexTemplate(settings.templateSettings)
_ <- createDataStream(settings.dataStreamName)
} yield ()
}
_ <- createIfAbsent(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can format it like that:

    _ <- createIfAbsent(
        checkIfResourceExists = checkIndexLifecyclePolicyExists(settings.lifecyclePolicy.id),
        createResource = createIndexLifecyclePolicy(settings.lifecyclePolicy),
        onNotAcknowledged = Failure(s"Unable to determine if the index lifecycle policy with ID '${settings.lifecyclePolicy.id.show}' has been created")
      )

checkIndexLifecyclePolicyExists(settings.lifecyclePolicy.id),
createIndexLifecyclePolicy(settings.lifecyclePolicy),
DataStreamSetupResult.Failure(s"Unable to determine if the index lifecycle policy with ID '${settings.lifecyclePolicy.id.show}' has been created")
)
_ <- createIfAbsent(
checkComponentTemplateExists(settings.mappings.templateName),
createComponentTemplateForMappings(settings.mappings),
DataStreamSetupResult.Failure(s"Unable to determine if component template with ID '${settings.mappings.templateName.show}' has been created")
)
_ <- createIfAbsent(
checkComponentTemplateExists(settings.componentSettings.templateName),
createComponentTemplateForIndex(settings.componentSettings),
DataStreamSetupResult.Failure(s"Unable to determine if component template with ID '${settings.componentSettings.templateName.show}' has been created")
)
_ <- createIfAbsent(
checkIndexTemplateExists(settings.templateSettings.templateName),
createIndexTemplate(settings.templateSettings),
DataStreamSetupResult.Failure(s"Unable to determine if index template with ID '${settings.templateSettings.templateName.show}' has been created")
)
_ <- createIfAbsent(
checkDataStreamExists(settings.dataStreamName),
createDataStream(settings.dataStreamName),
DataStreamSetupResult.Failure(s"Unable to determine if data stream with ID '${settings.dataStreamName.show}' has been created")
)
} yield DataStreamSetupResult.Success
}.merge

def checkDataStreamExists(dataStreamName: DataStreamName.Full): Task[Boolean]

protected def createDataStream(dataStreamName: DataStreamName.Full): Task[CreationResult]

protected def checkIndexLifecyclePolicyExists(policyId: NonEmptyString): Task[Boolean] = Task.pure(false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the default implementation? I'm not sure if we need it
(here, and below too)


protected def createIndexLifecyclePolicy(policy: LifecyclePolicy): Task[CreationResult]

protected def checkComponentTemplateExists(templateName: TemplateName): Task[Boolean] = Task.pure(false)

protected def createComponentTemplateForMappings(settings: ComponentTemplateMappings): Task[CreationResult]

protected def createComponentTemplateForIndex(settings: ComponentTemplateSettings): Task[CreationResult]

protected def checkIndexTemplateExists(templateName: TemplateName): Task[Boolean] = Task.pure(false)

protected def createIndexTemplate(settings: IndexTemplateSettings): Task[CreationResult]

private def createIfAbsent(checkIfResourceExists: Task[Boolean],
createResource: Task[CreationResult],
onNotAcknowledged: => DataStreamSetupResult.Failure): EitherT[Task, DataStreamSetupResult.Failure, Unit] = EitherT {
checkIfResourceExists
.flatMap {
case true =>
Task.pure(Acknowledged)
case false =>
createResourceWithConfirmation(checkIfResourceExists, createResource)
}
.map {
case CreationResult.Acknowledged => Right(())
case CreationResult.NotAcknowledged => Left(onNotAcknowledged)
}
}

private def createResourceWithConfirmation(checkIfResourceExists: Task[Boolean],
createResource: Task[CreationResult]): Task[CreationResult] = {
createResource
.flatMap {
case Acknowledged =>
Task.pure(Acknowledged)
case NotAcknowledged =>
withRetries(
checkIfResourceExists.map(exists => Either.cond(exists, Acknowledged, NotAcknowledged))
).map(_.merge)
}
}

private def withRetries[E, A](source: => Task[Either[E, A]]) =
retryBackoffEither(
source = source,
maxRetries = retryConfig.maxRetries,
firstDelay = retryConfig.initialDelay,
backOffScaler = retryConfig.backoffScaler
)

protected val retryConfig: RetryConfig = RetryConfig(initialDelay = 500.milliseconds, backoffScaler = 2, maxRetries = 5)

protected case class RetryConfig(initialDelay: FiniteDuration, backoffScaler: Int, maxRetries: Int)
}

object DataStreamService {

sealed trait DataStreamSetupResult

object DataStreamSetupResult {
case object Success extends DataStreamSetupResult

final case class Failure(reason: String) extends DataStreamSetupResult
}

final case class DataStreamSettings(dataStreamName: DataStreamName.Full,
lifecyclePolicy: LifecyclePolicy,
mappings: ComponentTemplateMappings,
Expand Down
Loading
Loading