Browse Source

Implement the manage thread

  Write stream values to sample vars in the manager state
Trevor Elliott 9 years ago
parent
commit
93b182992e
2 changed files with 164 additions and 43 deletions
  1. 136 43
      src/Gidl/Backend/Rpc.hs
  2. 28 0
      support/rpc/Base.hs.template

+ 136 - 43
src/Gidl/Backend/Rpc.hs

@@ -10,6 +10,7 @@ import Gidl.Backend.Haskell.Interface (interfaceModule,ifModuleName)
 import Gidl.Backend.Haskell.Types (typeModule,isUserDefined,typeModuleName)
 import Gidl.Interface
            (Interface(..),InterfaceEnv(..),MethodName,Method(..),Perm(..))
+import Gidl.Schema (Schema(..),producerSchema,consumerSchema)
 import Gidl.Types (Type,TypeEnv(..))
 
 import Data.Char (isSpace)
@@ -21,7 +22,7 @@ import Ivory.Artifact.Template (artifactCabalFileTemplate)
 import Text.PrettyPrint.Mainland
            (Doc,prettyLazyText,text,empty,(<+>),(</>),(<>),char,line,parens
            ,punctuate,stack,sep,tuple,dot,spread,cat,string,indent,hang,nest
-           ,(<+/>),align,comma)
+           ,(<+/>),align,comma,Pretty(..),braces)
 
 
 -- External Interface ----------------------------------------------------------
@@ -81,6 +82,10 @@ strToNs str =
 allMethods :: Interface -> [(MethodName,Method)]
 allMethods (Interface _ ps ms) = concatMap allMethods ps ++ ms
 
+isStream :: Method -> Bool
+isStream StreamMethod{} = True
+isStream _              = False
+
 
 -- Server Generation -----------------------------------------------------------
 
@@ -95,20 +100,26 @@ rpcModule typeEnv ns iface =
 
 
 genServer :: TypeEnv -> [String] -> Interface -> String -> Doc
-genServer typeEnv ns iface ifaceMod =
-  stack [ text "{-# LANGUAGE OverloadedStrings #-}"
-        , moduleHeader     ns ifaceMod
-        , line
-        , importTypes      ns typeEnv
-        , importInterface  ns ifaceMod
-        , line
-        , text "import" <+> (ppModName (ns ++ ["Rpc","Base"]))
-        , line
-        , webServerImports
-        , line
-        , line
-        , runServer typeEnv iface
-        ]
+genServer typeEnv ns iface ifaceMod = stack $
+  [ text "{-# LANGUAGE RecordWildCards #-}" | useManager ] ++
+  [ text "{-# LANGUAGE OverloadedStrings #-}"
+  , moduleHeader     ns ifaceMod
+  , line
+  , importTypes      ns typeEnv
+  , importInterface  ns ifaceMod
+  , line
+  , text "import" <+> (ppModName (ns ++ ["Rpc","Base"]))
+  , line
+  , webServerImports
+  , line
+  , line
+  , managerDefs
+  , runServer useManager typeEnv iface input output
+  ]
+  where
+  (useManager,managerDefs) = managerDef iface output
+
+  (input,output) = queueTypes iface
 
 
 moduleHeader :: [String] -> String -> Doc
@@ -145,52 +156,63 @@ webServerImports  =
         ]
 
 
-runServer :: TypeEnv -> Interface -> Doc
-runServer typeEnv iface = runServerSig </> runServerDef typeEnv iface
-
+type InputQueue  = Doc
+type OutputQueue = Doc
 
-runServerSig :: Doc
-runServerSig  =
-  text "rpcServer" <+> text "::"
-                   <+> hang 2 (arrow [ chan, chan, text "Config", text "IO ()" ])
+queueTypes :: Interface -> (InputQueue,OutputQueue)
+queueTypes iface = (input,output)
   where
-  chan = text "TChan" <+> text "S.ByteString"
+  Schema prodName _ = producerSchema iface
+  Schema consName _ = consumerSchema iface
+
+  prod = ifModuleName iface ++ prodName
+  cons = ifModuleName iface ++ consName
+
+  input  = text "TQueue" <+> text prod
+  output = text "TQueue" <+> text cons
+
+
+runServer :: Bool -> TypeEnv -> Interface -> InputQueue -> OutputQueue -> Doc
+runServer useMgr typeEnv iface input output =
+  runServerSig iface input output </> runServerDef useMgr typeEnv iface input output
+
 
+runServerSig :: Interface -> InputQueue -> OutputQueue -> Doc
+runServerSig iface input output =
+  text "rpcServer ::" <+> hang 2 (arrow [ input, output
+                                        , text "Config"
+                                        , text "IO ()" ])
 
 -- | Generate a definition for the server.
-runServerDef :: TypeEnv -> Interface -> Doc
-runServerDef typeEnv iface = hang 2 (text "rpcServer" <+> body)
+runServerDef :: Bool -> TypeEnv -> Interface -> InputQueue -> OutputQueue -> Doc
+runServerDef useMgr typeEnv iface input output =
+  hang 2 (text "rpcServer" <+> body)
   where
-  body = arg "input"  $ \ input  ->
-         arg "output" $ \ output ->
-         arg "cfg"    $ \ cfg    ->
-           char '=' </>
-           nest 2 (text "do" <+> align (stack (stmts cfg)))
+  body = text "input output cfg" <+> char '=' </>
+         nest 2 (text "do" <+> align (stack stmts))
              </> text "where"
              </> routesDef
-             </> managerDef input output
 
