diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Data/Sensor.hs | 54 |
1 files changed, 24 insertions, 30 deletions
diff --git a/src/Data/Sensor.hs b/src/Data/Sensor.hs index 787d919..96c48ed 100644 --- a/src/Data/Sensor.hs +++ b/src/Data/Sensor.hs @@ -21,10 +21,10 @@ import UnliftIO.Concurrent hiding (yield) import Prelude hiding (id, (.)) newtype Sensor m a b = Sensor' - { unSensor :: MSF m (Stale (Value a)) (Stale (Value b)) + { unSensor :: MSF (SensorT m) (Stale (Value a)) (Stale (Value b)) } -mkSensor :: (Monad m) => MSF m (Value a) (Stale (Value b)) -> Sensor m a b +mkSensor :: (Monad m) => MSF (SensorT m) (Value a) (Stale (Value b)) -> Sensor m a b mkSensor = Sensor' . ignoreStaleInput ignoreStaleInput :: @@ -148,10 +148,10 @@ combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap) combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap) combineStale f a b = Fresh (f a.unwrap b.unwrap) -class (MonadSensor m, Show s, Typeable a) => Aggregate m s a | s -> a where - aggregate :: s -> AggregateT s m () +class (MonadIO m, Show s, Typeable a) => Aggregate m s a | s -> a where + aggregate :: s -> AggregateT s (SensorT m) () -newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT (AggregateE (AggregateT s m)) m a} +newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT AggregateE m a} deriving instance (Functor m) => Functor (AggregateT s m) @@ -168,19 +168,19 @@ deriving instance (MonadFail m) => MonadFail (AggregateT s m) deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m) -deriving instance (Monad m) => MonadReader (AggregateE (AggregateT s m)) (AggregateT s m) +deriving instance (Monad m) => MonadReader AggregateE (AggregateT s m) -- XXX aggregates should be keyed on something else than `String` -data AggregateE m = AggregateE +data AggregateE = AggregateE { readAggregatesM :: TMVar (S.Set String), currentTickM :: TMVar Int, localQueueT :: TChan (), self :: String } -yield :: (Aggregate m s a) => a -> AggregateT s m () +yield :: (Aggregate m s a) => a -> AggregateT s (SensorT m) () yield x = do - SensorE {liveAggregatesM, globalQueueT} <- lift askS + SensorE {liveAggregatesM, globalQueueT} <- lift ask AggregateE {readAggregatesM, currentTickM, self} <- ask atomically do writeTChan globalQueueT () @@ -199,7 +199,7 @@ yield x = do sensor :: (Aggregate m s a) => s -> Sensor m () a sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do - SensorE {liveAggregatesM, startAggregateT} <- askS + SensorE {liveAggregatesM, startAggregateT} <- ask atomically do writeTQueue startAggregateT (AnyAggregate s) atomically do liveAggregates <- readTMVar liveAggregatesM @@ -220,9 +220,9 @@ data Restart = Restart String SomeException deriving (Show) instance Exception Restart -- XXX merge `sample'` and `sample` -sample' :: (MonadSensor m) => (Maybe c -> a -> m c) -> Sensor m () a -> m () -sample' f s = do - SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- askS +sample' :: (MonadUnliftIO m) => (Maybe c -> a -> m c) -> Sensor m () a -> m () +sample' f s = runSensorT do + SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- ask outputT <- newTVarIO Nothing _ <- forkIO $ forever do @@ -265,7 +265,7 @@ sample' f s = do ( restore do react h c' <- - maybe (pure c) (fmap Just . f c) + maybe (pure c) (lift . fmap Just . f c) =<< atomically do readTVar outputT _ <- atomically do readTChan globalQueueT pure c' @@ -278,7 +278,7 @@ runSensorT :: (MonadIO m) => SensorT m a -> m a runSensorT m = do runReaderT m.unSensorT =<< sensorE -sensorE :: (MonadIO m) => m (SensorE (t m)) +sensorE :: (MonadIO m) => m (SensorE m) sensorE = do liveAggregatesM <- newTMVarIO M.empty startAggregateT <- newTQueueIO @@ -287,9 +287,9 @@ sensorE = do mainThread <- myThreadId pure SensorE {..} -sample :: (MonadSensor m) => Int -> Sensor m () a -> m [a] -sample n s = do - SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- askS +sample :: (MonadUnliftIO m) => Int -> Sensor m () a -> m [a] +sample n s = runSensorT do + SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- ask outputsT <- newTVarIO [] _ <- forkIO $ forever do @@ -330,7 +330,7 @@ sample n s = do loop loop -newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE (SensorT m)) m a} +newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE m) m a} deriving instance (Functor m) => Functor (SensorT m) @@ -347,7 +347,7 @@ deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m) deriving instance (MonadFail m) => MonadFail (SensorT m) -deriving instance (Monad m) => MonadReader (SensorE (SensorT m)) (SensorT m) +deriving instance (Monad m) => MonadReader (SensorE m) (SensorT m) data SensorE m = SensorE { liveAggregatesM :: TMVar (Map String (LiveAggregate Dynamic)), @@ -357,12 +357,6 @@ data SensorE m = SensorE mainThread :: ThreadId } -class (MonadUnliftIO m) => MonadSensor m where - askS :: m (SensorE m) - -instance (MonadUnliftIO m) => MonadSensor (SensorT m) where - askS = ask - data AnyAggregate m where AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m @@ -372,10 +366,10 @@ data LiveAggregate a = LiveAggregate spawnedAt :: Int } -startAggregate :: (MonadSensor m) => AnyAggregate m -> m () +startAggregate :: (MonadUnliftIO m) => AnyAggregate m -> SensorT m () startAggregate (AnyAggregate s) = do let self = show s - SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- askS + SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- ask liveAggregates <- atomically do takeTMVar liveAggregatesM if M.member self liveAggregates then atomically do putTMVar liveAggregatesM liveAggregates @@ -403,9 +397,9 @@ startAggregate (AnyAggregate s) = do spawnedAt <- readTMVar globalTickM putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates) -dropAggregate :: (MonadSensor m) => String -> m () +dropAggregate :: (MonadIO m) => String -> SensorT m () dropAggregate self = do - SensorE {liveAggregatesM} <- askS + SensorE {liveAggregatesM} <- ask atomically do liveAggregates <- takeTMVar liveAggregatesM putTMVar liveAggregatesM (M.delete self liveAggregates) |