See stm
on Hackage for more information.
haskell / stm Goto Github PK
View Code? Open in Web Editor NEWSoftware Transactional Memory
License: Other
Software Transactional Memory
License: Other
See stm
on Hackage for more information.
The stm package has the rather nice registerDelay
API.
registerDelay :: Int -> IO (TVar Bool)
Set the value of returned TVar to True after a given number of microseconds. The caveats associated with threadDelay also apply.
This is nice, but we could provide a more extensive timer API, based on the underlying GHC timer API. The really nice thing about registerDelay
is that being based on STM it is composable. We just need a bit more for use cases like network protocols where you need to be able to push back a timeout. Cancelling is also useful. The GHC timer API can do all these things.
I would like to suggest and get feedback on the following API and implementation. If we go for it, it might be best to add to a new module Control.Concurrent.STM.Timer
. I can make a PR based on feedback.
API:
data TimerState = TimerPending | TimerFired | TimerCancelled
data Timer
type Microseconds = Int
-- | Create a new timer which will fire at the given time duration in
-- the future.
--
-- The timer will start in the 'TimerPending' state and either
-- fire at or after the given time leaving it in the 'TimerFired' state,
-- or it may be cancelled with 'cancelTimer', leaving it in the
-- 'TimerCancelled' state.
--
-- Timers /cannot/ be reset to the pending state once fired or cancelled
-- (as this would be very racy). You should create a new timer if you need
-- this functionality.
--
newTimer :: Microseconds -> IO Timer
-- | Read the current state of a timer. This does not block, but returns
-- the current state. It is your responsibility to use 'retry' to wait.
--
-- Alternatively you may wish to use the convenience utility 'awaitTimer'
-- to wait for just the fired or cancelled outcomes.
--
-- You should consider the cancelled state if you plan to use 'cancelTimer'.
--
readTimer :: Timer -> STM TimerState
-- Adjust when this timer will fire, to the given duration into the future.
--
-- It is safe to race this concurrently against the timer firing. It will
-- have no effect if the timer fires first.
--
-- The new time can be before or after the original expiry time, though
-- arguably it is an application design flaw to move timers sooner.
--
updateTimer :: Timer -> Microseconds -> STM ()
-- | Cancel a timer (unless it has already fired), putting it into the
-- 'TimerCancelled' state. Code reading and acting on the timer state
-- need to handle such cancellation appropriately.
--
-- It is safe to race this concurrently against the timer firing. It will
-- have no effect if the timer fires first.
--
cancelTimer :: Timer -> m ()
And implementation in terms of the GHC timeout manager (which is what registerDelay
uses)
data Timer = Timer !(STM.TVar TimerState) !GHC.TimerKey
readTimer (Timer var _key) = STM.readTVar var
newTimer = \usec -> do
var <- STM.newTVarIO TimerPending
mgr <- GHC.getSystemTimerManager
key <- GHC.registerTimeout mgr usec (STM.atomically (timerAction var))
return (Timer var key)
where
timerAction var = do
x <- STM.readTVar var
case x of
TimerPending -> STM.writeTVar var TimerFired
TimerFired -> error "MonadTimer(IO): invariant violation"
TimerCancelled -> return ()
-- In GHC's TimerManager this has no effect if the timer already fired.
-- It is safe to race against the timer firing.
updateTimer (Timer _var key) usec = do
mgr <- GHC.getSystemTimerManager
GHC.updateTimer mgr key usec
cancelTimer (Timer var key) = do
STM.atomically $ do
x <- STM.readTVar var
case x of
TimerPending -> STM.writeTVar var TimerCancelled
TimerFired -> return ()
TimerCancelled -> return ()
mgr <- GHC.getSystemTimerManager
GHC.unregisterTimeout mgr key
Plus one handy derived utility
-- | Returns @True@ when the timer is fired, or @False@ if it is cancelled.
awaitTimer :: Timer -> STM Bool
awaitTimer t = do
s <- readTimer t
case s of
TimerPending -> retry
TimerFired -> return True
TimerCancelled -> return False
If you have a TBQueue of size 2, you can put 3 times on it, then the 3rd call will block until you read from the queue (from another thread).
If you have a TBQueue of size 1, you can put 2 times on it, then the 2nd call will block until you read from the queue (from another thread):
Control.Concurrent.STM.TBQueue> q <- newTBQueueIO 1 :: IO (TBQueue ())
Control.Concurrent.STM.TBQueue> a <- async $ (atomically $ writeTBQueue q ()) >> print "wrote"
"wrote"
Control.Concurrent.STM.TBQueue> a <- async $ (atomically $ writeTBQueue q ()) >> print "wrote"
Control.Concurrent.STM.TBQueue> atomically $ readTBQueue q
"wrote"
Control.Concurrent.STM.TBQueue>
Therefore, if you have a TBQueue of size 0, you'd expect that you can put 1 time on it, then the 1st call will block until you read from the queue (from another thread). However this doesn't work:
Control.Concurrent.STM.TBQueue> q <- newTBQueueIO 0 :: IO (TBQueue ())
Control.Concurrent.STM.TBQueue> a <- async $ (atomically $ writeTBQueue q ()) >> print "wrote"
Control.Concurrent.STM.TBQueue> atomically $ readTBQueue q
--- hangs forever
From Control.Concurrent.STM.TVar:
stateTVar :: TVar s -> (s -> (a, s)) -> STM a
stateTVar var f = do
s <- readTVar var
let (a, s') = f s -- since we destructure this, we are strict in f
writeTVar var s'
return a
The comment is incorrect, since let
-bindings pattern-match lazily. In order to make this strict, a bang pattern is needed:
let !(a, s') = f s -- since we destructure this, we are strict in f
The GHC build, which builds with -Werror
, fails after bumping to 2.5.0.1 due to:
libraries/stm/Control/Concurrent/STM/TBQueue.hs:133:15: error: [-Wincomplete-uni-patterns, -Werror=incomplete-uni-patterns]
Pattern match(es) are non-exhaustive
In a pattern binding: Patterns of type ‘[a]’ not matched: []
|
133 | let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
| ^^^^^^^^^^^^^^^^^^^
libraries/stm/Control/Concurrent/STM/TBQueue.hs:173:15: error: [-Wincomplete-uni-patterns, -Werror=incomplete-uni-patterns]
Pattern match(es) are non-exhaustive
In a pattern binding: Patterns of type ‘[a]’ not matched: []
|
173 | let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
| ^^^^^^^^^^^^^^^^^^^
According to clang
, the CPP guards in #66 rely on undefined behavior:
/private/var/lib/gitlab-runner/builds/7KQsmDei/0/ghc/ghc/libraries/stm/Control/Concurrent/STM/TArray.hs:9:5: error:
error: macro expansion producing 'defined' has undefined behavior [-Werror,-Wexpansion-to-defined]
|
9 | #if HAS_UNLIFTED_ARRAY
| ^
#if HAS_UNLIFTED_ARRAY
^
/private/var/lib/gitlab-runner/builds/7KQsmDei/0/ghc/ghc/libraries/stm/Control/Concurrent/STM/TArray.hs:7:28: error:
note: expanded from macro 'HAS_UNLIFTED_ARRAY'
|
7 | #define HAS_UNLIFTED_ARRAY defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ >= 904
| ^
#define HAS_UNLIFTED_ARRAY defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ >= 904
^
I expected TBQueue
to be backed by an array, instead of a pair of lists. This would get rid of the amortization and should cause less allocations. Have there been any benchmarks showing that the current version is faster?
It would be very handy to have a MonadFail
instance that retry
s. As a minimal motivating example, consider:
pop :: TVar [a] -> STM a
pop v = do
a:as <- readTVar v
writeTVar v as
return a
With a MonadFail
instance, this is a very convenient way of writing a transaction that waits until there's a value available in the stack.
More generally, such an instance would allow one to write a transaction as if all your favorite patterns matched, and the transaction would then efficiently block until it's so.
It would be helpful to get the current number of elements written to a TBQueue
.
lengthTBQueue :: TBQueue a -> STM Int
I looked at the source; this isn't possible in O(1)
- it's length readEnd ++ length writeEnd
.
However, if we store the initial capacity, then it's just cap - r - w
:
data TBQueue a
= TBQueue (TVar Int) -- Read capacity
(TVar [a]) -- Read end
(TVar Int) -- Write capacity
(TVar [a]) -- Write end
Int -- Initial capacity
lengthTBQueue :: TBQueue a -> STM Int
lengthTBQueue (TBQueue rsize _ wsize _ cap) = do
r <- readTVar rsize
w <- readTVar wsize
return (cap - r - w)
I wonder if something could be done to improve understanding of
Prelude Control.Concurrent.STM> atomically $ do v <- newEmptyTMVar; takeTMVar v
*** Exception: thread blocked indefinitely in an STM transaction
How can I
This is not about changing the semantics, just documentation.
A link to https://hackage.haskell.org/package/base-4.11.1.0/docs/Control-Exception-Base.html#t:BlockedIndefinitelyOnSTM would help?
The fix has been committed 5 months ago, why is it not released yet? I've just detected this bug myself and can confirm that the above commit does fix the issue.
Even if the non-threaded runtime doesn't provide tools to implement it as efficiently, it's still possible to implement registerDelay with correct semantics. That would be a lot more pleasant to work with than an exception at run time.
On a side note, that current behavior isn't even mentioned in the haddocks, which makes it even more surprising.
As per this accepted GHC proposal, GHC will add incomplete-uni-patterns
and incomplete-record-updates
to -Wall
. Could you please ensure that the following files do not give any warnings for incomplete-uni-patterns
, perhaps by setting the pragma below? (A clean compile of stm is needed to build GHC.)
{-# OPTIONS_GHC -Wno-incomplete-uni-patterns #-}
Control/Concurrent/STM/TBQueue.hs
Control/Concurrent/STM/TQueue.hs
I've managed to reproduce one issue in #76 and provide a fix for it in #77. However I'm experiencing other race conditions on 2.5.2.1, which were not present on 2.5.1.0. It's hard to reproduce though.
Any way, it means that there is at least two bugs in the reimplementation and it has no test coverage. So there may be more.
Until the new implementation is covered with exhaustive tests and is fixed I suggest to roll back and deprecate the 2.5.2.0 and 2.5.2.1 releases.
Regarding an exhaustive test model for the new implementation I've made a suggestion here: #76 (comment).
In #17 / 2221948, the type for the capacity of TBQueue
s was changed from Int
to Natural
, to avoid negative capacities. With #70, the capacity is now internally stored as an Int
(since array lengths are Int
s), so IMO it would make sense to change the public API back to Int
s. Moreover, we need to check that the capacity is greater than 0 anyway and currently we additionally have to check that the capacity is not greater than maxBound :: Word
. This would also avoid potentially having to unbox/box Natural
s.
Could this MonadSTM type class be adopted by the stm
package?
It would be useful for certain abstractions such as the "console region" defined in concurrent-output, which currently defines its own equivalent type class.
My use case involves a monad transformer atop such a thing; I want to write a type signature like:
(MonadReader Env m, MonadSTM m) => m ()
I can find an answer in the doc of stm in hackage, it seems that the memory used by TChan is released, but I am not sure. Why there are no functions such as closeTchan? just like close a socket.
stm
is a basic library that used to have a tiny dependency footprint. However, with hashable-1.4.0.0
this changed considerably. Some of the dependencies I'm less than exited about are:
base-orphans
functor-classes-compat
ghc-bignum-orphans
transformers-compat
I'm not sure for what reason hashable
has to depend on those, but it can lead to all kinds of build issues that are hard to diagnose (depending on the install plan you end up with). In my specific case, I have code that works on CI (with --enable-tests
) but fails for older compilers when installed from Hackage.
Is there a chance that we can address this, either in stm
, nats
or hashable
.
The Haddock documentation for TBQueue.writeTBQueue
and TBQueue.ungetTBQueue
both say "blocks if the queue is full". This should be "retries if the queue is full".
Documentation about newBroadcastTChan
function says that the only way to read channel is to duplicate it with dupTChan
. In my case I would prefer to use cloneTChan
instead, to get the complete content, but I am not sure if it is safe to do so?
$ git clone [email protected]:haskell/stm.git
Cloning into 'stm'...
...
warning: the following paths have collided (e.g. case-sensitive paths
on a case-insensitive filesystem) and only one from the same
colliding group is in the working tree:
'bench/ChanBench.hs'
'bench/chanbench.hs'
Any Linux-capable maintainer, please fix this to include contributors from other cultural contexts.
ATTN: @bgamari
It acts weirdly. Causes random halts and etc.
One issue that is easily reproducible is that the following test fails:
describe "TBQueue" do
describe "flushTBQueue" do
it "Affects the length" do
queue <- newTBQueueIO 100
length <- atomically $ do
writeTBQueue queue 1
writeTBQueue queue 2
flushTBQueue queue
lengthTBQueue queue
shouldBe length 0
Replacing the implementation with the following suboptimal one can serve as a quick fix for the time being:
flushTBQueue :: TBQueue a -> STM [a]
flushTBQueue queue =
go []
where
go !acc = do
element <- tryReadTBQueue queue
case element of
Just element -> go $ element : acc
Nothing -> return $ reverse acc
I guess the bug was introduced in #70. So @konsumlamm please take a look.
We need stricter quality assurance standards for this package. Bugs in packages as central as this can have very dire impact on the stability of the whole ecosystem and the reputation of the language. There's 13496 indirect dependencies just on Hackage, meaning that virtually any application can get affected by bugs in this package. Another issue is that people don't expect bugs from such central packages, e.g., I've lost quite some time debugging this simply because I could not believe that the bug would come from "stm".
I suggest that rewrites should be done with extreme caution and should be exhaustively covered with tests. As I see #70 came with 0 tests.
Currently, a TArray
is a boxed Array
of TVar
s. This seems kind of silly. If TArray
is used at all, the array should surely be unboxed to reduce indirection. Nowadays, that means futzing with ArrayArray#
, but we'll soon have UnliftedArray#
.
I run the following code,but it didn't print "end".
main=do
initFFmpeg
(getFrame,cleanup)<- imageReaderTime (File "bunny.mp4"):: IO (IO (Maybe (Image PixelRGB8,Double)),IO() )
queue<- atomically $ newTQueue
forkIO $ do
fra<- getFrame
atomically $ writeTQueue queue 1
putStrLn "end "
Is it because of running out of memory?
When I comment out fra<-getFrame
, It runs well.When I comment out atomically $ writeTQueue queue 1
,it also runs well.
Currently it is not possible to determine TBQueue capacity, it has to be stored outside.
Can we add sizeTBQueue
to return the max number of the elements that it can hold?
The implementation is trivial, given that it is fixed it doesn't even need to be in STM monad unless I am missing something:
sizeTBQueue :: TBQueue a -> Natural
sizeTBQueue (TBQueue _ _ _ _ size) = size
Currently there is no way to efficiently snapshot the state of TBQueue without flushing it and rewriting (and neither there is a way to efficiently create TBQueue from the list). What is required to do to a snapshot now:
snapshotTBQueue :: TBQueue a -> STM [a]
snapshotTBQueue q = do
xs <- flushTBQueue q
mapM_ (writeTBQueue q) xs
pure xs
But snapshot is a part of flushTBQueue, and if TBQueue constructor was exported it could have been implemented outside in this way:
snapshotTBQueue :: TBQueue a -> STM [a]
snapshotTBQueue (TBQueue _ read _ write _) = do
xs <- readTVar read
ys <- readTVar write
return $ if null xs && null is then [] else xs ++ reverse ys
The use case for snapshot is dumping a state of the queue(s) to the hard-drive, so it can be efficiently snapshotted before dump starts (so that the state is consistent).
Could we add snapshot to STM? Alternatively, could we export TBQueue constructor?
Control.Concurrent.STM.TSem
provides signalTSemN
to release multiple units, but no waitTSemN
to acquire multiple units. It would try to acquire all units at once (like waitQSemN
).
It appears that catchSTM
does not rollback newTVar
when it encounters an exception. In the following code snippet, the transaction is only partially rolled back.
import Control.Concurrent.STM
import Control.Exception
data Error = Error (TVar String)
instance Show Error where
show (Error t) = "Error"
instance Exception Error
act = catchSTM (do
t <- newTVar "Hello"
writeTVar t "Hi"
throwSTM $ Error t)
(\(Error t) -> readTVar t)
main = atomically act >>= putStrLn
This outputs Hello
.
This appears to violate atomicity.
Question or bug report, I'm not sure which. On Hackage, stm-2.5.0.0
has the bound base (>=4.3 && <4.14)
. But I'm using the stm
package in a project that builds with base-4.15
. How does this work? Is it because there is a more up-to-date version of stm
bundled with GHC? Can/should Hackage be updated to reflect the latest knowledge?
It's trivial to write
instance MArray TArray e IO
just like the instance for STM
. Creation and reading operations can use the IO
variants of TVar
operations, and writes can use atomically
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.