diff --git a/cabal-override.project b/cabal-override.project index c611607ab..f9d8d6772 100644 --- a/cabal-override.project +++ b/cabal-override.project @@ -53,7 +53,7 @@ constraints: any.Diff ==0.4.1, any.blaze-builder ==0.4.2.3, any.blaze-html ==0.9.2.0, any.blaze-markup ==0.8.3.0, - any.bloodhound ==0.19.1.0, + any.bloodhound ==0.26.0.0, any.boring ==0.2.2, any.bsb-http-chunked ==0.0.0.4, any.bugzilla-redhat ==1.0.1.1, @@ -360,7 +360,7 @@ constraints: any.Diff ==0.4.1, any.yaml ==0.11.11.2, any.zigzag ==0.0.1.0, any.zlib ==0.6.3.0 -index-state: hackage.haskell.org 2025-01-06T16:40:37Z +index-state: hackage.haskell.org 2025-10-19T21:24:34Z package proto3-suite flags: -swagger -large-records diff --git a/monocle.cabal b/monocle.cabal index 5d2cfef03..38a4a0795 100644 --- a/monocle.cabal +++ b/monocle.cabal @@ -125,7 +125,7 @@ library , blaze-markup >= 0.8.2.8 , blaze-html >= 0.9.1.2 , binary >= 0.8 - , bloodhound ^>= 0.19 + , bloodhound ^>= 0.26 , bugzilla-redhat ^>= 1.0 , byteslice >= 0.2 , bytestring >= 0.10 diff --git a/nix/default.nix b/nix/default.nix index 1f751ae0f..d3738259d 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -118,11 +118,18 @@ let serialise = pkgs.haskell.lib.doJailbreak (pkgs.haskell.lib.dontCheck hpPrev.serialise); - # upgrade to bloodhound 0.20 needs some work - bloodhound = pkgs.haskell.lib.overrideCabal hpPrev.bloodhound { - version = "0.19.1.0"; - sha256 = "sha256-QEN1wOLLUEsDKAbgz8ex0wfK/duNytvRYclwkBj/1G0="; - }; + bloodhound = let + src = pkgs.fetchFromGitHub { + owner = "bitemyapp"; + repo = "bloodhound"; + rev = "601301754bc0c8d8002b1d3d7016530b357ee0c8"; # v0.26.0.0 + sha256 = "sha256-Q5oVzHMp9OBCd2S+HlSDsSFGcqX7dFNVF/dpHKEspIc="; + }; + pkg = hpPrev.callCabal2nix "bloodhound" src { }; + in pkgs.lib.pipe pkg [ + pkgs.haskell.lib.compose.doJailbreak + pkgs.haskell.lib.compose.dontCheck + ]; # relax bound for doctest, ghc-prim, primitive, template-haskell, text and transformers proto3-wire = pkgs.haskell.lib.doJailbreak hpPrev.proto3-wire; diff --git a/src/Database/Bloodhound/Raw.hs b/src/Database/Bloodhound/Raw.hs index 04658929e..d703f7ac8 100644 --- a/src/Database/Bloodhound/Raw.hs +++ b/src/Database/Bloodhound/Raw.hs @@ -13,54 +13,31 @@ module Database.Bloodhound.Raw ( aggWithDocValues, mkAgg, mkTermsCompositeAgg, -) where +) +where import Control.Monad.Catch (MonadThrow, throwM) import Data.Aeson import Data.Aeson qualified as Aeson import Data.Aeson.Casing.Internal qualified as AesonCasing -import Data.Aeson.KeyMap qualified as KM -import Data.Aeson.Types qualified as Aeson -import Data.Text qualified as Text +import Data.Map qualified as Map +import Data.Text qualified as T import Data.Vector qualified as V import Database.Bloodhound qualified as BH +import Database.Bloodhound.Common.Requests qualified as Query import Json.Extras qualified as Json import Monocle.Prelude import Network.HTTP.Client qualified as HTTP import Network.HTTP.Types.Method qualified as HTTP +import Network.HTTP.Types.Status qualified as HTTP data ScrollRequest = NoScroll | GetScroll ByteString -type QS = [(ByteString, Maybe ByteString)] - -dispatch :: - BH.MonadBH m => - HTTP.Method -> - Text -> - LByteString -> - QS -> - m BH.Reply -dispatch method url body qs = do - initReq <- liftIO $ HTTP.parseRequest (from url) - let request = - initReq - { HTTP.method = method - , HTTP.requestHeaders = - ("Content-Type", "application/json") : HTTP.requestHeaders initReq - , HTTP.requestBody = HTTP.RequestBodyLBS body - } - manager <- BH.bhManager <$> BH.getBHEnv - liftIO $ HTTP.httpLbs (setQs request) manager - where - setQs = case qs of - [] -> id - xs -> HTTP.setQueryString xs - -- | Utility function to advance in scroll result. We can use the BH library -- because we no longer need to support a custom raw body once we have a scroll. -advance :: (MonadBH m, MonadThrow m, FromJSON resp) => BH.ScrollId -> m (BH.SearchResult resp) +advance :: (MonadBH m, FromJSON resp) => BH.ScrollId -> m (BH.SearchResult resp) advance scroll = do - resp <- BH.advanceScroll scroll 60 + resp <- BH.tryEsError $ BH.advanceScroll scroll 60 case resp of Left err -> throwEsError "advance" err Right x -> pure x @@ -69,22 +46,14 @@ throwEsError :: MonadThrow m => LByteString -> BH.EsError -> m a throwEsError resp err = throwM $ BH.EsProtocolException err.errorMessage resp settings :: (MonadBH m, ToJSON body) => BH.IndexName -> body -> m () -settings (BH.IndexName index) body = do - BH.Server s <- BH.bhServer <$> BH.getBHEnv - let url = Text.intercalate "/" [s, index, "_settings"] +settings index body = do + let endpoint = BH.mkEndpoint [BH.unIndexName index, "_settings"] method = HTTP.methodPut - resp <- dispatch method url (Aeson.encode body) [] - case HTTP.responseBody resp of - "{\"acknowledged\":true}" -> pure () + resp <- BH.performBHRequest @_ @BH.StatusIndependant $ BH.mkFullRequest method endpoint (Aeson.encode body) + case resp of + BH.Acknowledged True -> pure () _ -> error $ "Settings apply failed: " <> show resp -search' :: (MonadBH m, ToJSON body) => BH.IndexName -> body -> QS -> m BH.Reply -search' (BH.IndexName index) body qs = do - BH.Server s <- BH.bhServer <$> BH.getBHEnv - let url = Text.intercalate "/" [s, index, "_search"] - method = HTTP.methodPost - dispatch method url (Aeson.encode body) qs - -- | Manual aeson casing implementation to create the search _source attribute -- -- >>> aesonCasing "echangeCommitCount" @@ -93,34 +62,26 @@ aesonCasing :: String -> String aesonCasing = AesonCasing.snakeCase . AesonCasing.dropFPrefix search :: - forall resp m body. - (MonadBH m, MonadThrow m) => - (Aeson.ToJSON body, FromJSONField resp) => + forall resp m. + MonadBH m => + FromJSONField resp => BH.IndexName -> - body -> + BH.Search -> ScrollRequest -> m (BH.SearchResult resp) -search index body scrollRequest = do - rawResp <- search' index newBody qs - resp <- BH.parseEsResponse rawResp +search index payload scrollRequest = do + let query = (Query.searchByIndex index payload {BH.source = Just fields}) {BH.bhRequestQueryStrings = qs} + resp <- BH.tryEsError $ BH.performBHRequest query case resp of Left err -> throwEsError "search" err Right x -> pure x where - newBody = case (fields, toJSON body) of - -- The results has fields, and the body is an object - (xs@(_ : _), Aeson.Object obj) -> Aeson.Object $ addSourceFields xs obj - -- Otherwise we don't change the body - (_, bodyValue) -> bodyValue - - addSourceFields xs = KM.insert "_source" (Aeson.Array $ fromList $ map toSourceElem xs) - - toSourceElem :: String -> Value - toSourceElem = Aeson.String . from . aesonCasing + toSourceElem :: String -> String + toSourceElem = from . aesonCasing -- The fields of the result data types. - fields :: [String] - fields = selectors (Proxy :: Proxy (Rep resp)) + fields :: BH.Source + fields = BH.SourcePatterns $ BH.PopPatterns $ BH.Pattern . T.pack . toSourceElem <$> selectors (Proxy :: Proxy (Rep resp)) qs = case scrollRequest of NoScroll -> [] @@ -129,77 +90,74 @@ search index body scrollRequest = do -- | A special purpose search implementation that uses the faster json-syntax searchHit :: MonadBH m => - Aeson.ToJSON body => + (Json.Value -> Either Text a) -> BH.IndexName -> - body -> - m [Json.Value] -searchHit index body = do - rawResp <- search' index body [] - case decodeHits (Json.decodeThrow $ HTTP.responseBody rawResp) of - Just xs -> pure xs - Nothing -> error $ "Could not find hits in " <> show rawResp - where - decodeHits :: Json.Value -> Maybe [Json.Value] - decodeHits value = do - hits <- Json.getAttr "hits" =<< Json.getAttr "hits" value - fmap getSource <$> Json.getArray hits - getSource value = case Json.getAttr "_source" value of - Nothing -> error $ "No source found in: " <> show value - Just v -> v - -aggWithDocValues :: [(Text, Value)] -> Maybe BH.Query -> Value -aggWithDocValues agg = mkAgg agg (Just dv) - where - dv = - Aeson.Array - ( V.fromList - [ Aeson.object - [ "field" .= Aeson.String "created_at" - , "format" .= Aeson.String "date_time" - ] - ] - ) + BH.Search -> + m [a] +searchHit parseHit index payload = do + let query = Query.searchByIndex @Value index payload + resp <- + BH.tryEsError + $ BH.performBHRequest @_ @BH.StatusIndependant + query + { BH.bhRequestParser = \(BH.BHResponse rawResp) -> + let + decodeHits :: Json.Value -> Maybe [Json.Value] + decodeHits value = do + hits <- Json.getAttr "hits" =<< Json.getAttr "hits" value + fmap getSource <$> Json.getArray hits + getSource value = case Json.getAttr "_source" value of + Nothing -> error $ "No source found in: " <> show value + Just v -> v + in + case decodeHits (Json.decodeThrow $ HTTP.responseBody rawResp) of + Just xs -> pure $ first (BH.EsError $ Just $ HTTP.statusCode $ HTTP.responseStatus rawResp) $ traverse parseHit xs + Nothing -> error $ "Could not find hits in " <> show rawResp + } + case resp of + Right xs -> pure xs + Left e -> throwEsError "Could not find hits" e -toPair :: (Text, Value) -> Aeson.Pair -toPair (k, v) = (from k, v) +aggWithDocValues :: BH.Aggregations -> Maybe BH.Query -> BH.Search +aggWithDocValues agg = mkAgg agg (Just [BH.DocvalueFieldNameAndFormat (BH.FieldName "created_at") "date_time"]) -mkAgg :: [(Text, Value)] -> Maybe Value -> Maybe BH.Query -> Value +mkAgg :: BH.Aggregations -> Maybe [BH.DocvalueField] -> Maybe BH.Query -> BH.Search mkAgg agg docvalues query = - Aeson.object - $ [ "aggregations" .= Aeson.object (toPair <$> agg) - , "size" .= Aeson.Number 0 - ] - <> case docvalues of - Just dv -> ["docvalue_fields" .= dv] - Nothing -> [] - <> case query of - Just q -> ["query" .= Aeson.toJSON q] - Nothing -> [] - -mkTermsCompositeAgg :: Text -> Maybe Value -> (Text, Value) + BH.Search + { trackSortScores = False + , suggestBody = Nothing + , sortBody = Nothing + , searchType = BH.SearchTypeQueryThenFetch + , searchAfterKey = Nothing + , scriptFields = Nothing + , docvalueFields = docvalues + , queryBody = query + , pointInTime = Nothing + , highlight = Nothing + , filterBody = Nothing + , aggBody = Just agg + , source = Nothing + , size = BH.Size 0 + , fields = Nothing + , from = BH.From 0 + } + +mkTermsCompositeAgg :: Text -> Maybe Value -> BH.Aggregations mkTermsCompositeAgg term afterM = - ( "agg1" - , Aeson.object - [ "composite" - .= Aeson.object - ( [ "sources" .= [agg] - , "size" .= Aeson.Number 1024 - ] - <> after - ) - ] - ) - where - after = case afterM of - Just v -> ["after" .= Aeson.object ["agg" .= v]] - Nothing -> [] - agg = - Aeson.object - [ "agg" - .= Aeson.object - [ "terms" .= Aeson.object ["field" .= term] - ] - ] + Map.singleton + "agg1" + $ BH.CompositeAgg + $ BH.CompositeAggregation + { compositeAggregationSize = Just 1024 + , compositeAggregationSources = + [ BH.CompositeAggregationSource + "agg" + $ BH.CompositeTermsAgg + $ BH.mkTermsAggregation + $ BH.FieldName term + ] + , compositeAggregationAfter = (\v -> Aeson.object ["agg" .= v]) <$> afterM + } -- Make Value a type parameter for TermsCompositeAggResult newtype TermsCompositeAggKey = TermsCompositeAggKey diff --git a/src/Monocle/Backend/Index.hs b/src/Monocle/Backend/Index.hs index c250d957e..60515ecff 100644 --- a/src/Monocle/Backend/Index.hs +++ b/src/Monocle/Backend/Index.hs @@ -1,4 +1,5 @@ -- witch instance for CrawlerPB +{-# LANGUAGE QuasiQuotes #-} {-# OPTIONS_GHC -Wno-orphans #-} -- | Index management functions such as document mapping and ingest @@ -248,14 +249,15 @@ instance ToJSON ChangesIndexMapping where createIndex :: (IndexEffects es, Retry :> es, ToJSON mapping) => BH.IndexName -> mapping -> Eff es () createIndex indexName mapping = do - recoverAll retryPolicy $ const $ esCreateIndex indexSettings indexName - -- print respCI - esPutMapping indexName mapping - -- print respPM - res <- esIndexExists indexName - case res of - True -> pure () - False -> logWarn "Fail to create index" ["name" .= indexName] + alreadyExists <- esIndexExists indexName + unless alreadyExists $ do + recoverAll retryPolicy $ const $ esCreateIndex indexSettings indexName + -- print respCI + esPutMapping indexName mapping + -- print respPM + res <- esIndexExists indexName + unless res + $ logWarn "Fail to create index" ["name" .= indexName] where indexSettings = BH.IndexSettings (BH.ShardCount 1) (BH.ReplicaCount 0) BH.defaultIndexMappingsLimits retryPolicy = exponentialBackoff 500_000 <> limitRetries 7 @@ -264,7 +266,7 @@ configVersion :: ConfigVersion configVersion = ConfigVersion 7 configIndex :: BH.IndexName -configIndex = BH.IndexName "monocle.config" +configIndex = [BH.qqIndexName|monocle.config|] configDoc :: BH.DocId configDoc = BH.DocId "config" @@ -477,13 +479,13 @@ ensureIndexCrawlerMetadata = do QueryWorkspace config <- getQueryTarget traverse_ initCrawlerMetadata config.crawlers -withRefresh :: HasCallStack => MonoQuery :> es => IndexEffects es => Eff es BH.Reply -> Eff es () +withRefresh :: HasCallStack => Show e => Show a => MonoQuery :> es => IndexEffects es => Eff es (Either e a) -> Eff es () withRefresh action = do index <- getIndexName resp <- action - unless (BH.isSuccess resp) (error $ "Unable to add or update: " <> show resp) + unless (isRight resp) (error $ "Unable to add or update: " <> show resp) refreshResp <- esRefreshIndex index - unless (BH.isSuccess refreshResp) (error $ "Unable to refresh index: " <> show resp) + unless (isRight refreshResp) (error $ "Unable to refresh index: " <> show resp) ensureIndex :: (E.Fail :> es, LoggerEffect :> es, MonoQuery :> es, Error ElasticError :> es, ElasticEffect :> es, Retry :> es) => Eff es () ensureIndex = do @@ -604,13 +606,14 @@ runAddDocsBulkOPs :: -- | The docs payload [(Value, BH.DocId)] -> Eff es () -runAddDocsBulkOPs bulkOp docs = do - index <- getIndexName - let stream = V.fromList $ fmap (bulkOp index) docs - _ <- esBulk stream - -- Bulk loads require an index refresh before new data is loaded. - _ <- esRefreshIndex index - pure () +runAddDocsBulkOPs bulkOp docs = + unless (null docs) $ do + index <- getIndexName + let stream = V.fromList $ fmap (bulkOp index) docs + _ <- esBulk stream + -- Bulk loads require an index refresh before new data is loaded. + _ <- esRefreshIndex index + pure () indexDocs :: MonoQuery :> es => IndexEffects es => [(Value, BH.DocId)] -> Eff es () indexDocs = runAddDocsBulkOPs toBulkIndex @@ -679,8 +682,8 @@ indexEvents events = indexDocs (fmap toDoc events) statusCheck :: (Int -> c) -> HTTP.Response body -> c statusCheck prd = prd . NHTS.statusCode . HTTP.responseStatus -isNotFound :: BH.Reply -> Bool -isNotFound = statusCheck (== 404) +isNotFound :: BH.BHResponse parsingContext a -> Bool +isNotFound (BH.BHResponse r) = statusCheck (== 404) r checkDocExists :: MonoQuery :> es => IndexEffects es => BH.DocId -> Eff es Bool checkDocExists docId = do @@ -690,16 +693,9 @@ checkDocExists docId = do getDocumentById' :: IndexEffects es => FromJSON a => BH.IndexName -> BH.DocId -> Eff es (Maybe a) getDocumentById' index docId = do resp <- esGetDocument index docId - if isNotFound resp - then pure Nothing - else do - parsed <- BH.parseEsResponse resp - case parsed of - Right cm -> pure . getHit $ BH.foundResult cm - Left _ -> error "Unable to get parse result" - where - getHit (Just (BH.EsResultFound _ cm)) = Just cm - getHit Nothing = Nothing + case resp of + Left _ -> pure Nothing + Right x -> pure $ BH._source <$> BH.foundResult x getDocumentById :: MonoQuery :> es => IndexEffects es => FromJSON a => BH.DocId -> Eff es (Maybe a) getDocumentById docId = do diff --git a/src/Monocle/Backend/Queries.hs b/src/Monocle/Backend/Queries.hs index a06f7bf9d..3783acfbd 100644 --- a/src/Monocle/Backend/Queries.hs +++ b/src/Monocle/Backend/Queries.hs @@ -49,14 +49,14 @@ measureQueryM :: a -> b -> b measureQueryM _ = id -- | Call the search endpoint -doScrollSearchBH :: (QEffects es, ToJSON body, FromJSONField resp) => BHR.ScrollRequest -> body -> Eff es (BH.SearchResult resp) -doScrollSearchBH scrollRequest body = do - measureQueryM body do +doScrollSearchBH :: (QEffects es, FromJSONField resp) => BHR.ScrollRequest -> BH.Search -> Eff es (BH.SearchResult resp) +doScrollSearchBH scrollRequest payload = do + measureQueryM payload do index <- getIndexName - esSearch index body scrollRequest + esSearch index payload scrollRequest -- | A search without scroll -doSearchBH :: (QEffects es, ToJSON body, FromJSONField resp) => body -> Eff es (BH.SearchResult resp) +doSearchBH :: (QEffects es, FromJSONField resp) => BH.Search -> Eff es (BH.SearchResult resp) doSearchBH = doScrollSearchBH BHR.NoScroll doAdvanceScrollBH :: (QEffects es, FromJSON resp) => BH.ScrollId -> Eff es (BH.SearchResult resp) @@ -64,29 +64,26 @@ doAdvanceScrollBH scroll = do measureQueryM (Aeson.object ["scrolling" .= ("advancing..." :: Text)]) do esAdvance scroll -doSearchHitBH :: (QEffects es, ToJSON body) => body -> Eff es [Json.Value] -doSearchHitBH body = do - measureQueryM body do +doSearchHitBH :: QEffects es => (Json.Value -> Either Text a) -> BH.Search -> Eff es [a] +doSearchHitBH parseHit payload = do + measureQueryM payload do index <- getIndexName - esSearchHit index body + esSearchHit parseHit index payload -- | Call the count endpoint doCountBH :: QEffects es => BH.Query -> Eff es Count -doCountBH body = do - measureQueryM body do +doCountBH payload = do + measureQueryM payload do index <- getIndexName - resp <- esCountByIndex index (BH.CountQuery body) - case resp of - Left e -> error $ show e - Right x -> pure $ naturalToCount (BH.crCount x) + either (const 0) (naturalToCount . BH.crCount) <$> esCountByIndex index (BH.CountQuery payload) -- | Call _delete_by_query endpoint doDeleteByQueryBH :: QEffects es => BH.Query -> Eff es () -doDeleteByQueryBH body = do - measureQueryM body do +doDeleteByQueryBH payload = do + measureQueryM payload do index <- getIndexName -- TODO: BH does not return parsed response - keep as is or if not enough move it to BHR. - void $ esDeleteByQuery index body + void $ esDeleteByQuery index payload void $ esRefreshIndex index ------------------------------------------------------------------------------- @@ -159,10 +156,11 @@ doSearch orderM limit = do SearchPB.Order_DirectionDESC -> BH.Descending -- | Get search results hits, as fast as possible -doFastSearch :: QEffects es => Word32 -> Eff es [Json.Value] -doFastSearch limit = do +doFastSearch :: QEffects es => (Json.Value -> Either Text a) -> Word32 -> Eff es [a] +doFastSearch parseHit limit = do query <- getQueryBH doSearchHitBH + parseHit (BH.mkSearch query Nothing) { BH.size = BH.Size $ fromInteger $ toInteger $ max 50 limit } @@ -182,8 +180,8 @@ deleteDocs = do void $ doDeleteByQueryBH query -- | Get aggregation results -doAggregation :: QEffects es => ToJSON body => body -> Eff es BH.AggregationResults -doAggregation body = toAggRes <$> doSearchBH body +doAggregation :: QEffects es => BH.Search -> Eff es BH.AggregationResults +doAggregation payload = toAggRes <$> doSearchBH payload toAggRes :: BH.SearchResult Value -> BH.AggregationResults toAggRes res = fromMaybe (error "oops") (BH.aggregations res) @@ -194,7 +192,7 @@ aggSearch query aggs = do let totalHits = BH.value $ BH.hitsTotal $ BH.searchHits resp pure $ AggregationResultsWTH (toAggRes resp) totalHits -queryAggValue :: QEffects es => Value -> Eff es Double +queryAggValue :: QEffects es => BH.Search -> Eff es Double queryAggValue search = getAggValue "agg1" <$> doAggregation search where getAggValue :: Text -> BH.AggregationResults -> Double @@ -206,8 +204,8 @@ parseAggregationResults key res = getExn do value <- Map.lookup (from key) res `orDie` ("No value found for: " <> from key) Aeson.parseEither Aeson.parseJSON value -queryAggResult :: QEffects es => FromJSON a => Value -> Eff es a -queryAggResult body = parseAggregationResults "agg1" <$> doAggregation body +queryAggResult :: QEffects es => FromJSON a => BH.Search -> Eff es a +queryAggResult payload = parseAggregationResults "agg1" <$> doAggregation payload -- | Run a Terms composite aggregation (composite agg result is paginated) -- | Composite aggregations are adviced when dealing with high cardinality terms @@ -216,7 +214,7 @@ doTermsCompositeAgg term = getPages Nothing where getPages :: Maybe Value -> Stream (Of BHR.TermsCompositeAggBucket) (Eff es) () getPages afterM = do - ret <- lift $ queryAggResult $ BHR.mkAgg [BHR.mkTermsCompositeAgg term afterM] Nothing Nothing + ret <- lift $ queryAggResult $ BHR.mkAgg (BHR.mkTermsCompositeAgg term afterM) Nothing Nothing Streaming.each $ getBuckets ret case getAfterValue ret of Just afterValue -> getPages (Just afterValue) @@ -470,8 +468,7 @@ firstEventOnChanges = do (minDate, _) <- getQueryBound -- Collect all the events - resultJson <- doFastSearch 10000 - let result = mapMaybe decodeJsonChangeEvent resultJson + result <- catMaybes <$> doFastSearch (Right . decodeJsonChangeEvent) 10000 -- Group by change_id let changeMap :: [NonEmpty JsonChangeEvent] @@ -549,19 +546,17 @@ getProjectAgg query = do pure $ unEPBuckets (parseAggregationResults "agg" res) where agg = - [ - ( "agg" - , Aeson.object - [ "terms" .= field "type" - , "aggs" - .= Aeson.object - [ "project" - .= Aeson.object - ["terms" .= field "repository_fullname"] - ] - ] - ) - ] + Map.singleton + "agg" + $ BH.TermsAgg + (BH.mkTermsAggregation $ BH.FieldName "type") + { BH.termAggs = + Just + $ Map.singleton "project" + $ BH.TermsAgg + $ BH.mkTermsAggregation + $ BH.FieldName "repository_fullname" + } -- | TopTerm agg query utils getSimpleTR :: BH.TermsResult -> TermResult @@ -597,7 +592,7 @@ getTermsAgg query onTerm maxBuckets = do aggs = BH.mkAggregations "singleTermAgg" $ BH.TermsAgg - $ (BH.mkTermsAggregation onTerm) + $ (BH.mkTermsAggregation $ BH.FieldName onTerm) { BH.termSize = maxBuckets } unfilteredR search' = maybe [] BH.buckets (BH.toTerms "singleTermAgg" search') @@ -612,12 +607,36 @@ instance FromJSON CountValue where parseJSON _ = mzero getCardinalityAgg :: QEffects es => BH.FieldName -> Maybe Int -> Eff es Count -getCardinalityAgg (BH.FieldName fieldName) threshold = do +getCardinalityAgg fieldName threshold = do bhQuery <- getQueryBH - let cardinality = Aeson.object ["field" .= fieldName, "precision_threshold" .= threshold] - agg = Aeson.object ["agg1" .= Aeson.object ["cardinality" .= cardinality]] - search = Aeson.object ["aggregations" .= agg, "size" .= (0 :: Word), "query" .= bhQuery] + let search = + BH.Search + { trackSortScores = False + , suggestBody = Nothing + , sortBody = Nothing + , searchType = BH.SearchTypeQueryThenFetch + , searchAfterKey = Nothing + , scriptFields = Nothing + , docvalueFields = Nothing + , queryBody = bhQuery + , pointInTime = Nothing + , highlight = Nothing + , filterBody = Nothing + , aggBody = + Just + $ Map.singleton "agg1" + $ BH.CardinalityAgg + BH.CardinalityAggregation + { cardinalityField = fieldName + , cardinalityPrecisionThreshold = BH.PrecisionThreshold <$> threshold + , cardinalityMissing = Nothing + } + , source = Nothing + , size = BH.Size 0 + , fields = Nothing + , from = BH.From 0 + } unCountValue . parseAggregationResults "agg1" <$> doAggregation search countAuthors :: QEffects es => Eff es Count @@ -882,40 +901,49 @@ getChangesTops limit = do , termsCountIntTotalHits = total } -searchBody :: QEffects es => QueryFlavor -> Value -> Eff es Value +searchBody :: QEffects es => QueryFlavor -> BH.Aggregation -> Eff es BH.Search searchBody qf agg = withFlavor qf do queryBH <- getQueryBH pure - $ Aeson.object - [ "aggregations" .= Aeson.object ["agg1" .= agg] - , "size" .= (0 :: Word) - , "docvalue_fields" - .= [ Aeson.object - [ "field" .= ("created_at" :: Text) - , "format" .= ("date_time" :: Text) - ] - ] - , "query" .= fromMaybe (error "need query") queryBH - ] + $ BH.Search + { trackSortScores = False + , suggestBody = Nothing + , sortBody = Nothing + , searchType = BH.SearchTypeQueryThenFetch + , searchAfterKey = Nothing + , scriptFields = Nothing + , docvalueFields = Just [BH.DocvalueFieldNameAndFormat (BH.FieldName "created_at") "date_time"] + , queryBody = Just $ fromMaybe (error "need query") queryBH + , pointInTime = Nothing + , highlight = Nothing + , filterBody = Nothing + , aggBody = Just $ Map.singleton "agg1" agg + , source = Nothing + , size = BH.Size 0 + , fields = Nothing + , from = BH.From 0 + } averageDuration :: QEffects es => QueryFlavor -> Eff es Double averageDuration qf = queryAggValue =<< searchBody qf avg where - avg = Aeson.object ["avg" .= Aeson.object ["field" .= ("duration" :: Text)]] + avg = + BH.AvgAgg + $ BH.AvgAggregation (BH.FieldName "duration") Nothing medianDeviationDuration :: QEffects es => QueryFlavor -> Eff es Double medianDeviationDuration qf = queryAggValue =<< searchBody qf deviation where deviation = - Aeson.object - [ "median_absolute_deviation" - .= Aeson.object ["field" .= ("duration" :: Text)] - ] + BH.MedianAbsoluteDeviationAgg + $ BH.MedianAbsoluteDeviationAggregation (BH.FieldName "duration") Nothing changeMergedAvgCommits :: QEffects es => QueryFlavor -> Eff es Double changeMergedAvgCommits qf = queryAggValue =<< searchBody qf avg where - avg = Aeson.object ["avg" .= Aeson.object ["field" .= ("commit_count" :: Text)]] + avg = + BH.AvgAgg + $ BH.AvgAggregation (BH.FieldName "commit_count") Nothing withDocTypes :: QEffects es => [EDocType] -> QueryFlavor -> Eff es a -> Eff es a withDocTypes docTypes flavor qm = @@ -1232,29 +1260,59 @@ countHisto rf intervalM = fmap toHisto <$> getCountHisto queryBH <- getQueryBH (minDate, maxDate, interval) <- queryToHistoBounds intervalM - let bound = - Aeson.object - [ "min" .= dateInterval interval minDate - , "max" .= dateInterval interval maxDate - ] - date_histo = - Aeson.object - [ "field" .= rangeField rf - , "calendar_interval" .= into @Text interval - , "format" .= getFormat (from interval) - , "min_doc_count" .= (0 :: Word) - , "extended_bounds" .= bound - ] - agg = - Aeson.object - [ "agg1" .= Aeson.object ["date_histogram" .= date_histo] - ] + let agg = + Map.singleton + "agg1" + $ BH.DateHistogramAgg + BH.DateHistogramAggregation + { datePreZone = Nothing + , datePreOffset = Nothing + , datePostZone = Nothing + , datePostOffset = Nothing + , dateInterval = Nothing + , dateFormat = Just $ getFormat (from interval) + , dateField = maybe (error "need date field") BH.FieldName $ rangeField rf + , dateAggs = Nothing + , dateCalendarInterval = + Just + $ case interval of + Q.Hour -> BH.Hour + Q.Day -> BH.Day + Q.Week -> BH.Week + Q.Month -> BH.Month + Q.Year -> BH.Year + , dateFixedInterval = Nothing + , dateTimeZone = Nothing + , dateOffset = Nothing + , dateKeyed = Nothing + , dateMissing = Nothing + , dateMinDocCount = Just 0 + , dateExtendedBounds = + Just + BH.ExtendedBounds + { extendedBoundsMin = toJSON $ dateInterval interval minDate + , extendedBoundsMax = toJSON $ dateInterval interval maxDate + } + } search = - Aeson.object - [ "aggregations" .= agg - , "size" .= (0 :: Word) - , "query" .= fromMaybe (error "need query") queryBH - ] + BH.Search + { trackSortScores = False + , suggestBody = Nothing + , sortBody = Nothing + , searchType = BH.SearchTypeQueryThenFetch + , searchAfterKey = Nothing + , scriptFields = Nothing + , docvalueFields = Nothing + , queryBody = Just $ fromMaybe (error "need query") queryBH + , pointInTime = Nothing + , highlight = Nothing + , filterBody = Nothing + , aggBody = Just agg + , source = Nothing + , size = BH.Size 0 + , fields = Nothing + , from = BH.From 0 + } hBuckets . parseAggregationResults "agg1" <$> doAggregation search @@ -1284,44 +1342,66 @@ authorCntHisto aField changeEvent intervalM = withDocType changeEvent qf getAuth queryBH <- getQueryBH (minDate, maxDate, interval) <- queryToHistoBounds intervalM - let bound = - Aeson.object - [ "min" .= dateInterval interval minDate - , "max" .= dateInterval interval maxDate - ] - date_histo = - Aeson.object - [ "field" .= rangeField (qfRange qf) - , "calendar_interval" .= into @Text interval - , "format" .= getFormat (from interval) - , "min_doc_count" .= (0 :: Word) - , "extended_bounds" .= bound - ] - author_agg = - Aeson.object - [ "authors" - .= Aeson.object - [ "terms" - .= Aeson.object - [ "field" .= aField - , "size" .= (10000 :: Word) - ] - ] - ] + let author_agg = + Map.singleton + "authors" + $ BH.TermsAgg + (BH.mkTermsAggregation $ BH.FieldName aField) + { BH.termSize = Just 10000 + } agg = - Aeson.object - [ "agg1" - .= Aeson.object - [ "date_histogram" .= date_histo - , "aggs" .= author_agg - ] - ] + Map.singleton + "agg1" + $ BH.DateHistogramAgg + BH.DateHistogramAggregation + { datePreZone = Nothing + , datePreOffset = Nothing + , datePostZone = Nothing + , datePostOffset = Nothing + , dateInterval = Nothing + , dateFormat = Just $ getFormat (from interval) + , dateField = maybe (error "need date field") BH.FieldName $ rangeField (qfRange qf) + , dateAggs = Just author_agg + , dateCalendarInterval = + Just + $ case interval of + Q.Hour -> BH.Hour + Q.Day -> BH.Day + Q.Week -> BH.Week + Q.Month -> BH.Month + Q.Year -> BH.Year + , dateFixedInterval = Nothing + , dateTimeZone = Nothing + , dateOffset = Nothing + , dateKeyed = Nothing + , dateMissing = Nothing + , dateMinDocCount = Just 0 + , dateExtendedBounds = + Just + BH.ExtendedBounds + { extendedBoundsMin = toJSON $ dateInterval interval minDate + , extendedBoundsMax = toJSON $ dateInterval interval maxDate + } + } search = - Aeson.object - [ "aggregations" .= agg - , "size" .= (0 :: Word) - , "query" .= fromMaybe (error "need query") queryBH - ] + BH.Search + { trackSortScores = False + , suggestBody = Nothing + , sortBody = Nothing + , searchType = BH.SearchTypeQueryThenFetch + , searchAfterKey = Nothing + , scriptFields = Nothing + , docvalueFields = Nothing + , queryBody = Just $ fromMaybe (error "need query") queryBH + , pointInTime = Nothing + , highlight = Nothing + , filterBody = Nothing + , aggBody = Just agg + , source = Nothing + , size = BH.Size 0 + , fields = Nothing + , from = BH.From 0 + } hBuckets . parseAggregationResults "agg1" <$> doAggregation search diff --git a/src/Monocle/Effects.hs b/src/Monocle/Effects.hs index 8087e162a..7f66f269b 100644 --- a/src/Monocle/Effects.hs +++ b/src/Monocle/Effects.hs @@ -62,6 +62,7 @@ import Control.Exception (finally, throwIO) import Control.Exception.Base (ErrorCall (ErrorCall)) import Control.Monad.Catch (catches) import Data.Text qualified as T +import Json.Extras qualified as Json import Monocle.Client qualified import Monocle.Config qualified import Network.HTTP.Client (HttpException (..)) @@ -87,7 +88,6 @@ import Servant qualified import Data.Vector qualified as V import Database.Bloodhound qualified as BH import Database.Bloodhound.Raw qualified as BHR -import Json.Extras qualified as Json -- for MonoQuery @@ -397,15 +397,37 @@ runBHIOSafe :: BH.BH IO a -> Eff es a runBHIOSafe call bodyJSON act = do + res <- runBHIOUnsafe call bodyJSON act + case res of + Right x -> pure x + Left (BH.EsError s e) -> E.throwError $ toErr "not handled invalid status" (encode (s, e)) + where + toErr msg = ElasticError call msg body + body = decodeUtf8 $ encode bodyJSON + +-- | This function runs a BH IO safely by catching EsProtocolExceptions (which contains json decoding errors). +-- After using this function, a new Error effect is added to the 'es' constraint. +-- Use 'runEsError' to discharge the new effect and access the final result 'a'. +runBHIOUnsafe :: + HasCallStack => + (ToJSON body, Error ElasticError :> es, ElasticEffect :> es) => + -- | The action name to be recorded in the error + Text -> + -- | A copy of the body to be recorded in the error + body -> + -- | The bloodhound action + BH.BH IO a -> + Eff es (Either BH.EsError a) +runBHIOUnsafe call bodyJSON act = do ElasticEffect env <- getStaticRep - eRes <- unsafeEff_ ((Right <$> BH.runBH env act) `catches` [errorHandler, esHandler]) + eRes <- unsafeEff_ ((Right <$> BH.runBH env (BH.tryEsError act)) `catches` [errorHandler, esHandler]) case eRes of - Right x -> pure x + Right x -> pure $ join x Left e -> E.throwError e where - toErr msg err = pure $ Left $ ElasticError call msg body err - errorHandler = Handler $ \(ErrorCall err) -> toErr (from err) "error called" - esHandler = Handler $ \(BH.EsProtocolException msg resp) -> toErr msg resp + toErr msg = ElasticError call msg body + errorHandler = Handler $ \(ErrorCall err) -> pure $ Left $ toErr (from err) "error called" + esHandler = Handler $ \(BH.EsProtocolException msg resp) -> pure $ Left $ toErr msg resp body = decodeUtf8 $ encode bodyJSON -- | Safely remove (Error ElasticError) from the list of effect. @@ -433,35 +455,35 @@ dieOnEsError act = ] Right x -> pure x -esSearch :: (Error ElasticError :> es, ElasticEffect :> es, ToJSON body, FromJSONField resp) => BH.IndexName -> body -> BHR.ScrollRequest -> Eff es (BH.SearchResult resp) -esSearch iname body scrollReq = do - runBHIOSafe "esSearch" body $ BHR.search iname body scrollReq +esSearch :: (Error ElasticError :> es, ElasticEffect :> es, FromJSONField resp) => BH.IndexName -> BH.Search -> BHR.ScrollRequest -> Eff es (BH.SearchResult resp) +esSearch iname payload scrollReq = do + runBHIOSafe "esSearch" payload $ BHR.search iname payload scrollReq -- | This is similar to esScanSearch, but 'searchByIndex' respects search size and sort order -esSearchByIndex :: (Error ElasticError :> es, ElasticEffect :> es, ToJSON body, FromJSONField resp) => BH.IndexName -> body -> Eff es [BH.Hit resp] -esSearchByIndex iname body = BH.hits . BH.searchHits <$> esSearch iname body BHR.NoScroll +esSearchByIndex :: (Error ElasticError :> es, ElasticEffect :> es, FromJSONField resp) => BH.IndexName -> BH.Search -> Eff es [BH.Hit resp] +esSearchByIndex iname payload = BH.hits . BH.searchHits <$> esSearch iname payload BHR.NoScroll esAdvance :: (Error ElasticError :> es, ElasticEffect :> es, FromJSON resp) => BH.ScrollId -> Eff es (BH.SearchResult resp) esAdvance scroll = do runBHIOSafe "esAdvance" scroll $ BHR.advance scroll -esGetDocument :: (Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> BH.DocId -> Eff es (HTTP.Response LByteString) +esGetDocument :: (Error ElasticError :> es, ElasticEffect :> es) => FromJSON a => BH.IndexName -> BH.DocId -> Eff es (Either BH.EsError (BH.EsResult a)) esGetDocument iname doc = do - runBHIOSafe "esGetDocument" doc $ BH.getDocument iname doc + runBHIOUnsafe "esGetDocument" doc $ BH.getDocument iname doc esCountByIndex :: (Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> BH.CountQuery -> Eff es (Either BH.EsError BH.CountResponse) esCountByIndex iname q = do - runBHIOSafe "esCountByIndex" q $ BH.countByIndex iname q + runBHIOUnsafe "esCountByIndex" q $ BH.countByIndex iname q -esSearchHit :: (Error ElasticError :> es, ElasticEffect :> es) => ToJSON body => BH.IndexName -> body -> Eff es [Json.Value] -esSearchHit iname body = do - runBHIOSafe "esSearchHit" body $ BHR.searchHit iname body +esSearchHit :: (Error ElasticError :> es, ElasticEffect :> es) => (Json.Value -> Either Text a) -> BH.IndexName -> BH.Search -> Eff es [a] +esSearchHit parseHit iname payload = do + runBHIOSafe "esSearchHit" payload $ BHR.searchHit parseHit iname payload esScanSearch :: (Error ElasticError :> es, ElasticEffect :> es) => FromJSON body => BH.IndexName -> BH.Search -> Eff es [BH.Hit body] esScanSearch iname search = do runBHIOSafe "esScanSearch" search $ BH.scanSearch iname search -esDeleteByQuery :: (Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> BH.Query -> Eff es BH.Reply +esDeleteByQuery :: (Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> BH.Query -> Eff es BH.DeletedDocuments esDeleteByQuery iname q = do runBHIOSafe "esDeleteByQuery" q $ BH.deleteByQuery iname q @@ -470,42 +492,42 @@ esCreateIndex is iname = do -- TODO: check for error void $ runBHIOSafe "esCreateIndex" iname $ BH.createIndex is iname -esIndexDocument :: (ToJSON body, Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> BH.IndexDocumentSettings -> body -> BH.DocId -> Eff es (HTTP.Response LByteString) +esIndexDocument :: (ToJSON body, Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> BH.IndexDocumentSettings -> body -> BH.DocId -> Eff es (Either BH.EsError BH.IndexedDocument) esIndexDocument indexName docSettings body docId = do - runBHIOSafe "esIndexDocument" body $ BH.indexDocument indexName docSettings body docId + runBHIOUnsafe "esIndexDocument" body $ BH.indexDocument indexName docSettings body docId esPutMapping :: (Error ElasticError :> es, ElasticEffect :> es) => ToJSON mapping => BH.IndexName -> mapping -> Eff es () esPutMapping iname mapping = do -- TODO: check for error - void $ runBHIOSafe "esPutMapping" mapping $ BH.putMapping iname mapping + void $ runBHIOUnsafe "esPutMapping" mapping $ BH.putMapping @Value iname mapping esIndexExists :: (Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> Eff es Bool esIndexExists iname = do runBHIOSafe "esIndexExists" iname $ BH.indexExists iname -esDeleteIndex :: (Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> Eff es (HTTP.Response LByteString) +esDeleteIndex :: (Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> Eff es (Either BH.EsError BH.Acknowledged) esDeleteIndex iname = do - runBHIOSafe "esDeleteIndex" iname $ BH.deleteIndex iname + runBHIOUnsafe "esDeleteIndex" iname $ BH.deleteIndex iname esSettings :: (Error ElasticError :> es, ElasticEffect :> es) => ToJSON body => BH.IndexName -> body -> Eff es () esSettings iname body = do runBHIOSafe "esSettings" body $ BHR.settings iname body -esRefreshIndex :: (Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> Eff es (HTTP.Response LByteString) +esRefreshIndex :: (Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> Eff es (Either BH.EsError BH.ShardResult) esRefreshIndex iname = do - runBHIOSafe "esRefreshIndex" iname $ BH.refreshIndex iname + runBHIOUnsafe "esRefreshIndex" iname $ BH.refreshIndex iname esDocumentExists :: (Error ElasticError :> es, ElasticEffect :> es) => BH.IndexName -> BH.DocId -> Eff es Bool esDocumentExists iname doc = do runBHIOSafe "esDocumentExists" doc $ BH.documentExists iname doc -esBulk :: (Error ElasticError :> es, ElasticEffect :> es) => V.Vector BulkOperation -> Eff es BH.Reply +esBulk :: (Error ElasticError :> es, ElasticEffect :> es) => V.Vector BulkOperation -> Eff es (Either BH.EsError BH.BulkResponse) esBulk ops = do - runBHIOSafe "esBulk" ([] :: [Bool]) $ BH.bulk ops + runBHIOUnsafe "esBulk" ([] :: [Bool]) $ BH.bulk ops -esUpdateDocument :: (Error ElasticError :> es, ElasticEffect :> es) => ToJSON a => BH.IndexName -> BH.IndexDocumentSettings -> a -> DocId -> Eff es BH.Reply +esUpdateDocument :: (Error ElasticError :> es, ElasticEffect :> es) => ToJSON a => BH.IndexName -> BH.IndexDocumentSettings -> a -> DocId -> Eff es (Either BH.EsError BH.IndexedDocument) esUpdateDocument iname ids body doc = do - runBHIOSafe "esUpdateDocument" body $ BH.updateDocument iname ids body doc + runBHIOUnsafe "esUpdateDocument" body $ BH.updateDocument iname ids body doc ------------------------------------------------------------------ diff --git a/src/Monocle/Env.hs b/src/Monocle/Env.hs index 54e3f0a36..b63309795 100644 --- a/src/Monocle/Env.hs +++ b/src/Monocle/Env.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE QuasiQuotes #-} + -- | The library environment and logging functions module Monocle.Env where @@ -57,13 +59,17 @@ envToIndexName :: QueryTarget -> BH.IndexName envToIndexName target = do case target of QueryWorkspace ws -> indexName ws - QueryConfig _ -> BH.IndexName "monocle.config" + QueryConfig _ -> [BH.qqIndexName|monocle.config|] where indexName :: Config.Index -> BH.IndexName indexName Config.Index {..} = tenantIndexName name tenantIndexName :: Config.IndexName -> BH.IndexName -tenantIndexName indexName = BH.IndexName $ indexNamePrefix <> Config.getIndexName indexName +tenantIndexName indexName = + let rawIndex = indexNamePrefix <> Config.getIndexName indexName + in case BH.mkIndexName rawIndex of + Left e -> error $ "Cannot build tenantIndexName (" <> show rawIndex <> "): " <> show e + Right x -> x -- | 'mkQuery' creates a Q.Query from a BH.Query mkQuery :: [BH.Query] -> Q.Query