Skip to content

Fixes: Enhance Kotlin Support for Suppliers, Functions, and Consumers #1277 #1278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 158 additions & 12 deletions docs/modules/ROOT/pages/spring-cloud-function/programming-model.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -705,31 +705,177 @@ However, given that `org.springframework.cloud.function.json.JsonMapper` is also
[[kotlin-lambda-support]]
== Kotlin Lambda support

We also provide support for Kotlin lambdas (since v2.0).
Consider the following:
Spring Cloud Function provides first-class support for Kotlin, allowing developers to leverage idiomatic Kotlin features, including coroutines and Flow, alongside imperative and Reactor-based programming models.

[source, java]
=== Defining Functions in Kotlin

You can define Suppliers, Functions, and Consumers in Kotlin and register them as Spring beans using several approaches:

* **Kotlin Lambdas:** Define functions directly as lambda expressions within `@Bean` definitions. This is concise for simple functions.
[source, kotlin]
----
@Configuration
class MyKotlinConfiguration {

@Bean
fun kotlinSupplier(): () -> String = { "Hello from Kotlin Lambda" }

@Bean
fun kotlinFunction(): (String) -> String = { it.uppercase() }

@Bean
fun kotlinConsumer(): (String) -> Unit = { println("Consumed by Kotlin Lambda: $it") }
}
----

* **Kotlin Classes implementing Kotlin Functional Types:** Define a class that directly implements the desired Kotlin functional type (e.g., `(String) -> String`, `suspend () -> Flow<Int>`).
[source, kotlin]
----
@Component
class UppercaseFunction : (String) -> String {
override fun invoke(p1: String): String = p1.uppercase()
}

// Can also be registered via @Bean
----

* **Kotlin Classes implementing `java.util.function` Interfaces:** Define a Kotlin class that implements the standard Java `Supplier`, `Function`, or `Consumer` interfaces.
[source, kotlin]
----
@Component
class ReverseFunction : Function<String, String> {
override fun apply(t: String): String = t.reversed()
}
----

Regardless of the definition style, beans of these types are registered with the `FunctionCatalog`, benefiting from features like type conversion and composition.

=== Coroutine Support (`suspend` and `Flow`)

A key feature is the seamless integration with Kotlin Coroutines. You can use `suspend` functions and `kotlinx.coroutines.flow.Flow` directly in your function signatures. The framework automatically handles the coroutine context and reactive stream conversions.

