summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Data/Sensor.hs816
1 files changed, 361 insertions, 455 deletions
diff --git a/src/Data/Sensor.hs b/src/Data/Sensor.hs
index 4bbc988..fcb07a0 100644
--- a/src/Data/Sensor.hs
+++ b/src/Data/Sensor.hs
@@ -1,494 +1,400 @@
-module Data.Sensor where
-
-import Control.Applicative
+{-# LANGUAGE Arrows #-}
+{-# LANGUAGE BlockArguments #-}
+{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE DerivingStrategies #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE PartialTypeSignatures #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE NoFieldSelectors #-}
+
+module Data.Sensor
+ ( Sensor,
+ reactimateS,
+ sensor,
+ --
+ concatS,
+ withDefaultS,
+ feedbackS,
+ arrS,
+ --
+ sample,
+ )
+where
+
+import Control.Concurrent.STM.TQueue (flushTQueue)
import Control.Arrow
import Control.Category (Category (..))
-import Control.Category qualified as C
-import Control.Concurrent.STM (retry)
-import Control.Exception (AsyncException (ThreadKilled))
-import Control.Exception qualified as E
+import Control.DeepSeq
import Control.Monad
import Control.Monad.Reader
import Data.Dynamic
import Data.List
-import Data.Map (Map)
-import Data.Map qualified as M
import Data.Maybe
-import Data.Set (Set)
+import Data.MonadicStreamFunction
+import Data.MonadicStreamFunction.InternalCore
import Data.Set qualified as S
+import System.Environment
+import System.IO.Unsafe (unsafePerformIO)
+import Text.Printf
import UnliftIO
-import UnliftIO.Concurrent hiding (yield)
+import UnliftIO.Concurrent
import Prelude hiding (id, (.))
-newtype Sensor m a b = Sensor'
- { unSensor :: MSF m (Stale (Value a)) (Stale (Value b))
+newtype SensorT m a b = SensorT'
+ { unSensorT :: MSF (ReaderT Config m) (V a) (V b)
}
-mkSensor :: (Monad m) => MSF m (Value a) (Stale (Value b)) -> Sensor m a b
-mkSensor = Sensor' . ignoreStaleInput
-
-ignoreStaleInput ::
- (Monad m) =>
- MSF m (Value a) (Stale (Value b)) ->
- MSF m (Stale (Value a)) (Stale (Value b))
-ignoreStaleInput sf = feedback Nothing $ proc (i, x') -> do
- case (i, x') of
- (Stale _, Just x) -> returnA -< (Stale x, x')
- _ -> do
- x <- sf -< i.unwrap
- returnA -< (x, Just x.unwrap)
-
-instance (Monad m) => Functor (Sensor m a) where
- f `fmap` s = mkSensor (fmap f <$$> s.unSensor <<< arr Fresh)
-
-(<$$>) :: (Functor f, Functor g) => (a -> b) -> f (g a) -> f (g b)
-f <$$> x = fmap f <$> x
-
-infixl 4 <$$>
-
-instance (Monad m) => Applicative (Sensor m a) where
- pure x = mkSensor do loop Fresh
+sensorT :: (MonadIO m) => MSF (ReaderT Config m) (V a) (V b) -> SensorT m a b
+sensorT =
+ SensorT' . go Nothing
+ where
+ go b' sf = MSF $ \a -> do
+ isStale <- stale a
+ if isStale && isJust b'
+ then do
+ _ <- (.unMSF) sf a
+ pure (fromJust b', go b' sf)
+ else do
+ (b, sf') <- (.unMSF) sf a
+ pure (b, go (Just b) sf')
+
+stale :: (MonadIO m) => V a -> ReaderT Config m Bool
+stale (V gen _) = do
+ stateM <- asks (.stateM)
+ state <- atomically (readTMVar stateM)
+ pure (gen < state.currentGen)
+
+instance (MonadIO m) => Category (SensorT m) where
+ id = sensorT id
+ sf2 . sf1 = SensorT' (sf2.unSensorT . sf1.unSensorT)
+
+instance (MonadIO m) => Functor (SensorT m a) where
+ f `fmap` sf = SensorT' (fmap f `fmap` sf.unSensorT)
+
+instance (MonadIO m) => Applicative (SensorT m a) where
+ pure b = sensorT $ go Nothing
where
- loop stale =
- MSF $ \_ -> pure (stale (Value S.empty M.empty x), loop Stale)
-
+ go gen' = MSF $ \_ -> do
+ gen <- case gen' of
+ Just gen -> pure gen
+ Nothing -> do
+ stateM <- asks (.stateM)
+ state <- atomically (readTMVar stateM)
+ let gen = state.currentGen
+ pure gen
+ pure (V gen b, go (Just gen))
f <*> x =
- mkSensor
- ( liftA2
- (combineStaleValue ($))
- (f.unSensor <<< arr Fresh)
- (x.unSensor <<< arr Fresh)
- )
-
-instance (Monad m) => Category (Sensor m) where
- id = mkSensor (C.id <<< arr Fresh)
+ SensorT' $
+ (\(V gen1 f) (V gen2 x) -> V (genMax gen1 gen2) (f x))
+ <$> f.unSensorT
+ <*> x.unSensorT
- sf2 . sf1 = mkSensor (sf2.unSensor . (sf1.unSensor <<< arr Fresh))
+instance (MonadIO m) => Arrow (SensorT m) where
+ arr f = sensorT (arr (fmap f))
+ first =
+ SensorT'
+ . (\sf -> arr f >>> first sf >>> arr g)
+ . (.unSensorT)
+ where
+ f :: V (a, c) -> (V a, c)
+ f (V gen (a, c)) = (V gen a, c)
-instance (Monad m) => Arrow (Sensor m) where
- arr f = mkSensor (arr (f <$$>) <<< arr Fresh)
+ g :: (V a, c) -> V (a, c)
+ g (V gen a, c) = (V gen (a, c))
- first :: Sensor m a b -> Sensor m (a, d) (b, d)
- first sf =
- mkSensor
- ( arr (\(x, d) -> (,d) <$$> x)
- <<< first (sf.unSensor <<< arr Fresh)
- <<< arr (\x -> (fst <$> x, snd (x.unwrap)))
- )
+-- first :: a b c -> a (b, d) (c, d)
-feedbackS :: (Monad m) => c -> Sensor m (a, c) (b, c) -> Sensor m a b
-feedbackS c sf =
- mkSensor . feedback c $
- arr (\x -> (fst <$$> x, snd (x.unwrap.unwrap)))
- <<< (sf.unSensor <<< arr Fresh)
- <<< arr (\(x, d) -> (,d) <$> x)
-
-data Stale a
- = Stale {unwrap :: a}
- | Fresh {unwrap :: a}
- | Input {unwrap :: a}
- deriving (Functor, Show)
-
-instance (Monad m) => Monad (Sensor m a) where
- m >>= f =
- mkSensor $ switch (switchOnFresh (m.unSensor <<< arr Fresh)) $ \v ->
- (combineValue (const id) v.unwrap) <$$> ((f v.unwrap.unwrap).unSensor <<< arr Fresh)
-
-switchOnFresh ::
- (Monad m) =>
- MSF m a (Stale (Value b)) ->
- MSF m a (Stale (Value b), Bool)
-switchOnFresh =
- fmap
- ( \v ->
- ( v,
- case v of
- Fresh _ -> True
- _ -> False
- )
- )
+type Sensor = SensorT IO
-switch :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m a b
-switch m f = feedback Nothing $ go m f
- where
- go :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m (a, Maybe (MSF m a b)) (b, Maybe (MSF m a b))
- go m f = MSF $ \(i, k') -> do
- ((x, me), m') <- m.unMSF i
- let k = case (me, k') of
- (False, Just k) -> k
- _ -> f x
- (z, k') <- k.unMSF i
- pure ((z, (Just k')), go m' f)
-
-data Value a = Value
- { dependsOn :: Set String,
- atTick :: Map String Int,
- unwrap :: a
+data Config = Config
+ { stateM :: TMVar State,
+ logger :: TMVar (String -> ReaderT Config IO ())
}
- deriving (Show, Functor, Eq)
-combineStaleValue ::
- (a -> b -> c) ->
- Stale (Value a) ->
- Stale (Value b) ->
- Stale (Value c)
-combineStaleValue f =
- combineStale (combineValue f)
+data Journal = Journal {unJournal :: S.Set String}
-combineValue :: (a -> b -> c) -> Value a -> Value b -> Value c
-combineValue f a b =
- Value
- (S.union a.dependsOn b.dependsOn)
- (M.unionWith max a.atTick b.atTick)
- (f a.unwrap b.unwrap)
+instance Semigroup Journal where
+ (Journal xs) <> (Journal ys) = Journal (xs <> ys)
-combineStale :: (a -> b -> c) -> Stale a -> Stale b -> Stale c
-combineStale f a@(Stale _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
-combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
-combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap)
-combineStale f a b = Fresh (f a.unwrap b.unwrap)
+instance Monoid Journal where
+ mempty = Journal mempty
+ mconcat = Journal . mconcat . map (.unJournal)
-class (MonadSensor m, Show s, Typeable a) => Aggregate m s a | s -> a where
- aggregate :: s -> AggregateT s m ()
-
-newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT (AggregateE (AggregateT s m)) m a}
-
-deriving instance (Functor m) => Functor (AggregateT s m)
-
-deriving instance (Applicative m) => Applicative (AggregateT s m)
-
-deriving instance (Monad m) => Monad (AggregateT s m)
-
-deriving instance (MonadIO m) => MonadIO (AggregateT s m)
-
-instance MonadTrans (AggregateT s) where
- lift = AggregateT . lift
-
-deriving instance (MonadFail m) => MonadFail (AggregateT s m)
-
-deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m)
-
-deriving instance (Monad m) => MonadReader (AggregateE (AggregateT s m)) (AggregateT s m)
-
--- XXX aggregates should be keyed on something else than `String`
-data AggregateE m = AggregateE
- { readAggregatesM :: TMVar (S.Set String),
- currentTickM :: TMVar Int,
- localQueueT :: TChan (),
- self :: String
+data State = State
+ { threadPool :: ThreadPool,
+ inputQueue :: TQueue (),
+ currentGen :: Gen,
+ journal :: Journal
}
-yield :: (Aggregate m s a) => a -> AggregateT s m ()
-yield x = do
- SensorE {liveAggregatesM, globalQueueT} <- lift askS
- AggregateE {readAggregatesM, currentTickM, self} <- ask
- atomically do
- writeTChan globalQueueT ()
- liveAggregates <- readTMVar liveAggregatesM
- valueT <- do
- case M.lookup self liveAggregates of
- Nothing -> retry
- Just LiveAggregate {valueT} -> pure valueT
- dependsOn <- takeTMVar readAggregatesM
- putTMVar readAggregatesM S.empty
- currentTick <- takeTMVar currentTickM
- putTMVar currentTickM (currentTick + 1)
- let atTick = M.singleton self currentTick
- let v = Value (S.insert self dependsOn) atTick (toDyn x)
- writeTVar valueT (Just v)
-
-sensor :: (Aggregate m s a) => s -> Sensor m () a
-sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do
- SensorE {liveAggregatesM, startAggregateT} <- askS
- atomically do writeTQueue startAggregateT (AnyAggregate s)
- atomically do
- liveAggregates <- readTMVar liveAggregatesM
- valueT <- case M.lookup (show s) liveAggregates of
- Nothing -> retry
- Just LiveAggregate {valueT} -> pure valueT
- v <-
- readTVar valueT >>= \case
- Nothing -> retry
- Just v -> pure v
- let stale
- | (isNothing v' || (Just (const () <$> v) /= v')) = Fresh
- | otherwise = Stale
- pure (flip fromDyn (error "") <$$> stale v, Just (const () <$> v))
-
-data Restart = Restart String SomeException deriving (Show)
-
-instance Exception Restart
-
--- XXX merge `sample'` and `sample`
-sample' :: (MonadSensor m) => (Maybe c -> a -> m c) -> Sensor m () a -> m ()
-sample' f s = do
- SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- askS
- outputT <- newTVarIO Nothing
-
- _ <- forkIO $ forever do
- startAggregate =<< atomically do readTQueue startAggregateT
-
- h <-
- reactInit
- ( ( arrM $ \v -> do
- deadAggregates <- atomically do
- case v of
- Input _ -> error "Input"
- Stale _ -> do
- writeTVar outputT Nothing
- pure M.empty
- Fresh v -> do
- writeTVar outputT (Just v.unwrap)
- liveAggregates <- readTMVar liveAggregatesM
- globalTick <- takeTMVar globalTickM
- putTMVar globalTickM (globalTick + 1)
- pure
- ( M.filterWithKey
- ( \key liveAggregate ->
- not (S.member key v.dependsOn)
- && liveAggregate.spawnedAt < globalTick
- )
- liveAggregates
- )
- mapM_
- ( \liveAggregate -> do
- liftIO (killThread (liveAggregate.threadId))
- )
- deadAggregates
- )
- <<< s.unSensor
- <<< arr (Input . Value S.empty M.empty)
- )
- let loop c = do
- mask $ \restore ->
- catch
- ( restore do
- react h
- c' <-
- maybe (pure c) (fmap Just . f c)
- =<< atomically do readTVar outputT
- _ <- atomically do readTChan globalQueueT
- pure c'
- )
- ( \(Restart s e) -> do
- liftIO (print (s, e))
- pure c
- )
- >>= loop
- loop Nothing
-
-runSensorT :: (MonadIO m) => SensorT m a -> m a
-runSensorT m = do
- runReaderT m.unSensorT =<< sensorE
-
-sensorE :: (MonadIO m) => m (SensorE (t m))
-sensorE = do
- liveAggregatesM <- newTMVarIO M.empty
- startAggregateT <- newTQueueIO
- globalQueueT <- newTChanIO
- globalTickM <- newTMVarIO 0
- mainThread <- myThreadId
- pure SensorE {..}
-
-sample :: (MonadSensor m) => Int -> Sensor m () a -> m [a]
-sample n s = do
- SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- askS
- outputsT <- newTVarIO []
-
- _ <- forkIO $ forever do
- startAggregate =<< atomically do readTQueue startAggregateT
-
- h <-
- reactInit
- ( ( arrM $ \v -> do
- case v of
- Input _ -> error "Input"
- Stale _ -> pure ()
- Fresh v -> do
- deadAggregates <- atomically do
- writeTVar outputsT . (v.unwrap :) =<< readTVar outputsT
- liveAggregates <- readTMVar liveAggregatesM
- globalTick <- takeTMVar globalTickM
- putTMVar globalTickM (globalTick + 1)
- pure
- ( M.filterWithKey
- ( \key liveAggregate ->
- not (S.member key v.dependsOn)
- && liveAggregate.spawnedAt < globalTick
- )
- liveAggregates
- )
- mapM_ (liftIO . killThread . (.threadId)) deadAggregates
- )
- <<< s.unSensor
- <<< arr (Input . Value S.empty M.empty)
- )
- let loop = do
- react h
- xs <- atomically do readTVar outputsT
- if length xs >= n
- then pure (reverse xs)
- else do
- _ <- atomically do readTChan globalQueueT
- loop
- loop
-
-newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE (SensorT m)) m a}
-
-deriving instance (Functor m) => Functor (SensorT m)
-
-deriving instance (Applicative m) => Applicative (SensorT m)
-
-deriving instance (Monad m) => Monad (SensorT m)
-
-deriving instance (MonadIO m) => MonadIO (SensorT m)
-
-instance MonadTrans SensorT where
- lift = SensorT . lift
-
-deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m)
-
-deriving instance (MonadFail m) => MonadFail (SensorT m)
-
-deriving instance (Monad m) => MonadReader (SensorE (SensorT m)) (SensorT m)
-
-data SensorE m = SensorE
- { liveAggregatesM :: TMVar (Map String (LiveAggregate m Dynamic)),
- startAggregateT :: TQueue (AnyAggregate m),
- globalQueueT :: TChan (),
- globalTickM :: TMVar Int,
- mainThread :: ThreadId
+data ThreadPool = ThreadPool
+ { threads :: [Thread]
}
-class (MonadUnliftIO m) => MonadSensor m where
- askS :: m (SensorE m)
-
-instance (MonadUnliftIO m) => MonadSensor (SensorT m) where
- askS = ask
-
-data AnyAggregate m where
- AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m
-
-data LiveAggregate m a = LiveAggregate
- { valueT :: TVar (Maybe (Value a)),
+data Thread = Thread
+ { id :: String,
threadId :: ThreadId,
- spawnedAt :: Int
+ valueT :: TVar (Maybe (V Dynamic))
}
-startAggregate :: (MonadSensor m) => AnyAggregate m -> m ()
-startAggregate (AnyAggregate s) = do
- let self = show s
- SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- askS
- liveAggregates <- atomically do takeTMVar liveAggregatesM
- if M.member self liveAggregates
- then atomically do putTMVar liveAggregatesM liveAggregates
- else do
- readAggregatesM <- newTMVarIO S.empty
- currentTickM <- newTMVarIO 0
- localQueueT <- atomically do dupTChan globalQueueT
-
- threadId <-
- forkFinally
- (runReaderT (aggregate s).unAggregateT AggregateE {..})
- ( either
- ( \e ->
- if
- | fromException e == Just ThreadKilled -> do
- dropAggregate self
- | otherwise -> do
- dropAggregate self
- liftIO (E.throwTo mainThread (Restart self e))
- )
- pure
+newtype Gen = Gen Int deriving (Eq, Ord, Show)
+
+genInc :: Gen -> Gen
+genInc (Gen n) = Gen (n + 1)
+
+genMax :: Gen -> Gen -> Gen
+genMax (Gen n) (Gen m) = Gen (max n m)
+
+data V a = V {gen :: Gen, unV :: a} deriving (Show, Functor)
+
+{-
+withDefaultS :: (Monad m) => b -> MSF m a b -> MSF m (Maybe a) b
+withDefaultS def sf = do
+ msf $ \case
+ V gen Nothing -> pure (V gen def, withDefault def sf)
+ V gen (Just x) -> do
+ (r, sf') <- (.unMSF) sf (V gen x)
+ pure (r, withDefault def sf')-}
+
+sample :: Show a => Int -> SensorT IO () a -> IO [a]
+sample n sf = do
+ qT <- newTQueueIO
+ tId <- forkIO $ reactimateS (sf >>> arrS (atomically . writeTQueue qT))
+ result <- atomically do
+ xs <- flushTQueue qT
+ checkSTM (length xs >= n)
+ pure (take n xs)
+ killThread tId
+ pure result
+
+reactimateS :: (Show b, NFData b) => SensorT IO () b -> IO ()
+reactimateS sf = do
+ let threadPool = ThreadPool []
+ inputQueue <- liftIO newTQueueIO
+ let currentGen = Gen 0
+ logger <- newTMVarIO (liftIO . putStrLn)
+ let journal = Journal S.empty
+ stateM <- newTMVarIO State {..}
+ catchSyncOrAsync
+ ( do
+ let incCurrentGen = SensorT' $ arrM $ \x -> do
+ bracketOnError
+ (atomically (takeTMVar stateM))
+ (atomically . putTMVar stateM)
+ ( \state -> do
+ let state' = state {currentGen = genInc state.currentGen}
+ atomically (putTMVar stateM state')
+ )
+ pure x
+ inputs = SensorT' $ constM $ do
+ state <- atomically (readTMVar stateM)
+ let gen = state.currentGen
+ log_ (printf "[gen] %s" (show gen))
+ atomically do
+ V gen <$> case state.threadPool.threads of
+ [] -> pure ()
+ _ -> readTQueue state.inputQueue
+ reactimateS'
+ (Config {..})
+ ( incCurrentGen
+ >>> inputs
+ >>> sf
+ >>> ( SensorT' $ arrM $ \(V gen x) -> do
+ _ <- evaluate x
+ deepseq x `seq` log_ . printf "[output] %s" . show $ x
+ pure (V gen ())
+ )
)
- valueT <- newTVarIO Nothing
- atomically do
- spawnedAt <- readTMVar globalTickM
- putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates)
+ )
+ ( \(e :: SomeException) -> do
+ bracketOnError
+ (atomically (takeTMVar stateM))
+ (atomically . putTMVar stateM)
+ ( \state -> do
+ (mapM_ (\thread -> do killThread thread.threadId))
+ state.threadPool.threads
+ let state' = state {threadPool = state.threadPool {threads = []}}
+ atomically (putTMVar stateM state')
+ )
+ throwIO e
+ )
-dropAggregate :: (MonadSensor m) => String -> m ()
-dropAggregate self = do
- SensorE {liveAggregatesM} <- askS
- atomically do
- liveAggregates <- takeTMVar liveAggregatesM
- putTMVar liveAggregatesM (M.delete self liveAggregates)
+reactimateS' :: Config -> SensorT IO () () -> IO ()
+reactimateS' r sf = do
+ (_, sf') <-
+ runReaderT
+ ( do
+ stateM <- asks (.stateM)
+ state <- atomically (readTMVar stateM)
+ let gen = state.currentGen
+ sf.unSensorT.unMSF (V gen ())
+ )
+ r
+ runReaderT prune r
+ reactimateS' r (SensorT' sf')
+
+prune :: ReaderT Config IO ()
+prune = do
+ stateM <- asks (.stateM)
+ bracketOnError
+ (atomically (takeTMVar stateM))
+ (atomically . putTMVar stateM)
+ ( \state -> do
+ let Journal liveThreadIds = state.journal
+ mapM_
+ (\thread -> killThread thread.threadId)
+ (filter (\thread -> not (S.member thread.id liveThreadIds)) state.threadPool.threads)
+ atomically (putTMVar stateM state {journal = Journal S.empty})
+ )
--- XXX we should depend on `dunai`
-data MSF m a b = MSF {unMSF :: a -> m (b, MSF m a b)}
+log_ :: String -> ReaderT Config IO ()
+log_ s = when env_SENSOR_DEBUG do
+ logger <- asks (.logger)
+ bracket
+ (atomically (takeTMVar logger))
+ (atomically . putTMVar logger)
+ (\putStrLn -> putStrLn s)
+
+env_SENSOR_DEBUG :: Bool
+env_SENSOR_DEBUG =
+ unsafePerformIO (isJust <$> lookupEnv "SENSOR_DEBUG")
+{-# NOINLINE env_SENSOR_DEBUG #-}
+
+sensor ::
+ (Typeable b, Show b) =>
+ (a -> String) ->
+ ( a ->
+ (b -> ReaderT Config IO ()) ->
+ ReaderT Config IO ()
+ ) ->
+ SensorT IO a b
+sensor mkId mkF = do
+ SensorT' $ arrM $ \(V _ input) -> do
+ let id = mkId input
+ f = mkF input
+ stateM <- asks (.stateM)
+ state <- atomically (readTMVar stateM)
+ valueT <- do
+ case find (\thread -> thread.id == id) state.threadPool.threads of
+ Nothing -> spawn id f
+ Just thread -> pure thread.valueT
+ value <- atomically do
+ maybeValue <- readTVar valueT
+ case maybeValue of
+ Nothing -> retrySTM
+ Just value -> pure (flip fromDyn (error "fromDyn") <$> value)
+ atomically do
+ state <- takeTMVar stateM
+ let state' = state {journal = Journal (S.singleton id) <> state.journal}
+ putTMVar stateM state'
+ pure value
+ where
+ spawn ::
+ (Typeable a, Show a) =>
+ String ->
+ ( (a -> ReaderT Config IO ()) ->
+ ReaderT Config IO ()
+ ) ->
+ (ReaderT Config IO (TVar (Maybe (V Dynamic))))
+ spawn id f = do
+ stateM <- asks (.stateM)
+ bracketOnError
+ (atomically (takeTMVar stateM))
+ (atomically . putTMVar stateM)
+ ( \state -> do
+ log_ (printf "[spawn] %s" id)
+ valueT <- newTVarIO Nothing
+ threadId <-
+ forkFinally
+ (f (yield id valueT))
+ ( \case
+ (Left (e :: SomeException)) -> do
+ log_ (printf "[error] %s %s" id (show e))
+ despawn id
+ Right _ -> pure ()
+ )
+ let state' =
+ state
+ { threadPool =
+ state.threadPool
+ { threads = (Thread {..}) : state.threadPool.threads
+ }
+ }
+ atomically (putTMVar stateM state')
+ pure valueT
+ )
-instance (Monad m) => Category (MSF m) where
- id = go
- where
- go = MSF $ \a -> return (a, go)
+ yield ::
+ (Typeable a, Show a) =>
+ String ->
+ TVar (Maybe (V Dynamic)) ->
+ a ->
+ ReaderT Config IO ()
+ yield id valueT value = do
+ stateM <- asks (.stateM)
+ state <- atomically (readTMVar stateM)
+ let gen = state.currentGen
+ log_ (printf "[yield] %s %s" id (show (V gen value)))
+ atomically do
+ value' <- readTVar valueT
+ writeTVar valueT (Just (toDyn <$> V gen value))
+ when (isJust value') do
+ writeTQueue state.inputQueue ()
+
+ despawn :: String -> ReaderT Config IO ()
+ despawn id = do
+ stateM <- asks (.stateM)
+ bracketOnError
+ (atomically $ takeTMVar stateM)
+ (atomically . putTMVar stateM)
+ ( \state -> do
+ log_ (printf "[despawn] %s" id)
+ atomically . putTMVar stateM $
+ state
+ { threadPool =
+ state.threadPool
+ { threads =
+ filter
+ (\thread -> thread.id /= id)
+ state.threadPool.threads
+ }
+ }
+ )
- sf2 . sf1 = MSF $ \a -> do
- (b, sf1') <- unMSF sf1 a
- (c, sf2') <- unMSF sf2 b
- let sf' = sf2' . sf1'
- c `seq` return (c, sf')
+concatS :: (Monad m) => [SensorT m a b] -> SensorT m a [b]
+concatS = SensorT' . fmap f . go . map (.unSensorT)
+ where
+ go :: (Monad m) => [MSF m a b] -> MSF m a [b]
+ go sfs = MSF $ \a -> do
+ (bs, sfs') <- unzip <$> mapM (\sf -> (.unMSF) sf a) sfs
+ pure (bs, go sfs')
+ f vs = V (maximum (map (.gen) vs)) (map (.unV) vs)
+
+withDefaultS :: (Monad m) => b -> SensorT m a b -> SensorT m (Maybe a) b
+withDefaultS def = SensorT' . go . (.unSensorT)
+ where
+ go sf = MSF $ \case
+ V gen (Just a) -> do
+ (b, sf') <- sf.unMSF (V gen a)
+ pure (b, go sf')
+ V gen Nothing ->
+ pure (V gen def, go sf)
+
+feedbackS :: (Monad m) => c -> SensorT m (a, c) (b, c) -> SensorT m a b
+feedbackS c =
+ SensorT'
+ . (\sf -> feedback c (arr g >>> sf >>> arr f))
+ . (.unSensorT)
+ where
+ f :: V (a, c) -> (V a, c)
+ f (V gen (a, c)) = (V gen a, c)
-instance (Monad m) => Arrow (MSF m) where
- arr f = arrM (return . f)
+ g :: (V a, c) -> V (a, c)
+ g (V gen a, c) = (V gen (a, c))
- first =
- morphGS $ \f (a, c) -> do
- (b, msf') <- f a
- return ((b, c), msf')
-
-instance (Monad m) => Functor (MSF m a) where
- fmap f msf = msf >>> arr f
-
-instance (Functor m, Monad m) => Applicative (MSF m a) where
- pure = arr . const
-
- fs <*> bs = (fs &&& bs) >>> arr (uncurry ($))
-
-morphGS ::
- (Monad m2) =>
- (forall c. (a1 -> m1 (b1, c)) -> (a2 -> m2 (b2, c))) ->
- MSF m1 a1 b1 ->
- MSF m2 a2 b2
-morphGS morph msf = MSF $ \a2 -> do
- (b2, msf') <- morph (unMSF msf) a2
- return (b2, morphGS morph msf')
-
-feedback :: (Monad m) => c -> MSF m (a, c) (b, c) -> MSF m a b
-feedback c sf = MSF $ \a -> do
- ((b', c'), sf') <- unMSF sf (a, c)
- return (b', feedback c' sf')
-
-embed :: (Monad m) => MSF m a b -> [a] -> m [b]
-embed _ [] = return []
-embed sf (a : as) = do
- (b, sf') <- unMSF sf a
- bs <- embed sf' as
- return (b : bs)
-
-reactimate :: (Monad m) => MSF m () () -> m ()
-reactimate sf = do
- (_, sf') <- unMSF sf ()
- reactimate sf'
-
-arrM :: (Monad m) => (a -> m b) -> MSF m a b
-arrM f =
- morphGS (\i a -> i a >>= \(_, c) -> f a >>= \b -> return (b, c)) C.id
-
-type ReactHandle m = IORef (MSF m () ())
-
-reactInit :: (MonadIO m) => MSF m () () -> m (ReactHandle m)
-reactInit = liftIO . newIORef
-
-react :: (MonadIO m) => ReactHandle m -> m ()
-react handle = do
- msf <- liftIO $ readIORef handle
- (_, msf') <- unMSF msf ()
- liftIO $ writeIORef handle msf'
-
-instance (Monad m) => ArrowChoice (MSF m) where
- left :: MSF m a b -> MSF m (Either a c) (Either b c)
- left sf = MSF f
- where
- f (Left a) = do
- (b, sf') <- unMSF sf a
- return (Left b, left sf')
- f (Right c) = return (Right c, left sf)
+arrS :: (MonadIO m) => (a -> m b) -> SensorT m a b
+arrS f = sensorT $ go
+ where
+ go = MSF $ \(V gen a) ->
+ lift (((,go) . V gen) <$> f a)