Lightweight concurrency

From HaskellWiki
Revision as of 18:55, 13 March 2012 by Kc (talk | contribs) (→‎Proposal)
Jump to navigation Jump to search


This page contains information about the design, implementation, problems and potential solutions for building user-level concurrency primitives in GHC.

Introdution

All of GHC's concurrency primitives are written in C code and is baked in as a part of the RTS. This precludes extensibility as well as making it difficult to maintain. Ideally, the concurrency libraries will be implemented completely in Haskell code, over a small subset of primitive operations provided by the RTS. This will provide a Haskell programmer the ability to build custom schedulers and concurrency libraries. For an earlier attempt at this problem, please look at Peng Li's paper [1].

Substrate primitives

Substrate primitives are the primitives exposed by the RTS, on top of which user-level concurreny libraries are built.

data PTM a -- Primitive transactional memory
instance Monad PTM
unsafeIOToPTM :: IO a -> PTM a
atomically :: PTM a -> IO a

data PVar a -- Primitive transactional variable
newPVar :: a -> PTM (PVar a)
newPVarIO :: a -> IO (PVar a)
readPVar :: PVar a -> PTM a
writePVar :: PVar a -> a -> PTM ()

data SCont -- One-shot continuations
data ThreadStatus = BlockedOnConcDS -- Current thread is being blocked on a 
                                    -- user-level concurrent data structure
                  | BlockedOnSched  -- Current thread is being suspended on a
                                    -- user-level scheduler
                  | Completed       -- Current thread has completed execution
               -- | Running. Running is set implicitly after a context switch
newSCont :: IO () -> IO SCont
switch   :: (SCont -> PTM (SCont, ThreadStatus)) -> IO ()
{- For switch, target thread's status must be BlockedOn*. Otherwise, 
 - raises runtime error. After switching, target thread's status is implicitly 
 - set to Running, and current thread's status is set to ThreadStatus that was
 - passed.
 -}
getSCont :: PTM SCont
switchTo :: SCont -> ThreadStatus -> PTM ()

Concurrency libraries

In order to support the construction of extensible user-level schedulers in GHC, special care has to be taken about blocking concurrency actions. When there is no default scheduler, the user-level scheduler must be made aware of the blocking action, and more interestingly, the blocking action of the user-level scheduler.

Motivation

The interaction between user-level schedulers and blocking actions is motivated through actions on MVars.The semantics of takeMVar is to block the calling thread if the MVar is empty, and eventually unblock and return the value when the MVar becomes full. Internally, when a thread blocks on an MVar, it switches to the next runnable thread. This assumes that the takeMVar has knowledge about the scheduler. In particular, the current implementation of takeMVar knows how to perform the following:

  • Block action: blocking the current thread on a condition, and switching to another runnable thread.
  • Unblock action: placing the unblocked thread back into the scheduler data structure.

Proposal

The new, scheduler agnostic version of takeMVar (say takeMVarPrim), will have the type:

takeMVarPrim :: PTM () -> PTM () -> MVarPrim a -> IO a

where the first and second arguments are the block and unblock actions. If the blocking and unblocking actions are known, takeMVar with its usual type can be obtained simply by partial application:

takeMVar :: MVarPrim a -> IO a
takeMVar = takeMVarPrim blockAct unblockAct

Since the MVar implementation is independent of the schedulers, even threads from different schedulers can perform operations on the same MVar. The knowledge of schedulers is completely embedded in the block and unblock actions. A typical implementation of blockAct and unblockAct for a scheduler might look like

data Scheduler

getBlockUnblockPair :: Scheduler -> (PTM (), PTM ())
getBlockUnblockPair sched = do
  thread <- Substrate.getSCont
  let blockAction = do {
    nextThread <- -- get next thread to run from sched
    switchTo nextThread Substrate.Blocked
  }
  let unblockAction = -- enque thread to sched
  return (blockAction, unblockAction)


Interaction with RTS

In the current GHC implementation, runtime manages the threading system entirely. By moving the threading system to the user-level, several subtle interactions between the threads and the RTS have to be handled differently. This section goes into details of such interactions, lists the issues and potential solutions.

Up-call handlers

In order to support interaction between the scheduler and RTS, every Haskell thread must have the following up-call handlers:

switchToNext :: ThreadStatus -> IO ()
unblockThread :: SCont -> IO ()

switchToNext implements code necessary to switch to the next thread from the calling thread's scheduler, and suspends the calling thread with the given status. unblockThread enqueues the given thread to the current thread's scheduler. switchToNext and unblockThread are analogous to the block and unblock actions described under Concurrency primitives. unblockThread explicitly takes an SCont as an argument. This might seem strange at first. But this signature allows helper threads created by the RTS to inherit a scheduler. This mechanism will be explained later.

