1
+ import * as os from 'node:os'
2
+
3
+ import * as core from '@contentlayer/core'
4
+ import type { HasConsole } from "@contentlayer/utils/effect" ;
5
+ import { Chunk , OT , pipe , T } from "@contentlayer/utils/effect" ;
6
+ import type * as notion from '@notionhq/client' ;
7
+ import type { PageObjectResponse } from '@notionhq/client/build/src/api-endpoints.js' ;
8
+
9
+ import { UnknownNotionError } from '../errors.js' ;
10
+ import type * as LocalSchema from '../schema/defs/index.js'
11
+ import { makeCacheItem } from './page.js' ;
12
+
13
+ type Page = PageObjectResponse
14
+
15
+ export const fetchAllDocuments = ( {
16
+ client,
17
+ schemaDef,
18
+ databaseTypeDefs,
19
+ options
20
+ } : {
21
+ client : notion . Client ,
22
+ databaseTypeDefs : LocalSchema . DatabaseTypeDef [ ] ,
23
+ schemaDef : core . SchemaDef ,
24
+ options : core . PluginOptions
25
+ } ) : T . Effect < OT . HasTracer & HasConsole , core . SourceFetchDataError , core . DataCache . Cache > => pipe (
26
+ T . gen ( function * ( $ ) {
27
+ const pages : Page [ ] = [ ] ;
28
+
29
+ for ( const databaseDef of databaseTypeDefs ) {
30
+ const result = yield * $ ( fetchDatabasePages ( { client, databaseDef } ) ) ; ;
31
+ pages . push ( ...result ) ;
32
+ }
33
+
34
+
35
+ const documentEntriesWithDocumentTypeDef = Object . values ( schemaDef . documentTypeDefMap ) . flatMap (
36
+ ( documentTypeDef ) => pages . map ( ( page ) => ( { page, documentTypeDef } ) )
37
+ ) ;
38
+
39
+ const concurrencyLimit = os . cpus ( ) . length
40
+
41
+ const documents = yield * $ (
42
+ pipe (
43
+ documentEntriesWithDocumentTypeDef ,
44
+ T . forEachParN ( concurrencyLimit , ( { page, documentTypeDef } ) =>
45
+ makeCacheItem ( {
46
+ page,
47
+ documentTypeDef,
48
+ options
49
+ } )
50
+ ) ,
51
+ OT . withSpan ( '@contentlayer/source-notion/fetchData:makeCacheItems' , {
52
+ attributes : { count : documentEntriesWithDocumentTypeDef . length } ,
53
+ } ) ,
54
+ )
55
+ )
56
+
57
+ const cacheItemsMap = Object . fromEntries ( Chunk . map_ ( documents , ( _ ) => [ _ . document . _id , _ ] ) )
58
+
59
+ return { cacheItemsMap }
60
+ } ) ,
61
+ OT . withSpan ( '@contentlayer/source-notion/fetchData:fetchAllDocuments' , {
62
+ attributes : { schemadef : JSON . stringify ( schemaDef ) } ,
63
+ } ) ,
64
+ T . mapError ( ( error ) => new core . SourceFetchDataError ( { error, alreadyHandled : false } ) )
65
+ )
66
+
67
+ const fetchDatabasePages = ( {
68
+ client,
69
+ databaseDef
70
+ } : {
71
+ client : notion . Client ,
72
+ databaseDef : LocalSchema . DatabaseTypeDef
73
+ } ) : T . Effect < OT . HasTracer , UnknownNotionError , Page [ ] > => pipe (
74
+ T . tryPromise ( ( ) => client . databases . query ( { database_id : databaseDef . databaseId } ) . then ( res => res . results as Page [ ] ) ) ,
75
+ OT . withSpan ( '@contentlayer/source-contentlayer/fetchData:getAllEntries' ) ,
76
+ T . mapError ( ( error ) => new UnknownNotionError ( { error } ) ) ,
77
+ )
0 commit comments