forked from sphinx-doc/sphinx-autobuild
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
84 lines (70 loc) · 2.74 KB
/
server.py
File metadata and controls
84 lines (70 loc) · 2.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from __future__ import annotations
import asyncio
from concurrent.futures import ProcessPoolExecutor
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from pathlib import Path
from typing import TYPE_CHECKING
import watchfiles
from starlette.websockets import WebSocket
if TYPE_CHECKING:
import os
from collections.abc import Callable, Sequence
from starlette.types import Receive, Scope, Send
from sphinx_autobuild.filter import IgnoreFilter
class RebuildServer:
def __init__(
self,
paths: list[os.PathLike[str]],
ignore_filter: IgnoreFilter,
change_callback: Callable[[Sequence[Path]], None],
) -> None:
self.paths = [Path(path).resolve(strict=True) for path in paths]
self.ignore = ignore_filter
self.change_callback = change_callback
@asynccontextmanager
async def lifespan(self, _app) -> AbstractAsyncContextManager[None]:
self.flag = asyncio.Event()
self.should_exit = asyncio.Event()
task = asyncio.create_task(self.main())
yield
self.should_exit.set()
await task
return
async def main(self) -> None:
tasks = (
asyncio.create_task(self.watch()),
asyncio.create_task(self.should_exit.wait()),
)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
[task.cancel() for task in pending]
[task.result() for task in done]
async def watch(self) -> None:
async for changes in watchfiles.awatch(
*self.paths,
watch_filter=lambda _, path: not self.ignore(path),
):
changed_paths = [Path(path).resolve() for (_, path) in changes]
with ProcessPoolExecutor() as pool:
fut = pool.submit(self.change_callback, changed_paths=changed_paths)
await asyncio.wrap_future(fut)
self.flag.set()
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
assert scope["type"] == "websocket"
ws = WebSocket(scope, receive, send)
await ws.accept()
tasks = (
asyncio.create_task(self.watch_reloads(ws)),
asyncio.create_task(self.wait_client_disconnect(ws)),
)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
[task.cancel() for task in pending]
[task.result() for task in done]
async def watch_reloads(self, ws: WebSocket) -> None:
while True:
await self.flag.wait()
self.flag.clear()
await ws.send_text("refresh")
@staticmethod
async def wait_client_disconnect(ws: WebSocket) -> None:
async for _ in ws.iter_text():
pass