Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ jobs:
python3 -m pip install /root/packages/cu118/flash_attn-*.whl
python3 -m pip install -r requirements_cuda.txt -r requirements/test.txt
python3 -m pip install -e .
python3 -m pip install -U 'numpy<2.0'
- name: Check env
run: |
python3 -m pip list
Expand Down
1 change: 0 additions & 1 deletion docker/prepare_wheel.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ if [[ ${PYTHON_VERSION} = "3.13" ]]; then

pip install setuptools_rust
pip wheel -v --no-build-isolation --no-deps -w /wheels "git+https://github.com/google/[email protected]#subdirectory=python"
pip wheel -v --no-build-isolation --no-deps -w /wheels --use-deprecated=legacy-resolver outlines_core==0.1.26
fi

if [[ "${CUDA_VERSION_SHORT}" != "cu118" ]]; then
Expand Down
4 changes: 3 additions & 1 deletion lmdeploy/pytorch/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ def _on_end_session(self, reqs: List[Request], **kwargs):
if len(msgs) > 0 and msgs[0].preserve_cache:
self.scheduler._set_message_status(msgs[0], MessageStatus.TO_BE_MIGRATED)
else:
self.scheduler.end_session(session_id)
self.end_session(session_id)
resp_type = ResponseType.SUCCESS
if resp:
self._response(req.resp, resp_type)
Expand Down Expand Up @@ -912,6 +912,7 @@ def __need_logits(seqs: SeqList):
stopping_criteria = self.model_agent_strategy.make_stopping_criteria(running)

sync_long_context = inputs.input_ids.numel() > self.cache_config.max_prefill_token_num

