module Data.Sensor where import Control.Applicative import Control.Arrow import Control.Category (Category (..)) import Control.Category qualified as C import Control.Concurrent.STM (retry) import Control.Exception (AsyncException (ThreadKilled)) import Control.Exception qualified as E import Control.Monad import Control.Monad.Reader import Data.Dynamic import Data.List import Data.Map (Map) import Data.Map qualified as M import Data.Maybe import Data.Set (Set) import Data.Set qualified as S import UnliftIO import UnliftIO.Concurrent hiding (yield) import Prelude hiding (id, (.)) newtype Sensor m a b = Sensor' { unSensor :: MSF m (Stale (Value a)) (Stale (Value b)) } mkSensor :: (Monad m) => MSF m (Value a) (Stale (Value b)) -> Sensor m a b mkSensor = Sensor' . ignoreStaleInput ignoreStaleInput :: (Monad m) => MSF m (Value a) (Stale (Value b)) -> MSF m (Stale (Value a)) (Stale (Value b)) ignoreStaleInput sf = feedback Nothing $ proc (i, x') -> do case (i, x') of (Stale _, Just x) -> returnA -< (Stale x, x') _ -> do x <- sf -< i.unwrap returnA -< (x, Just x.unwrap) instance (Monad m) => Functor (Sensor m a) where f `fmap` s = mkSensor (fmap f <$$> s.unSensor <<< arr Fresh) (<$$>) :: (Functor f, Functor g) => (a -> b) -> f (g a) -> f (g b) f <$$> x = fmap f <$> x infixl 4 <$$> instance (Monad m) => Applicative (Sensor m a) where pure x = mkSensor do loop Fresh where loop stale = MSF $ \_ -> pure (stale (Value S.empty M.empty x), loop Stale) f <*> x = mkSensor ( liftA2 (combineStaleValue ($)) (f.unSensor <<< arr Fresh) (x.unSensor <<< arr Fresh) ) instance (Monad m) => Category (Sensor m) where id = mkSensor ( <<< arr Fresh) sf2 . sf1 = mkSensor (sf2.unSensor . (sf1.unSensor <<< arr Fresh)) instance (Monad m) => Arrow (Sensor m) where arr f = mkSensor (arr (f <$$>) <<< arr Fresh) first :: Sensor m a b -> Sensor m (a, d) (b, d) first sf = mkSensor ( arr (\(x, d) -> (,d) <$$> x) <<< first (sf.unSensor <<< arr Fresh) <<< arr (\x -> (fst <$> x, snd (x.unwrap))) ) feedbackS :: (Monad m) => c -> Sensor m (a, c) (b, c) -> Sensor m a b feedbackS c sf = mkSensor . feedback c $ arr (\x -> (fst <$$> x, snd (x.unwrap.unwrap))) <<< (sf.unSensor <<< arr Fresh) <<< arr (\(x, d) -> (,d) <$> x) data Stale a = Stale {unwrap :: a} | Fresh {unwrap :: a} | Input {unwrap :: a} deriving (Functor, Show) instance (Monad m) => Monad (Sensor m a) where m >>= f = mkSensor $ switch (switchOnFresh (m.unSensor <<< arr Fresh)) $ \v -> (combineValue (const id) v.unwrap) <$$> ((f v.unwrap.unwrap).unSensor <<< arr Fresh) switchOnFresh :: (Monad m) => MSF m a (Stale (Value b)) -> MSF m a (Stale (Value b), Bool) switchOnFresh = fmap ( \v -> ( v, case v of Fresh _ -> True _ -> False ) ) switch :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m a b switch m f = feedback Nothing $ go m f where go :: (Monad m) => MSF m a (c, Bool) -> (c -> MSF m a b) -> MSF m (a, Maybe (MSF m a b)) (b, Maybe (MSF m a b)) go m f = MSF $ \(i, k') -> do ((x, me), m') <- m.unMSF i let k = case (me, k') of (False, Just k) -> k _ -> f x (z, k') <- k.unMSF i pure ((z, (Just k')), go m' f) data Value a = Value { dependsOn :: Set String, atTick :: Map String Int, unwrap :: a } deriving (Show, Functor, Eq) combineStaleValue :: (a -> b -> c) -> Stale (Value a) -> Stale (Value b) -> Stale (Value c) combineStaleValue f = combineStale (combineValue f) combineValue :: (a -> b -> c) -> Value a -> Value b -> Value c combineValue f a b = Value (S.union a.dependsOn b.dependsOn) (M.unionWith max a.atTick b.atTick) (f a.unwrap b.unwrap) combineStale :: (a -> b -> c) -> Stale a -> Stale b -> Stale c combineStale f a@(Stale _) b@(Stale _) = Stale (f a.unwrap b.unwrap) combineStale f a@(Input _) b@(Stale _) = Stale (f a.unwrap b.unwrap) combineStale f a@(Stale _) b@(Input _) = Stale (f a.unwrap b.unwrap) combineStale f a b = Fresh (f a.unwrap b.unwrap) class (MonadSensor m, Show s, Typeable a) => Aggregate m s a | s -> a where aggregate :: s -> AggregateT s m () newtype AggregateT s m a = AggregateT {unAggregateT :: ReaderT (AggregateE (AggregateT s m)) m a} deriving instance (Functor m) => Functor (AggregateT s m) deriving instance (Applicative m) => Applicative (AggregateT s m) deriving instance (Monad m) => Monad (AggregateT s m) deriving instance (MonadIO m) => MonadIO (AggregateT s m) instance MonadTrans (AggregateT s) where lift = AggregateT . lift deriving instance (MonadFail m) => MonadFail (AggregateT s m) deriving instance (MonadUnliftIO m) => MonadUnliftIO (AggregateT s m) deriving instance (Monad m) => MonadReader (AggregateE (AggregateT s m)) (AggregateT s m) -- XXX aggregates should be keyed on something else than `String` data AggregateE m = AggregateE { readAggregatesM :: TMVar (S.Set String), currentTickM :: TMVar Int, localQueueT :: TChan (), self :: String } yield :: (Aggregate m s a) => a -> AggregateT s m () yield x = do SensorE {liveAggregatesM, globalQueueT} <- lift askS AggregateE {readAggregatesM, currentTickM, self} <- ask atomically do writeTChan globalQueueT () liveAggregates <- readTMVar liveAggregatesM valueT <- do case M.lookup self liveAggregates of Nothing -> retry Just LiveAggregate {valueT} -> pure valueT dependsOn <- takeTMVar readAggregatesM putTMVar readAggregatesM S.empty currentTick <- takeTMVar currentTickM putTMVar currentTickM (currentTick + 1) let atTick = M.singleton self currentTick let v = Value (S.insert self dependsOn) atTick (toDyn x) writeTVar valueT (Just v) sensor :: (Aggregate m s a) => s -> Sensor m () a sensor s = mkSensor $ feedback Nothing $ arrM $ \(_, v') -> do SensorE {liveAggregatesM, startAggregateT} <- askS atomically do writeTQueue startAggregateT (AnyAggregate s) atomically do liveAggregates <- readTMVar liveAggregatesM valueT <- case M.lookup (show s) liveAggregates of Nothing -> retry Just LiveAggregate {valueT} -> pure valueT v <- readTVar valueT >>= \case Nothing -> retry Just v -> pure v let stale | (isNothing v' || (Just (const () <$> v) /= v')) = Fresh | otherwise = Stale pure (flip fromDyn (error "") <$$> stale v, Just (const () <$> v)) data Restart = Restart String SomeException deriving (Show) instance Exception Restart -- XXX merge `sample'` and `sample` sample' :: (MonadSensor m) => (Maybe c -> a -> m c) -> Sensor m () a -> m () sample' f s = do SensorE {liveAggregatesM, startAggregateT, globalQueueT, globalTickM} <- askS outputT <- newTVarIO Nothing _ <- forkIO $ forever do startAggregate =<< atomically do readTQueue startAggregateT h <- reactInit ( ( arrM $ \v -> do deadAggregates <- atomically do case v of Input _ -> error "Input" Stale _ -> do writeTVar outputT Nothing pure M.empty Fresh v -> do writeTVar outputT (Just v.unwrap) liveAggregates <- readTMVar liveAggregatesM globalTick <- takeTMVar globalTickM putTMVar globalTickM (globalTick + 1) pure ( M.filterWithKey ( \key liveAggregate -> not (S.member key v.dependsOn) && liveAggregate.spawnedAt < globalTick ) liveAggregates ) mapM_ ( \liveAggregate -> do liftIO (killThread (liveAggregate.threadId)) ) deadAggregates ) <<< s.unSensor <<< arr (Input . Value S.empty M.empty) ) let loop c = do mask $ \restore -> catch ( restore do react h c' <- maybe (pure c) (fmap Just . f c) =<< atomically do readTVar outputT _ <- atomically do readTChan globalQueueT pure c' ) ( \(Restart s e) -> do liftIO (print (s, e)) pure c ) >>= loop loop Nothing runSensorT :: (MonadIO m) => SensorT m a -> m a runSensorT m = do runReaderT m.unSensorT =<< sensorE sensorE :: (MonadIO m) => m (SensorE (t m)) sensorE = do liveAggregatesM <- newTMVarIO M.empty startAggregateT <- newTQueueIO globalQueueT <- newTChanIO globalTickM <- newTMVarIO 0 mainThread <- myThreadId pure SensorE {..} sample :: (MonadSensor m) => Int -> Sensor m () a -> m [a] sample n s = do SensorE {liveAggregatesM, globalQueueT, globalTickM, startAggregateT} <- askS outputsT <- newTVarIO [] _ <- forkIO $ forever do startAggregate =<< atomically do readTQueue startAggregateT h <- reactInit ( ( arrM $ \v -> do case v of Input _ -> error "Input" Stale _ -> pure () Fresh v -> do deadAggregates <- atomically do writeTVar outputsT . (v.unwrap :) =<< readTVar outputsT liveAggregates <- readTMVar liveAggregatesM globalTick <- takeTMVar globalTickM putTMVar globalTickM (globalTick + 1) pure ( M.filterWithKey ( \key liveAggregate -> not (S.member key v.dependsOn) && liveAggregate.spawnedAt < globalTick ) liveAggregates ) mapM_ (liftIO . killThread . (.threadId)) deadAggregates ) <<< s.unSensor <<< arr (Input . Value S.empty M.empty) ) let loop = do react h xs <- atomically do readTVar outputsT if length xs >= n then pure (reverse xs) else do _ <- atomically do readTChan globalQueueT loop loop newtype SensorT m a = SensorT {unSensorT :: ReaderT (SensorE (SensorT m)) m a} deriving instance (Functor m) => Functor (SensorT m) deriving instance (Applicative m) => Applicative (SensorT m) deriving instance (Monad m) => Monad (SensorT m) deriving instance (MonadIO m) => MonadIO (SensorT m) instance MonadTrans SensorT where lift = SensorT . lift deriving instance (MonadUnliftIO m) => MonadUnliftIO (SensorT m) deriving instance (MonadFail m) => MonadFail (SensorT m) deriving instance (Monad m) => MonadReader (SensorE (SensorT m)) (SensorT m) data SensorE m = SensorE { liveAggregatesM :: TMVar (Map String (LiveAggregate m Dynamic)), startAggregateT :: TQueue (AnyAggregate m), globalQueueT :: TChan (), globalTickM :: TMVar Int, mainThread :: ThreadId } class (MonadUnliftIO m) => MonadSensor m where askS :: m (SensorE m) instance (MonadUnliftIO m) => MonadSensor (SensorT m) where askS = ask data AnyAggregate m where AnyAggregate :: forall m s a. (Aggregate m s a) => s -> AnyAggregate m data LiveAggregate m a = LiveAggregate { valueT :: TVar (Maybe (Value a)), threadId :: ThreadId, spawnedAt :: Int } startAggregate :: (MonadSensor m) => AnyAggregate m -> m () startAggregate (AnyAggregate s) = do let self = show s SensorE {liveAggregatesM, globalQueueT, globalTickM, mainThread} <- askS liveAggregates <- atomically do takeTMVar liveAggregatesM if M.member self liveAggregates then atomically do putTMVar liveAggregatesM liveAggregates else do readAggregatesM <- newTMVarIO S.empty currentTickM <- newTMVarIO 0 localQueueT <- atomically do dupTChan globalQueueT threadId <- forkFinally (runReaderT (aggregate s).unAggregateT AggregateE {..}) ( either ( \e -> if | fromException e == Just ThreadKilled -> do dropAggregate self | otherwise -> do dropAggregate self liftIO (E.throwTo mainThread (Restart self e)) ) pure ) valueT <- newTVarIO Nothing atomically do spawnedAt <- readTMVar globalTickM putTMVar liveAggregatesM (M.insert self (LiveAggregate {..}) liveAggregates) dropAggregate :: (MonadSensor m) => String -> m () dropAggregate self = do SensorE {liveAggregatesM} <- askS atomically do liveAggregates <- takeTMVar liveAggregatesM putTMVar liveAggregatesM (M.delete self liveAggregates) -- XXX we should depend on `dunai` data MSF m a b = MSF {unMSF :: a -> m (b, MSF m a b)} instance (Monad m) => Category (MSF m) where id = go where go = MSF $ \a -> return (a, go) sf2 . sf1 = MSF $ \a -> do (b, sf1') <- unMSF sf1 a (c, sf2') <- unMSF sf2 b let sf' = sf2' . sf1' c `seq` return (c, sf') instance (Monad m) => Arrow (MSF m) where arr f = arrM (return . f) first = morphGS $ \f (a, c) -> do (b, msf') <- f a return ((b, c), msf') instance (Monad m) => Functor (MSF m a) where fmap f msf = msf >>> arr f instance (Functor m, Monad m) => Applicative (MSF m a) where pure = arr . const fs <*> bs = (fs &&& bs) >>> arr (uncurry ($)) morphGS :: (Monad m2) => (forall c. (a1 -> m1 (b1, c)) -> (a2 -> m2 (b2, c))) -> MSF m1 a1 b1 -> MSF m2 a2 b2 morphGS morph msf = MSF $ \a2 -> do (b2, msf') <- morph (unMSF msf) a2 return (b2, morphGS morph msf') feedback :: (Monad m) => c -> MSF m (a, c) (b, c) -> MSF m a b feedback c sf = MSF $ \a -> do ((b', c'), sf') <- unMSF sf (a, c) return (b', feedback c' sf') embed :: (Monad m) => MSF m a b -> [a] -> m [b] embed _ [] = return [] embed sf (a : as) = do (b, sf') <- unMSF sf a bs <- embed sf' as return (b : bs) reactimate :: (Monad m) => MSF m () () -> m () reactimate sf = do (_, sf') <- unMSF sf () reactimate sf' arrM :: (Monad m) => (a -> m b) -> MSF m a b arrM f = morphGS (\i a -> i a >>= \(_, c) -> f a >>= \b -> return (b, c)) type ReactHandle m = IORef (MSF m () ()) reactInit :: (MonadIO m) => MSF m () () -> m (ReactHandle m) reactInit = liftIO . newIORef react :: (MonadIO m) => ReactHandle m -> m () react handle = do msf <- liftIO $ readIORef handle (_, msf') <- unMSF msf () liftIO $ writeIORef handle msf' instance (Monad m) => ArrowChoice (MSF m) where left :: MSF m a b -> MSF m (Either a c) (Either b c) left sf = MSF f where f (Left a) = do (b, sf') <- unMSF sf a return (Left b, left sf') f (Right c) = return (Right c, left sf)