Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .obelisk/impl/github.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"owner": "obsidiansystems",
"repo": "obelisk",
"branch": "develop",
"rev": "86a9584c6d7648bd5324ef57d62421fed1bf1978",
"sha256": "1lbii87j5530ncm6brfbzkd3wg16mgxazsf3l56zzv4c8cydilmh"
"branch": "cg-fullroute",
"rev": "0d98be9c5b49bda50fdc172a6b62477d80fc20ac",
"sha256": "16zd733m3cbq8228412bqyhivyblwpj24xfdygd7k25ml5f0f41k"
}
5 changes: 5 additions & 0 deletions aeson-orphans/src/Rhyolite/Aeson/Orphans.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE StandaloneDeriving #-}
Expand All @@ -15,9 +16,13 @@ import qualified Data.ByteString.Lazy as LBS
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Monoid hiding (First (..))
import Data.Ord
import Data.Semigroup
import Data.Text.Encoding (decodeUtf8, encodeUtf8)

deriving newtype instance ToJSON a => ToJSON (Down a)
deriving newtype instance FromJSON a => FromJSON (Down a)

instance ToJSON ByteString where
toJSON = toJSON . decodeUtf8 . B64.encode

Expand Down
7 changes: 4 additions & 3 deletions backend-db/Network/PushNotification/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}