return dict(
running=running,
inputs=inputs,
Expand Down Expand Up @@ -1237,6 +1238,7 @@ def start_loop(self):
def end_session(self, session_id: int):
"""End session."""
if session_id in self.scheduler.sessions:
self.sampling_strategy.on_session_end(session_id)
self.scheduler.end_session(session_id)
return True
return False
Expand Down
240 changes: 92 additions & 148 deletions lmdeploy/pytorch/engine/guided_process.py
Original file line number Diff line number Diff line change
@@ -1,161 +1,105 @@
# Copyright 2024- the Outlines developers
# This file is adapted from
# https://github.com/outlines-dev/outlines/blob/main/outlines/serve/vllm.py
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

import copy
import math
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
from functools import lru_cache
from typing import DefaultDict, Dict, List, Union
# Copyright (c) OpenMMLab. All rights reserved.
import json
import logging
from typing import Any, Dict, List, Optional, Tuple

import torch
from outlines.fsm.guide import CFGGuide, Generate, RegexGuide, Write
from outlines.fsm.json_schema import build_regex_from_schema
from pydantic import BaseModel
import xgrammar as xgr
from transformers import PreTrainedTokenizerBase


class BaseLogitsProcessor:

def init_state(self):
"""Initialize the FSM states."""
self.fsm_state: DefaultDict[int, int] = defaultdict(int)

def __call__(self, input_ids: List[int], scores: torch.Tensor) -> torch.Tensor:
"""Use the FSM to bias the logits before sampling the next token."""

seq_id = hash(tuple(input_ids))

if len(input_ids) == 0:
self.init_state()
else:
last_token = input_ids[-1]
last_seq_id = hash(tuple(input_ids[:-1]))
self.fsm_state[seq_id] = self.fsm.get_next_state(state=self.fsm_state[last_seq_id], token_id=last_token)

instruction = self.fsm.get_next_instruction(self.fsm_state[seq_id])

if type(instruction) == Generate:
allowed_tokens = instruction.tokens
elif type(instruction) == Write:
# TODO: support fast forward tokens
allowed_tokens = [instruction.tokens[0]]
logger = logging.getLogger('lmdeploy')


class GuidedDecodingMangager:
processors = {}

def __init__(self, tokenizer: PreTrainedTokenizerBase, vocab_size: Optional[int]):
if vocab_size is None:
vocab_size = tokenizer.vocab_size

tokenizer_info = xgr.TokenizerInfo.from_huggingface(tokenizer, vocab_size=vocab_size)
self.compiler = xgr.GrammarCompiler(tokenizer_info)
self.vocab_size = vocab_size

def get_processors(self, session_ctx: List[Dict[str, Any]],
response_formats: Tuple[Dict]) -> Dict[int, xgr.GrammarMatcher]:
processors = {}
for i, _format in enumerate(response_formats):
if isinstance(_format, Dict) and _format.get('type', 'text') != 'text':
if _format['type'] == 'json_schema':
schema = _format['json_schema']
if isinstance(schema, Dict):
for key in ['json_schema', 'schema']:
if key in schema:
schema = json.dumps(schema[key], ensure_ascii=False)

if not isinstance(schema, str):
raise ValueError(f'Cannot parse schema {schema}. The schema must be '
'either a dictionary or a string that contains the'
' JSON Schema specification')
elif _format['type'] == 'regex_schema':
schema = _format.get('regex_schema', '')
else:
raise ValueError(f"unsupported format type: {_format['type']}")

session_id = session_ctx[i]['session_id']
seq_id = session_ctx[i]['seq_id']

processors[i] = self.get_processor(session_id, seq_id, schema, _format['type'])

return processors

def get_processor(self, session_id: int, seq_id: int, schema: str, type: str) -> xgr.GrammarMatcher:
if session_id in self.processors:
session_dict = self.processors[session_id]
if seq_id in session_dict:
processor = session_dict[seq_id]
return processor

if type == 'json_schema':
if isinstance(schema, str):
schema = json.loads(schema)

assert isinstance(schema, dict)
compiled = self.compiler.compile_json_schema(schema)
elif type == 'regex_schema':
compiled = self.compiler.compile_regex_grammar(schema)
else:
raise TypeError(f'Unsupported instruction type {type(instruction)}')

mask = torch.full((scores.shape[-1], ), -math.inf, device=scores.device)
mask[allowed_tokens] = 0
scores.add_(mask)

return scores

def adapt_tokenizer(self, tokenizer):
"""Adapt tokenizer to use to compile the FSM.

The API of Outlines tokenizers is slightly different to that of `transformers`. In addition we need to handle
the missing spaces to Llama's tokenizer to be able to compile FSMs for this model.
"""
from outlines.integrations.utils import adapt_tokenizer
tokenizer = adapt_tokenizer(tokenizer)
# vocab size greater than logits shape because of '[UNUSED_TOKEN_...]'
if hasattr(tokenizer, '_tokenizer'):
tokenizer.vocabulary = tokenizer._tokenizer.get_vocab(with_added_tokens=False)
return tokenizer

assert False, f'Do not support schema type {type}'

class RegexLogitsProcessor(BaseLogitsProcessor):
processor = xgr.GrammarMatcher(compiled, terminate_without_stop_token=True)
self.processors.setdefault(session_id, {})[seq_id] = processor
logger.info(f'create guided processor for session_id={session_id}, seq_id={seq_id}, and '
f'total_processors={len(self.processors)}')
return processor

def __init__(self, regex_string: str, tokenizer):
"""Compile the FSM that drives the regex-structured generation.
def remove_processor(self, session_id: int):
if session_id in self.processors:
del self.processors[session_id]
logger.info(
f'delete guided processor for session_id={session_id}, and total_processors={len(self.processors)}')

Args:
regex_string: A string that represents a regular expression
tokenizer: The model's tokenizer
"""
tokenizer = self.adapt_tokenizer(copy.deepcopy(tokenizer))
fsm = RegexGuide(regex_string, tokenizer)
self.fsm = fsm
def allocate_batched_bitmap(self, batch_size: int) -> torch.Tensor:
return xgr.allocate_token_bitmask(batch_size, self.vocab_size)

def fill_bitmap(self, processor: xgr.GrammarMatcher, guided_bitmask: torch.Tensor, index: int) -> None:
processor.fill_next_token_bitmask(guided_bitmask, index)

class JSONLogitsProcessor(RegexLogitsProcessor):
def accept_token(self, processor: xgr.GrammarMatcher, token: int) -> None:
processor.accept_token(token)

def __init__(self, schema: Union[str, Dict, BaseModel], tokenizer):
"""Compile the FSM that drives the JSON-guided generation.
def apply_batched_bitmap(self, logits: torch.Tensor, guided_bitmask: torch.Tensor) -> None:
device = logits.device
dtype = logits.dtype

Args:
schema: A str schema that encodes the structure we want the model
to generate
tokenizer: The model's tokenizer
"""
regex_string = build_regex_from_schema(schema)
super().__init__(regex_string, tokenizer)


class CFGLogitsProcessor(BaseLogitsProcessor):

def __init__(self, cfg: str, tokenizer: PreTrainedTokenizerBase):
"""Compile the FSM that drives the context free grammar generation.

Parameters
----------
cfg
A string that represents a context-free grammar
tokenizer
The model's tokenizer
"""
tokenizer = self.adapt_tokenizer(tokenizer)
fsm = CFGGuide(cfg, tokenizer)
self.fsm = fsm


# copied from https://github.com/vllm-project/vllm/blob/a7f65c2be93f491771aca31106f790bf381c0bad/vllm/model_executor/guided_decoding/outlines_decoding.py#L31 # noqa
JSON_GRAMMAR = r"""
?start: object | array

?value: object
| array
| UNESCAPED_STRING
| SIGNED_NUMBER -> number
| "true" -> true
| "false" -> false
| "null" -> null

array : "[" [value ("," value)*] "]"
object : "{" [pair ("," pair)*] "}"
pair : UNESCAPED_STRING ":" value

%import common.UNESCAPED_STRING
%import common.SIGNED_NUMBER
%import common.WS

%ignore WS
"""


@lru_cache(maxsize=32)
def _get_guided_logits_processor(guide: str, tokenizer: PreTrainedTokenizerBase, type: str):
try:
if type == 'json_object':
return CFGLogitsProcessor(guide, tokenizer)
elif type == 'json_schema':
return JSONLogitsProcessor(guide, tokenizer)
elif type == 'regex_schema':
return RegexLogitsProcessor(guide, tokenizer)
if device.type in {'cpu', 'cuda'}:
xgr.apply_token_bitmask_inplace(logits, guided_bitmask.to(device))
else:
return None
except Exception as e:
from lmdeploy.utils import get_logger
logger = get_logger('lmdeploy')
logger.error(e)
return None
cpu_logits = logits.cpu().float()
cpu_mask = guided_bitmask.cpu()
xgr.apply_token_bitmask_inplace(cpu_logits, cpu_mask)
logits.copy_(cpu_logits.to(device, dtype))

def clear(self) -> None:
self.processors.clear()
logger.info(f'clear guided processors, total_processors={len(self.processors)}')
Loading
Loading