Skip to content

Commit 2fe94b1

Browse files
committed
router pushes sse events; better streaming extraction
1 parent 843e891 commit 2fe94b1

File tree

3 files changed

+57
-27
lines changed

3 files changed

+57
-27
lines changed

oneping/curl.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,15 @@
1212
##
1313

1414
def prepare_url(prov, url=None, host=None, port=None):
15-
if host is None:
16-
host = 'localhost'
17-
if port is None:
18-
port = 8000
19-
if url is None:
20-
url = prov['url'].format(host=host, port=port)
21-
return url
15+
host = prov.get('host') if host is None else host
16+
port = prov.get('port') if port is None else port
17+
url = prov.get('url') if url is None else url
18+
return url.format(host=host, port=port)
2219

2320
def prepare_auth(prov, api_key=None):
2421
if (auth_func := prov.get('authorize')) is not None:
25-
if api_key is None and (api_key := os.environ.get(key_env := prov['api_key_env'])) is None:
26-
raise Exception('Cannot find API key in {key_env}')
22+
if (api_key := os.environ.get(prov['api_key_env'])) is None:
23+
raise Exception('Cannot find API key in {api_key_env}')
2724
headers_auth = auth_func(api_key)
2825
else:
2926
headers_auth = {}
@@ -36,7 +33,7 @@ def prepare_model(prov, model=None):
3633