{-# OPTIONS_GHC -fno-warn-orphans #-}
module Network.PushNotification.Worker where

Expand All @@ -23,7 +24,6 @@ import Rhyolite.Backend.DB
import Rhyolite.Backend.Schema
import Rhyolite.Backend.Schema.TH
import Rhyolite.Concurrent
import Rhyolite.Request.TH
import Rhyolite.Schema
import GHC.Generics

Expand All @@ -49,11 +49,12 @@ makeDefaultKeyIdInt64 ''ApplePushMessage 'ApplePushMessageKey

data AndroidPushMessage = AndroidPushMessage
{ _androidPushMessage_payload :: Json FcmPayload }
deriving (Generic)

instance ToJSON AndroidPushMessage
instance FromJSON AndroidPushMessage
instance HasId AndroidPushMessage

makeJson ''AndroidPushMessage

mkRhyolitePersist (Just "migrateAndroidPushMessage") [groundhog|
- entity: AndroidPushMessage
|]
Expand Down
10 changes: 8 additions & 2 deletions backend-db/Rhyolite/Backend/Schema.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import Database.PostgreSQL.Simple.ToField (ToField, toField, Action)
import Database.PostgreSQL.Simple.Types (Binary (..), Identifier (..))

import Rhyolite.Schema (Json (..), SchemaName(..), LargeObjectId(..), HasId (..), Id (..))
import Rhyolite.Backend.Schema.TH (makePersistFieldNewtype)
import Rhyolite.Backend.Schema.Class (DerivedEntity, DerivedEntityHead, DefaultKeyId, toIdData, fromIdData)

instance ToField SchemaName where
Expand Down Expand Up @@ -125,4 +124,11 @@ toShowUniverse = toField . T.pack . show

instance Exception VisibleUniverseFailure

makePersistFieldNewtype ''SchemaName
instance PersistField SchemaName where
persistName _ = "SchemaName"
toPersistValues (SchemaName x) = toPersistValues x
fromPersistValues pv = do
(x, pv') <- fromPersistValues pv
return (SchemaName x, pv')
dbType p (SchemaName x) = dbType p x

10 changes: 9 additions & 1 deletion backend-db/Rhyolite/Backend/Schema/TH.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@

{-# OPTIONS_GHC -fno-warn-orphans -fno-warn-redundant-constraints #-}

module Rhyolite.Backend.Schema.TH where
module Rhyolite.Backend.Schema.TH
( deriveNewtypePersistBackend
, makeDefaultKeyIdInt64
, makeDefaultKeyIdSimple
, mkRhyolitePersist
, makePersistFieldNewtype
, module Rhyolite.Backend.Schema
) where

import Control.Lens ((%~), _head)
import Control.Monad
Expand All @@ -29,6 +36,7 @@ import Database.Groundhog.TH.Settings (PersistDefinitions(..))
import Language.Haskell.TH

import Rhyolite.TH (conName)
import Rhyolite.Backend.Schema -- Not needed for this module, but without it, the generated code fails to compile in a way which is confusing, so we re-export it.
import Rhyolite.Backend.Schema.Class

deriveNewtypePersistBackend :: (TypeQ -> TypeQ) -> (TypeQ -> TypeQ) -> Name -> Name -> DecsQ
Expand Down
78 changes: 65 additions & 13 deletions backend-db/Rhyolite/Backend/TaskWorker.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeFamilies #-}
module Rhyolite.Backend.TaskWorker where
Expand All @@ -8,19 +10,36 @@ import Rhyolite.Backend.Schema.Task
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.Thread.Delay
import Control.Exception.Lifted (bracket)
import Control.Exception.Lifted (bracket, catch, throw, Exception, SomeException(..))
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Logger
import Control.Monad.Trans.Control
import Data.Functor.Identity
import Data.Pool
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time
import Database.Groundhog.Core
import Database.Groundhog.Expression
import Database.Groundhog.Postgresql
import qualified Database.PostgreSQL.Simple.Types as PG
import Rhyolite.Backend.DB
import Rhyolite.Backend.DB.PsqlSimple (PostgresRaw, executeQ)

withSavepoint
:: (PostgresRaw m, MonadBaseControl IO m, Exception e)
=> String -> m a -> m (Either e a)
withSavepoint name action = do
let savePt = PG.Identifier $ T.pack name
[executeQ|SAVEPOINT ?savePt|]
result <- catch (Right <$> action) $ \e -> return (Left e)
case result of
Left _ -> do
[executeQ|ROLLBACK TO SAVEPOINT ?savePt|]
Right _ -> do
[executeQ|RELEASE SAVEPOINT ?savePt|]
return result

--TODO: Ensure Tasks are always properly indexed
--TODO: Use Notifications to start the worker promptly
Expand Down Expand Up @@ -64,20 +83,52 @@ taskWorker input pk ready f go db workerName = do
checkedOutValue <- runDb (Identity db) $ do
qe <- project1 (pk, input) $ isFieldNothing (f ~> Task_resultSelector) &&. isFieldNothing (f ~> Task_checkedOutBySelector) &&. ready
forM qe $ \(taskId, a) -> do
update [f ~> Task_checkedOutBySelector =. Just workerName] $ pk ==. taskId
(,) taskId <$> go a
now <- getTime
update
[ f ~> Task_checkedOutBySelector =. Just workerName
, f ~> Task_checkedOutAtSelector =. Just now]
$ pk ==. taskId
result <- withSavepoint "Rhyolite.Backend.TaskWorker[1]" $ (,) taskId <$> go a
case result of
Right _ -> pure ()
Left (e :: SomeException) -> update
[ f ~> Task_failedSelector =. (Just . T.pack $ "Step 1:" <> show e)
]
$ pk ==. taskId
return result

case checkedOutValue of
Nothing -> pure False
Just (taskId, action) -> do
followup <- action
Rhyolite.Backend.DB.runDb (Identity db) $ do
b <- followup
update
[ f ~> Task_resultSelector =. Just b
, f ~> Task_checkedOutBySelector =. (Nothing :: Maybe Text)
]
(pk ==. taskId)
pure True
Just (Left bad) -> throw bad
Just (Right (taskId, action)) -> do
followupOrError <- catch (Right <$> action) $ return . Left
finallError <- case followupOrError of
Left (e :: SomeException) -> Rhyolite.Backend.DB.runDb (Identity db) $ do
update
[ f ~> Task_failedSelector =. (Just . T.pack $ "Step 2:" <> show e)
]
(pk ==. taskId)
return $ Just e
Right followup -> Rhyolite.Backend.DB.runDb (Identity db) $ do
bOrError <- withSavepoint "Rhyolite.Backend.TaskWorker[1]" followup
case bOrError of
Left (e :: SomeException) -> do
update
[ f ~> Task_failedSelector =. (Just . T.pack $ "Step 3:" <> show e)
]
(pk ==. taskId)
return $ Just e
Right b -> do
update
[ f ~> Task_resultSelector =. Just b
, f ~> Task_checkedOutBySelector =. (Nothing :: Maybe Text)
, f ~> Task_checkedOutAtSelector =. (Nothing :: Maybe UTCTime)
]
(pk ==. taskId)
return Nothing
case finallError of
Just e -> throw e
Nothing -> pure True

-- | Run a worker thread
-- The worker will wake up whenever the timer expires or the wakeup action is called
Expand All @@ -104,3 +155,4 @@ withWorker d work child = do
Right False -> sleep nextStartVar
go nextStartVar
bracket (liftIO $ async $ go initialStartVar) (liftIO . cancel) $ \_ -> child wakeup

Loading