Skip to content

Commit dfd4c5d

Browse files
feat: Support the Count aggregation query (#673)
* feat: Support the Count aggregation query * Fix docs build * Add test coverage for calling count from Query. * Fix the test. * Add aggregation doc and update docstrings. * Add the aggregation.rst file * Test that the aggregation alias is unique Test in transaction * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Type annotation fix * Remove unneeded variable Refactor system test suite to fallback to default creds Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 24c3848 commit dfd4c5d

27 files changed

+2055
-182
lines changed

docs/aggregation.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
Aggregation
2+
~~~~~~~~~~~
3+
4+
.. automodule:: google.cloud.firestore_v1.aggregation
5+
:members:
6+
:show-inheritance:
7+
8+
.. automodule:: google.cloud.firestore_v1.base_aggregation
9+
:members:
10+
:show-inheritance:
11+
12+
.. automodule:: google.cloud.firestore_v1.async_aggregation
13+
:members:
14+
:show-inheritance:

docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ API Reference
1010

1111
client
1212
collection
13+
aggregation
1314
document
1415
field_path
1516
query
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
# Copyright 2023 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Classes for representing aggregation queries for the Google Cloud Firestore API.
16+
17+
A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from
18+
a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
19+
a more common way to create an aggregation query than direct usage of the constructor.
20+
"""
21+
from __future__ import annotations
22+
23+
from google.api_core import exceptions
24+
from google.api_core import gapic_v1
25+
from google.api_core import retry as retries
26+
27+
28+
from google.cloud.firestore_v1.base_aggregation import (
29+
AggregationResult,
30+
BaseAggregationQuery,
31+
_query_response_to_result,
32+
)
33+
34+
from typing import Generator, Union, List, Any
35+
36+
37+
class AggregationQuery(BaseAggregationQuery):
38+
"""Represents an aggregation query to the Firestore API."""
39+
40+
def __init__(
41+
self,
42+
nested_query,
43+
) -> None:
44+
super(AggregationQuery, self).__init__(nested_query)
45+
46+
def get(
47+
self,
48+
transaction=None,
49+
retry: Union[
50+
retries.Retry, None, gapic_v1.method._MethodDefault
51+
] = gapic_v1.method.DEFAULT,
52+
timeout: float | None = None,
53+
) -> List[AggregationResult]:
54+
"""Runs the aggregation query.
55+
56+
This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
57+
58+
Args:
59+
transaction
60+
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
61+
An existing transaction that this query will run in.
62+
If a ``transaction`` is used and it already has write operations
63+
added, this method cannot be used (i.e. read-after-write is not
64+
allowed).
65+
retry (google.api_core.retry.Retry): Designation of what errors, if any,
66+
should be retried. Defaults to a system-specified policy.
67+
timeout (float): The timeout for this request. Defaults to a
68+
system-specified value.
69+
70+
Returns:
71+
list: The aggregation query results
72+
73+
"""
74+
result = self.stream(transaction=transaction, retry=retry, timeout=timeout)
75+
return list(result) # type: ignore
76+
77+
def _get_stream_iterator(self, transaction, retry, timeout):
78+
"""Helper method for :meth:`stream`."""
79+
request, kwargs = self._prep_stream(
80+
transaction,
81+
retry,
82+
timeout,
83+
)
84+
85+
return self._client._firestore_api.run_aggregation_query(
86+
request=request,
87+
metadata=self._client._rpc_metadata,
88+
**kwargs,
89+
)
90+
91+
def _retry_query_after_exception(self, exc, retry, transaction):
92+
"""Helper method for :meth:`stream`."""
93+
if transaction is None: # no snapshot-based retry inside transaction
94+
if retry is gapic_v1.method.DEFAULT:
95+
transport = self._client._firestore_api._transport
96+
gapic_callable = transport.run_aggregation_query
97+
retry = gapic_callable._retry
98+
return retry._predicate(exc)
99+
100+
return False
101+
102+
def stream(
103+
self,
104+
transaction=None,
105+
retry: Union[
106+
retries.Retry, None, gapic_v1.method._MethodDefault
107+
] = gapic_v1.method.DEFAULT,
108+
timeout: float | None = None,
109+
) -> Union[Generator[List[AggregationResult], Any, None]]:
110+
"""Runs the aggregation query.
111+
112+
This sends a ``RunAggregationQuery`` RPC and then returns an iterator which
113+
consumes each document returned in the stream of ``RunAggregationQueryResponse``
114+
messages.
115+
116+
If a ``transaction`` is used and it already has write operations
117+
added, this method cannot be used (i.e. read-after-write is not
118+
allowed).
119+
120+
Args:
121+
transaction
122+
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
123+
An existing transaction that this query will run in.
124+
retry (google.api_core.retry.Retry): Designation of what errors, if any,
125+
should be retried. Defaults to a system-specified policy.
126+
timeout (float): The timeout for this request. Defaults to a
127+
system-specified value.
128+
129+
Yields:
130+
:class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`:
131+
The result of aggregations of this query
132+
"""
133+
134+
response_iterator = self._get_stream_iterator(
135+
transaction,
136+
retry,
137+
timeout,
138+
)
139+
while True:
140+
try:
141+
response = next(response_iterator, None)
142+
except exceptions.GoogleAPICallError as exc:
143+
if self._retry_query_after_exception(exc, retry, transaction):
144+
response_iterator = self._get_stream_iterator(
145+
transaction,
146+
retry,
147+
timeout,
148+
)
149+
continue
150+
else:
151+
raise
152+
153+
if response is None: # EOI
154+
break
155+
result = _query_response_to_result(response)
156+
yield result
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# Copyright 2023 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Classes for representing Async aggregation queries for the Google Cloud Firestore API.
16+
17+
A :class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery` can be created directly from
18+
a :class:`~google.cloud.firestore_v1.async_collection.AsyncCollection` and that can be
19+
a more common way to create an aggregation query than direct usage of the constructor.
20+
"""
21+
from __future__ import annotations
22+
23+
from google.api_core import gapic_v1
24+
from google.api_core import retry as retries
25+
26+
from typing import List, Union, AsyncGenerator
27+
28+
29+
from google.cloud.firestore_v1.base_aggregation import (
30+
AggregationResult,
31+
_query_response_to_result,
32+
BaseAggregationQuery,
33+
)
34+
35+
36+
class AsyncAggregationQuery(BaseAggregationQuery):
37+
"""Represents an aggregation query to the Firestore API."""
38+
39+
def __init__(
40+
self,
41+
nested_query,
42+
) -> None:
43+
super(AsyncAggregationQuery, self).__init__(nested_query)
44+
45+
async def get(
46+
self,
47+
transaction=None,
48+
retry: Union[
49+
retries.Retry, None, gapic_v1.method._MethodDefault
50+
] = gapic_v1.method.DEFAULT,
51+
timeout: float | None = None,
52+
) -> List[AggregationResult]:
53+
"""Runs the aggregation query.
54+
55+
This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
56+
57+
Args:
58+
transaction
59+
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
60+
An existing transaction that this query will run in.
61+
If a ``transaction`` is used and it already has write operations
62+
added, this method cannot be used (i.e. read-after-write is not
63+
allowed).
64+
retry (google.api_core.retry.Retry): Designation of what errors, if any,
65+
should be retried. Defaults to a system-specified policy.
66+
timeout (float): The timeout for this request. Defaults to a
67+
system-specified value.
68+
69+
Returns:
70+
list: The aggregation query results
71+
72+
"""
73+
stream_result = self.stream(
74+
transaction=transaction, retry=retry, timeout=timeout
75+
)
76+
result = [aggregation async for aggregation in stream_result]
77+
return result # type: ignore
78+
79+
async def stream(
80+
self,
81+
transaction=None,
82+
retry: Union[
83+
retries.Retry, None, gapic_v1.method._MethodDefault
84+
] = gapic_v1.method.DEFAULT,
85+
timeout: float | None = None,
86+
) -> Union[AsyncGenerator[List[AggregationResult], None]]:
87+
"""Runs the aggregation query.
88+
89+
This sends a ``RunAggregationQuery`` RPC and then returns an iterator which
90+
consumes each document returned in the stream of ``RunAggregationQueryResponse``
91+
messages.
92+
93+
If a ``transaction`` is used and it already has write operations
94+
added, this method cannot be used (i.e. read-after-write is not
95+
allowed).
96+
97+
Args:
98+
transaction
99+
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
100+
An existing transaction that this query will run in.
101+
retry (google.api_core.retry.Retry): Designation of what errors, if any,
102+
should be retried. Defaults to a system-specified policy.
103+
timeout (float): The timeout for this request. Defaults to a
104+
system-specified value.
105+
106+
Yields:
107+
:class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`:
108+
The result of aggregations of this query
109+
"""
110+
request, kwargs = self._prep_stream(
111+
transaction,
112+
retry,
113+
timeout,
114+
)
115+
116+
response_iterator = await self._client._firestore_api.run_aggregation_query(
117+
request=request,
118+
metadata=self._client._rpc_metadata,
119+
**kwargs,
120+
)
121+
122+
async for response in response_iterator:
123+
result = _query_response_to_result(response)
124+
yield result

google/cloud/firestore_v1/async_collection.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121
BaseCollectionReference,
2222
_item_to_document_ref,
2323
)
24-
from google.cloud.firestore_v1 import (
25-
async_query,
26-
async_document,
27-
)
24+
from google.cloud.firestore_v1 import async_query, async_document, async_aggregation
2825

2926
from google.cloud.firestore_v1.document import DocumentReference
3027

@@ -72,6 +69,14 @@ def _query(self) -> async_query.AsyncQuery:
7269
"""
7370
return async_query.AsyncQuery(self)
7471

72+
def _aggregation_query(self) -> async_aggregation.AsyncAggregationQuery:
73+
"""AsyncAggregationQuery factory.
74+
75+
Returns:
76+
:class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery
77+
"""
78+
return async_aggregation.AsyncAggregationQuery(self._query())
79+
7580
async def _chunkify(self, chunk_size: int):
7681
async for page in self._query()._chunkify(chunk_size):
7782
yield page

google/cloud/firestore_v1/async_query.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
1919
a more common way to create a query than direct usage of the constructor.
2020
"""
21+
from __future__ import annotations
2122

2223
from google.api_core import gapic_v1
2324
from google.api_core import retry as retries
@@ -39,6 +40,8 @@
3940
# Types needed only for Type Hints
4041
from google.cloud.firestore_v1.transaction import Transaction
4142

43+
from google.cloud.firestore_v1.async_aggregation import AsyncAggregationQuery
44+
4245

4346
class AsyncQuery(BaseQuery):
4447
"""Represents a query to the Firestore API.
@@ -213,6 +216,21 @@ async def get(
213216

214217
return result
215218

219+
def count(
220+
self, alias: str | None = None
221+
) -> Type["firestore_v1.async_aggregation.AsyncAggregationQuery"]:
222+
"""Adds a count over the nested query.
223+
224+
Args:
225+
alias
226+
(Optional[str]): The alias for the count
227+
228+
Returns:
229+
:class:`~google.cloud.firestore_v1.async_aggregation.AsyncAggregationQuery`:
230+
An instance of an AsyncAggregationQuery object
231+
"""
232+
return AsyncAggregationQuery(self).count(alias=alias)
233+
216234
async def stream(
217235
self,
218236
transaction=None,

0 commit comments

Comments
 (0)