* **`suspend` Functions:** Functions marked with `suspend` can perform non-blocking operations using coroutine delays or other suspending calls.
[source, kotlin]
----
@Bean
open fun kotlinSupplier(): () -> String {
return { "Hello from Kotlin" }
fun suspendingFunction(): suspend (String) -> Int = {
delay(100) // Non-blocking delay
it.length
}

@Bean
open fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
fun suspendingSupplier(): suspend () -> String = {
delay(50)
"Data from suspend"
}

@Bean
open fun kotlinConsumer(): (String) -> Unit {
return { println(it) }
fun suspendingConsumer(): suspend (String) -> Unit = {
delay(20)
println("Suspend consumed: $it")
}
----

* **`Flow` Integration:** Kotlin `Flow` can be used for reactive stream processing, similar to Reactor's `Flux`.
[source, kotlin]
----
The above represents Kotlin lambdas configured as Spring beans. The signature of each maps to a Java equivalent of `Supplier`, `Function` and `Consumer`, and thus supported/recognized signatures by the framework.
While mechanics of Kotlin-to-Java mapping are outside of the scope of this documentation, it is important to understand that the same rules for signature transformation outlined in "Java 8 function support" section are applied here as well.
@Bean
fun flowFunction(): (Flow<String>) -> Flow<Int> = { flow ->
flow.map { it.length } // Process the stream reactively
}

@Bean
fun flowSupplier(): () -> Flow<String> = {
flow { // kotlinx.coroutines.flow.flow builder
emit("a")
delay(10)
emit("b")
}
}

// Consumer example taking a Flow
@Bean
fun flowConsumer(): suspend (Flow<String>) -> Unit = { flow ->
flow.collect { item -> // Collect must happen within a coroutine scope
println("Flow consumed: $item")
}
}
----

* **Combining `suspend` and `Flow`:** You can combine `suspend` and `Flow` for complex asynchronous and streaming logic.
[source, kotlin]
----
@Bean
fun suspendingFlowFunction(): suspend (Flow<String>) -> Flow<String> = { incoming ->
flow {
delay(50) // Initial suspend
incoming.collect {
emit(it.uppercase()) // Process and emit
}
}
}

@Bean
fun suspendingFlowSupplier(): suspend () -> Flow<Int> = {
flow {
repeat(3) {
delay(100)
emit(it)
}
}
}
----

=== Reactive Types (`Mono`/`Flux`)

Kotlin functions can seamlessly use Reactor's `Mono` and `Flux` types, just like Java functions.
[source, kotlin]
----
@Bean
fun reactorFunction(): (Flux<String>) -> Mono<Int> = { flux ->
flux.map { it.length }.reduce(0) { acc, i -> acc + i }
}

@Bean
fun monoSupplier(): () -> Mono<String> = {
Mono.just("Reactive Hello")
}
----

=== `Message<T>` Support

Kotlin functions can also operate directly on `org.springframework.messaging.Message<T>` to access headers, including combinations with `suspend` and `Flow`.
[source, kotlin]
----
@Bean
fun messageFunction(): (Message<String>) -> Message<Int> = { msg ->
MessageBuilder.withPayload(msg.payload.length)
.copyHeaders(msg.headers)
.setHeader("processed", true)
.build()
}

@Bean
fun suspendMessageFunction(): suspend (Message<String>) -> Message<String> = { msg ->
delay(100)
MessageBuilder.withPayload(msg.payload.reversed())
.copyHeaders(msg.headers)
.build()
}

@Bean
fun flowMessageFunction(): (Flow<Message<String>>) -> Flow<Message<Int>> = { flow ->
flow.map { msg ->
MessageBuilder.withPayload(msg.payload.hashCode())
.copyHeaders(msg.headers)
.build()
}
}
----

=== Kotlin Sample Project

To enable Kotlin support all you need is to add Kotlin SDK libraries on the classpath which will trigger appropriate autoconfiguration and supporting classes.
For a comprehensive set of runnable examples showcasing these Kotlin features, please refer to the `src/test/kotlin/org/springframework/cloud/function/kotlin/arity` directory within the Spring Cloud Function repository. These examples demonstrate a wide range of function signatures with different arities, including regular functions, suspend functions (coroutines), and various reactive types (Flow, Mono, Flux).

[[function-component-scan]]
== Function Component Scan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration;
import org.springframework.cloud.function.context.wrapper.KotlinFunctionWrapper;
import org.springframework.core.KotlinDetector;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
Expand Down Expand Up @@ -118,7 +118,7 @@ public FunctionRegistration<T> properties(Map<String, String> properties) {

public FunctionRegistration<T> type(Type type) {
this.type = type;
if (KotlinDetector.isKotlinPresent() && this.target instanceof KotlinLambdaToFunctionAutoConfiguration.KotlinFunctionWrapper) {
if (KotlinDetector.isKotlinPresent() && this.target instanceof KotlinFunctionWrapper) {
return this;
}
Type discoveredFunctionType = type; //FunctionTypeUtils.discoverFunctionTypeFromClass(this.target.getClass());
Expand Down Expand Up @@ -174,15 +174,6 @@ public FunctionRegistration<T> names(String... names) {
return this.names(Arrays.asList(names));
}

/**
* Transforms (wraps) function identified by the 'target' to its {@code Flux}
* equivalent unless it already is. For example, {@code Function<String, String>}
* becomes {@code Function<Flux<String>, Flux<String>>}
* @param <S> the expected target type of the function (e.g., FluxFunction)
* @return {@code FunctionRegistration} with the appropriately wrapped target.
*
*/

@Override
public void setBeanName(String name) {
if (CollectionUtils.isEmpty(this.names)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.config.FunctionContextUtils;
import org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration;
import org.springframework.cloud.function.context.config.KotlinLambdaToFunctionFactory;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.function.json.JsonMapper;
Expand Down Expand Up @@ -120,7 +121,8 @@ public <T> T lookup(Class<?> type, String functionDefinition, String... expected
functionDefinition = StringUtils.hasText(functionDefinition)
? functionDefinition
: this.applicationContext.getEnvironment().getProperty(FunctionProperties.FUNCTION_DEFINITION, "");
if (!this.applicationContext.containsBean(functionDefinition) || !KotlinUtils.isKotlinType(this.applicationContext.getBean(functionDefinition))) {

if (!this.applicationContext.containsBean(functionDefinition) || !isKotlinType(functionDefinition)) {
functionDefinition = this.normalizeFunctionDefinition(functionDefinition);
}
if (!isFunctionDefinitionEligible(functionDefinition)) {
Expand Down Expand Up @@ -160,12 +162,9 @@ public <T> T lookup(Class<?> type, String functionDefinition, String... expected
else if (functionCandidate instanceof BiFunction || functionCandidate instanceof BiConsumer) {
functionRegistration = this.registerMessagingBiFunction(functionCandidate, functionName);
}
else if (KotlinUtils.isKotlinType(functionCandidate)) {
KotlinLambdaToFunctionAutoConfiguration.KotlinFunctionWrapper wrapper =
new KotlinLambdaToFunctionAutoConfiguration.KotlinFunctionWrapper(functionCandidate);
wrapper.setName(functionName);
wrapper.setBeanFactory(this.applicationContext.getBeanFactory());
functionRegistration = wrapper.getFunctionRegistration();
else if (isKotlinType(functionName, functionCandidate)) {
KotlinLambdaToFunctionFactory kotlinFactory = new KotlinLambdaToFunctionFactory(functionCandidate, this.applicationContext.getBeanFactory());
functionRegistration = kotlinFactory.getFunctionRegistration(functionName);
}
else if (this.isFunctionPojo(functionCandidate, functionName)) {
Method functionalMethod = FunctionTypeUtils.discoverFunctionalMethod(functionCandidate.getClass());
Expand Down Expand Up @@ -203,6 +202,17 @@ else if (this.isSpecialFunctionRegistration(functionNames, functionName)) {
return (T) function;
}

private boolean isKotlinType(String functionDefinition) {
Object fonctionBean = this.applicationContext.getBean(functionDefinition);
return isKotlinType(functionDefinition, fonctionBean);
}

private boolean isKotlinType(String functionDefinition, Object fonctionBean) {
ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
Type functionType = FunctionContextUtils.findType(functionDefinition, beanFactory);
return KotlinUtils.isKotlinType(fonctionBean, functionType);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private FunctionRegistration registerMessagingBiFunction(Object userFunction, String functionName) {
Type biFunctionType = FunctionContextUtils.findType(this.applicationContext.getBeanFactory(), functionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.functions.Function2;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -234,6 +235,10 @@ else if (Function0.class.isAssignableFrom(functionalClass)) {
ResolvableType kotlinType = ResolvableType.forClass(functionalClass).as(Function0.class);
return GenericTypeResolver.resolveType(kotlinType.getType(), functionalClass);
}
else if (Function2.class.isAssignableFrom(functionalClass)) {
ResolvableType kotlinType = ResolvableType.forClass(functionalClass).as(Function2.class);
return GenericTypeResolver.resolveType(kotlinType.getType(), functionalClass);
}
}
Type typeToReturn = null;
if (Function.class.isAssignableFrom(functionalClass)) {
Expand Down
Loading