{-| This module has functions to send commands LISTEN and NOTIFY to the database server. It also has a function to wait for and handle notifications on a database connection. For more information check the [PostgreSQL documentation](https://www.postgresql.org/docs/current/libpq-notify.html). -}moduleHasql.Notifications(notifyPool ,notify ,listen ,unlisten ,waitForNotifications ,PgIdentifier ,toPgIdentifier ,fromPgIdentifier )whereimportHasql.Pool(Pool,UsageError,use)importHasql.Session(sql,run,statement)importqualifiedHasql.SessionasSimportqualifiedHasql.StatementasHSTimportHasql.Connection(Connection,withLibPQConnection)importqualifiedHasql.DecodersasHDimportqualifiedHasql.EncodersasHEimportqualifiedDatabase.PostgreSQL.LibPQasPQimportData.Text(Text)importqualifiedData.TextasTimportqualifiedData.Text.EncodingasTimportData.ByteString.Char8(ByteString)importData.Functor.Contravariant(contramap)importControl.Monad(void,forever)importControl.Concurrent(threadWaitRead)importControl.Exception(Exception,throw)-- | A wrapped text that represents a properly escaped and quoted PostgreSQL identifiernewtypePgIdentifier =PgIdentifier Textderiving(Int -> PgIdentifier -> ShowS [PgIdentifier] -> ShowS PgIdentifier -> String forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a showList :: [PgIdentifier] -> ShowS $cshowList :: [PgIdentifier] -> ShowS show :: PgIdentifier -> String $cshow :: PgIdentifier -> String showsPrec :: Int -> PgIdentifier -> ShowS $cshowsPrec :: Int -> PgIdentifier -> ShowS Show)-- | Uncatchable exceptions thrown and never caught.newtypeFatalError =FatalError {FatalError -> String fatalErrorMessage ::String}deriving(Int -> FatalError -> ShowS [FatalError] -> ShowS FatalError -> String forall a. (Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a showList :: [FatalError] -> ShowS $cshowList :: [FatalError] -> ShowS show :: FatalError -> String $cshow :: FatalError -> String showsPrec :: Int -> FatalError -> ShowS $cshowsPrec :: Int -> FatalError -> ShowS Show)instanceExceptionFatalError -- | Given a PgIdentifier returns the wrapped textfromPgIdentifier ::PgIdentifier ->TextfromPgIdentifier :: PgIdentifier -> Text fromPgIdentifier (PgIdentifier Text bs )=Text bs -- | Given a text returns a properly quoted and escaped PgIdentifiertoPgIdentifier ::Text->PgIdentifier toPgIdentifier :: Text -> PgIdentifier toPgIdentifier Text x =Text -> PgIdentifier PgIdentifier forall a b. (a -> b) -> a -> b $Text "\""forall a. Semigroup a => a -> a -> a <>Text -> Text strictlyReplaceQuotes Text x forall a. Semigroup a => a -> a -> a <>Text "\""wherestrictlyReplaceQuotes ::Text->TextstrictlyReplaceQuotes :: Text -> Text strictlyReplaceQuotes =Text -> Text -> Text -> Text T.replaceText "\""(Text "\"\""::Text)-- | Given a Hasql Pool, a channel and a message sends a notify command to the databasenotifyPool ::Pool-- ^ Pool from which the connection will be used to issue a NOTIFY command.->Text-- ^ Channel where to send the notification->Text-- ^ Payload to be sent with the notification->IO(EitherUsageError())notifyPool :: Pool -> Text -> Text -> IO (Either UsageError ()) notifyPool Pool pool Text channel Text mesg =forall a. Pool -> Session a -> IO (Either UsageError a) usePool pool (forall params result. params -> Statement params result -> Session result statement(Text channel ,Text mesg )Statement (Text, Text) () callStatement )wherecallStatement :: Statement (Text, Text) () callStatement =forall a b. ByteString -> Params a -> Result b -> Bool -> Statement a b HST.Statement(ByteString "SELECT pg_notify"forall a. Semigroup a => a -> a -> a <>ByteString "(1,ドル 2ドル)")Params (Text, Text) encoder Result () HD.noResultBool Falseencoder :: Params (Text, Text) encoder =forall (f :: * -> *) a' a. Contravariant f => (a' -> a) -> f a -> f a' contramapforall a b. (a, b) -> a fst(forall a. NullableOrNot Value a -> Params a HE.paramforall a b. (a -> b) -> a -> b $forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a HE.nonNullableValue Text HE.text)forall a. Semigroup a => a -> a -> a <>forall (f :: * -> *) a' a. Contravariant f => (a' -> a) -> f a -> f a' contramapforall a b. (a, b) -> b snd(forall a. NullableOrNot Value a -> Params a HE.paramforall a b. (a -> b) -> a -> b $forall (encoder :: * -> *) a. encoder a -> NullableOrNot encoder a HE.nonNullableValue Text HE.text)-- | Given a Hasql Connection, a channel and a message sends a notify command to the databasenotify ::Connection-- ^ Connection to be used to send the NOTIFY command->PgIdentifier -- ^ Channel where to send the notification->Text-- ^ Payload to be sent with the notification->IO(EitherS.QueryError())notify :: Connection -> PgIdentifier -> Text -> IO (Either QueryError ()) notify Connection con PgIdentifier channel Text mesg =forall a. Session a -> Connection -> IO (Either QueryError a) run(ByteString -> Session () sqlforall a b. (a -> b) -> a -> b $Text -> ByteString T.encodeUtf8(Text "NOTIFY "forall a. Semigroup a => a -> a -> a <>PgIdentifier -> Text fromPgIdentifier PgIdentifier channel forall a. Semigroup a => a -> a -> a <>Text ", '"forall a. Semigroup a => a -> a -> a <>Text mesg forall a. Semigroup a => a -> a -> a <>Text "'"))Connection con {-| Given a Hasql Connection and a channel sends a listen command to the database. Once the connection sends the LISTEN command the server register its interest in the channel. Hence it's important to keep track of which connection was used to open the listen command. Example of listening and waiting for a notification: @ import System.Exit (die) import Hasql.Connection import Hasql.Notifications main :: IO () main = do dbOrError <- acquire "postgres://localhost/db_name" case dbOrError of Right db -> do let channelToListen = toPgIdentifier "sample-channel" listen db channelToListen waitForNotifications (\channel _ -> print $ "Just got notification on channel " <> channel) db _ -> die "Could not open database connection" @ -}listen ::Connection-- ^ Connection to be used to send the LISTEN command->PgIdentifier -- ^ Channel this connection will be registered to listen to->IO()listen :: Connection -> PgIdentifier -> IO () listen Connection con PgIdentifier channel =forall (f :: * -> *) a. Functor f => f a -> f () voidforall a b. (a -> b) -> a -> b $forall a. Connection -> (Connection -> IO a) -> IO a withLibPQConnectionConnection con Connection -> IO () execListen whereexecListen :: Connection -> IO () execListen Connection pqCon =forall (f :: * -> *) a. Functor f => f a -> f () voidforall a b. (a -> b) -> a -> b $Connection -> ByteString -> IO (Maybe Result) PQ.execConnection pqCon forall a b. (a -> b) -> a -> b $Text -> ByteString T.encodeUtf8forall a b. (a -> b) -> a -> b $Text "LISTEN "forall a. Semigroup a => a -> a -> a <>PgIdentifier -> Text fromPgIdentifier PgIdentifier channel -- | Given a Hasql Connection and a channel sends a unlisten command to the databaseunlisten ::Connection-- ^ Connection currently registerd by a previous 'listen' call->PgIdentifier -- ^ Channel this connection will be deregistered from->IO()unlisten :: Connection -> PgIdentifier -> IO () unlisten Connection con PgIdentifier channel =forall (f :: * -> *) a. Functor f => f a -> f () voidforall a b. (a -> b) -> a -> b $forall a. Connection -> (Connection -> IO a) -> IO a withLibPQConnectionConnection con Connection -> IO () execListen whereexecListen :: Connection -> IO () execListen Connection pqCon =forall (f :: * -> *) a. Functor f => f a -> f () voidforall a b. (a -> b) -> a -> b $Connection -> ByteString -> IO (Maybe Result) PQ.execConnection pqCon forall a b. (a -> b) -> a -> b $Text -> ByteString T.encodeUtf8forall a b. (a -> b) -> a -> b $Text "UNLISTEN "forall a. Semigroup a => a -> a -> a <>PgIdentifier -> Text fromPgIdentifier PgIdentifier channel {-| Given a function that handles notifications and a Hasql connection it will listen on the database connection and call the handler everytime a message arrives. The message handler passed as first argument needs two parameters channel and payload. See an example of handling notification on a separate thread: @ import Control.Concurrent.Async (async) import Control.Monad (void) import System.Exit (die) import Hasql.Connection import Hasql.Notifications notificationHandler :: ByteString -> ByteString -> IO() notificationHandler channel payload = void $ async do print $ "Handle payload " <> payload <> " in its own thread" main :: IO () main = do dbOrError <- acquire "postgres://localhost/db_name" case dbOrError of Right db -> do let channelToListen = toPgIdentifier "sample-channel" listen db channelToListen waitForNotifications notificationHandler db _ -> die "Could not open database connection" @ -}waitForNotifications ::(ByteString->ByteString->IO())-- ^ Callback function to handle incoming notifications->Connection-- ^ Connection where we will listen to->IO()waitForNotifications :: (ByteString -> ByteString -> IO ()) -> Connection -> IO () waitForNotifications ByteString -> ByteString -> IO () sendNotification Connection con =forall a. Connection -> (Connection -> IO a) -> IO a withLibPQConnectionConnection con forall a b. (a -> b) -> a -> b $forall (f :: * -> *) a. Functor f => f a -> f () voidforall b c a. (b -> c) -> (a -> b) -> a -> c .forall (f :: * -> *) a b. Applicative f => f a -> f b foreverforall b c a. (b -> c) -> (a -> b) -> a -> c .Connection -> IO () pqFetch wherepqFetch :: Connection -> IO () pqFetch Connection pqCon =doMaybe Notify mNotification <-Connection -> IO (Maybe Notify) PQ.notifiesConnection pqCon caseMaybe Notify mNotification ofMaybe Notify Nothing->doMaybe Fd mfd <-Connection -> IO (Maybe Fd) PQ.socketConnection pqCon caseMaybe Fd mfd ofMaybe Fd Nothing->forall a. String -> a panic String "Error checking for PostgreSQL notifications"JustFd fd ->doforall (f :: * -> *) a. Functor f => f a -> f () voidforall a b. (a -> b) -> a -> b $Fd -> IO () threadWaitReadFd fd forall (f :: * -> *) a. Functor f => f a -> f () voidforall a b. (a -> b) -> a -> b $Connection -> IO Bool PQ.consumeInputConnection pqCon JustNotify notification ->ByteString -> ByteString -> IO () sendNotification (Notify -> ByteString PQ.notifyRelnameNotify notification )(Notify -> ByteString PQ.notifyExtraNotify notification )panic ::String->a panic :: forall a. String -> a panic String a =forall a e. Exception e => e -> a throw(String -> FatalError FatalError String a )