diff options
| -rw-r--r-- | nix/sources.json | 8 | ||||
| -rw-r--r-- | sensors.cabal | 2 | ||||
| -rw-r--r-- | src/Data/Sensor.hs | 816 | ||||
| -rw-r--r-- | test/AggregateSpec.hs | 78 | ||||
| -rw-r--r-- | test/PureSpec.hs | 2 |
5 files changed, 399 insertions, 507 deletions
diff --git a/nix/sources.json b/nix/sources.json index e36b267..d613746 100644 --- a/nix/sources.json +++ b/nix/sources.json @@ -1,14 +1,14 @@ { "nixpkgs": { - "branch": "release-24.05", + "branch": "release-25.05", "description": "Nix Packages collection", "homepage": null, "owner": "NixOS", "repo": "nixpkgs", - "rev": "36097d5a3f3ce6334a35717edeb90dba184d7081", - "sha256": "0ygi382h7sslzpzpcacbi9pdplldr5mqn5qq97lcwlxs9ksk1z64", + "rev": "682aaef638791a25f65c5979022bb49bc7fb67b6", + "sha256": "0ld0fagf748pck4d8mhskb3ziypi3d8dm2a9vf48586990m1h2y3", "type": "tarball", - "url": "https://github.com/NixOS/nixpkgs/archive/36097d5a3f3ce6334a35717edeb90dba184d7081.tar.gz", + "url": "https://github.com/NixOS/nixpkgs/archive/682aaef638791a25f65c5979022bb49bc7fb67b6.tar.gz", "url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz" } } diff --git a/sensors.cabal b/sensors.cabal index 844aae2..e86fa02 100644 --- a/sensors.cabal +++ b/sensors.cabal @@ -21,6 +21,8 @@ library build-depends: base, containers, + deepseq, + dunai, hspec, mtl, stm, diff --git a/src/Data/Sensor.hs b/src/Data/Sensor.hs index 4bbc988..fcb07a0 100644 --- a/src/Data/Sensor.hs +++ b/src/Data/Sensor.hs @@ -1,494 +1,400 @@ -module Data.Sensor where - -import Control.Applicative +{-# LANGUAGE Arrows #-} +{-# LANGUAGE BlockArguments #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE PartialTypeSignatures #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE NoFieldSelectors #-} + +module Data.Sensor + ( Sensor, + reactimateS, + sensor, + -- + concatS, + withDefaultS, + feedbackS, + arrS, + -- + sample, + ) +where + +import Control.Concurrent.STM.TQueue (flushTQueue) import Control.Arrow import Control.Category (Category (..)) -import Control.Category qualified as C -import Control.Concurrent.STM (retry) -import Control.Exception (AsyncException (ThreadKilled)) -import Control.Exception qualified as E +import Control.DeepSeq import Control.Monad import Control.Monad.Reader import Data.Dynamic import Data.List -import Data.Map (Map) -import Data.Map qualified as M import Data.Maybe -import Data.Set (Set) +import Data.MonadicStreamFunction +import Data.MonadicStreamFunction.InternalCore import Data.Set qualified as S +import System.Environment +import System.IO.Unsafe (unsafePerformIO) +import Text.Printf import UnliftIO -import UnliftIO.Concurrent hiding (yield) +import UnliftIO.Concurrent import Prelude hiding (id, (.)) -newtype Sensor m a b = Sensor' - { unSensor :: MSF m (Stale (Value a)) (Stale (Value b)) +newtype SensorT m a b = SensorT' + { unSensorT :: MSF (ReaderT Config m) (V a) (V b) } -mkSensor :: (Monad m) => MSF m (Value a) (Stale (Value b)) -> Sensor m a b -mkSensor = Sensor' . ignoreStaleInput - -ignoreStaleInput :: - (Monad m) => - MSF m (Value a) (Stale (Value b)) -> - MSF m (Stale (Value a)) (Stale (Value b)) -ignoreStaleInput sf = feedback Nothing $ proc (i, x') -> do - case (i, x') of - (Stale _, Just x) -> returnA -< (Stale x, x') - _ -> do - x <- sf -< i.unwrap - returnA -< (x, Just x.unwrap) - -instance (Monad m) => Functor (Sensor m a) where - f `fmap` s = mkSensor (fmap f <$$> s.unSensor <<< arr Fresh) - -(<$$>) :: (Functor f, Functor g) => (a -> b) -> f (g a) -> f (g b) -f <$$> x = fmap f <$> x - -infixl 4 <$$> - -instance (Monad m) => Applicative (Sensor m a) where - pure x = mkSensor do loop Fresh +sensorT :: (MonadIO m) => MSF (ReaderT Config m) (V a) (V b) -> SensorT m a b +sensorT = + SensorT' . go Nothing + where + go b' sf = MSF $ \a -> do + isStale <- stale a + if isStale && isJust b' + then do + _ <- (.unMSF) sf a + pure (fromJust b', go b' sf) + else do + (b, sf') <- (.unMSF) sf a + pure (b, go (Just b) sf') + +stale :: (MonadIO m) => V a -> ReaderT Config m Bool +stale (V gen _) = do + stateM <- asks (.stateM) + state <- atomically (readTMVar stateM) + pure (gen < state.currentGen) + +instance (MonadIO m) => Category (SensorT m) where + id = sensorT id + sf2 . sf1 = SensorT' (sf2.unSensorT . sf1.unSensorT) + +instance (MonadIO m) => Functor (SensorT m a) where + f `fmap` sf = SensorT' (fmap f `fmap` sf.unSensorT) + +instance (MonadIO m) => Applicative (SensorT m a) where + pure b = sensorT $ go Nothing where - loop stale = - MSF $ \_ -> pure (stale (Value S.empty M.empty x), loop Stale) - + go gen' = MSF $ \_ -> do + gen <- case gen' of + Just gen -> pure gen + Nothing -> do + stateM <- asks (.stateM) + state <- atomically (readTMVar stateM) + let gen = state.currentGen + pure gen + pure (V gen b, go (Just gen)) f <*> x = - mkSensor - ( liftA2 - (combineStaleValue ($)) - (f.unSensor <<< arr Fresh) - (x.unSensor <<< arr Fresh) - ) - -instance (Monad m) => Category (Sensor m) where - id = mkSensor (C.id <<< arr Fresh) + SensorT' $ + (\(V gen1 f) (V gen2 x) -> V (genMax gen1 gen2) (f x)) + <$> f.unSensorT + <*> x.unSensorT - sf2 . sf1 = mkSensor (sf2.unSensor . (sf1.unSensor <<< arr Fresh)) +instance (MonadIO m) => Arrow (SensorT m) where + arr f = sensorT (arr (fmap f)) + first = + SensorT' + . (\sf -> arr f >>> first sf >>> arr g) + . (.unSensorT) + where + f :: V (a, c) -> (V a, c) + f (V gen (a, c)) = (V gen a, c) -instance (Monad m) => Arrow (Sensor m) where - arr f = mkSensor (arr (f <$$>) <<< arr Fresh) + g :: (V a, c) -> V (a, c) + g (V gen a, c) = (V gen (a, c)) - first :: Sensor m a b -> Sensor m (a, d) (b, d) - first sf = - mkSensor - ( arr (\(x, d) -> (,d) <$$> x) - <<< first (sf.unSensor <<< arr Fresh) - <<< arr (\x -> (fst <$> x, snd (x.unwrap))) - ) +-- first :: a b c -> a (b, d) (c, d) -feedbackS :: (Monad m) => c -> Sensor m (a, c) (b, c) -> Sensor m a b -feedbackS c sf = - mkSensor . feedback c $ - arr (\x -> (fst <$$> x, snd (x.unwrap.unwrap))) - <<< (sf.unSensor <<< arr Fresh) - <<< arr (\(x, d) -> (,d) <$> x) - -data Stale a - = Stale {unwrap :: a} - | Fresh {unwrap :: a} - | Input {unwrap :: a} - deriving (Functor, Show) - -instance (Monad m) => Monad (Sensor m a) where - m >>= f = - mkSensor $ switch (switchOnFresh (m.unSensor <<< arr Fresh)) $ \v -> - (combineValue (const id) v.unwrap) <$$> ((f v.unwrap.unwrap).unSensor <<< arr Fresh) - -switchOnFresh :: - (Monad m) => - MSF m a (Stale (Value b)) -> - MSF m a (Stale (Value b), Bool) -switchOnFresh = - fmap - ( \v -> - ( v, - case v of - Fresh _ -> True - _ -> False - ) - ) +type Sensor = SensorT IO -switch :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m a b -switch m f = feedback Nothing $ go m f - where - go :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m (a, Maybe (MSF m a b)) (b, Maybe (MSF m a b)) - go m f = MSF $ \(i, k') -> do - ((x, me), m') <- m.unMSF i - let k = case (me, k') of - (False, Just k) -> k - _ -> f x - (z, k') <- k.unMSF i - pure ((z, (Just k')), go m' f) - -data Value a = Value - { dependsOn :: Set String, - atTick :: Map String Int, - unwrap :: a +data Config = Config + { stateM :: TMVar State, + logger :: TMVar (String -> ReaderT Config IO ()) } - deriving (Show, Functor, Eq) -combineStaleValue :: - (a -> b -> c) -> - Stale (Value a) -> - Stale (Value b) -> - Stale (Value c) -combineStaleValue f = - combineStale (combineValue f) +data Journal = Journal {unJournal :: S.Set String} -combineValue :: (a -> b -> c) -> Value a -> Value b -> Value c -combineValue f a b = - Value - (S.union a.dependsOn b.dependsOn) - (M.unionWith max a.atTick b.atTick) - (f a.unwrap b.unwrap) +instance Semigroup Journal where + (Journal xs) <> (Journal ys) = Journal (xs <> ys) -combineStale :: (a -> b -> c) -> Stale a -> Stale b -> Stale c -combineStale f a@(Stale _) b@(Stale _) = Stale (f a.unwrap b.unwrap) -combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap) -combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap) -combineStale f a b = Fresh (f a.unwrap b.unwrap) +instance Monoid Journal where + mempty = Journal mempty + mconcat = Journal . mconcat . map (.unJournal) -class (MonadSensor m, Show s, Typeable a) => Aggregate m s a | s -> a where - aggregate :: s -> AggregateT s m () - -newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT (AggregateE (AggregateT s m)) m a} - -deriving instance (Functor m) => Functor (AggregateT s m) - -deriving instance (Applicative m) => Applicative (AggregateT s m) - -deriving instance (Monad m) => Monad (AggregateT s m) - -deriving instance (MonadIO m) => MonadIO (AggregateT s m) - -instance MonadTrans (AggregateT s) where - lift = AggregateT . lift - -deriving instance (MonadFail m) => MonadFail (AggregateT s m) - -deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m) - -deriving instance (Monad m) => MonadReader (AggregateE (AggregateT s m)) (AggregateT s m) - --- XXX aggregates should be keyed on something else than `String` -data AggregateE m = AggregateE - { readAggregatesM :: TMVar (S.Set String), - currentTickM :: TMVar Int, - localQueueT :: TChan (), - self :: String +data State = State + { threadPool :: ThreadPool, + inputQueue :: TQueue (), + currentGen :: Gen, + journal :: Journal } -yield :: (Aggregate m s a) => a -> AggregateT s m () -yield x = do - SensorE {liveAggregatesM, globalQueueT} <- lift askS - AggregateE {readAggregatesM, currentTickM, self} <- ask - atomically do - writeTChan globalQueueT () - liveAggregates <- readTMVar liveAggregatesM - valueT <- do - case M.lookup self liveAggregates of - Nothing -> retry - Just LiveAggregate {valueT} -> pure valueT - dependsOn <- takeTMVar readAggregatesM - putTMVar readAggregatesM S.empty - currentTick <- takeTMVar currentTickM - putTMVar currentTickM (currentTick + 1) - let atTick = M.singleton self currentTick - let v = Value (S.insert self dependsOn) atTick (toDyn x) - writeTVar valueT (Just v) - -sensor :: (Aggregate m s a) => s -> Sensor m () a -sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do - SensorE {liveAggregatesM, startAggregateT} <- askS - atomically do writeTQueue startAggregateT (AnyAggregate s) - atomically do - liveAggregates <- readTMVar liveAggregatesM - valueT <- case M.lookup (show s) liveAggregates of - Nothing -> retry - Just LiveAggregate {valueT} -> pure valueT - v <- - readTVar valueT >>= \case - Nothing -> retry - Just v -> pure v - let stale - | (isNothing v' || (Just (const () <$> v) /= v')) = Fresh - | otherwise = Stale - pure (flip fromDyn (error "") <$$> stale v, Just (const () <$> v)) - -data Restart = Restart String SomeException deriving (Show) - -instance Exception Restart - --- XXX merge `sample'` and `sample` -sample' :: (MonadSensor m) => (Maybe c -> a -> m c) -> Sensor m () a -> m () -sample' f s = do - SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- askS - outputT <- newTVarIO Nothing - - _ <- forkIO $ forever do - startAggregate =<< atomically do readTQueue startAggregateT - - h <- - reactInit - ( ( arrM $ \v -> do - deadAggregates <- atomically do - case v of - Input _ -> error "Input" - Stale _ -> do - writeTVar outputT Nothing - pure M.empty - Fresh v -> do - writeTVar outputT (Just v.unwrap) - liveAggregates <- readTMVar liveAggregatesM - globalTick <- takeTMVar globalTickM - putTMVar globalTickM (globalTick + 1) - pure - ( M.filterWithKey - ( \key liveAggregate -> - not (S.member key v.dependsOn) - && liveAggregate.spawnedAt < globalTick - ) - liveAggregates - ) - mapM_ - ( \liveAggregate -> do - liftIO (killThread (liveAggregate.threadId)) - ) - deadAggregates - ) - <<< s.unSensor - <<< arr (Input . Value S.empty M.empty) - ) - let loop c = do - mask $ \restore -> - catch - ( restore do - react h - c' <- - maybe (pure c) (fmap Just . f c) - =<< atomically do readTVar outputT - _ <- atomically do readTChan globalQueueT - pure c' - ) - ( \(Restart s e) -> do - liftIO (print (s, e)) - pure c - ) - >>= loop - loop Nothing - -runSensorT :: (MonadIO m) => SensorT m a -> m a -runSensorT m = do - runReaderT m.unSensorT =<< sensorE - -sensorE :: (MonadIO m) => m (SensorE (t m)) -sensorE = do - liveAggregatesM <- newTMVarIO M.empty - startAggregateT <- newTQueueIO - globalQueueT <- newTChanIO - globalTickM <- newTMVarIO 0 - mainThread <- myThreadId - pure SensorE {..} - -sample :: (MonadSensor m) => Int -> Sensor m () a -> m [a] -sample n s = do - SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- askS - outputsT <- newTVarIO [] - - _ <- forkIO $ forever do - startAggregate =<< atomically do readTQueue startAggregateT - - h <- - reactInit - ( ( arrM $ \v -> do - case v of - Input _ -> error "Input" - Stale _ -> pure () - Fresh v -> do - deadAggregates <- atomically do - writeTVar outputsT . (v.unwrap :) =<< readTVar outputsT - liveAggregates <- readTMVar liveAggregatesM - globalTick <- takeTMVar globalTickM - putTMVar globalTickM (globalTick + 1) - pure - ( M.filterWithKey - ( \key liveAggregate -> - not (S.member key v.dependsOn) - && liveAggregate.spawnedAt < globalTick - ) - liveAggregates - ) - mapM_ (liftIO . killThread . (.threadId)) deadAggregates - ) - <<< s.unSensor - <<< arr (Input . Value S.empty M.empty) - ) - let loop = do - react h - xs <- atomically do readTVar outputsT - if length xs >= n - then pure (reverse xs) - else do - _ <- atomically do readTChan globalQueueT - loop - loop - -newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE (SensorT m)) m a} - -deriving instance (Functor m) => Functor (SensorT m) - -deriving instance (Applicative m) => Applicative (SensorT m) - -deriving instance (Monad m) => Monad (SensorT m) - -deriving instance (MonadIO m) => MonadIO (SensorT m) - -instance MonadTrans SensorT where - lift = SensorT . lift - -deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m) - -deriving instance (MonadFail m) => MonadFail (SensorT m) - -deriving instance (Monad m) => MonadReader (SensorE (SensorT m)) (SensorT m) - -data SensorE m = SensorE - { liveAggregatesM :: TMVar (Map String (LiveAggregate m Dynamic)), - startAggregateT :: TQueue (AnyAggregate m), - globalQueueT :: TChan (), - globalTickM :: TMVar Int, - mainThread :: ThreadId +data ThreadPool = ThreadPool + { threads :: [Thread] } -class (MonadUnliftIO m) => MonadSensor m where - askS :: m (SensorE m) - -instance (MonadUnliftIO m) => MonadSensor (SensorT m) where - askS = ask - -data AnyAggregate m where - AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m - -data LiveAggregate m a = LiveAggregate - { valueT :: TVar (Maybe (Value a)), +data Thread = Thread + { id :: String, threadId :: ThreadId, - spawnedAt :: Int + valueT :: TVar (Maybe (V Dynamic)) } -startAggregate :: (MonadSensor m) => AnyAggregate m -> m () -startAggregate (AnyAggregate s) = do - let self = show s - SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- askS - liveAggregates <- atomically do takeTMVar liveAggregatesM - if M.member self liveAggregates - then atomically do putTMVar liveAggregatesM liveAggregates - else do - readAggregatesM <- newTMVarIO S.empty - currentTickM <- newTMVarIO 0 - localQueueT <- atomically do dupTChan globalQueueT - - threadId <- - forkFinally - (runReaderT (aggregate s).unAggregateT AggregateE {..}) - ( either - ( \e -> - if - | fromException e == Just ThreadKilled -> do - dropAggregate self - | otherwise -> do - dropAggregate self - liftIO (E.throwTo mainThread (Restart self e)) - ) - pure +newtype Gen = Gen Int deriving (Eq, Ord, Show) + +genInc :: Gen -> Gen +genInc (Gen n) = Gen (n + 1) + +genMax :: Gen -> Gen -> Gen +genMax (Gen n) (Gen m) = Gen (max n m) + +data V a = V {gen :: Gen, unV :: a} deriving (Show, Functor) + +{- +withDefaultS :: (Monad m) => b -> MSF m a b -> MSF m (Maybe a) b +withDefaultS def sf = do + msf $ \case + V gen Nothing -> pure (V gen def, withDefault def sf) + V gen (Just x) -> do + (r, sf') <- (.unMSF) sf (V gen x) + pure (r, withDefault def sf')-} + +sample :: Show a => Int -> SensorT IO () a -> IO [a] +sample n sf = do + qT <- newTQueueIO + tId <- forkIO $ reactimateS (sf >>> arrS (atomically . writeTQueue qT)) + result <- atomically do + xs <- flushTQueue qT + checkSTM (length xs >= n) + pure (take n xs) + killThread tId + pure result + +reactimateS :: (Show b, NFData b) => SensorT IO () b -> IO () +reactimateS sf = do + let threadPool = ThreadPool [] + inputQueue <- liftIO newTQueueIO + let currentGen = Gen 0 + logger <- newTMVarIO (liftIO . putStrLn) + let journal = Journal S.empty + stateM <- newTMVarIO State {..} + catchSyncOrAsync + ( do + let incCurrentGen = SensorT' $ arrM $ \x -> do + bracketOnError + (atomically (takeTMVar stateM)) + (atomically . putTMVar stateM) + ( \state -> do + let state' = state {currentGen = genInc state.currentGen} + atomically (putTMVar stateM state') + ) + pure x + inputs = SensorT' $ constM $ do + state <- atomically (readTMVar stateM) + let gen = state.currentGen + log_ (printf "[gen] %s" (show gen)) + atomically do + V gen <$> case state.threadPool.threads of + [] -> pure () + _ -> readTQueue state.inputQueue + reactimateS' + (Config {..}) + ( incCurrentGen + >>> inputs + >>> sf + >>> ( SensorT' $ arrM $ \(V gen x) -> do + _ <- evaluate x + deepseq x `seq` log_ . printf "[output] %s" . show $ x + pure (V gen ()) + ) ) - valueT <- newTVarIO Nothing - atomically do - spawnedAt <- readTMVar globalTickM - putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates) + ) + ( \(e :: SomeException) -> do + bracketOnError + (atomically (takeTMVar stateM)) + (atomically . putTMVar stateM) + ( \state -> do + (mapM_ (\thread -> do killThread thread.threadId)) + state.threadPool.threads + let state' = state {threadPool = state.threadPool {threads = []}} + atomically (putTMVar stateM state') + ) + throwIO e + ) -dropAggregate :: (MonadSensor m) => String -> m () -dropAggregate self = do - SensorE {liveAggregatesM} <- askS - atomically do - liveAggregates <- takeTMVar liveAggregatesM - putTMVar liveAggregatesM (M.delete self liveAggregates) +reactimateS' :: Config -> SensorT IO () () -> IO () +reactimateS' r sf = do + (_, sf') <- + runReaderT + ( do + stateM <- asks (.stateM) + state <- atomically (readTMVar stateM) + let gen = state.currentGen + sf.unSensorT.unMSF (V gen ()) + ) + r + runReaderT prune r + reactimateS' r (SensorT' sf') + +prune :: ReaderT Config IO () +prune = do + stateM <- asks (.stateM) + bracketOnError + (atomically (takeTMVar stateM)) + (atomically . putTMVar stateM) + ( \state -> do + let Journal liveThreadIds = state.journal + mapM_ + (\thread -> killThread thread.threadId) + (filter (\thread -> not (S.member thread.id liveThreadIds)) state.threadPool.threads) + atomically (putTMVar stateM state {journal = Journal S.empty}) + ) --- XXX we should depend on `dunai` -data MSF m a b = MSF {unMSF :: a -> m (b, MSF m a b)} +log_ :: String -> ReaderT Config IO () +log_ s = when env_SENSOR_DEBUG do + logger <- asks (.logger) + bracket + (atomically (takeTMVar logger)) + (atomically . putTMVar logger) + (\putStrLn -> putStrLn s) + +env_SENSOR_DEBUG :: Bool +env_SENSOR_DEBUG = + unsafePerformIO (isJust <$> lookupEnv "SENSOR_DEBUG") +{-# NOINLINE env_SENSOR_DEBUG #-} + +sensor :: + (Typeable b, Show b) => + (a -> String) -> + ( a -> + (b -> ReaderT Config IO ()) -> + ReaderT Config IO () + ) -> + SensorT IO a b +sensor mkId mkF = do + SensorT' $ arrM $ \(V _ input) -> do + let id = mkId input + f = mkF input + stateM <- asks (.stateM) + state <- atomically (readTMVar stateM) + valueT <- do + case find (\thread -> thread.id == id) state.threadPool.threads of + Nothing -> spawn id f + Just thread -> pure thread.valueT + value <- atomically do + maybeValue <- readTVar valueT + case maybeValue of + Nothing -> retrySTM + Just value -> pure (flip fromDyn (error "fromDyn") <$> value) + atomically do + state <- takeTMVar stateM + let state' = state {journal = Journal (S.singleton id) <> state.journal} + putTMVar stateM state' + pure value + where + spawn :: + (Typeable a, Show a) => + String -> + ( (a -> ReaderT Config IO ()) -> + ReaderT Config IO () + ) -> + (ReaderT Config IO (TVar (Maybe (V Dynamic)))) + spawn id f = do + stateM <- asks (.stateM) + bracketOnError + (atomically (takeTMVar stateM)) + (atomically . putTMVar stateM) + ( \state -> do + log_ (printf "[spawn] %s" id) + valueT <- newTVarIO Nothing + threadId <- + forkFinally + (f (yield id valueT)) + ( \case + (Left (e :: SomeException)) -> do + log_ (printf "[error] %s %s" id (show e)) + despawn id + Right _ -> pure () + ) + let state' = + state + { threadPool = + state.threadPool + { threads = (Thread {..}) : state.threadPool.threads + } + } + atomically (putTMVar stateM state') + pure valueT + ) -instance (Monad m) => Category (MSF m) where - id = go - where - go = MSF $ \a -> return (a, go) + yield :: + (Typeable a, Show a) => + String -> + TVar (Maybe (V Dynamic)) -> + a -> + ReaderT Config IO () + yield id valueT value = do + stateM <- asks (.stateM) + state <- atomically (readTMVar stateM) + let gen = state.currentGen + log_ (printf "[yield] %s %s" id (show (V gen value))) + atomically do + value' <- readTVar valueT + writeTVar valueT (Just (toDyn <$> V gen value)) + when (isJust value') do + writeTQueue state.inputQueue () + + despawn :: String -> ReaderT Config IO () + despawn id = do + stateM <- asks (.stateM) + bracketOnError + (atomically $ takeTMVar stateM) + (atomically . putTMVar stateM) + ( \state -> do + log_ (printf "[despawn] %s" id) + atomically . putTMVar stateM $ + state + { threadPool = + state.threadPool + { threads = + filter + (\thread -> thread.id /= id) + state.threadPool.threads + } + } + ) - sf2 . sf1 = MSF $ \a -> do - (b, sf1') <- unMSF sf1 a - (c, sf2') <- unMSF sf2 b - let sf' = sf2' . sf1' - c `seq` return (c, sf') +concatS :: (Monad m) => [SensorT m a b] -> SensorT m a [b] +concatS = SensorT' . fmap f . go . map (.unSensorT) + where + go :: (Monad m) => [MSF m a b] -> MSF m a [b] + go sfs = MSF $ \a -> do + (bs, sfs') <- unzip <$> mapM (\sf -> (.unMSF) sf a) sfs + pure (bs, go sfs') + f vs = V (maximum (map (.gen) vs)) (map (.unV) vs) + +withDefaultS :: (Monad m) => b -> SensorT m a b -> SensorT m (Maybe a) b +withDefaultS def = SensorT' . go . (.unSensorT) + where + go sf = MSF $ \case + V gen (Just a) -> do + (b, sf') <- sf.unMSF (V gen a) + pure (b, go sf') + V gen Nothing -> + pure (V gen def, go sf) + +feedbackS :: (Monad m) => c -> SensorT m (a, c) (b, c) -> SensorT m a b +feedbackS c = + SensorT' + . (\sf -> feedback c (arr g >>> sf >>> arr f)) + . (.unSensorT) + where + f :: V (a, c) -> (V a, c) + f (V gen (a, c)) = (V gen a, c) -instance (Monad m) => Arrow (MSF m) where - arr f = arrM (return . f) + g :: (V a, c) -> V (a, c) + g (V gen a, c) = (V gen (a, c)) - first = - morphGS $ \f (a, c) -> do - (b, msf') <- f a - return ((b, c), msf') - -instance (Monad m) => Functor (MSF m a) where - fmap f msf = msf >>> arr f - -instance (Functor m, Monad m) => Applicative (MSF m a) where - pure = arr . const - - fs <*> bs = (fs &&& bs) >>> arr (uncurry ($)) - -morphGS :: - (Monad m2) => - (forall c. (a1 -> m1 (b1, c)) -> (a2 -> m2 (b2, c))) -> - MSF m1 a1 b1 -> - MSF m2 a2 b2 -morphGS morph msf = MSF $ \a2 -> do - (b2, msf') <- morph (unMSF msf) a2 - return (b2, morphGS morph msf') - -feedback :: (Monad m) => c -> MSF m (a, c) (b, c) -> MSF m a b -feedback c sf = MSF $ \a -> do - ((b', c'), sf') <- unMSF sf (a, c) - return (b', feedback c' sf') - -embed :: (Monad m) => MSF m a b -> [a] -> m [b] -embed _ [] = return [] -embed sf (a : as) = do - (b, sf') <- unMSF sf a - bs <- embed sf' as - return (b : bs) - -reactimate :: (Monad m) => MSF m () () -> m () -reactimate sf = do - (_, sf') <- unMSF sf () - reactimate sf' - -arrM :: (Monad m) => (a -> m b) -> MSF m a b -arrM f = - morphGS (\i a -> i a >>= \(_, c) -> f a >>= \b -> return (b, c)) C.id - -type ReactHandle m = IORef (MSF m () ()) - -reactInit :: (MonadIO m) => MSF m () () -> m (ReactHandle m) -reactInit = liftIO . newIORef - -react :: (MonadIO m) => ReactHandle m -> m () -react handle = do - msf <- liftIO $ readIORef handle - (_, msf') <- unMSF msf () - liftIO $ writeIORef handle msf' - -instance (Monad m) => ArrowChoice (MSF m) where - left :: MSF m a b -> MSF m (Either a c) (Either b c) - left sf = MSF f - where - f (Left a) = do - (b, sf') <- unMSF sf a - return (Left b, left sf') - f (Right c) = return (Right c, left sf) +arrS :: (MonadIO m) => (a -> m b) -> SensorT m a b +arrS f = sensorT $ go + where + go = MSF $ \(V gen a) -> + lift (((,go) . V gen) <$> f a) diff --git a/test/AggregateSpec.hs b/test/AggregateSpec.hs index 71bf420..ae22560 100644 --- a/test/AggregateSpec.hs +++ b/test/AggregateSpec.hs @@ -4,66 +4,59 @@ import Control.Arrow import Control.Concurrent (threadDelay) import Control.Monad (forM_) import Control.Monad.Trans (MonadIO, liftIO) -import Data.List (intercalate) +import Data.List (intersperse) import Data.Sensor qualified as S import Test.Hspec +import Text.Printf spec :: SpecWith () spec = do describe "current time" do it "date" do - S.runSensorT (S.sample 1 date) >>= (`shouldBe` ["1970-01-01"]) + S.sample 1 date >>= (`shouldBe` ["1970-01-01"]) it "time" do - S.runSensorT (S.sample 1 time) >>= (`shouldBe` ["00:00:00"]) + S.sample 1 time >>= (`shouldBe` ["00:00:00"]) it "date and time" do - S.runSensorT (S.sample 1 (intercalate " " <$> sequence [date, time])) - >>= (`shouldBe` ["1970-01-01 00:00:00"]) + S.sample 1 (S.concatS (intersperse (pure " ") [date, time])) + >>= (`shouldBe` [["1970-01-01", " ", "00:00:00"]]) describe "timing" do it "fast, fast" do - S.runSensorT (S.sample 2 ((,) <$> count <*> count)) + S.sample 2 ((,) <$> count <*> count) >>= (`shouldBe` [(1, 1), (2, 2)]) it "fast, slow" do - S.runSensorT (S.sample 4 ((,) <$> count <*> slowCount)) + S.sample 4 ((,) <$> count <*> slowCount) >>= (`shouldBe` [(1, 1), (2, 1), (2, 2), (3, 2)]) - S.runSensorT (S.sample 3 (const <$> count <*> slowCount)) + S.sample 3 (const <$> count <*> slowCount) >>= (`shouldBe` [1, 2, 2]) - S.runSensorT (S.sample 3 (flip const <$> count <*> slowCount)) + S.sample 3 (flip const <$> count <*> slowCount) >>= (`shouldBe` [1, 1, 2]) describe "square count" do it "count" do - S.runSensorT (S.sample 2 count) >>= (`shouldBe` [1, 2]) + S.sample 2 count >>= (`shouldBe` [1, 2]) it "square" do - S.runSensorT (S.sample 2 (count >>= square)) >>= (`shouldBe` [1, 4]) + S.sample 2 (count >>> square) >>= (`shouldBe` [1, 4]) describe "diagram" do it "count diagram" do - S.runSensorT (S.sample 3 (diagram 2 count)) + S.sample 3 (diagram 2 count) >>= (`shouldBe` [[1 :: Int], [1, 2], [2, 3]]) - S.runSensorT (S.sample 5 (diagram 2 slowCount <* count)) + S.sample 5 (diagram 2 slowCount <* count) >>= (`shouldBe` [[1 :: Int], [1], [1, 2], [1, 2], [2, 3]]) -diagram :: (Monad m) => Int -> S.Sensor m () a -> S.Sensor m () [a] +diagram :: Int -> S.Sensor () a -> S.Sensor () [a] diagram n sf = S.feedbackS [] $ proc ((), xs) -> do x <- sf -< () returnA -< (reverse (x : xs), take (n - 1) (x : xs)) -count :: (S.MonadSensor m) => S.Sensor m () Int -count = S.sensor Count - -data Count = Count deriving (Show) - -instance (S.MonadSensor m) => S.Aggregate m Count Int where - aggregate _ = forM_ [1, 2 ..] $ \n -> do - S.yield n +count :: S.Sensor () Int +count = S.sensor (\_ -> "Count") $ \_ yield -> do + forM_ [1, 2 ..] $ \n -> do + yield n sleep -slowCount :: (S.MonadSensor m) => S.Sensor m () Int -slowCount = S.sensor SlowCount - -data SlowCount = SlowCount deriving (Show) - -instance (S.MonadSensor m) => S.Aggregate m SlowCount Int where - aggregate _ = forM_ [1, 2 ..] $ \n -> do - S.yield n +slowCount :: S.Sensor () Int +slowCount = S.sensor (\_ -> "SlowCount") $ \_ yield -> do + forM_ [1, 2 ..] $ \n -> do + yield n sleepLong sleep :: (MonadIO m) => m () @@ -72,19 +65,13 @@ sleep = liftIO (threadDelay 2_000) sleepLong :: (MonadIO m) => m () sleepLong = liftIO (threadDelay 3_000) -square :: (S.MonadSensor m) => Int -> S.Sensor m () Int -square = S.sensor . Square +square :: S.Sensor Int Int +square = S.sensor (\n -> printf "Square %d" n) $ \n yield -> + yield (n * n) -data Square = Square Int deriving (Show) - -instance (S.MonadSensor m) => S.Aggregate m Square Int where - aggregate (Square n) = do - S.yield (n * n) - -currentTime :: (S.MonadSensor m) => S.Sensor m () UTCTime -currentTime = S.sensor CurrentTime - -data CurrentTime = CurrentTime deriving (Show) +currentTime :: S.Sensor () UTCTime +currentTime = S.sensor (\_ -> "CurrentTime") $ \_ yield -> do + yield (UTCTime "1970-01-01" "00:00:00") data UTCTime = UTCTime { date :: String, @@ -92,11 +79,8 @@ data UTCTime = UTCTime } deriving (Show) -instance (S.MonadSensor m) => S.Aggregate m CurrentTime UTCTime where - aggregate _ = S.yield (UTCTime "1970-01-01" "00:00:00") - -date :: (S.MonadSensor m) => S.Sensor m () String +date :: S.Sensor () String date = (.date) <$> currentTime -time :: (S.MonadSensor m) => S.Sensor m () String +time :: S.Sensor () String time = (.time) <$> currentTime diff --git a/test/PureSpec.hs b/test/PureSpec.hs index 5f718e7..528167d 100644 --- a/test/PureSpec.hs +++ b/test/PureSpec.hs @@ -7,4 +7,4 @@ spec :: Spec spec = do describe "Pure" do it "pure" do - S.runSensorT (S.sample 1 (pure "")) >>= (`shouldBe` [""]) + S.sample 1 (pure "") >>= (`shouldBe` [""]) |
