Skip to content

Commit c4210b2

Browse files
feat: Add support for Directed Reads (#1000)
* changes * changes * docs * docs * linting * feat(spanner): remove client side validations for directed read options * feat(spanner): update the auto_failover_disabled field * feat(spanner): update unit tests * feat(spanner): update test * feat(spanner): update documentation * feat(spanner): add system test to validate exception in case of RW transaction * feat(spanner): update unit test * feat(spanner): add dro for batchsnapshot and update system tests * feat(spanner): fix unit tests for batchsnapshot * feat(spanner): add unit tests for partition read and query * feat(spanner): lint fixes * feat(spanner): code refactor remove TransactionType * feat(spanner): comment refactor * feat(spanner): remove comments --------- Co-authored-by: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Co-authored-by: Sri Harsha CH <sriharshach@google.com>
1 parent 07a0202 commit c4210b2

File tree

14 files changed

+564
-8
lines changed

14 files changed

+564
-8
lines changed

google/cloud/spanner_v1/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from .types.spanner import CommitRequest
4141
from .types.spanner import CreateSessionRequest
4242
from .types.spanner import DeleteSessionRequest
43+
from .types.spanner import DirectedReadOptions
4344
from .types.spanner import ExecuteBatchDmlRequest
4445
from .types.spanner import ExecuteBatchDmlResponse
4546
from .types.spanner import ExecuteSqlRequest
@@ -108,6 +109,7 @@
108109
"CommitResponse",
109110
"CreateSessionRequest",
110111
"DeleteSessionRequest",
112+
"DirectedReadOptions",
111113
"ExecuteBatchDmlRequest",
112114
"ExecuteBatchDmlResponse",
113115
"ExecuteSqlRequest",

google/cloud/spanner_v1/client.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ class Client(ClientWithProject):
120120
disable leader aware routing. Disabling leader aware routing would
121121
route all requests in RW/PDML transactions to the closest region.
122122
123+
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
124+
or :class:`dict`
125+
:param directed_read_options: (Optional) Client options used to set the directed_read_options
126+
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
127+
or regions should be used for non-transactional reads or queries.
128+
123129
:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
124130
and ``admin`` are :data:`True`
125131
"""
@@ -139,6 +145,7 @@ def __init__(
139145
client_options=None,
140146
query_options=None,
141147
route_to_leader_enabled=True,
148+
directed_read_options=None,
142149
):
143150
self._emulator_host = _get_spanner_emulator_host()
144151

@@ -179,6 +186,7 @@ def __init__(
179186
warnings.warn(_EMULATOR_HOST_HTTP_SCHEME)
180187

181188
self._route_to_leader_enabled = route_to_leader_enabled
189+
self._directed_read_options = directed_read_options
182190

183191
@property
184192
def credentials(self):
@@ -260,6 +268,17 @@ def route_to_leader_enabled(self):
260268
"""
261269
return self._route_to_leader_enabled
262270

271+
@property
272+
def directed_read_options(self):
273+
"""Getter for directed_read_options.
274+
275+
:rtype:
276+
:class:`~google.cloud.spanner_v1.DirectedReadOptions`
277+
or :class:`dict`
278+
:returns: The directed_read_options for the client.
279+
"""
280+
return self._directed_read_options
281+
263282
def copy(self):
264283
"""Make a copy of this client.
265284
@@ -383,3 +402,14 @@ def list_instances(self, filter_="", page_size=None):
383402
request=request, metadata=metadata
384403
)
385404
return page_iter
405+
406+
@directed_read_options.setter
407+
def directed_read_options(self, directed_read_options):
408+
"""Sets directed_read_options for the client
409+
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
410+
or :class:`dict`
411+
:param directed_read_options: Client options used to set the directed_read_options
412+
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
413+
or regions should be used for non-transactional reads or queries.
414+
"""
415+
self._directed_read_options = directed_read_options

google/cloud/spanner_v1/database.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ def __init__(
167167
self._route_to_leader_enabled = self._instance._client.route_to_leader_enabled
168168
self._enable_drop_protection = enable_drop_protection
169169
self._reconciling = False
170+
self._directed_read_options = self._instance._client.directed_read_options
170171

171172
if pool is None:
172173
pool = BurstyPool(database_role=database_role)
@@ -1226,6 +1227,7 @@ def generate_read_batches(
12261227
partition_size_bytes=None,
12271228
max_partitions=None,
12281229
data_boost_enabled=False,
1230+
directed_read_options=None,
12291231
*,
12301232
retry=gapic_v1.method.DEFAULT,
12311233
timeout=gapic_v1.method.DEFAULT,
@@ -1265,6 +1267,12 @@ def generate_read_batches(
12651267
(Optional) If this is for a partitioned read and this field is
12661268
set ``true``, the request will be executed via offline access.
12671269
1270+
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
1271+
or :class:`dict`
1272+
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
1273+
for ReadRequests that indicates which replicas
1274+
or regions should be used for non-transactional reads.
1275+
12681276
:type retry: :class:`~google.api_core.retry.Retry`
12691277
:param retry: (Optional) The retry settings for this request.
12701278
@@ -1293,6 +1301,7 @@ def generate_read_batches(
12931301
"keyset": keyset._to_dict(),
12941302
"index": index,
12951303
"data_boost_enabled": data_boost_enabled,
1304+
"directed_read_options": directed_read_options,
12961305
}
12971306
for partition in partitions:
12981307
yield {"partition": partition, "read": read_info.copy()}
@@ -1337,6 +1346,7 @@ def generate_query_batches(
13371346
max_partitions=None,
13381347
query_options=None,
13391348
data_boost_enabled=False,
1349+
directed_read_options=None,
13401350
*,
13411351
retry=gapic_v1.method.DEFAULT,
13421352
timeout=gapic_v1.method.DEFAULT,
@@ -1388,6 +1398,12 @@ def generate_query_batches(
13881398
(Optional) If this is for a partitioned query and this field is
13891399
set ``true``, the request will be executed via offline access.
13901400
1401+
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
1402+
or :class:`dict`
1403+
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
1404+
for ExecuteSqlRequests that indicates which replicas
1405+
or regions should be used for non-transactional queries.
1406+
13911407
:type retry: :class:`~google.api_core.retry.Retry`
13921408
:param retry: (Optional) The retry settings for this request.
13931409
@@ -1412,6 +1428,7 @@ def generate_query_batches(
14121428
query_info = {
14131429
"sql": sql,
14141430
"data_boost_enabled": data_boost_enabled,
1431+
"directed_read_options": directed_read_options,
14151432
}
14161433
if params:
14171434
query_info["params"] = params

google/cloud/spanner_v1/snapshot.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ def read(
173173
partition=None,
174174
request_options=None,
175175
data_boost_enabled=False,
176+
directed_read_options=None,
176177
*,
177178
retry=gapic_v1.method.DEFAULT,
178179
timeout=gapic_v1.method.DEFAULT,
@@ -224,6 +225,12 @@ def read(
224225
``partition_token``, the API will return an
225226
``INVALID_ARGUMENT`` error.
226227
228+
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
229+
or :class:`dict`
230+
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
231+
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
232+
or regions should be used for non-transactional reads or queries.
233+
227234
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
228235
:returns: a result set instance which can be used to consume rows.
229236
@@ -253,6 +260,11 @@ def read(
253260
if self._read_only:
254261
# Transaction tags are not supported for read only transactions.
255262
request_options.transaction_tag = None
263+
if (
264+
directed_read_options is None
265+
and database._directed_read_options is not None
266+
):
267+
directed_read_options = database._directed_read_options
256268
elif self.transaction_tag is not None:
257269
request_options.transaction_tag = self.transaction_tag
258270

@@ -266,6 +278,7 @@ def read(
266278
partition_token=partition,
267279
request_options=request_options,
268280
data_boost_enabled=data_boost_enabled,
281+
directed_read_options=directed_read_options,
269282
)
270283
restart = functools.partial(
271284
api.streaming_read,
@@ -322,6 +335,7 @@ def execute_sql(
322335
retry=gapic_v1.method.DEFAULT,
323336
timeout=gapic_v1.method.DEFAULT,
324337
data_boost_enabled=False,
338+
directed_read_options=None,
325339
):
326340
"""Perform an ``ExecuteStreamingSql`` API request.
327341
@@ -379,6 +393,12 @@ def execute_sql(
379393
``partition_token``, the API will return an
380394
``INVALID_ARGUMENT`` error.
381395
396+
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
397+
or :class:`dict`
398+
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
399+
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
400+
or regions should be used for non-transactional reads or queries.
401+
382402
:raises ValueError:
383403
for reuse of single-use snapshots, or if a transaction ID is
384404
already pending for multiple-use snapshots.
@@ -419,6 +439,11 @@ def execute_sql(
419439
if self._read_only:
420440
# Transaction tags are not supported for read only transactions.
421441
request_options.transaction_tag = None
442+
if (
443+
directed_read_options is None
444+
and database._directed_read_options is not None
445+
):
446+
directed_read_options = database._directed_read_options
422447
elif self.transaction_tag is not None:
423448
request_options.transaction_tag = self.transaction_tag
424449

@@ -433,6 +458,7 @@ def execute_sql(
433458
query_options=query_options,
434459
request_options=request_options,
435460
data_boost_enabled=data_boost_enabled,
461+
directed_read_options=directed_read_options,
436462
)
437463
restart = functools.partial(
438464
api.execute_streaming_sql,

samples/samples/snippets.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from google.cloud import spanner
3232
from google.cloud.spanner_admin_instance_v1.types import spanner_instance_admin
3333
from google.cloud.spanner_v1 import param_types
34+
from google.cloud.spanner_v1 import DirectedReadOptions
3435
from google.type import expr_pb2
3536
from google.iam.v1 import policy_pb2
3637
from google.cloud.spanner_v1.data_types import JsonObject
@@ -2723,6 +2724,78 @@ def drop_sequence(instance_id, database_id):
27232724

27242725
# [END spanner_drop_sequence]
27252726

2727+
2728+
def directed_read_options(
2729+
instance_id,
2730+
database_id,
2731+
):
2732+
"""
2733+
Shows how to run an execute sql request with directed read options.
2734+
Only one of exclude_replicas or include_replicas can be set
2735+
Each accepts a list of replicaSelections which contains location and type
2736+
* `location` - The location must be one of the regions within the
2737+
multi-region configuration of your database.
2738+
* `type_` - The type of the replica
2739+
Some examples of using replica_selectors are:
2740+
* `location:us-east1` --> The "us-east1" replica(s) of any available type
2741+
will be used to process the request.
2742+
* `type:READ_ONLY` --> The "READ_ONLY" type replica(s) in nearest
2743+
available location will be used to process the
2744+
request.
2745+
* `location:us-east1 type:READ_ONLY` --> The "READ_ONLY" type replica(s)
2746+
in location "us-east1" will be used to process
2747+
the request.
2748+
include_replicas also contains an option for auto_failover_disabled which when set
2749+
Spanner will not route requests to a replica outside the
2750+
include_replicas list when all the specified replicas are unavailable
2751+
or unhealthy. The default value is `false`
2752+
"""
2753+
# [START spanner_directed_read]
2754+
# instance_id = "your-spanner-instance"
2755+
# database_id = "your-spanner-db-id"
2756+
2757+
directed_read_options_for_client = {
2758+
"exclude_replicas": {
2759+
"replica_selections": [
2760+
{
2761+
"location": "us-east4",
2762+
},
2763+
],
2764+
},
2765+
}
2766+
2767+
# directed_read_options can be set at client level and will be used in all
2768+
# read-only transaction requests
2769+
spanner_client = spanner.Client(
2770+
directed_read_options=directed_read_options_for_client
2771+
)
2772+
instance = spanner_client.instance(instance_id)
2773+
database = instance.database(database_id)
2774+
2775+
directed_read_options_for_request = {
2776+
"include_replicas": {
2777+
"replica_selections": [
2778+
{
2779+
"type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY,
2780+
},
2781+
],
2782+
"auto_failover_disabled": True,
2783+
},
2784+
}
2785+
2786+
with database.snapshot() as snapshot:
2787+
# Read rows while passing directed_read_options directly to the query.
2788+
# These will override the options passed at Client level.
2789+
results = snapshot.execute_sql(
2790+
"SELECT SingerId, AlbumId, AlbumTitle FROM Albums",
2791+
directed_read_options=directed_read_options_for_request,
2792+
)
2793+
2794+
for row in results:
2795+
print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
2796+
# [END spanner_directed_read]
2797+
2798+
27262799
if __name__ == "__main__": # noqa: C901
27272800
parser = argparse.ArgumentParser(
27282801
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
@@ -2862,6 +2935,7 @@ def drop_sequence(instance_id, database_id):
28622935
"--database_role", default="new_parent"
28632936
)
28642937
enable_fine_grained_access_parser.add_argument("--title", default="condition title")
2938+
subparsers.add_parser("directed_read_options", help=directed_read_options.__doc__)
28652939

28662940
args = parser.parse_args()
28672941

@@ -2993,3 +3067,5 @@ def drop_sequence(instance_id, database_id):
29933067
args.database_role,
29943068
args.title,
29953069
)
3070+
elif args.command == "directed_read_options":
3071+
directed_read_options(args.instance_id, args.database_id)

samples/samples/snippets_test.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,3 +852,10 @@ def test_drop_sequence(capsys, instance_id, bit_reverse_sequence_database):
852852
"Altered Customers table to drop DEFAULT from CustomerId column and dropped the Seq sequence on database"
853853
in out
854854
)
855+
856+
857+
@pytest.mark.dependency(depends=["insert_data"])
858+
def test_directed_read_options(capsys, instance_id, sample_database):
859+
snippets.directed_read_options(instance_id, sample_database.database_id)
860+
out, _ = capsys.readouterr()
861+
assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out

0 commit comments

Comments
 (0)