diff --git a/python/ray/llm/_internal/serve/core/configs/llm_config.py b/python/ray/llm/_internal/serve/core/configs/llm_config.py index c4d25a6978ef..399d04cfba15 100644 --- a/python/ray/llm/_internal/serve/core/configs/llm_config.py +++ b/python/ray/llm/_internal/serve/core/configs/llm_config.py @@ -75,6 +75,7 @@ class LLMEngine(str, Enum): """Enum that represents an LLMEngine.""" vLLM = "vLLM" + SGLang = "SGLang" class LoraConfig(BaseModelExtended): diff --git a/python/ray/llm/_internal/serve/core/ingress/builder.py b/python/ray/llm/_internal/serve/core/ingress/builder.py index b083b7133f5a..eedff77d6cc4 100644 --- a/python/ray/llm/_internal/serve/core/ingress/builder.py +++ b/python/ray/llm/_internal/serve/core/ingress/builder.py @@ -15,6 +15,7 @@ ) from ray.llm._internal.serve.core.server.builder import ( build_llm_deployment, + build_sglang_deployment, ) from ray.llm._internal.serve.observability.logging import get_logger from ray.serve.deployment import Application @@ -134,3 +135,38 @@ def build_openai_app(builder_config: dict) -> Application: return serve.deployment(ingress_cls, **ingress_options).bind( llm_deployments=llm_deployments, **ingress_cls_config.ingress_extra_kwargs ) + +def build_sglang_openai_app(builder_config: dict) -> Application: + """Build an OpenAI compatible app with the llm deployment setup from + the given builder configuration. + + Args: + builder_config: The configuration for the builder. It has to conform + to the LLMServingArgs pydantic model. + + Returns: + The configured Ray Serve Application router. + """ + + builder_config = LLMServingArgs.model_validate(builder_config) + llm_configs = builder_config.llm_configs + + llm_deployments = [build_sglang_deployment(c) for c in llm_configs] + + ingress_cls_config = builder_config.ingress_cls_config + ingress_options = ingress_cls_config.ingress_cls.get_deployment_options(llm_configs) + + if builder_config.ingress_deployment_config: + ingress_options = deep_merge_dicts( + ingress_options, builder_config.ingress_deployment_config + ) + + ingress_cls = make_fastapi_ingress(ingress_cls_config.ingress_cls) + + logger.info("============== Ingress Options ==============") + logger.info(pprint.pformat(ingress_options)) + + return serve.deployment(ingress_cls, **ingress_options).bind( + llm_deployments=llm_deployments, **ingress_cls_config.ingress_extra_kwargs + ) + diff --git a/python/ray/llm/_internal/serve/core/server/builder.py b/python/ray/llm/_internal/serve/core/server/builder.py index 032e195784a2..7cb09bb503cc 100644 --- a/python/ray/llm/_internal/serve/core/server/builder.py +++ b/python/ray/llm/_internal/serve/core/server/builder.py @@ -12,6 +12,7 @@ LLMConfig, ) from ray.llm._internal.serve.core.server.llm_server import LLMServer +from ray.llm._internal.serve.engines.sglang.sglang_engine import SGLangServer from ray.llm._internal.serve.observability.logging import get_logger from ray.serve.deployment import Application @@ -76,3 +77,51 @@ def build_llm_deployment( return serve.deployment(deployment_cls, **deployment_options).bind( llm_config=llm_config, **bind_kwargs ) + +def build_sglang_deployment( + llm_config: LLMConfig, + *, + name_prefix: Optional[str] = None, + bind_kwargs: Optional[dict] = None, + override_serve_options: Optional[dict] = None, + deployment_cls: Optional[Type[LLMServer]] = None, +) -> Application: + """Build an LLMServer deployment. + + Args: + llm_config: The LLMConfig to build the deployment. + name_prefix: The prefix to add to the deployment name. + bind_kwargs: The optional extra kwargs to pass to the deployment. + Used for customizing the deployment. + override_serve_options: The optional serve options to override the + default options. + deployment_cls: The deployment class to use. Defaults to LLMServer. + + Returns: + The Ray Serve Application for the LLMServer deployment. + """ + deployment_cls = SGLangServer #deployment_cls or LLMServer + name_prefix = name_prefix or f"{deployment_cls.__name__}:" + bind_kwargs = bind_kwargs or {} + + deployment_options = deployment_cls.get_deployment_options(llm_config) + + # Set the name of the deployment config to map to the model ID. + deployment_name = deployment_options.get("name", _get_deployment_name(llm_config)) + + if name_prefix: + deployment_options["name"] = name_prefix + deployment_name + + if override_serve_options: + deployment_options.update(override_serve_options) + + deployment_options = deep_merge_dicts( + DEFAULT_DEPLOYMENT_OPTIONS, deployment_options + ) + + logger.info("============== Deployment Options ==============") + logger.info(pprint.pformat(deployment_options)) + + return serve.deployment(deployment_cls, **deployment_options).bind( + _llm_config=llm_config, **bind_kwargs + ) diff --git a/python/ray/llm/_internal/serve/engines/sglang/sglang_engine.py b/python/ray/llm/_internal/serve/engines/sglang/sglang_engine.py new file mode 100644 index 000000000000..b17795f271a5 --- /dev/null +++ b/python/ray/llm/_internal/serve/engines/sglang/sglang_engine.py @@ -0,0 +1,175 @@ +import ray +import requests +from ray import serve +from ray.serve.handle import DeploymentHandle +from ray.llm._internal.serve.core.configs.llm_config import LLMConfig + +from typing import ( + TYPE_CHECKING, + Any, + AsyncGenerator, + Dict, + List, + Optional, + Type, + TypeVar, + Union, +) + +from ray.serve.llm.openai_api_models import ( + CompletionResponse, + CompletionChoice, + CompletionUsage, +) + +#@serve.deployment disable serve.deployment +class SGLangServer: + def __init__(self, _llm_config: LLMConfig): + + self._llm_config = _llm_config + self.engine_kwargs = _llm_config.engine_kwargs + + try: + import sglang + except ImportError as e: + raise ImportError( + "SGLang is not installed or failed to import. Please run " + "`pip install sglang[all]` to install required dependencies." + ) from e + self.engine = sglang.Engine(**self.engine_kwargs) + ''' + async def chat(self, message: str): + print('In SGLangServer CHAT with message', message) + res = await self.engine.async_generate( + prompt = message, + stream = False + ) + return {"echo": res} + ''' + async def completions(self, request) -> AsyncGenerator[CompletionResponse, None]: + """Implements the LLMEngine.completions protocol for Ray LLM.""" + print("In SGLangServer COMPLETIONS with request", request) + + # ---- 1. Get prompt from CompletionRequest ---- + prompt = request.prompt + if isinstance(prompt, list): + prompt = prompt[0] + + # ---- 2. Call SGLang (non-streaming) ---- + # SGLang's async_generate usually returns a list of dicts. + raw = await self.engine.async_generate( + prompt=prompt, + stream=False, + ) + + if isinstance(raw, list): + raw = raw[0] + + # SGLang output looks like: + # { + # "text": "...", + # "output_ids": [...], + # "meta_info": { + # "id": "...", + # "finish_reason": {"type": "length"}, + # "prompt_tokens": 5, + # "completion_tokens": 30, + # ... + # }, + # } + text: str = raw.get("text", "") + meta: dict[str, Any] = raw.get("meta_info", {}) or {} + finish_reason_info = meta.get("finish_reason", {}) or {} + + prompt_tokens = int(meta.get("prompt_tokens", 0)) + completion_tokens = int(meta.get("completion_tokens", 0)) + total_tokens = prompt_tokens + completion_tokens + + # ---- 3. Build OpenAI-style response objects ---- + usage = CompletionUsage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + ) + + choice = CompletionChoice( + index=0, + text=text, + logprobs=None, + finish_reason=finish_reason_info.get("type", "length"), + ) + + resp = CompletionResponse( + id=meta.get("id", "sglang-completion"), + object="text_completion", + created=int(time.time()), + model=request.model, # comes from the HTTP payload ("8B") + choices=[choice], + usage=usage, + ) + + # ---- 4. Yield the CompletionResponse (NOT a dict) ---- + yield resp + ''' + async def chat(self, request): + # request.messages is a list of {role, content} + content_parts = [m["content"] for m in request.messages] + prompt = "\n".join(content_parts) + + res = await self.engine.async_generate(prompt=prompt, stream=False) + return {"echo": res} + + async def completions(self, request) -> AsyncGenerator[Dict[str, Any], None]: + """Ray calls this with a CompletionRequest and expects an *async generator*.""" + print("In SGLangServer COMPLETIONS with request", request) + + # 1. Extract prompt + prompt = request.prompt + if isinstance(prompt, list): + # OpenAI-style API allows list of prompts; simplest is to use the first. + prompt = prompt[0] + + # 2. Map a few fields from the request into SGLang sampling params (optional) + sampling_params = { + "max_new_tokens": request.max_tokens or 16, + "temperature": request.temperature if request.temperature is not None else 1.0, + } + + # 3. Call SGLang in non-streaming async mode + # For a single prompt, async_generate usually returns a list of one output. + outputs = await self.engine.async_generate( + prompt, + sampling_params, + stream=False, + ) + + # SGLang offline engine typically returns a list of dicts with "text" + if isinstance(outputs, list) and outputs: + text = outputs[0].get("text", str(outputs[0])) + else: + # Fallback if the shape is different + text = str(outputs) + + # 4. Yield ONE message (makes this an async generator) + yield { + "echo": text + } + # function ends here; generator is exhausted + ''' + + async def llm_config(self) -> Optional[LLMConfig]: + return self._llm_config + + @classmethod + def get_deployment_options(cls, _llm_config: "LLMConfig"): + return {'autoscaling_config': {'min_replicas': 1, 'max_replicas': 1}, + 'placement_group_bundles': [{'CPU': 1, 'GPU': 1, 'accelerator_type:H200': 0.001}, {'GPU': 1, 'accelerator_type:H200': 0.001}], + 'placement_group_strategy': 'PACK', + 'ray_actor_options': {'runtime_env': + {'worker_process_setup_hook': 'ray.llm._internal.serve._worker_process_setup_hook'} + } + } + +#sglangServer = SGLangServer.bind() +#my_App = MyFastAPIDeployment.bind(sglangServer) +#handle: DeploymentHandle = serve.run(my_App, blocking = True)