|
1 | 1 | from .suggested_videos_events_kafka import SuggestedVideosConsumer |
2 | 2 | from dse_graph import DseGraph |
3 | 3 | from gremlin_python.process.graph_traversal import __ |
4 | | -from gremlin_python.process.traversal import gte, neq, within, Scope, Operator, Order, Column |
| 4 | +from gremlin_python.process.traversal import gte, neq, without, Scope, Operator, Order, Column |
5 | 5 | import logging |
6 | 6 |
|
7 | 7 | class VideoPreview(): |
@@ -37,7 +37,6 @@ class SuggestedVideosService(object): |
37 | 37 | def __init__(self, session): |
38 | 38 | self.session = session |
39 | 39 | self.graph = DseGraph.traversal_source(session=self.session, graph_name='killrvideo_video_recommendations') |
40 | | - logging.debug('Graph traversal source: ' + str(self.graph) + ' verts' + str(self.graph.V())) |
41 | 40 | self.suggested_videos_consumer = SuggestedVideosConsumer(self) |
42 | 41 |
|
43 | 42 |
|
@@ -118,24 +117,19 @@ def get_suggested_for_user(self, user_id, page_size, paging_state): |
118 | 117 | # - then grab properties of the video and the user who uploaded each video using project() |
119 | 118 |
|
120 | 119 | traversal = self.graph.V().has('user', 'userId', user_id).as_('^user') \ |
121 | | - .map(__.out('rated').dedup().fold()).as_('^watchedVideos') \ |
122 | | - .select('^user') \ |
123 | | - .outE('rated').has('rating', gte(MIN_RATING)).inV() \ |
124 | | - .inE('rated').has('rating', gte(MIN_RATING)) \ |
| 120 | + .outE('rated').sideEffect(__.inV().aggregate('^watchedVideos')) \ |
| 121 | + .has('rating', gte(MIN_RATING).inV().inE('rated').has('rating'), gte(MIN_RATING)) \ |
125 | 122 | .sample(NUM_RATINGS_TO_SAMPLE).by('rating').outV() \ |
126 | 123 | .where(neq('^user')) \ |
127 | 124 | .local(__.outE('rated').has('rating', gte(MIN_RATING)).limit(LOCAL_USER_RATINGS_TO_SAMPLE)) \ |
128 | 125 | .sack(Operator.assign).by('rating').inV() \ |
129 | | - .filter(__.in_('uploaded').hasLabel('user')) \ |
| 126 | + .where(without('^watchedVideos')) \ |
130 | 127 | .group().by().by(__.sack().sum()) \ |
131 | 128 | .order(Scope.local).by(Column.values, Order.decr) \ |
132 | 129 | .limit(Scope.local, NUM_RECOMMENDATIONS).select(Column.keys).unfold() \ |
133 | 130 | .project('video_id', 'added_date', 'name', 'preview_image_location', 'user_id') \ |
134 | 131 | .by('videoId').by('added_date').by('name').by('preview_image_location').by(__.in_('uploaded').values('userId')) |
135 | 132 |
|
136 | | - # TODO: this step needs to be reinserted after .sack and before .filter |
137 | | - #.not_(__.where(within('^watchedVideos'))) \ |
138 | | - |
139 | 133 | logging.debug('Traversal: ' + str(traversal.bytecode)) |
140 | 134 |
|
141 | 135 | results = traversal.toList() |
|
0 commit comments