This document provides detailed proposals for fixing the issues identified in BUGS.md. Each proposal includes code examples, implementation strategies, and potential trade-offs.
class Actor {
private val stateMutex = Mutex()
private val _state = AtomicReference(ActorState.INITIALIZED)
suspend fun send(message: ActorMessage) {
stateMutex.withLock {
if (_state.get() == ActorState.RUNNING) {
withTimeout(5000L) { // configurable timeout
actorChannel.send(message)
}
} else {
throw IllegalStateException("Cannot send message; actor is not running. Current state: ${_state.get()}")
}
}
}
}class Actor {
suspend fun recover() {
stateMutex.withLock {
if (_state.get() == ActorState.ERROR) {
// Perform cleanup
clearPendingMessages()
resetMetrics()
_state.set(ActorState.INITIALIZED)
// Restart actor
start()
}
}
}
private suspend fun clearPendingMessages() {
// Clear any pending messages in the channel
while (actorChannel.tryReceive().isSuccess) {
metrics.recordDroppedMessage()
}
}
}class Actor(
private val config: ActorConfig = ActorConfig()
) {
data class ActorConfig(
val channelCapacity: Int = 100,
val processingTimeout: Duration = 30.seconds,
val backpressureStrategy: BackpressureStrategy = BackpressureStrategy.BUFFER
)
private val actorChannel = Channel<ActorMessage>(
capacity = when (config.backpressureStrategy) {
BackpressureStrategy.BUFFER -> config.channelCapacity
BackpressureStrategy.DROP -> Channel.CONFLATED
BackpressureStrategy.BLOCK -> Channel.RENDEZVOUS
}
)
suspend fun send(message: ActorMessage) {
withTimeout(config.processingTimeout) {
stateMutex.withLock {
if (_state.get() == ActorState.RUNNING) {
when (config.backpressureStrategy) {
BackpressureStrategy.DROP -> actorChannel.trySend(message)
else -> actorChannel.send(message)
}
}
}
}
}
}class Actor : AutoCloseable {
private val resources = mutableListOf<AutoCloseable>()
private val cleanupScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
override fun close() {
runBlocking {
stateMutex.withLock {
_state.set(ActorState.STOPPED)
// Cancel all ongoing operations
cleanupScope.launch {
actorChannel.close()
// Wait for pending messages to complete with timeout
withTimeout(5000L) {
while (!actorChannel.isEmpty) {
delay(100)
}
}
// Cleanup resources in reverse order
resources.asReversed().forEach { resource ->
try {
resource.close()
} catch (e: Exception) {
// Log cleanup error but continue
metrics.recordCleanupError()
}
}
}.join()
}
}
}
}interface Port<T : Any> {
companion object {
private val idCounter = AtomicLong(0)
fun generateId(): String = buildString {
append("port-")
append(System.currentTimeMillis())
append("-")
append(idCounter.incrementAndGet())
append("-")
append(UUID.randomUUID().toString().take(8))
}
}
}class PortConnection<T : Any> {
inline fun <reified S : T> connect(
output: OutputPort<S>,
input: InputPort<T>
) {
// Verify type compatibility at runtime
require(input.type.isInstance(output.type)) {
"Type mismatch: Cannot connect ${output.type} to ${input.type}"
}
// Proceed with connection
}
}@Test
fun `test concurrent message processing`() = runTest {
val actor = TestActor()
val messages = List(100) { index ->
Actor.ActorMessage(type = "test", payload = "Message $index")
}
actor.start()
coroutineScope {
messages.forEach { message ->
launch {
actor.send(message)
}
}
}
// Verify all messages were processed
assertEquals(100, actor.processedCount)
assertTrue(actor.processedMessages.containsAll(messages))
}
@Test
fun `test memory pressure`() = runTest {
val actor = TestActor(ActorConfig(channelCapacity = 10))
val largeMsgCount = 1000
actor.start()
// Should not throw OOM
repeat(largeMsgCount) {
actor.send(Actor.ActorMessage(type = "test", payload = ByteArray(1024 * 1024)))
}
assertTrue(actor.metrics.droppedMessages > 0)
}data class ActorConfiguration(
val id: String = UUID.randomUUID().toString(),
val processingConfig: ProcessingConfig = ProcessingConfig(),
val metricConfig: MetricConfig = MetricConfig(),
val errorConfig: ErrorConfig = ErrorConfig()
) {
data class ProcessingConfig(
val channelCapacity: Int = 100,
val processingTimeout: Duration = 30.seconds,
val backpressureStrategy: BackpressureStrategy = BackpressureStrategy.BUFFER,
val maxConcurrentMessages: Int = 1
)
data class MetricConfig(
val enabled: Boolean = true,
val exporterClass: KClass<out MetricExporter>? = null,
val samplingRate: Double = 1.0
)
data class ErrorConfig(
val maxRetries: Int = 3,
val retryBackoff: Duration = 1.seconds,
val errorHandler: suspend (Exception, ActorMessage) -> Unit = { _, _ -> }
)
}class ActorMetrics {
private val messageLatency = histogram("message_processing_latency")
private val messageCount = counter("message_count")
private val errorCount = counter("error_count")
private val memoryUsage = gauge("memory_usage")
fun export(): MetricsSnapshot {
return MetricsSnapshot(
latencies = messageLatency.snapshot(),
counts = messageCount.value,
errors = errorCount.value,
memory = memoryUsage.value
)
}
// Prometheus integration
fun toPrometheusFormat(): String {
return buildString {
appendLine("# HELP actor_message_latency Message processing latency")
appendLine("# TYPE actor_message_latency histogram")
// Add metric data in Prometheus format
}
}
}-
Immediate (Week 1-2)
- Fix race conditions in state management
- Implement proper resource cleanup
- Add basic error recovery
-
Short-term (Week 3-4)
- Enhance configuration system
- Implement backpressure mechanisms
- Improve port type safety
-
Medium-term (Month 2)
- Enhance metrics system
- Add comprehensive test suite
- Implement monitoring integration
-
Long-term (Month 3+)
- Implement advanced features from documentation
- Performance optimization
- Documentation updates
- Create new versions of affected classes with
V2suffix - Deprecate old implementations
- Provide migration guides
- Allow grace period for updates
- Remove deprecated code in next major version
- Unit tests for all new implementations
- Integration tests for actor interactions
- Performance tests for memory and concurrency
- Migration tests for backward compatibility
- All changes maintain backward compatibility where possible
- New features are introduced behind feature flags
- Performance impact is measured before/after changes
- Documentation is updated alongside code changes
- Consider adding support for distributed actors
- Plan for Kotlin 2.0 migration
- Evaluate integration with other actor frameworks
- Consider implementing supervision hierarchies
Please update this document as implementation progresses and new requirements are discovered.