The up-call handlers are stored in the StgTSO thread structure so that the RTS may find it. They are traced during a GC as a part of tracing the thread. It is the responsibility of schedulers to install the up-call handlers during thread creating. Currently, up-call handlers are installed using the following primitives exposed by the substrate:

setSwitchToNextClosure :: SCont -> (ThreadStatus -> IO ()) -> IO ()
setUnblockThreadClosure :: SCont -> (SCont -> IO ()) -> IO ()

where the given SCont is the target thread. Ideally, this needs to be a part of newSCont primitive.

Interaction with GC

In the vanilla GHC implementation, each capability maintains a list of runnable Haskell threads. Each generation in the GC also maintains a list of threads belonging to that generation. At the end of generational collection, threads that survive are promoted to the next generation. Whenever a new thread is created, it is added to generation0's thread list. During a GC, threads are classified into three categories:

  • Runnable threads: Threads that are on the runnable queues. These are considered to be GC roots.
  • Reachable threads: Threads that are reachable from runnable threads. These threads might be blocked on MVars, STM actions, etc., complete or killed.
  • Unreachable threads: Threads that are unreachable. Unreachable threads might be blocked, complete or killed.

At the end of a GC, all unreachable threads that are blocked are prepared with BlockedIndefinitely exception and added to their capability's run queue. Note that complete and killed reachable threads survive a collection along with runnable threads, since asynchronous exceptions can still be invoked on them.

In the lightweight concurrency implementation, each capability has just a single runnable thread. Each generation still maintains a list of threads belonging to that generation. During a GC, threads are classified into reachable and unreachable. RTS knows whether a thread is blocked or complete since this is made explicit in the switch primitive.

Problem 1 - Indefinitely blocked threads

In the LWC implementation, since there is no notion of a runnable queue of threads for a capability, how do we raise BlockedIndefinitely exception?

Proposal

Let us first assume each thread has its blockAction and unblockAction saved on the TSO structure, such that RTS can pick it off the TSO structure and evaluate it. Secondly, We need to distinguish between blocked on an unreachable concurrent data structure and an unreachable scheduler. The programmer makes this distinction explicit through the thread status argument as a part of context switch primitives.

Blocked on an unreachable concurrent data structure

If the MVar is unreachable, the scheduler might still be reachable, and some runnable thread is potentially waiting pull work off this scheduler. Hence, we can prepare the blocked thread for raising the asynchronous exception as we do in the vanilla implementation. Subsequently, RTS need only to evaluate the blocked thread's unblock action, which will enqueue the blocked thread on its scheduler. But on which thread do we execute the unblock action?

In the LWC implementation, each capability has only ever one thread in its run queue. Hence, we create an array of IO () actions with the following structure:

[unblock_t0, unblock_t1, ..., unblock_tn, unblock_currentThread, block_currentThread]

new thread with the following closure.

rtsSchedulerBatchIO :: Array (IO ()) -> IO ()

which when given an array of IO () actions, performs it one-by-one.

Blocked on a unreachable scheduler

This case is a bit tricky. If a thread is blocked on an unreachable scheduler, we need to find a scheduler for this thread to execute. But which scheduler? RTS does not know about any other user-level schedulers.

We might fall back to the vanilla GHC's solution here, which is to prepare the blocked thread for asynchronous exception and add it to the current capability's queue of threads blocked on scheduler. At the end of GC, RTS first raises BlockedIndefinitelyOnScheduler exception on all the threads blocked on scheduler, and finally switches to the actual computation (current thread). This solution is not ideal since we do not eliminate schedulers completely from RTS.

Notes

  • What is the type of TSO->blockAction and TSO->unblockAction? In the user-level block and unblock actions are of type PTM (). RTS versions of the actions should be IO () so that we can utilize RTS functions for evaluating IO () for evaluating the scheduler actions.
  • RTS version of scheduler actions should be traced by the GC.

Problem 2 - Detecting deadlock

In the vanilla implementation, whenever RTS finds there are no runnable threads, a GC is forced, that might potentially release the deadlock. This will happen since any indefinitely blocked threads will be woken up with asynchronous exceptions. In the LWC implementation, how would the runtime distinguish between a scheduler that might actively be spinning, looking for more work and a thread execution? There might be multiple schedulers running on a capability, and no one scheduler might know that all schedulers on the capability are waiting for work. It might not be a good idea to trigger a GC whenever a scheduler runs out of work.

Proposal 1

