Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
grouzen committed Aug 19, 2023
1 parent 939d21e commit 2f0f524
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package zio.telemetry.opentelemetry.context
import io.opentelemetry.context.Context
import zio._

trait ContextStorage {
sealed trait ContextStorage {

def get(implicit trace: Trace): UIO[Context]

Expand All @@ -20,6 +20,59 @@ trait ContextStorage {

object ContextStorage {

final class FiberRefContextStorage(private[zio] val ref: FiberRef[Context]) extends ContextStorage {

override def get(implicit trace: Trace): UIO[Context] =
ref.get

override def set(context: Context)(implicit trace: Trace): UIO[Unit] =
ref.set(context)

override def getAndSet(context: Context)(implicit trace: Trace): UIO[Context] =
ref.getAndSet(context)

override def updateAndGet(f: Context => Context)(implicit trace: Trace): UIO[Context] =
ref.updateAndGet(f)

override def locally[R, E, A](context: Context)(zio: ZIO[R, E, A])(implicit
trace: Trace
): ZIO[R, E, A] =
ref.locally(context)(zio)

override def locallyScoped(context: Context)(implicit trace: Trace): ZIO[Scope, Nothing, Unit] =
ref.locallyScoped(context)
}

final class OpenTelemetryContextStorage extends ContextStorage {

override def get(implicit trace: Trace): UIO[Context] =
ZIO.succeed(Context.current())

override def set(context: Context)(implicit trace: Trace): UIO[Unit] =
ZIO.succeed(context.makeCurrent()).unit

override def getAndSet(context: Context)(implicit trace: Trace): UIO[Context] =
ZIO.succeed {
val old = Context.current()
val _ = context.makeCurrent()
old
}.uninterruptible

override def updateAndGet(f: Context => Context)(implicit trace: Trace): UIO[Context] =
ZIO.succeed {
val updated = f(Context.current())
val _ = updated.makeCurrent()
updated
}.uninterruptible

override def locally[R, E, A](context: Context)(zio: ZIO[R, E, A])(implicit trace: Trace): ZIO[R, E, A] =
ZIO.acquireReleaseWith(get <* set(context))(set)(_ => zio)

override def locallyScoped(context: Context)(implicit trace: Trace): ZIO[Scope, Nothing, Unit] =
ZIO.acquireRelease(get <* set(context))(set).unit

}

/**
* The main one. Uses [[FiberRef]] as a [[ContextStorage]].
*/
Expand All @@ -28,29 +81,7 @@ object ContextStorage {
FiberRef
.make[Context](Context.root())
.flatMap { ref =>
ZIO.succeed {
new ContextStorage {
override def get(implicit trace: Trace): UIO[Context] =
ref.get

override def set(context: Context)(implicit trace: Trace): UIO[Unit] =
ref.set(context)

override def getAndSet(context: Context)(implicit trace: Trace): UIO[Context] =
ref.getAndSet(context)

override def updateAndGet(f: Context => Context)(implicit trace: Trace): UIO[Context] =
ref.updateAndGet(f)

override def locally[R, E, A](context: Context)(zio: ZIO[R, E, A])(implicit
trace: Trace
): ZIO[R, E, A] =
ref.locally(context)(zio)

override def locallyScoped(context: Context)(implicit trace: Trace): ZIO[Scope, Nothing, Unit] =
ref.locallyScoped(context)
}
}
ZIO.succeed(new FiberRefContextStorage(ref))
}
)

Expand All @@ -59,35 +90,6 @@ object ContextStorage {
* [[https://github.com/open-telemetry/opentelemetry-java-instrumentation OTEL instrumentation agent]] is used.
*/
val openTelemetryContext: ULayer[ContextStorage] =
ZLayer.succeed {
new ContextStorage {
override def get(implicit trace: Trace): UIO[Context] =
ZIO.succeed(Context.current())

override def set(context: Context)(implicit trace: Trace): UIO[Unit] =
ZIO.succeed(context.makeCurrent()).unit

override def getAndSet(context: Context)(implicit trace: Trace): UIO[Context] =
ZIO.succeed {
val old = Context.current()
val _ = context.makeCurrent()
old
}.uninterruptible

override def updateAndGet(f: Context => Context)(implicit trace: Trace): UIO[Context] =
ZIO.succeed {
val updated = f(Context.current())
val _ = updated.makeCurrent()
updated
}.uninterruptible

override def locally[R, E, A](context: Context)(zio: ZIO[R, E, A])(implicit trace: Trace): ZIO[R, E, A] =
ZIO.acquireReleaseWith(get <* set(context))(set)(_ => zio)

override def locallyScoped(context: Context)(implicit trace: Trace): ZIO[Scope, Nothing, Unit] =
ZIO.acquireRelease(get <* set(context))(set).unit

}
}
ZLayer.succeed(new OpenTelemetryContextStorage)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package zio.telemetry.opentelemetry.logging

import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.api.logs.{Logger, LoggerProvider}
import io.opentelemetry.context.Context
import zio._
import zio.telemetry.opentelemetry.context.ContextStorage

object Logging {

def live(
instrumentationScopeName: String,
logLevel: LogLevel = LogLevel.Info
): ZLayer[ContextStorage with LoggerProvider, Nothing, Unit] =
ZLayer
.fromZIO(
for {
loggerProvider <- ZIO.service[LoggerProvider]
contextStorage <- ZIO.service[ContextStorage]
logger <- ZIO.succeed(
zioLogger(instrumentationScopeName)(contextStorage, loggerProvider)
.filterLogLevel(l => l >= logLevel)
)
} yield logger
)
.flatMap(env => Runtime.addLogger(env.get))

private def zioLogger(instrumentationScopeName: String)(
contextStorage: ContextStorage,
loggerProvider: LoggerProvider
): ZLogger[String, Unit] =
new ZLogger[String, Unit] {

val logger: Logger = loggerProvider.get(instrumentationScopeName)

override def apply(
trace: Trace,
fiberId: FiberId,
logLevel: LogLevel,
message: () => String,
cause: Cause[Any],
context: FiberRefs,
spans: List[LogSpan],
annotations: Map[String, String]
): Unit = {
val builder = logger.logRecordBuilder()

builder.setBody(message())
builder.setSeverityText(logLevel.label)
annotations.foreach { case (k, v) => builder.setAttribute(AttributeKey.stringKey(k), v) }

contextStorage match {
case cs: ContextStorage.FiberRefContextStorage =>
context.get(cs.ref).foreach(builder.setContext)
case _: ContextStorage.OpenTelemetryContextStorage =>
builder.setContext(Context.current())
}

builder.emit()
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package zio.telemetry.opentelemetry.logging

import io.opentelemetry.api.logs.LoggerProvider
import io.opentelemetry.sdk.logs.SdkLoggerProvider
import io.opentelemetry.sdk.logs.`export`.SimpleLogRecordProcessor
import io.opentelemetry.sdk.logs.data.LogRecordData
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter
import zio._
import zio.telemetry.opentelemetry.context.ContextStorage
import zio.telemetry.opentelemetry.tracing.{Tracing, TracingTest}
import zio.test._
import zio.test.Assertion._

import scala.jdk.CollectionConverters._

object LoggingTest extends ZIOSpecDefault {

val inMemoryLogLoggerProvider =
for {
logRecordExporter <- ZIO.succeed(InMemoryLogRecordExporter.create())
logRecordProcessor <- ZIO.succeed(SimpleLogRecordProcessor.create(logRecordExporter))
loggerProvider <- ZIO.succeed(SdkLoggerProvider.builder().addLogRecordProcessor(logRecordProcessor).build())
} yield (logRecordExporter, loggerProvider)

val inMemoryLoggerProviderLayer: ULayer[InMemoryLogRecordExporter with LoggerProvider] =
ZLayer.fromZIOEnvironment(inMemoryLogLoggerProvider.map { case (inMemoryLogRecordExporter, loggerProvider) =>
ZEnvironment(inMemoryLogRecordExporter).add(loggerProvider)
})

def loggingMockLayer(
instrumentationScopeName: String
): URLayer[ContextStorage, InMemoryLogRecordExporter with LoggerProvider] =
inMemoryLoggerProviderLayer >>> (
Logging.live(instrumentationScopeName) ++ inMemoryLoggerProviderLayer
)

def getFinishedLogRecords: ZIO[InMemoryLogRecordExporter, Nothing, List[LogRecordData]] =
ZIO.service[InMemoryLogRecordExporter].map(_.getFinishedLogRecordItems.asScala.toList)

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("zio opentelemetry")(
suite("Logging")(
test("works with empty tracing context") {
for {
_ <- ZIO.logInfo("test")
logRecords <- getFinishedLogRecords
} yield {
val r = logRecords.head
val body = r.getBody.asString()
val severityNumber = r.getSeverity.getSeverityNumber
val severityText = r.getSeverityText
val instrumentationScopeName = r.getInstrumentationScopeInfo.getName
val attributes = r.getAttributes.asMap().asScala.toMap.map { case (k, v) => k.getKey -> v.toString }
val traceId = r.getSpanContext.getTraceId
val spanId = r.getSpanContext.getSpanId

assert(logRecords.length)(equalTo(1)) &&
assert(body)(equalTo("test")) &&
assert(severityNumber)(equalTo(0)) && // TODO: set it
assert(severityText)(equalTo("INFO")) &&
assert(instrumentationScopeName)(equalTo("test1")) &&
assert(attributes)(equalTo(Map.empty[String, String])) &&
assert(traceId)(equalTo("00000000000000000000000000000000")) &&
assert(spanId)(equalTo("0000000000000000"))
}
}.provide(Runtime.removeDefaultLoggers >>> loggingMockLayer("test1"), ContextStorage.fiberRef),
test("works in a tracing context (fiberRef)") {
ZIO.serviceWithZIO[Tracing] { tracing =>
tracing.root("ROOT")(
for {
spanCtx <- tracing.getCurrentSpanContextUnsafe
_ <- ZIO.logInfo("test")
logRecords <- getFinishedLogRecords
} yield {
val r = logRecords.head
val body = r.getBody.asString()
val severityNumber = r.getSeverity.getSeverityNumber
val severityText = r.getSeverityText
val instrumentationScopeName = r.getInstrumentationScopeInfo.getName
val attributes = r.getAttributes.asMap().asScala.toMap.map { case (k, v) => k.getKey -> v.toString }
val traceId = r.getSpanContext.getTraceId
val spanId = r.getSpanContext.getSpanId

assert(logRecords.length)(equalTo(1)) &&
assert(body)(equalTo("test")) &&
assert(severityNumber)(equalTo(0)) && // TODO: set it
assert(severityText)(equalTo("INFO")) &&
assert(instrumentationScopeName)(equalTo("test2")) &&
assert(attributes)(equalTo(Map.empty[String, String])) &&
assert(traceId)(equalTo(spanCtx.getTraceId)) &&
assert(spanId)(equalTo(spanCtx.getSpanId))
}
)
}
}.provide(
Runtime.removeDefaultLoggers >>> loggingMockLayer("test2"),
TracingTest.tracingMockLayer,
ContextStorage.fiberRef
),
test("works in a tracing context (openTelemtryContext") {
ZIO.serviceWithZIO[Tracing] { tracing =>
tracing.root("ROOT")(
for {
spanCtx <- tracing.getCurrentSpanContextUnsafe
_ <- ZIO.logInfo("test")
logRecords <- getFinishedLogRecords
} yield {
val r = logRecords.head
val body = r.getBody.asString()
val severityNumber = r.getSeverity.getSeverityNumber
val severityText = r.getSeverityText
val instrumentationScopeName = r.getInstrumentationScopeInfo.getName
val attributes = r.getAttributes.asMap().asScala.toMap.map { case (k, v) => k.getKey -> v.toString }
val traceId = r.getSpanContext.getTraceId
val spanId = r.getSpanContext.getSpanId

assert(logRecords.length)(equalTo(1)) &&
assert(body)(equalTo("test")) &&
assert(severityNumber)(equalTo(0)) && // TODO: set it
assert(severityText)(equalTo("INFO")) &&
assert(instrumentationScopeName)(equalTo("test3")) &&
assert(attributes)(equalTo(Map.empty[String, String])) &&
assert(traceId)(equalTo(spanCtx.getTraceId)) &&
assert(spanId)(equalTo(spanCtx.getSpanId))
}
)
}
}.provide(
Runtime.removeDefaultLoggers >>> loggingMockLayer("test3"),
TracingTest.tracingMockLayer,
ContextStorage.openTelemetryContext
)
)
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ object TracingTest extends ZIOSpecDefault {
tracer = tracerProvider.get("TracingTest")
} yield (spanExporter, tracer)

val inMemoryTracerLayer: ULayer[InMemorySpanExporter with Tracer with ContextStorage] =
val inMemoryTracerLayer: ULayer[InMemorySpanExporter with Tracer] =
ZLayer.fromZIOEnvironment(inMemoryTracer.map { case (inMemorySpanExporter, tracer) =>
ZEnvironment(inMemorySpanExporter).add(tracer)
}) ++ ContextStorage.fiberRef
})

val tracingMockLayer: ULayer[Tracing with InMemorySpanExporter with Tracer] =
val tracingMockLayer: URLayer[ContextStorage, Tracing with InMemorySpanExporter with Tracer] =
inMemoryTracerLayer >>> (Tracing.live ++ inMemoryTracerLayer)

def getFinishedSpans: ZIO[InMemorySpanExporter, Nothing, List[SpanData]] =
Expand All @@ -55,7 +55,7 @@ object TracingTest extends ZIOSpecDefault {
_ <- ZIO.scoped(Tracing.live.build)
finishedSpans <- getFinishedSpans
} yield assert(finishedSpans)(hasSize(equalTo(0)))
}.provideLayer(inMemoryTracerLayer)
}.provide(inMemoryTracerLayer, ContextStorage.fiberRef)
)

private val spansSpec =
Expand Down Expand Up @@ -554,7 +554,7 @@ object TracingTest extends ZIOSpecDefault {
} yield assert(ko)(isSome(failureAssertion)) && assert(ok)(isSome(successAssertion))
}
}
).provideLayer(tracingMockLayer)
).provide(tracingMockLayer, ContextStorage.fiberRef)

private val spanScopedSpec =
suite("scoped spans")(
Expand Down Expand Up @@ -656,5 +656,5 @@ object TracingTest extends ZIOSpecDefault {
} yield assert(tags.get(AttributeKey.stringKey("string")))(equalTo("bar"))
}
}
).provideLayer(tracingMockLayer)
).provide(tracingMockLayer, ContextStorage.fiberRef)
}

0 comments on commit 2f0f524

Please sign in to comment.