Skip to content

Commit 51d6d80

Browse files
authored
Merge pull request #5631 from unisonweb/25-03-21-dont-enqueue-events
tweak: dont enqueue filesystem events
2 parents a877cd7 + 5a8802b commit 51d6d80

File tree

5 files changed

+295
-288
lines changed

5 files changed

+295
-288
lines changed

unison-cli/package.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ library:
2727
- aeson >= 2.0.0.0
2828
- aeson-pretty
2929
- ansi-terminal
30-
- async
3130
- bytestring
3231
- cmark
3332
- co-log-core
+90-140
Original file line numberDiff line numberDiff line change
@@ -1,153 +1,103 @@
1-
{-# LANGUAGE OverloadedStrings #-}
2-
{-# LANGUAGE PatternSynonyms #-}
3-
{-# LANGUAGE TypeApplications #-}
4-
5-
module Unison.Codebase.Watch where
6-
7-
import Control.Concurrent
8-
( forkIO,
9-
killThread,
10-
threadDelay,
1+
module Unison.Codebase.Watch
2+
( watchDirectory,
113
)
4+
where
5+
6+
import Control.Concurrent (threadDelay)
127
import Control.Concurrent.STM qualified as STM
8+
import Control.Exception (MaskingState (..))
9+
import Data.IORef (newIORef, readIORef, writeIORef)
1310
import Data.Map qualified as Map
14-
import Data.Time.Clock
15-
( UTCTime,
16-
diffUTCTime,
17-
)
11+
import Data.Time.Clock (UTCTime, diffUTCTime)
12+
import GHC.Conc (registerDelay)
13+
import GHC.IO (unsafeUnmask)
14+
import Ki qualified
1815
import System.FSNotify (Event (Added, Modified))
1916
import System.FSNotify qualified as FSNotify
2017
import Unison.Prelude
21-
import Unison.Util.TQueue (TQueue)
22-
import Unison.Util.TQueue qualified as TQueue
23-
import UnliftIO.Exception (catch)
24-
import UnliftIO.IORef
25-
( newIORef,
26-
readIORef,
27-
writeIORef,
28-
)
29-
import UnliftIO.MVar
30-
( newEmptyMVar,
31-
putMVar,
32-
takeMVar,
33-
tryPutMVar,
34-
tryTakeMVar,
35-
)
18+
import UnliftIO.Exception (finally, tryAny)
3619
import UnliftIO.STM (atomically)
3720

38-
untilJust :: (Monad m) => m (Maybe a) -> m a
39-
untilJust act = act >>= maybe (untilJust act) return
21+
watchDirectory :: Ki.Scope -> FSNotify.WatchManager -> FilePath -> (FilePath -> Bool) -> IO (IO (FilePath, Text))
22+
watchDirectory scope mgr dir allow = do
23+
eventQueue <- forkDirWatcherThread scope mgr dir allow
24+
25+
-- Await an event from the event queue with the following simple debounce logic, which is intended to work around the
26+
-- tendency for modern editors to create a flurry of rapid filesystem events when a file is saved:
27+
--
28+
-- 1. Block until an event arrives.
29+
-- 2. Keep consuming events until 50ms elapse without an event.
30+
-- 3. Return only the last event.
31+
--
32+
-- Note we don't have any smarts here for a flurry of events that are related to more than one file; we just throw
33+
-- everything away except the last event. In practice, this has seemed to work fine.
34+
let awaitEvent0 :: IO (FilePath, UTCTime)
35+
awaitEvent0 = do
36+
let go :: (FilePath, UTCTime) -> IO (FilePath, UTCTime)
37+
go event0 = do
38+
var <- registerDelay 50_000
39+
(join . atomically . asum)
40+
[ do
41+
event1 <- STM.readTQueue eventQueue
42+
pure (go event1),
43+
do
44+
STM.readTVar var >>= STM.check
45+
pure (pure event0)
46+
]
47+
event <- atomically (STM.readTQueue eventQueue)
48+
go event
49+
50+
-- Enhance the previous "await event" action with a small file cache that serves as a second debounce implementation.
51+
-- We keep in memory the file contents of previously-saved files, so that we can avoid emitting events for files that
52+
-- last changed less than 500ms ago, and whose contents haven't changed.
53+
previousFilesRef <- newIORef Map.empty
54+
let awaitEvent1 :: IO (FilePath, Text)
55+
awaitEvent1 = do
56+
(file, t) <- awaitEvent0
57+
tryAny (readUtf8 file) >>= \case
58+
-- Somewhat-expected read error from a file that was just written. Just ignore the event and try again.
59+
Left _ -> awaitEvent1
60+
Right contents -> do
61+
previousFiles <- readIORef previousFilesRef
62+
case Map.lookup file previousFiles of
63+
Just (contents0, t0) | contents == contents0 && (t `diffUTCTime` t0) < 0.5 -> awaitEvent1
64+
_ -> do
65+
writeIORef previousFilesRef $! Map.insert file (contents, t) previousFiles
66+
pure (file, contents)
67+
68+
-- Enhance the previous "await" event action by first clearing the whole event queue (tossing old filesystem events
69+
-- we may have accumulated while e.g. running a long-running IO action), and *then* waiting.
70+
let awaitEvent2 :: IO (FilePath, Text)
71+
awaitEvent2 = do
72+
_ <- STM.atomically (STM.flushTQueue eventQueue)
73+
awaitEvent1
74+
75+
pure awaitEvent2
76+
77+
-- | `forkDirWatcherThread scope mgr dir allow` forks a background thread into `scope` that, using "file watcher
78+
-- manager" `mgr` (just a boilerplate argument the caller is responsible for creating), watches directory `dir` for
79+
-- all "added" and "modified" filesystem events that occur on files that pass the `allow` predicate. It returns a queue
80+
-- of such event that is (obviously) meant to be read or flushed, never written.
81+
forkDirWatcherThread :: Ki.Scope -> FSNotify.WatchManager -> FilePath -> (FilePath -> Bool) -> IO (STM.TQueue (FilePath, UTCTime))
82+
forkDirWatcherThread scope mgr dir allow = do
83+
queue <- STM.newTQueueIO
4084

41-
watchDirectory' ::
42-
forall m. (MonadIO m) => FilePath -> m (IO (), IO (FilePath, UTCTime))
43-
watchDirectory' d = do
44-
mvar <- newEmptyMVar
4585
let handler :: Event -> IO ()
46-
handler e = case e of
47-
Added fp t FSNotify.IsFile -> doIt fp t
48-
Modified fp t FSNotify.IsFile -> doIt fp t
86+
handler = \case
87+
Added fp t FSNotify.IsFile | allow fp -> atomically (STM.writeTQueue queue (fp, t))
88+
Modified fp t FSNotify.IsFile | allow fp -> atomically (STM.writeTQueue queue (fp, t))
4989
_ -> pure ()
50-
where
51-
doIt fp t = do
52-
_ <- tryTakeMVar mvar
53-
putMVar mvar (fp, t)
54-
-- janky: used to store the cancellation action returned
55-
-- by `watchDir`, which is created asynchronously
56-
cleanupRef <- newEmptyMVar
57-
-- we don't like FSNotify's debouncing (it seems to drop later events)
58-
-- so we will be doing our own instead
59-
let config = FSNotify.defaultConfig
60-
cancel <- liftIO $
61-
forkIO $
62-
FSNotify.withManagerConf config $ \mgr -> do
63-
cancelInner <- FSNotify.watchDir mgr d (const True) handler <|> (pure (pure ()))
64-
putMVar cleanupRef $ liftIO cancelInner
65-
forever $ threadDelay 1000000
66-
let cleanup :: IO ()
67-
cleanup = join (takeMVar cleanupRef) >> killThread cancel
68-
pure (cleanup, takeMVar mvar)
69-
70-
collectUntilPause :: forall a. TQueue a -> Int -> IO [a]
71-
collectUntilPause queue minPauseµsec = do
72-
-- 1. wait for at least one element in the queue
73-
void . atomically $ TQueue.peek queue
7490

75-
let go :: IO [a]
76-
go = do
77-
before <- atomically $ TQueue.enqueueCount queue
78-
threadDelay minPauseµsec
79-
after <- atomically $ TQueue.enqueueCount queue
80-
-- if nothing new is on the queue, then return the contents
81-
if before == after
82-
then atomically $ TQueue.flush queue
83-
else go
84-
go
91+
-- A bit of a "one too many threads" situation but there's not much we can easily do about it. The `fsnotify` API
92+
-- doesn't expose any synchronous API; the only option is to fork a background thread with a callback. So, we spawn
93+
-- a thread that spawns *that* thread, then waits forever. The purpose here is to simply leverage `ki` exception
94+
-- propagation machinery to ensure that the `fsnotify` thread is properly cleaned up.
95+
Ki.forkWith_ scope Ki.defaultThreadOptions {Ki.maskingState = MaskedUninterruptible} do
96+
-- The goal here is to prevent spawning this background watching thread before installing an exception handler that
97+
-- guarantees it's killed. Unfortunately the fsnotify API doesn't seem to make that possible (hence the first
98+
-- `unsafeUnmask` here), since we do need the thread *it* spawns to be killable, and (at least as of version
99+
-- 0.4.2.0) they don't take care to guarantee that; it just inherits the masking state.
100+
stopListening <- unsafeUnmask (FSNotify.watchDir mgr dir (const True) handler) <|> pure (pure ())
101+
unsafeUnmask (forever (threadDelay maxBound)) `finally` stopListening
85102

86-
watchDirectory ::
87-
forall m.
88-
(MonadIO m) =>
89-
FilePath ->
90-
(FilePath -> Bool) ->
91-
m (IO (), IO (FilePath, Text))
92-
watchDirectory dir allow = do
93-
previousFiles <- newIORef Map.empty
94-
(cancelWatch, watcher) <- watchDirectory' dir
95-
let process :: FilePath -> UTCTime -> IO (Maybe (FilePath, Text))
96-
process file t =
97-
if allow file
98-
then
99-
let handle :: IOException -> IO ()
100-
handle _e =
101-
-- Sometimes we notice a change and try to read a file while it's being written.
102-
-- This typically occurs when UCM is writing to the scratch file and can be
103-
-- ignored anyways.
104-
pure ()
105-
go :: IO (Maybe (FilePath, Text))
106-
go = liftIO $ do
107-
contents <- readUtf8 file
108-
prevs <- readIORef previousFiles
109-
case Map.lookup file prevs of
110-
-- if the file's content's haven't changed and less than .5s
111-
-- have elapsed, wait for the next update
112-
Just (contents0, t0)
113-
| contents == contents0 && (t `diffUTCTime` t0) < 0.5 ->
114-
return Nothing
115-
_ ->
116-
Just (file, contents)
117-
<$ writeIORef previousFiles (Map.insert file (contents, t) prevs)
118-
in catch go (\e -> Nothing <$ handle e)
119-
else return Nothing
120-
queue <- TQueue.newIO
121-
gate <- liftIO newEmptyMVar
122-
-- We spawn a separate thread to siphon the file change events
123-
-- into a queue, which can be debounced using `collectUntilPause`
124-
enqueuer <- liftIO . forkIO $ do
125-
takeMVar gate -- wait until gate open before starting
126-
forever $ do
127-
event@(file, _) <- watcher
128-
when (allow file) $
129-
STM.atomically $
130-
TQueue.enqueue queue event
131-
pending <- newIORef []
132-
let await :: IO (FilePath, Text)
133-
await =
134-
untilJust $
135-
readIORef pending >>= \case
136-
[] -> do
137-
-- open the gate
138-
_ <- tryPutMVar gate ()
139-
-- this debounces the events, waits for 50ms pause
140-
-- in file change events
141-
events <- collectUntilPause queue 50000
142-
-- traceM $ "Collected file change events" <> show events
143-
case events of
144-
[] -> pure Nothing
145-
-- we pick the last of the events within the 50ms window
146-
-- TODO: consider enqueing other events if there are
147-
-- multiple events for different files
148-
_ -> uncurry process $ last events
149-
((file, t) : rest) -> do
150-
writeIORef pending rest
151-
process file t
152-
cancel = cancelWatch >> killThread enqueuer
153-
pure (cancel, await)
103+
pure queue

unison-cli/src/Unison/CommandLine.hs

+1-14
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,9 @@ module Unison.CommandLine
1010
parseInput,
1111
prompt,
1212
reportParseFailure,
13-
watchFileSystem,
1413
)
1514
where
1615

17-
import Control.Concurrent (forkIO, killThread)
1816
import Control.Lens hiding (aside)
1917
import Control.Monad.Except
2018
import Control.Monad.Trans.Except
@@ -31,11 +29,10 @@ import Text.Regex.TDFA ((=~))
3129
import Unison.Codebase (Codebase)
3230
import Unison.Codebase.Branch (Branch0)
3331
import Unison.Codebase.Branch qualified as Branch
34-
import Unison.Codebase.Editor.Input (Event (..), Input (..))
32+
import Unison.Codebase.Editor.Input (Input (..))
3533
import Unison.Codebase.Editor.Output (NumberedArgs)
3634
import Unison.Codebase.Editor.StructuredArgument (StructuredArgument)
3735
import Unison.Codebase.ProjectPath qualified as PP
38-
import Unison.Codebase.Watch qualified as Watch
3936
import Unison.CommandLine.FZFResolvers qualified as FZFResolvers
4037
import Unison.CommandLine.FuzzySelect qualified as Fuzzy
4138
import Unison.CommandLine.Helpers (warn)
@@ -46,8 +43,6 @@ import Unison.Parser.Ann (Ann)
4643
import Unison.Prelude
4744
import Unison.Symbol (Symbol)
4845
import Unison.Util.Pretty qualified as P
49-
import Unison.Util.TQueue qualified as Q
50-
import UnliftIO.STM
5146
import Prelude hiding (readFile, writeFile)
5247

5348
allow :: FilePath -> Bool
@@ -56,14 +51,6 @@ allow p =
5651
not (".#" `isPrefixOf` takeFileName p)
5752
&& (isSuffixOf ".u" p || isSuffixOf ".uu" p)
5853

59-
watchFileSystem :: Q.TQueue Event -> FilePath -> IO (IO ())
60-
watchFileSystem q dir = do
61-
(cancel, watcher) <- Watch.watchDirectory dir allow
62-
t <- forkIO . forever $ do
63-
(filePath, text) <- watcher
64-
atomically . Q.enqueue q $ UnisonFileChanged (Text.pack filePath) text
65-
pure (cancel >> killThread t)
66-
6754
data ExpansionFailure
6855
= TooManyArguments (NonEmpty InputPattern.Argument)
6956
| UnexpectedStructuredArgument StructuredArgument

0 commit comments

Comments
 (0)