{-# LANGUAGE Arrows #-} {-# LANGUAGE BlockArguments #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE NoFieldSelectors #-} module Data.Sensor ( Sensor, reactimateS, sensor, -- concatS, withDefaultS, feedbackS, arrS, -- sample, ) where import Control.Concurrent.STM.TQueue (flushTQueue) import Control.Arrow import Control.Category (Category (..)) import Control.DeepSeq import Control.Monad import Control.Monad.Reader import Data.Dynamic import Data.List import Data.Maybe import Data.MonadicStreamFunction import Data.MonadicStreamFunction.InternalCore import Data.Set qualified as S import System.Environment import System.IO.Unsafe (unsafePerformIO) import Text.Printf import UnliftIO import UnliftIO.Concurrent import Prelude hiding (id, (.)) newtype SensorT m a b = SensorT' { unSensorT :: MSF (ReaderT Config m) (V a) (V b) } sensorT :: (MonadIO m) => MSF (ReaderT Config m) (V a) (V b) -> SensorT m a b sensorT = SensorT' {- SensorT' . go Nothing where go b' sf = MSF $ \a -> do isStale <- stale a if isStale && isJust b' then do _ <- (.unMSF) sf a pure (fromJust b', go b' sf) else do (b, sf') <- (.unMSF) sf a pure (b, go (Just b) sf')-} stale :: (MonadIO m) => V a -> ReaderT Config m Bool stale (V gen _) = do stateM <- asks (.stateM) state <- atomically (readTMVar stateM) pure (gen < state.currentGen) instance (MonadIO m) => Category (SensorT m) where id = sensorT id sf2 . sf1 = SensorT' (sf2.unSensorT . sf1.unSensorT) instance (MonadIO m) => Functor (SensorT m a) where f `fmap` sf = SensorT' (fmap f `fmap` sf.unSensorT) instance (MonadIO m) => Applicative (SensorT m a) where pure b = sensorT $ go Nothing where go gen' = MSF $ \_ -> do gen <- case gen' of Just gen -> pure gen Nothing -> do stateM <- asks (.stateM) state <- atomically (readTMVar stateM) let gen = state.currentGen pure gen pure (V gen b, go (Just gen)) f <*> x = SensorT' $ (\(V gen1 f) (V gen2 x) -> V (genMax gen1 gen2) (f x)) <$> f.unSensorT <*> x.unSensorT instance (MonadIO m) => Arrow (SensorT m) where arr f = sensorT (arr (fmap f)) first = SensorT' . (\sf -> arr f >>> first sf >>> arr g) . (.unSensorT) where f :: V (a, c) -> (V a, c) f (V gen (a, c)) = (V gen a, c) g :: (V a, c) -> V (a, c) g (V gen a, c) = (V gen (a, c)) -- first :: a b c -> a (b, d) (c, d) type Sensor = SensorT IO data Config = Config { stateM :: TMVar State, logger :: TMVar (String -> ReaderT Config IO ()) } data Journal = Journal {unJournal :: S.Set String} instance Semigroup Journal where (Journal xs) <> (Journal ys) = Journal (xs <> ys) instance Monoid Journal where mempty = Journal mempty mconcat = Journal . mconcat . map (.unJournal) data State = State { threadPool :: ThreadPool, inputQueue :: TQueue (), currentGen :: Gen, journal :: Journal } data ThreadPool = ThreadPool { threads :: [Thread] } data Thread = Thread { id :: String, threadId :: ThreadId, valueT :: TVar (Maybe (V Dynamic)) } newtype Gen = Gen Int deriving (Eq, Ord, Show) genInc :: Gen -> Gen genInc (Gen n) = Gen (n + 1) genMax :: Gen -> Gen -> Gen genMax (Gen n) (Gen m) = Gen (max n m) data V a = V {gen :: Gen, unV :: a} deriving (Show, Functor) {- withDefaultS :: (Monad m) => b -> MSF m a b -> MSF m (Maybe a) b withDefaultS def sf = do msf $ \case V gen Nothing -> pure (V gen def, withDefault def sf) V gen (Just x) -> do (r, sf') <- (.unMSF) sf (V gen x) pure (r, withDefault def sf')-} sample :: Show a => Int -> SensorT IO () a -> IO [a] sample n sf = do qT <- newTQueueIO tId <- forkIO $ reactimateS (sf >>> arrS (atomically . writeTQueue qT)) result <- atomically do xs <- flushTQueue qT checkSTM (length xs >= n) pure (take n xs) killThread tId pure result reactimateS :: (Show b, NFData b) => SensorT IO () b -> IO () reactimateS sf = do let threadPool = ThreadPool [] inputQueue <- liftIO newTQueueIO let currentGen = Gen 0 logger <- newTMVarIO (liftIO . putStrLn) let journal = Journal S.empty stateM <- newTMVarIO State {..} catchSyncOrAsync ( do let incCurrentGen = SensorT' $ arrM $ \x -> do bracketOnError (atomically (takeTMVar stateM)) (atomically . putTMVar stateM) ( \state -> do let state' = state {currentGen = genInc state.currentGen} atomically (putTMVar stateM state') ) pure x inputs = SensorT' $ constM $ do state <- atomically (readTMVar stateM) let gen = state.currentGen log_ (printf "[gen] %s" (show gen)) atomically do V gen <$> case state.threadPool.threads of [] -> pure () _ -> readTQueue state.inputQueue reactimateS' (Config {..}) ( incCurrentGen >>> inputs >>> sf >>> ( SensorT' $ arrM $ \(V gen x) -> do _ <- evaluate x deepseq x `seq` log_ . printf "[output] %s" . show $ x pure (V gen ()) ) ) ) ( \(e :: SomeException) -> do bracketOnError (atomically (takeTMVar stateM)) (atomically . putTMVar stateM) ( \state -> do (mapM_ (\thread -> do killThread thread.threadId)) state.threadPool.threads let state' = state {threadPool = state.threadPool {threads = []}} atomically (putTMVar stateM state') ) throwIO e ) reactimateS' :: Config -> SensorT IO () () -> IO () reactimateS' r sf = do (_, sf') <- runReaderT ( do stateM <- asks (.stateM) state <- atomically (readTMVar stateM) let gen = state.currentGen sf.unSensorT.unMSF (V gen ()) ) r runReaderT prune r reactimateS' r (SensorT' sf') prune :: ReaderT Config IO () prune = do stateM <- asks (.stateM) bracketOnError (atomically (takeTMVar stateM)) (atomically . putTMVar stateM) ( \state -> do let Journal liveThreadIds = state.journal mapM_ (\thread -> killThread thread.threadId) (filter (\thread -> not (S.member thread.id liveThreadIds)) state.threadPool.threads) atomically (putTMVar stateM state {journal = Journal S.empty}) ) log_ :: String -> ReaderT Config IO () log_ s = when env_SENSOR_DEBUG do logger <- asks (.logger) bracket (atomically (takeTMVar logger)) (atomically . putTMVar logger) (\putStrLn -> putStrLn s) env_SENSOR_DEBUG :: Bool env_SENSOR_DEBUG = unsafePerformIO (isJust <$> lookupEnv "SENSOR_DEBUG") {-# NOINLINE env_SENSOR_DEBUG #-} sensor :: (Typeable b, Show b) => (a -> String) -> ( a -> (b -> ReaderT Config IO ()) -> ReaderT Config IO () ) -> SensorT IO a b sensor mkId mkF = do SensorT' $ arrM $ \(V _ input) -> do let id = mkId input f = mkF input stateM <- asks (.stateM) state <- atomically (readTMVar stateM) valueT <- do case find (\thread -> thread.id == id) state.threadPool.threads of Nothing -> spawn id f Just thread -> pure thread.valueT value <- atomically do maybeValue <- readTVar valueT case maybeValue of Nothing -> retrySTM Just value -> pure (flip fromDyn (error "fromDyn") <$> value) atomically do state <- takeTMVar stateM let state' = state {journal = Journal (S.singleton id) <> state.journal} putTMVar stateM state' pure value where spawn :: (Typeable a, Show a) => String -> ( (a -> ReaderT Config IO ()) -> ReaderT Config IO () ) -> (ReaderT Config IO (TVar (Maybe (V Dynamic)))) spawn id f = do stateM <- asks (.stateM) bracketOnError (atomically (takeTMVar stateM)) (atomically . putTMVar stateM) ( \state -> do log_ (printf "[spawn] %s" id) valueT <- newTVarIO Nothing threadId <- forkFinally (f (yield id valueT)) ( \case (Left (e :: SomeException)) -> do log_ (printf "[error] %s %s" id (show e)) despawn id Right _ -> pure () ) let state' = state { threadPool = state.threadPool { threads = (Thread {..}) : state.threadPool.threads } } atomically (putTMVar stateM state') pure valueT ) yield :: (Typeable a, Show a) => String -> TVar (Maybe (V Dynamic)) -> a -> ReaderT Config IO () yield id valueT value = do stateM <- asks (.stateM) state <- atomically (readTMVar stateM) let gen = state.currentGen log_ (printf "[yield] %s %s" id (show (V gen value))) atomically do value' <- readTVar valueT writeTVar valueT (Just (toDyn <$> V gen value)) when (isJust value') do writeTQueue state.inputQueue () despawn :: String -> ReaderT Config IO () despawn id = do stateM <- asks (.stateM) bracketOnError (atomically $ takeTMVar stateM) (atomically . putTMVar stateM) ( \state -> do log_ (printf "[despawn] %s" id) atomically . putTMVar stateM $ state { threadPool = state.threadPool { threads = filter (\thread -> thread.id /= id) state.threadPool.threads } } ) concatS :: (Monad m) => [SensorT m a b] -> SensorT m a [b] concatS = SensorT' . fmap f . go . map (.unSensorT) where go :: (Monad m) => [MSF m a b] -> MSF m a [b] go sfs = MSF $ \a -> do (bs, sfs') <- unzip <$> mapM (\sf -> (.unMSF) sf a) sfs pure (bs, go sfs') f vs = V (maximum (map (.gen) vs)) (map (.unV) vs) withDefaultS :: (Monad m) => b -> SensorT m a b -> SensorT m (Maybe a) b withDefaultS def = SensorT' . go . (.unSensorT) where go sf = MSF $ \case V gen (Just a) -> do (b, sf') <- sf.unMSF (V gen a) pure (b, go sf') V gen Nothing -> pure (V gen def, go sf) feedbackS :: (Monad m) => c -> SensorT m (a, c) (b, c) -> SensorT m a b feedbackS c = SensorT' . (\sf -> feedback c (arr g >>> sf >>> arr f)) . (.unSensorT) where f :: V (a, c) -> (V a, c) f (V gen (a, c)) = (V gen a, c) g :: (V a, c) -> V (a, c) g (V gen a, c) = (V gen (a, c)) arrS :: (MonadIO m) => (a -> m b) -> SensorT m a b arrS f = sensorT $ go where go = MSF $ \(V gen a) -> lift (((,go) . V gen) <$> f a)