Skip to content

Commit

Permalink
optimize qp counts with count scans
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminpkane committed Mar 4, 2025
1 parent 3a92b96 commit 8e580a9
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 62 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions app/packages/state/src/recoil/aggregations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
groupStatistics,
} from "./groups";
import { sidebarSampleId } from "./modal";
import { queryPerformance } from "./queryPerformance";
import { RelayEnvironmentKey } from "./relay";
import * as schemaAtoms from "./schema";
import * as selectors from "./selectors";
Expand Down Expand Up @@ -85,6 +86,7 @@ export const aggregationQuery = graphQLSelectorFamily<
slices: mixed ? get(groupSlices) : get(currentSlices(modal)),
slice: get(groupSlice),
view: customView ? customView : !root ? get(viewAtoms.view) : [],
queryPerformance: get(queryPerformance),
};

return {
Expand Down
11 changes: 8 additions & 3 deletions app/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ input AggregationForm {
slices: [String!]
view: BSONArray!
viewName: String = null
queryPerformance: Boolean = false
}

union AggregationResponses =
Expand Down Expand Up @@ -238,7 +239,7 @@ type Dataset {
appConfig: DatasetAppConfig
info: JSON
estimatedFrameCount: Int
estimatedSampleCount: Int!
estimatedSampleCount: Int
frameIndexes: [Index!]
sampleIndexes: [Index!]
stages(slug: String = null, view: BSONArray = null): BSONArray
Expand Down Expand Up @@ -613,7 +614,11 @@ type Query {
viewName: String = null
form: ExtendedViewForm = null
): [AggregationResponses!]!
dataset(name: String!, savedViewSlug: String, view: BSONArray = null): Dataset
dataset(
name: String!
savedViewSlug: String
view: BSONArray = null
): Dataset!
datasets(
search: String
first: Int = 200
Expand Down Expand Up @@ -795,7 +800,7 @@ type StringAggregation implements Aggregation {
path: String!
count: Int!
exists: Int!
values: [StringAggregationValue!]!
values: [StringAggregationValue!]
}

type StringAggregationValue {
Expand Down
26 changes: 19 additions & 7 deletions fiftyone/core/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
| `voxel51.com <https://voxel51.com/>`_
|
"""

from collections import defaultdict, OrderedDict
from copy import deepcopy
from datetime import date, datetime
Expand Down Expand Up @@ -537,9 +538,15 @@ class Count(Aggregation):
"""

def __init__(
self, field_or_expr=None, expr=None, safe=False, _unwind=True
self,
field_or_expr=None,
expr=None,
safe=False,
_optimize=False,
_unwind=True,
):
super().__init__(field_or_expr, expr=expr, safe=safe)
self._optimize = _optimize
self._unwind = _unwind

def _kwargs(self):
Expand Down Expand Up @@ -575,10 +582,14 @@ def to_mongo(self, sample_collection, context=None):
safe=self._safe,
unwind=self._unwind,
context=context,
optimize=self._optimize,
)

if not sample_collection._contains_videos() or path != "frames":
pipeline.append({"$match": {"$expr": {"$gt": ["$" + path, None]}}})
if not self._optimize:
if not sample_collection._contains_videos() or path != "frames":
pipeline.append(
{"$match": {"$expr": {"$gt": ["$" + path, None]}}}
)

pipeline.append({"$count": "count"})

Expand Down Expand Up @@ -795,9 +806,9 @@ def to_mongo(self, sample_collection, context=None):
pipeline += [
{
"$match": {
"$expr": {"$and": exprs}
if len(exprs) > 1
else exprs[0]
"$expr": (
{"$and": exprs} if len(exprs) > 1 else exprs[0]
)
}
}
]
Expand Down Expand Up @@ -2992,6 +3003,7 @@ def _parse_field_and_expr(
unwind=True,
allow_missing=False,
context=None,
optimize=False,
):
# unwind can be {True, False, -1}
auto_unwind = unwind != False
Expand Down Expand Up @@ -3098,7 +3110,7 @@ def _parse_field_and_expr(
{"$replaceRoot": {"newRoot": "$frames"}},
]
)
elif not context:
elif not context and not optimize:
pipeline.append({"$project": {path: True}})
elif unwind_list_fields:
pipeline.append({"$project": {path: True}})
Expand Down
45 changes: 27 additions & 18 deletions fiftyone/server/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class AggregationForm:
slices: t.Optional[t.List[str]]
view: BSONArray
view_name: t.Optional[str] = None
query_performance: t.Optional[bool] = False


@gql.interface
Expand Down Expand Up @@ -92,7 +93,7 @@ class StringAggregationValue:

@gql.type
class StringAggregation(Aggregation):
values: t.List[StringAggregationValue]
values: t.Optional[t.List[StringAggregationValue]] = None


AggregateResult = gql.union(
Expand Down Expand Up @@ -141,7 +142,10 @@ async def aggregate_resolver(
)

aggregations, deserializers = zip(
*[_resolve_path_aggregation(path, view) for path in form.paths]
*[
_resolve_path_aggregation(path, view, form.query_performance)
for path in form.paths
]
)
counts = [len(a) for a in aggregations]
flattened = [item for sublist in aggregations for item in sublist]
Expand Down Expand Up @@ -209,10 +213,12 @@ async def _load_view(form: AggregationForm, slices: t.List[str]):


def _resolve_path_aggregation(
path: str, view: foc.SampleCollection
path: str, view: foc.SampleCollection, query_performance: bool
) -> AggregateResult:
aggregations: t.List[foa.Aggregation] = [
foa.Count(path if path and path != "" else None)
foa.Count(
path if path and path != "" else None, _optimize=query_performance
)
]
field = view.get_field(path)

Expand All @@ -225,23 +231,26 @@ def _resolve_path_aggregation(
else RootAggregation
)

if meets_type(field, fof.BooleanField):
aggregations.append(foa.CountValues(path))

elif meets_type(field, (fof.DateField, fof.DateTimeField, fof.IntField)):
aggregations.append(foa.Bounds(path))
if not query_performance:
if meets_type(field, fof.BooleanField):
aggregations.append(foa.CountValues(path))

elif meets_type(field, fof.FloatField):
aggregations.append(
foa.Bounds(
path,
safe=True,
_count_nonfinites=True,
elif meets_type(
field, (fof.DateField, fof.DateTimeField, fof.IntField)
):
aggregations.append(foa.Bounds(path))

elif meets_type(field, fof.FloatField):
aggregations.append(
foa.Bounds(
path,
safe=True,
_count_nonfinites=True,
)
)
)

elif meets_type(field, (fof.ObjectIdField, fof.StringField)):
aggregations.append(foa.CountValues(path, _first=LIST_LIMIT))
elif meets_type(field, (fof.ObjectIdField, fof.StringField)):
aggregations.append(foa.CountValues(path, _first=LIST_LIMIT))

data = {"path": path}

Expand Down
65 changes: 39 additions & 26 deletions tests/unittests/server_aggregations_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,34 +94,29 @@ async def test_group_mode_sidebar_counts(self, dataset: fo.Dataset):
}
"""

result = await execute(
schema,
query,
{
"form": {
"index": 0,
"dataset": dataset.name,
"extended_stages": {},
"filters": {
"label.label": {
"values": [
"default",
],
"exclude": False,
"isMatching": False,
}
},
"group_id": None,
"hidden_labels": [],
"paths": ["label.label"],
"mixed": True,
"sample_ids": [],
"slice": "default",
"slices": None,
"view": [],
form = {
"index": 0,
"dataset": dataset.name,
"extended_stages": {},
"filters": {
"label.label": {
"values": [
"default",
],
"exclude": False,
"isMatching": False,
}
},
)
"group_id": None,
"hidden_labels": [],
"paths": ["label.label"],
"mixed": True,
"sample_ids": [],
"slice": "default",
"slices": None,
"view": [],
}
result = await execute(schema, query, {"form": form})

# ensure only "default" count is returned, "other" should be omitted
self.assertEqual(
Expand All @@ -138,6 +133,24 @@ async def test_group_mode_sidebar_counts(self, dataset: fo.Dataset):
},
)

form["query_performance"] = True
result = await execute(schema, query, {"form": form})

# when query performance is on, omit values
self.assertEqual(
result.data,
{
"aggregations": [
{
"path": "label.label",
"count": 1,
"exists": 1,
"values": None,
},
],
},
)


class TestGroupModeHistogramCounts(unittest.IsolatedAsyncioTestCase):
@drop_async_dataset
Expand Down

0 comments on commit 8e580a9

Please sign in to comment.