Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Refactor Workerqueue, add waiting empty utility for testing #4296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
soulomoon wants to merge 18 commits into master
base: master
Choose a base branch
Loading
from soulomoon/make-workerqueue-cancellable-if-not-started
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
06e3b8f
add workerQueue
soulomoon Jun 8, 2024
ca84bb5
Fix build
soulomoon Jun 8, 2024
b839ba7
fix import order
soulomoon Jun 8, 2024
b69014d
Merge branch 'master' into soulomoon/make-workerqueue-cancellable-if-...
soulomoon Jun 9, 2024
bb6502b
Merge branch 'master' into soulomoon/make-workerqueue-cancellable-if-...
soulomoon Jun 9, 2024
6904335
stylish
soulomoon Jun 9, 2024
7c47722
add `waitUntilWorkerQueueEmpty`
soulomoon Jun 10, 2024
7989d98
Merge branch 'master' into soulomoon/make-workerqueue-cancellable-if-...
soulomoon Jun 10, 2024
c7eeffb
add `peekWorkerQueue` and `readWorkerQueue`
soulomoon Jun 10, 2024
ed5949c
fix comment
soulomoon Jun 10, 2024
3f8c4db
rename
soulomoon Jun 10, 2024
66e8075
refactor
soulomoon Jun 10, 2024
3b8ebf5
Merge branch 'master' into soulomoon/make-workerqueue-cancellable-if-...
soulomoon Jun 11, 2024
9f9e0b4
Merge branch 'master' into soulomoon/make-workerqueue-cancellable-if-...
soulomoon Jun 15, 2024
c87fbc2
Merge branch 'master' into soulomoon/make-workerqueue-cancellable-if-...
soulomoon Jun 18, 2024
5aea44e
Merge branch 'master' into soulomoon/make-workerqueue-cancellable-if-...
soulomoon Jun 25, 2024
de5a066
Merge branch 'master' into soulomoon/make-workerqueue-cancellable-if-...
soulomoon Jun 28, 2024
bf4c87a
Merge branch 'master' into soulomoon/make-workerqueue-cancellable-if-...
soulomoon Jun 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ghcide/session-loader/Development/IDE/Session.hs
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ import Data.Void