-  stmts cfg = [ text "_ <- forkIO manager"
-              , text "runServer" <+> cfg <+> text "routes"
-              ]
+  stmts = [ text "state <- mkState"                           | useMgr ]
+       ++ [ defOutput                                                  ]
+       ++ [ text "_ <- forkIO (manager state output output')" | useMgr ]
+       ++ [ text "runServer cfg" <+> text "routes"                     ]
+
+  defOutput
+    | useMgr    = text "output' <- newTQueue"
+    | otherwise = text "let output' = output"
 
   routesDef = nest 3 $
     nest 2 (text "routes" <+> char '=' <+/> align (routes typeEnv iface))
 
-  managerDef input output =
-    nest 2 (text "manager" <+> char '=' <+/> align (text "..."))
-
 
 -- | Define one route for each interface member
 routes :: TypeEnv -> Interface -> Doc
-routes types iface =
-  text "route" <+> methods
-
+routes types iface = text "route" <+> align methods
   where
-
-  methods = align (char '['
+  methods = char '['
          <> stack (punctuate comma (concatMap (mkRoute types) (allMethods iface)))
-         <> char ']')
+         <> char ']'
 
 mkRoute :: TypeEnv -> (MethodName,Method) -> [Doc]
 mkRoute types (name,method) =
@@ -213,6 +235,77 @@ writeMethod :: TypeEnv -> Type -> Doc
 writeMethod types _ = text "writeBS \"write\""
 
 
+-- The stream manager ----------------------------------------------------------
+
+-- | Define everything associated with the manager, but only if there are stream
+-- values to manage.
+managerDef :: Interface -> OutputQueue -> (Bool,Doc)
+managerDef iface output
+  | null streams = (False,empty)
+  | otherwise    = (True,stack defs </> empty)
+  where
+
+  streams = [ (name,ty) | (name,StreamMethod _ ty) <- allMethods iface ]
+
+  (stateType,stateDecl) = stateDef streams
+
+  defs = [ stateDecl
+         , empty
+         , mkStateDef streams
+         , empty
+         , text "manager ::" <+> arrow [ stateType, output, output, text "IO ()" ]
+         , nest 2 $ text "manager state output filtered = forever $"
+                </> text "do" <+> align stmts
+         ]
+
+  stmts = text "msg <- atomically (readTQueue output)"
+      </> nest 2 (text "case msg of" </> stack (map mkCase streams ++ [defCase]))
+
+  -- name the producer constructor for a stream element
+  Schema prodSuffix _ = producerSchema iface
+  prodName ty = text (typeModuleName ty ++ prodSuffix)
+
+  -- update the state for this stream element
+  mkCase (n,ty) = prodName ty <+> text "x -> atomically (writeTSampleVar"
+                              <+> parens (fieldName n <+> text "state")
+                              <+> text "x)"
+
+  defCase = text "notStream -> atomically (writeTQueue filtered notStream)"
+
+
+-- | Generate the data type used to hold the streaming values, or nothing if
+-- there aren't any present in the interface.
+stateDef :: [(MethodName,Type)] -> (Doc,Doc)
+stateDef streams = (text "State",def)
+  where
+
+  def = nest 2 (text "data State = State" <+> braces fields)
+
+  fields = align (stack (punctuate comma (map mkField streams)))
+
+  mkField (name,ty) =
+    fieldName name
+      <+> text "::"
+      <+> text "TSampleVar"
+      <+> text (typeModuleName ty)
+
+
+mkStateDef :: [(MethodName,Type)] -> Doc
+mkStateDef streams = stack
+  [ text "mkState :: IO State"
+  , nest 2 (text "mkState  =" </> nest 3 (text "do" <+> align (stack stmts)))
+  ]
+  where
+  stmts = [ fieldName n <+> text "<- newTSampleVarIO" | (n,_) <- streams ]
+       ++ [ text "return State { .. }" ]
+
+
+-- | Given the name of a stream in the interface, produce the selector for the
+-- state data type.
+fieldName :: MethodName -> Doc
+fieldName name = text "stream_" <> text name
+
+
 -- Pretty-printing Helpers -----------------------------------------------------
 
 arg :: String -> (Doc -> Doc) -> Doc

+ 28 - 0
support/rpc/Base.hs.template

@@ -5,6 +5,8 @@
 
 module $module_path$.Base where
 
+import           Control.Concurrent.STM
+                     (atomically,TQueue,writeTQueue,STM,retry,TVar,newTVar)
 import           Snap.Core (Snap,route)
 import qualified Snap.Http.Server as HTTP
 import           Snap.Util.FileServe (serveDirectory)
@@ -39,3 +41,29 @@ runServer Config { .. } serveRpc =
        case cfgStaticDir of
          Just path -> route [ ("", serveDirectory path) ]
          Nothing   -> return ()
+
+
+data Request req resp = ReadRequest req (resp -> IO ())
+                      | WriteRequest req
+
+
+-- Sample Variables ------------------------------------------------------------
+
+-- | A TVar that blocks when it's empty, but allows writes even when it's full.
+newtype SampleTVar a = SampleVar (TVar (Maybe a))
+
+newTSampleVar :: STM (TSampleTVar a)
+newTSampleVar  = TSampleVar `fmap` newTVar Nothing
+
+newTSampleVarIO :: IO (TSampleTVar a)
+newTSampleVarIO  = atomically (TSampleVar `fmap` newTVar Nothing)
+
+writeTSampleVar :: SampleVar a -> a -> STM ()
+writeTSampleVar (TSampleVar tv) a = writeTVar tv (Just a)
+
+readTSampleVar :: TSampleVar a -> STM ()
+readTSampleVar (TSampleVar tv) =
+  do mb <- readTVar tv
+     case mb of
+       Just a  -> return a
+       Nothing -> retry