3734
def prepare_request(
3835
query, provider='local', system=None, prefill=None, prediction=None, history=None,
39-
url=None, port=None, api_key=None, model=None, max_tokens=DEFAULT_MAX_TOKENS, **kwargs
36+
url=None, host=None, port=None, api_key=None, model=None, max_tokens=DEFAULT_MAX_TOKENS, **kwargs
4037
):
4138
# external provider
4239
prov = get_provider(provider)
@@ -45,7 +42,7 @@ def prepare_request(
4542
max_tokens_name = prov.get('max_tokens_name', 'max_tokens')
4643

4744
# get full url
48-
url = prepare_url(prov, url=url, port=port)
45+
url = prepare_url(prov, url=url, host=host, port=port)
4946

5047
# get authorization headers
5148
headers_auth = prepare_auth(prov, api_key=api_key)
@@ -127,9 +124,9 @@ async def reply_async(query, provider='local', history=None, prefill=None, **kwa
127124
## stream requests
128125
##
129126

130-
def parse_stream_data(chunk):
131-
if chunk.startswith(b'data: '):
132-
text = chunk[6:]
127+
def parse_stream_data(line):
128+
if line.startswith(b'data: '):
129+
text = line[6:]
133130
if text != b'[DONE]' and len(text) > 0:
134131
return text
135132

@@ -172,7 +169,9 @@ def stream(query, provider='local', history=None, prefill=None, **kwargs):
172169
for line in response.iter_lines():
173170
if (data := parse_stream_data(line)) is not None:
174171
parsed = json.loads(data)
175-
yield extractor(parsed)
172+
text = extractor(parsed)
173+
if text is not None:
174+
yield text
176175

177176
async def stream_async(query, provider='local', history=None, prefill=None, **kwargs):
178177
# get provider
@@ -206,7 +205,9 @@ async def stream_async(query, provider='local', history=None, prefill=None, **kw
206205
async for line in lines:
207206
if (data := parse_stream_data(line)) is not None:
208207
parsed = json.loads(data)
209-
yield extractor(parsed)
208+
text = extractor(parsed)
209+
if text is not None:
210+
yield text
210211

211212
##
212213
## embeddings

oneping/providers.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
ANTHROPIC_MODEL = 'claude-3-5-sonnet-latest'
2222
FIREWORKS_MODEL = 'accounts/fireworks/models/llama-v3p1-70b-instruct'
2323
GROQ_MODEL = 'llama-3.1-70b-versatile'
24+
DEEPSEEK_MODEL = 'deepseek-chat'
2425

2526
##
2627
## environment key names
@@ -30,6 +31,7 @@
3031
ANTHROPIC_KEYENV = 'ANTHROPIC_API_KEY'
3132
FIREWORKS_KEYENV = 'FIREWORKS_API_KEY'
3233
GROQ_KEYENV = 'GROQ_API_KEY'
34+
DEEPSEEK_KEYENV = 'DEEPSEEK_API_KEY'
3335
AZURE_KEYENV = 'AZURE_OPENAI_API_KEY'
3436

3537
##
@@ -114,17 +116,23 @@ def response_anthropic(reply):
114116
content = reply['content'][0]
115117
return content['text']
116118

117-
def stream_oneping(chunk):
118-
return chunk
119+
##
120+
## stream handlers
121+
##
119122

120123
def stream_openai(chunk):
121124
return chunk['choices'][0]['delta'].get('content', '')
122125

123126
def stream_anthropic(chunk):
124127
if chunk['type'] == 'content_block_delta':
125128
return chunk['delta']['text']
126-
else:
127-
return ''
129+
130+
def stream_oneping(chunk):
131+
return chunk
132+
133+
##
134+
## native handlers
135+
##
128136

129137
def response_openai_native(reply):
130138
return reply.choices[0].message.content
@@ -145,6 +153,10 @@ def stream_anthropic_native(chunk):
145153
else:
146154
return ''
147155

156+
##
157+
## other modal handlers
158+
##
159+
148160
def embed_openai(reply):
149161
return reply['data'][0]['embedding']
150162

@@ -156,7 +168,6 @@ def transcribe_openai(audio):
156168
##
157169

158170
DEFAULT_PROVIDER = {
159-
'authorize': authorize_openai,
160171
'payload': payload_openai,
161172
'response': response_openai,
162173
'stream': stream_openai,
@@ -167,18 +178,21 @@ def transcribe_openai(audio):
167178
LLM_PROVIDERS = {
168179
'local': {
169180
'url': 'http://{host}:{port}/v1/chat/completions',
170-
'authorize': None,
181+
'host': 'localhost',
182+
'port': 8000,
171183
},
172184
'oneping': {
173185
'url': 'http://{host}:{port}/chat',
174186
'host': 'localhost',
175187
'port': 5000,
188+
'authorize': None,
176189
'payload': payload_oneping,
177190
'response': response_oneping,
178191
'stream': stream_oneping,
179192
},
180193
'openai': {
181194
'url': 'https://api.openai.com/v1/chat/completions',
195+
'authorize': authorize_openai,
182196
'max_tokens_name': 'max_completion_tokens',
183197
'api_key_env': OPENAI_KEYENV,
184198
'model': OPENAI_MODEL,
@@ -195,14 +209,22 @@ def transcribe_openai(audio):
195209
},
196210
'fireworks': {
197211
'url': 'https://api.fireworks.ai/inference/v1/chat/completions',
212+
'authorize': authorize_openai,
198213
'api_key_env': FIREWORKS_KEYENV,
199214
'model': FIREWORKS_MODEL,
200215
},
201216
'groq': {
202217
'url': 'https://api.groq.com/openai/v1/chat/completions',
218+
'authorize': authorize_openai,
203219
'api_key_env': GROQ_KEYENV,
204220
'model': GROQ_MODEL,
205221
},
222+
'deepseek': {
223+
'url': 'https://api.deepseek.com/chat/completions',
224+
'authorize': authorize_openai,
225+
'api_key_env': DEEPSEEK_KEYENV,
226+
'model': DEEPSEEK_MODEL,
227+
},
206228
}
207229

208230
def get_provider(provider):

oneping/server.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# llm servers
22

3+
import json
34
import subprocess
45
from itertools import chain
56

@@ -26,6 +27,12 @@ def patch_payload(data):
2627
data['provider'] = model
2728
return data
2829

30+
def generate_sse(stream):
31+
for chunk in stream:
32+
data = json.dumps(chunk)
33+
yield f'data: {data}\n\n'
34+
yield 'data: [DONE]\n\n'
35+
2936
def start_router(host='127.0.0.1', port=5000, allow_origins=DEFAULT_ALLOW_ORIGINS, **kwargs):
3037
import uvicorn
3138
from fastapi import FastAPI
@@ -61,11 +68,11 @@ async def chat(genreq: GenerateRequest):
6168
data = genreq.model_dump(exclude_none=True)
6269
patch = patch_payload(data)
6370
if patch.get('stream', False):
64-
ret = stream_api(**kwargs, **patch)
65-
return StreamingResponse(ret, media_type='text/plain')
71+
stream = stream_api(**kwargs, **patch)
72+
sse = generate_sse(stream)
73+
return StreamingResponse(sse, media_type='text/event-stream')
6674
else:
67-
ret = reply_api(**kwargs, **patch)
68-
text = ret[1] if type(ret) is tuple else ret
69-
return PlainTextResponse(text)
75+
reply = reply_api(**kwargs, **patch)
76+
return PlainTextResponse(reply)
7077

7178
uvicorn.run(app, host=host, port=port)

0 commit comments

Comments
 (0)