import Control.Concurrent.STM.Stats (atomically, modifyTVar',
readTVar, writeTVar)
import Control.Concurrent.STM.TQueue
import Control.DeepSeq
import Control.Exception (evaluate)
import Control.Monad.IO.Unlift (MonadUnliftIO)
Expand All @@ -103,7 +102,8 @@ import qualified Data.HashSet as Set
import qualified Data.Set as OS
import Database.SQLite.Simple
import Development.IDE.Core.Tracing (withTrace)
import Development.IDE.Core.WorkerThread (awaitRunInThread,
import Development.IDE.Core.WorkerThread (WorkerQueue,
awaitRunInThread,
withWorkerQueue)
import qualified Development.IDE.GHC.Compat.Util as Compat
import Development.IDE.Session.Diagnostics (renderCradleError)
Expand Down Expand Up @@ -421,7 +421,7 @@ getHieDbLoc dir = do
-- components mapping to the same hie.yaml file are mapped to the same
-- HscEnv which is updated as new components are discovered.

loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> TQueue (IO ()) -> IO (Action IdeGhcSession)
loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> WorkerQueue (IO ()) -> IO (Action IdeGhcSession)
loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir que = do
let toAbsolutePath = toAbsolute rootDir -- see Note [Root Directory]
cradle_files <- newIORef []
Expand Down
4 changes: 3 additions & 1 deletion ghcide/src/Development/IDE/Core/Compile.hs
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import Development.IDE.Core.ProgressReporting (ProgressReporting (..))
import Development.IDE.Core.RuleTypes
import Development.IDE.Core.Shake
import Development.IDE.Core.Tracing (withTrace)
import Development.IDE.Core.WorkerThread (writeWorkerQueue)
import Development.IDE.GHC.Compat hiding (assert,
loadInterface,
parseHeader,
Expand All @@ -84,6 +85,7 @@ import Development.IDE.GHC.Compat hiding (assert,
import qualified Development.IDE.GHC.Compat as Compat
import qualified Development.IDE.GHC.Compat as GHC
import qualified Development.IDE.GHC.Compat.Util as Util
import Development.IDE.Core.ProgressReporting (ProgressReporting (..), progressReportingOutsideState)
import Development.IDE.GHC.CoreFile
import Development.IDE.GHC.Error
import Development.IDE.GHC.Orphans ()
Expand Down Expand Up @@ -795,7 +797,7 @@ indexHieFile se mod_summary srcPath !hash hf = do
-- hiedb doesn't use the Haskell src, so we clear it to avoid unnecessarily keeping it around
let !hf' = hf{hie_hs_src = mempty}
modifyTVar' indexPending $ HashMap.insert srcPath hash
writeTQueue indexQueue $ \withHieDb -> do
writeWorkerQueue indexQueue $ \withHieDb -> do
-- We are now in the worker thread
-- Check if a newer index of this file has been scheduled, and if so skip this one
newerScheduled <- atomically $ do
Expand Down
4 changes: 2 additions & 2 deletions ghcide/src/Development/IDE/Core/FileStore.hs
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ module Development.IDE.Core.FileStore(
) where

import Control.Concurrent.STM.Stats (STM, atomically)
import Control.Concurrent.STM.TQueue (writeTQueue)
import Control.Exception
import Control.Monad.Extra
import Control.Monad.IO.Class
Expand All @@ -40,6 +39,7 @@ import Development.IDE.Core.IdeConfiguration (isWorkspaceFile)
import Development.IDE.Core.RuleTypes
import Development.IDE.Core.Shake hiding (Log)
import qualified Development.IDE.Core.Shake as Shake
import Development.IDE.Core.WorkerThread (writeWorkerQueue)
import Development.IDE.GHC.Orphans ()
import Development.IDE.Graph
import Development.IDE.Import.DependencyInformation
Expand Down Expand Up @@ -247,7 +247,7 @@ typecheckParentsAction recorder nfp = do
setSomethingModified :: VFSModified -> IdeState -> String -> IO [Key] -> IO ()
setSomethingModified vfs state reason actionBetweenSession = do
-- Update database to remove any files that might have been renamed/deleted
atomically $ writeTQueue (indexQueue $ hiedbWriter $ shakeExtras state) (\withHieDb -> withHieDb deleteMissingRealFiles)
atomically $ writeWorkerQueue (indexQueue $ hiedbWriter $ shakeExtras state) (\withHieDb -> withHieDb deleteMissingRealFiles)
void $ restartShakeSession (shakeExtras state) vfs reason [] actionBetweenSession

registerFileWatches :: [String] -> LSP.LspT Config IO Bool
Expand Down
10 changes: 5 additions & 5 deletions ghcide/src/Development/IDE/Core/Shake.hs
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,12 @@ data HieDbWriter
-- | Actions to queue up on the index worker thread
-- The inner `(HieDb -> IO ()) -> IO ()` wraps `HieDb -> IO ()`
-- with (currently) retry functionality
type IndexQueue = TQueue (((HieDb -> IO ()) -> IO ()) -> IO ())
type IndexQueue = WorkerQueue (((HieDb -> IO ()) -> IO ()) -> IO ())

data ThreadQueue = ThreadQueue {
tIndexQueue :: IndexQueue
, tRestartQueue :: TQueue (IO ())
, tLoaderQueue :: TQueue (IO ())
, tRestartQueue :: WorkerQueue (IO ())
, tLoaderQueue :: WorkerQueue (IO ())
}

-- Note [Semantic Tokens Cache Location]
Expand Down Expand Up @@ -326,9 +326,9 @@ data ShakeExtras = ShakeExtras
-- ^ Default HLS config, only relevant if the client does not provide any Config
, dirtyKeys :: TVar KeySet
-- ^ Set of dirty rule keys since the last Shake run
, restartQueue :: TQueue (IO ())
, restartQueue :: WorkerQueue (IO ())
-- ^ Queue of restart actions to be run.
, loaderQueue :: TQueue (IO ())
, loaderQueue :: WorkerQueue (IO ())
-- ^ Queue of loader actions to be run.
}

Expand Down
75 changes: 65 additions & 10 deletions ghcide/src/Development/IDE/Core/WorkerThread.hs
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@ Description : This module provides an API for managing worker threads in the IDE
see Note [Serializing runs in separate thread]
-}
module Development.IDE.Core.WorkerThread
(withWorkerQueue, awaitRunInThread)
(withWorkerQueue
, awaitRunInThread
, withWorkerQueueOfOne
, WorkerQueue
, writeWorkerQueue
, waitUntilWorkerQueueEmpty)
where

import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM
import Control.Concurrent.Strict (newBarrier, signalBarrier,
waitBarrier)
import Control.Monad (forever)
import Control.Exception (finally)
import Control.Monad (forever, unless)
import Control.Monad.Cont (ContT (ContT))
import Control.Monad.IO.Class (liftIO)

{-
Note [Serializing runs in separate thread]
Expand All @@ -28,27 +35,75 @@ Originally we used various ways to implement this, but it was hard to maintain a
Moreover, we can not stop these threads uniformly when we are shutting down the server.
-}

-- | 'withWorkerQueue' creates a new 'TQueue', and launches a worker
data WorkerQueue a = WorkerQueueOfOne (TMVar a) | WorkerQueueOfMany (TQueue a)

-- | peekWorkerQueue returns the next action in the queue without removing it.
peekWorkerQueue :: WorkerQueue a -> STM a
peekWorkerQueue (WorkerQueueOfOne tVar) = readTMVar tVar
peekWorkerQueue (WorkerQueueOfMany tQueue) = peekTQueue tQueue

-- | readWorkerQueue returns the next action in the queue and removes it.
readWorkerQueue :: WorkerQueue a -> STM a
readWorkerQueue (WorkerQueueOfOne tVar) = takeTMVar tVar
readWorkerQueue (WorkerQueueOfMany tQueue) = readTQueue tQueue

writeWorkerQueue :: WorkerQueue a -> a -> STM ()
writeWorkerQueue (WorkerQueueOfOne tVar) action = putTMVar tVar action
writeWorkerQueue (WorkerQueueOfMany tQueue) action = writeTQueue tQueue action

-- | waitUntilWorkerQueueEmpty blocks until the worker queue is empty.
waitUntilWorkerQueueEmpty :: WorkerQueue a -> STM ()
waitUntilWorkerQueueEmpty (WorkerQueueOfOne tVar) = do
isEmpty <- isEmptyTMVar tVar
unless isEmpty retry
waitUntilWorkerQueueEmpty (WorkerQueueOfMany queue) = do
isEmpty <- isEmptyTQueue queue
unless isEmpty retry

newWorkerQueue :: STM (WorkerQueue a)
newWorkerQueue = WorkerQueueOfMany <$> newTQueue

newWorkerQueueOfOne :: STM (WorkerQueue a)
newWorkerQueueOfOne = WorkerQueueOfOne <$> newEmptyTMVar

-- | 'withWorkerQueue' creates a new 'WorkerQueue', and launches a worker
-- thread which polls the queue for requests and runs the given worker
-- function on them.
withWorkerQueue :: (t -> IO a) -> ContT () IO (TQueue t)
withWorkerQueue workerAction = ContT $ \mainAction -> do
q <- newTQueueIO
withWorkerQueue :: (t -> IO a) -> ContT () IO (WorkerQueue t)
withWorkerQueue workerAction = do
q <- liftIO $ atomically newWorkerQueue
runWorkerQueue q workerAction

-- | 'withWorkerQueueOfOne' creates a new 'WorkerQueue' that only allows one action to be queued at a time.
-- and one action can only be queued after the previous action has been done.
-- this is useful when we want to cancel the action waiting to be enqueue if it's thread is cancelled.
-- e.g. session loading in session loader. When a shake session is restarted
-- , we want to cancel the previous pending session loading.
-- since the hls-graph can handle the retrying of the session loading.
withWorkerQueueOfOne :: (t -> IO a) -> ContT () IO (WorkerQueue t)
withWorkerQueueOfOne workerAction = do
q <- liftIO $ atomically newWorkerQueueOfOne
runWorkerQueue q workerAction

runWorkerQueue :: WorkerQueue t -> (t -> IO a) -> ContT () IO (WorkerQueue t)
runWorkerQueue q workerAction = ContT $ \mainAction -> do
withAsync (writerThread q) $ \_ -> mainAction q
where
writerThread q =
forever $ do
l <- atomically $ readTQueue q
workerAction l
-- peek the action from the queue, run it and then remove it from the queue
l <- atomically $ peekWorkerQueue q
workerAction l `finally` atomically (readWorkerQueue q)


-- | 'awaitRunInThread' queues up an 'IO' action to be run by a worker thread,
-- and then blocks until the result is computed.
awaitRunInThread :: TQueue (IO ()) -> IO result -> IO result
awaitRunInThread :: WorkerQueue (IO ()) -> IO result -> IO result
awaitRunInThread q act = do
-- Take an action from TQueue, run it and
-- use barrier to wait for the result
barrier <- newBarrier
atomically $ writeTQueue q $ do
atomically $ writeWorkerQueue q $ do
res <- act
signalBarrier barrier res
waitBarrier barrier
5 changes: 3 additions & 2 deletions ghcide/src/Development/IDE/LSP/LanguageServer.hs
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import Control.Monad.Trans.Cont (evalContT)
import Development.IDE.Core.IdeConfiguration
import Development.IDE.Core.Shake hiding (Log)
import Development.IDE.Core.Tracing
import Development.IDE.Core.WorkerThread (withWorkerQueue)
import Development.IDE.Core.WorkerThread (withWorkerQueue,
withWorkerQueueOfOne)
import qualified Development.IDE.Session as Session
import Development.IDE.Types.Shake (WithHieDb,
WithHieDbShield (..))
Expand Down Expand Up @@ -261,7 +262,7 @@ handleInit recorder defaultRoot getHieDbLoc getIdeState lifetime exitClientMsg c
runWithWorkerThreads :: Recorder (WithPriority Session.Log) -> FilePath -> (WithHieDb -> ThreadQueue -> IO ()) -> IO ()
runWithWorkerThreads recorder dbLoc f = evalContT $ do
sessionRestartTQueue <- withWorkerQueue id
sessionLoaderTQueue <- withWorkerQueue id
sessionLoaderTQueue <- withWorkerQueueOfOne id
(WithHieDbShield hiedb, threadQueue) <- runWithDb recorder dbLoc
liftIO $ f hiedb (ThreadQueue threadQueue sessionRestartTQueue sessionLoaderTQueue)

Expand Down

AltStyle によって変換されたページ (->オリジナル) /