Every capability keeps a count of SConts spawned as schedulers, and empty schedulers. When these counts become equal, a GC is triggered. If no threads are unblocked by this GC, then we are really deadlocked. There is a possibility of false positives with this scheme since a scheduler might be slow to let the RTS know that it has in fact found work. How do we deal with such momentary false positives?

Proposal 2

Treat the first Haskell thread (proto_thread) created on any capability as a special thread whose only job is to create threads to execute work. It enqueues itself on the scheduler it creates. But none of the threads on the scheduler will switch to this proto_thread, unless there is nothing else to switch to. The proto_thread, when resumed, will force a GC. However, this solution assumes there is a single scheduler data structure at the lowest level per capability.

A capability might really not be deadlocked, since work might be generated by other cores. For example, MVar synchronization might release threads that will be enqueued to schedulers on this capability. Should performing GC on a deadlock be a global property of all capabilities?

Bound threads

Bound threads [2] are bound to operating system threads (tasks), and only the task to which the haskell thread is bound to can run it. Under vanilla GHC, every capability has a pool of tasks (workers), only one of which can own the capability at any give time. Ideally, there is one-to-one mapping between capabilities and cores. If the next Haskell thread to be run is a bound thread, and if the bound thread is bound to:

  • the current task, run it.
  • a task belonging to the current capability, add the thread to the front of the run queue, and pass the current capability to the bound thread's task (which is currently suspended). This suspends the current task, and wakes up the target task.
  • a task belonging to a different capability, add the thread to that capability's run queue, and switch to next task.

Problem

Under lightweight concurrency implementation, the first two cases can be handled just like in the vanilla implementation. For the last case, since there is no notion of run queues per capability, we are stuck.

Proposal (Partial)

Expose the notion of boundedness to the programmer and put the onus on the programmer to not switch to a bound thread that belongs to a different capability. Additional substrate primitives would be,

newBoundSCont :: IO () -> IO SCont -- new SCont will be bound to the current capability

data Capability deriving Eq
getMyCapability :: IO Capability
getCapability :: SCont -> IO Capability

On scheduler which could potentially hold bound threads, programmer can check whether it is safe to run the next thread. If the bound thread belongs to a different capability, we put it back into the scheduler, with the hope that it will be picked up by the capability to which the SCont is bound to. The solution is partial since we don't know if there are schedulers running on the correct capability, which will pick up the bound thread. Or, is it programmer's responsibility to make sure all the threads that were created on the scheduler are run to completion?

Safe foreign calls

A safe foreign calls does not impede the execution of other Haskell threads on the same scheduler, if the foreign call blocks, unlike unsafe foreign calls. A safe foreign call is typically more expensive than its unsafe counterpart since it potentially involves switching between Haskell threads. At the very least, a safe foreign call involves releasing and re-acquiring capability.

Anatomy of a safe foreign call

Every capability, among other things, has a list of tasks (returning_tasks) that have completed their safe foreign call. The following are the steps involved in invoking a safe foreign call:

  • Before the foreign call, release the current capability to another worker task.
  • Perform the foreign call.
  • Add the current task to returning_tasks list.
  • Reacquire the capability.
  • Resume execution of Haskell thread.

The first action performed by the worker task that acquired the capability is to check if returning_tasks is not empty. If so, the worker yields the capability to the first task in the returning_task list (fast path). Otherwise, the worker proceeds to run the next task from the run queue (slow path). Thus, in the fast path, the haskell thread never switches.

Problem

In the LWC implementation, the worker does not have the reference to the scheduler to pick the next task from. And for the same reason, when the task returns from the foreign call, it needs to know what to do with the Haskell thread, whether to switch to it (fast path) or add it to the scheduler data structure, to which it does not have a reference to. Even if the RTS had a reference to the scheduler data structure, it must be implemented in such a way that it is operable by both C and Haskell code.

Proposal

Rather than manipulating the scheduler, we might build on top of solutions for implementing concurreny primitives. In particular, for a safe foreign call, let us assume RTS has references to block and unblock actions for the thread making the foreign call. The steps involved in a safe foreign call would be:

  • Before the foreign call, release the current capability to a worker, along with the blockAction.
  • Perform the foreign call.
  • Add the current task to returning_tasks list.
  • Reacquire the capability.
  • If I am on the fast path (i.e, worker did not get to executing blockAction), resume execution of Haskell thread.
  • Otherwise (slow path), execute unblockAction to enque the Haskell thread to the scheduler.

At the worker:

  • Try to acquire the capability.
  • If the returning_tasks list is not empty, yield capability to the task from the head of the list (fast path).
  • Otherwise (slow path), execute the blockAction, which will switch control to the next Haskell thread.