summaryrefslogtreecommitdiffstats
path: root/src/Data/Sensor.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Data/Sensor.hs')
-rw-r--r--src/Data/Sensor.hs54
1 files changed, 24 insertions, 30 deletions
diff --git a/src/Data/Sensor.hs b/src/Data/Sensor.hs
index 787d919..96c48ed 100644
--- a/src/Data/Sensor.hs
+++ b/src/Data/Sensor.hs
@@ -21,10 +21,10 @@ import UnliftIO.Concurrent hiding (yield)
import Prelude hiding (id, (.))
newtype Sensor m a b = Sensor'
- { unSensor :: MSF m (Stale (Value a)) (Stale (Value b))
+ { unSensor :: MSF (SensorT m) (Stale (Value a)) (Stale (Value b))
}
-mkSensor :: (Monad m) => MSF m (Value a) (Stale (Value b)) -> Sensor m a b
+mkSensor :: (Monad m) => MSF (SensorT m) (Value a) (Stale (Value b)) -> Sensor m a b
mkSensor = Sensor' . ignoreStaleInput
ignoreStaleInput ::
@@ -148,10 +148,10 @@ combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap)
combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap)
combineStale f a b = Fresh (f a.unwrap b.unwrap)
-class (MonadSensor m, Show s, Typeable a) => Aggregate m s a | s -> a where
- aggregate :: s -> AggregateT s m ()
+class (MonadIO m, Show s, Typeable a) => Aggregate m s a | s -> a where
+ aggregate :: s -> AggregateT s (SensorT m) ()
-newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT (AggregateE (AggregateT s m)) m a}
+newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT AggregateE m a}
deriving instance (Functor m) => Functor (AggregateT s m)
@@ -168,19 +168,19 @@ deriving instance (MonadFail m) => MonadFail (AggregateT s m)
deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m)
-deriving instance (Monad m) => MonadReader (AggregateE (AggregateT s m)) (AggregateT s m)
+deriving instance (Monad m) => MonadReader AggregateE (AggregateT s m)
-- XXX aggregates should be keyed on something else than `String`
-data AggregateE m = AggregateE
+data AggregateE = AggregateE
{ readAggregatesM :: TMVar (S.Set String),
currentTickM :: TMVar Int,
localQueueT :: TChan (),
self :: String
}
-yield :: (Aggregate m s a) => a -> AggregateT s m ()
+yield :: (Aggregate m s a) => a -> AggregateT s (SensorT m) ()
yield x = do
- SensorE {liveAggregatesM, globalQueueT} <- lift askS
+ SensorE {liveAggregatesM, globalQueueT} <- lift ask
AggregateE {readAggregatesM, currentTickM, self} <- ask
atomically do
writeTChan globalQueueT ()
@@ -199,7 +199,7 @@ yield x = do
sensor :: (Aggregate m s a) => s -> Sensor m () a
sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do
- SensorE {liveAggregatesM, startAggregateT} <- askS
+ SensorE {liveAggregatesM, startAggregateT} <- ask
atomically do writeTQueue startAggregateT (AnyAggregate s)
atomically do
liveAggregates <- readTMVar liveAggregatesM
@@ -220,9 +220,9 @@ data Restart = Restart String SomeException deriving (Show)
instance Exception Restart
-- XXX merge `sample'` and `sample`
-sample' :: (MonadSensor m) => (Maybe c -> a -> m c) -> Sensor m () a -> m ()
-sample' f s = do
- SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- askS
+sample' :: (MonadUnliftIO m) => (Maybe c -> a -> m c) -> Sensor m () a -> m ()
+sample' f s = runSensorT do
+ SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- ask
outputT <- newTVarIO Nothing
_ <- forkIO $ forever do
@@ -265,7 +265,7 @@ sample' f s = do
( restore do
react h
c' <-
- maybe (pure c) (fmap Just . f c)
+ maybe (pure c) (lift . fmap Just . f c)
=<< atomically do readTVar outputT
_ <- atomically do readTChan globalQueueT
pure c'
@@ -278,7 +278,7 @@ runSensorT :: (MonadIO m) => SensorT m a -> m a
runSensorT m = do
runReaderT m.unSensorT =<< sensorE
-sensorE :: (MonadIO m) => m (SensorE (t m))
+sensorE :: (MonadIO m) => m (SensorE m)
sensorE = do
liveAggregatesM <- newTMVarIO M.empty
startAggregateT <- newTQueueIO
@@ -287,9 +287,9 @@ sensorE = do
mainThread <- myThreadId
pure SensorE {..}
-sample :: (MonadSensor m) => Int -> Sensor m () a -> m [a]
-sample n s = do
- SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- askS
+sample :: (MonadUnliftIO m) => Int -> Sensor m () a -> m [a]
+sample n s = runSensorT do
+ SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- ask
outputsT <- newTVarIO []
_ <- forkIO $ forever do
@@ -330,7 +330,7 @@ sample n s = do
loop
loop
-newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE (SensorT m)) m a}
+newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE m) m a}
deriving instance (Functor m) => Functor (SensorT m)
@@ -347,7 +347,7 @@ deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m)
deriving instance (MonadFail m) => MonadFail (SensorT m)
-deriving instance (Monad m) => MonadReader (SensorE (SensorT m)) (SensorT m)
+deriving instance (Monad m) => MonadReader (SensorE m) (SensorT m)
data SensorE m = SensorE
{ liveAggregatesM :: TMVar (Map String (LiveAggregate Dynamic)),
@@ -357,12 +357,6 @@ data SensorE m = SensorE
mainThread :: ThreadId
}
-class (MonadUnliftIO m) => MonadSensor m where
- askS :: m (SensorE m)
-
-instance (MonadUnliftIO m) => MonadSensor (SensorT m) where
- askS = ask
-
data AnyAggregate m where
AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m
@@ -372,10 +366,10 @@ data LiveAggregate a = LiveAggregate
spawnedAt :: Int
}
-startAggregate :: (MonadSensor m) => AnyAggregate m -> m ()
+startAggregate :: (MonadUnliftIO m) => AnyAggregate m -> SensorT m ()
startAggregate (AnyAggregate s) = do
let self = show s
- SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- askS
+ SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- ask
liveAggregates <- atomically do takeTMVar liveAggregatesM
if M.member self liveAggregates
then atomically do putTMVar liveAggregatesM liveAggregates
@@ -403,9 +397,9 @@ startAggregate (AnyAggregate s) = do
spawnedAt <- readTMVar globalTickM
putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates)
-dropAggregate :: (MonadSensor m) => String -> m ()
+dropAggregate :: (MonadIO m) => String -> SensorT m ()
dropAggregate self = do
- SensorE {liveAggregatesM} <- askS
+ SensorE {liveAggregatesM} <- ask
atomically do
liveAggregates <- takeTMVar liveAggregatesM
putTMVar liveAggregatesM (M.delete self liveAggregates)