Skip to content

Commit 651ca9c

Browse files
feat: default enable multiplex session for all operations unless explicitly set to false (#1394)
* feat: enable multiplex session for all operations unless explicitly set to false * fix tests * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * rename job name * fux emulator systest * update python version for emulator tests * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix test * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix test * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix systests * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * skip dbapi test which depends on session delete * revert timestamp changes * revert timestamp changes * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * more fixes * fix regular session systests * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * expect precommit token only when session is multiplexed. * pin emulator version to make multiplex session with emulator pass --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent f81fbd5 commit 651ca9c

20 files changed

+771
-428
lines changed

.github/workflows/integration-tests-against-emulator-with-multiplexed-session.yaml renamed to .github/workflows/integration-tests-against-emulator-with-regular-session.yaml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ on:
33
branches:
44
- main
55
pull_request:
6-
name: Run Spanner integration tests against emulator with multiplexed sessions
6+
name: Run Spanner integration tests against emulator with regular sessions
77
jobs:
88
system-tests:
99
runs-on: ubuntu-latest
@@ -21,7 +21,7 @@ jobs:
2121
- name: Setup Python
2222
uses: actions/setup-python@v5
2323
with:
24-
python-version: 3.8
24+
python-version: 3.12
2525
- name: Install nox
2626
run: python -m pip install nox
2727
- name: Run system tests
@@ -30,5 +30,6 @@ jobs:
3030
SPANNER_EMULATOR_HOST: localhost:9010
3131
GOOGLE_CLOUD_PROJECT: emulator-test-project
3232
GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE: true
33-
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
34-
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true
33+
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: false
34+
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: false
35+
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW: false

.github/workflows/integration-tests-against-emulator.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ jobs:
1010

1111
services:
1212
emulator:
13-
image: gcr.io/cloud-spanner-emulator/emulator:latest
13+
image: gcr.io/cloud-spanner-emulator/emulator:1.5.37
1414
ports:
1515
- 9010:9010
1616
- 9020:9020
@@ -21,7 +21,7 @@ jobs:
2121
- name: Setup Python
2222
uses: actions/setup-python@v5
2323
with:
24-
python-version: 3.8
24+
python-version: 3.12
2525
- name: Install nox
2626
run: python -m pip install nox
2727
- name: Run system tests

.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg renamed to .kokoro/presubmit/integration-regular-sessions-enabled.cfg

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,15 @@ env_vars: {
88

99
env_vars: {
1010
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
11-
value: "true"
11+
value: "false"
1212
}
1313

1414
env_vars: {
1515
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
16-
value: "true"
16+
value: "false"
17+
}
18+
19+
env_vars: {
20+
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"
21+
value: "false"
1722
}

google/cloud/spanner_v1/database.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,14 @@ def session(self, labels=None, database_role=None):
848848
# If role is specified in param, then that role is used
849849
# instead.
850850
role = database_role or self._database_role
851-
return Session(self, labels=labels, database_role=role)
851+
is_multiplexed = False
852+
if self.sessions_manager._use_multiplexed(
853+
transaction_type=TransactionType.READ_ONLY
854+
):
855+
is_multiplexed = True
856+
return Session(
857+
self, labels=labels, database_role=role, is_multiplexed=is_multiplexed
858+
)
852859

853860
def snapshot(self, **kw):
854861
"""Return an object which wraps a snapshot.

google/cloud/spanner_v1/database_sessions_manager.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -230,15 +230,13 @@ def _use_multiplexed(cls, transaction_type: TransactionType) -> bool:
230230
"""Returns whether to use multiplexed sessions for the given transaction type.
231231
232232
Multiplexed sessions are enabled for read-only transactions if:
233-
* _ENV_VAR_MULTIPLEXED is set to true.
233+
* _ENV_VAR_MULTIPLEXED != 'false'.
234234
235235
Multiplexed sessions are enabled for partitioned transactions if:
236-
* _ENV_VAR_MULTIPLEXED is set to true; and
237-
* _ENV_VAR_MULTIPLEXED_PARTITIONED is set to true.
236+
* _ENV_VAR_MULTIPLEXED_PARTITIONED != 'false'.
238237
239238
Multiplexed sessions are enabled for read/write transactions if:
240-
* _ENV_VAR_MULTIPLEXED is set to true; and
241-
* _ENV_VAR_MULTIPLEXED_READ_WRITE is set to true.
239+
* _ENV_VAR_MULTIPLEXED_READ_WRITE != 'false'.
242240
243241
:type transaction_type: :class:`TransactionType`
244242
:param transaction_type: the type of transaction
@@ -254,30 +252,26 @@ def _use_multiplexed(cls, transaction_type: TransactionType) -> bool:
254252
return cls._getenv(cls._ENV_VAR_MULTIPLEXED)
255253

256254
elif transaction_type is TransactionType.PARTITIONED:
257-
return cls._getenv(cls._ENV_VAR_MULTIPLEXED) and cls._getenv(
258-
cls._ENV_VAR_MULTIPLEXED_PARTITIONED
259-
)
255+
return cls._getenv(cls._ENV_VAR_MULTIPLEXED_PARTITIONED)
260256

261257
elif transaction_type is TransactionType.READ_WRITE:
262-
return cls._getenv(cls._ENV_VAR_MULTIPLEXED) and cls._getenv(
263-
cls._ENV_VAR_MULTIPLEXED_READ_WRITE
264-
)
258+
return cls._getenv(cls._ENV_VAR_MULTIPLEXED_READ_WRITE)
265259

266260
raise ValueError(f"Transaction type {transaction_type} is not supported.")
267261

268262
@classmethod
269263
def _getenv(cls, env_var_name: str) -> bool:
270264
"""Returns the value of the given environment variable as a boolean.
271265
272-
True values are '1' and 'true' (case-insensitive).
273-
All other values are considered false.
266+
True unless explicitly 'false' (case-insensitive).
267+
All other values (including unset) are considered true.
274268
275269
:type env_var_name: str
276270
:param env_var_name: the name of the boolean environment variable
277271
278272
:rtype: bool
279-
:returns: True if the environment variable is set to a true value, False otherwise.
273+
:returns: True unless the environment variable is set to 'false', False otherwise.
280274
"""
281275

282-
env_var_value = getenv(env_var_name, "").lower().strip()
283-
return env_var_value in ["1", "true"]
276+
env_var_value = getenv(env_var_name, "true").lower().strip()
277+
return env_var_value != "false"

google/cloud/spanner_v1/session.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,13 @@ def delete(self):
275275
current_span, "Deleting Session failed due to unset session_id"
276276
)
277277
raise ValueError("Session ID not set by back-end")
278-
278+
if self._is_multiplexed:
279+
add_span_event(
280+
current_span,
281+
"Skipped deleting Multiplexed Session",
282+
{"session.id": self._session_id},
283+
)
284+
return
279285
add_span_event(
280286
current_span, "Deleting Session", {"session.id": self._session_id}
281287
)

google/cloud/spanner_v1/snapshot.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ def _restart_on_unavailable(
133133
# Update the transaction from the response.
134134
if transaction is not None:
135135
transaction._update_for_result_set_pb(item)
136+
if item.precommit_token is not None and transaction is not None:
137+
transaction._update_for_precommit_token_pb(item.precommit_token)
136138

137139
if item.resume_token:
138140
resume_token = item.resume_token
@@ -1013,9 +1015,6 @@ def _update_for_result_set_pb(
10131015
if result_set_pb.metadata and result_set_pb.metadata.transaction:
10141016
self._update_for_transaction_pb(result_set_pb.metadata.transaction)
10151017

1016-
if result_set_pb.precommit_token:
1017-
self._update_for_precommit_token_pb(result_set_pb.precommit_token)
1018-
10191018
def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
10201019
"""Updates the snapshot for the given transaction.
10211020
@@ -1031,7 +1030,7 @@ def _update_for_transaction_pb(self, transaction_pb: Transaction) -> None:
10311030
self._transaction_id = transaction_pb.id
10321031

10331032
if transaction_pb.precommit_token:
1034-
self._update_for_precommit_token_pb(transaction_pb.precommit_token)
1033+
self._update_for_precommit_token_pb_unsafe(transaction_pb.precommit_token)
10351034

10361035
def _update_for_precommit_token_pb(
10371036
self, precommit_token_pb: MultiplexedSessionPrecommitToken
@@ -1044,10 +1043,22 @@ def _update_for_precommit_token_pb(
10441043
# Because multiple threads can be used to perform operations within a
10451044
# transaction, we need to use a lock when updating the precommit token.
10461045
with self._lock:
1047-
if self._precommit_token is None or (
1048-
precommit_token_pb.seq_num > self._precommit_token.seq_num
1049-
):
1050-
self._precommit_token = precommit_token_pb
1046+
self._update_for_precommit_token_pb_unsafe(precommit_token_pb)
1047+
1048+
def _update_for_precommit_token_pb_unsafe(
1049+
self, precommit_token_pb: MultiplexedSessionPrecommitToken
1050+
) -> None:
1051+
"""Updates the snapshot for the given multiplexed session precommit token.
1052+
This method is unsafe because it does not acquire a lock before updating
1053+
the precommit token. It should only be used when the caller has already
1054+
acquired the lock.
1055+
:type precommit_token_pb: :class:`~google.cloud.spanner_v1.MultiplexedSessionPrecommitToken`
1056+
:param precommit_token_pb: The multiplexed session precommit token to update the snapshot with.
1057+
"""
1058+
if self._precommit_token is None or (
1059+
precommit_token_pb.seq_num > self._precommit_token.seq_num
1060+
):
1061+
self._precommit_token = precommit_token_pb
10511062

10521063

10531064
class Snapshot(_SnapshotBase):

google/cloud/spanner_v1/transaction.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -285,13 +285,18 @@ def commit(
285285

286286
def wrapped_method(*args, **kwargs):
287287
attempt.increment()
288+
commit_request_args = {
289+
"mutations": mutations,
290+
**common_commit_request_args,
291+
}
292+
# Check if session is multiplexed (safely handle mock sessions)
293+
is_multiplexed = getattr(self._session, "is_multiplexed", False)
294+
if is_multiplexed and self._precommit_token is not None:
295+
commit_request_args["precommit_token"] = self._precommit_token
296+
288297
commit_method = functools.partial(
289298
api.commit,
290-
request=CommitRequest(
291-
mutations=mutations,
292-
precommit_token=self._precommit_token,
293-
**common_commit_request_args,
294-
),
299+
request=CommitRequest(**commit_request_args),
295300
metadata=database.metadata_with_request_id(
296301
nth_request,
297302
attempt.value,
@@ -516,6 +521,9 @@ def wrapped_method(*args, **kwargs):
516521
if is_inline_begin:
517522
self._lock.release()
518523

524+
if result_set_pb.precommit_token is not None:
525+
self._update_for_precommit_token_pb(result_set_pb.precommit_token)
526+
519527
return result_set_pb.stats.row_count_exact
520528

521529
def batch_update(
@@ -660,6 +668,14 @@ def wrapped_method(*args, **kwargs):
660668
if is_inline_begin:
661669
self._lock.release()
662670

671+
if (
672+
len(response_pb.result_sets) > 0
673+
and response_pb.result_sets[0].precommit_token
674+
):
675+
self._update_for_precommit_token_pb(
676+
response_pb.result_sets[0].precommit_token
677+
)
678+
663679
row_counts = [
664680
result_set.stats.row_count_exact for result_set in response_pb.result_sets
665681
]
@@ -736,9 +752,6 @@ def _update_for_execute_batch_dml_response_pb(
736752
:type response_pb: :class:`~google.cloud.spanner_v1.types.ExecuteBatchDmlResponse`
737753
:param response_pb: The execute batch DML response to update the transaction with.
738754
"""
739-
if response_pb.precommit_token:
740-
self._update_for_precommit_token_pb(response_pb.precommit_token)
741-
742755
# Only the first result set contains the result set metadata.
743756
if len(response_pb.result_sets) > 0:
744757
self._update_for_result_set_pb(response_pb.result_sets[0])

tests/_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def is_multiplexed_enabled(transaction_type: TransactionType) -> bool:
4343
env_var_read_write = "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_FOR_RW"
4444

4545
def _getenv(val: str) -> bool:
46-
return getenv(val, "false").lower() == "true"
46+
return getenv(val, "true").lower().strip() != "false"
4747

4848
if transaction_type is TransactionType.READ_ONLY:
4949
return _getenv(env_var)

tests/mockserver_tests/mock_server_test_base.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
SpannerServicer,
4242
start_mock_server,
4343
)
44+
from tests._helpers import is_multiplexed_enabled
4445

4546

4647
# Creates an aborted status with the smallest possible retry delay.
@@ -228,3 +229,109 @@ def database(self) -> Database:
228229
enable_interceptors_in_tests=True,
229230
)
230231
return self._database
232+
233+
def assert_requests_sequence(
234+
self,
235+
requests,
236+
expected_types,
237+
transaction_type,
238+
allow_multiple_batch_create=True,
239+
):
240+
"""Assert that the requests sequence matches the expected types, accounting for multiplexed sessions and retries.
241+
242+
Args:
243+
requests: List of requests from spanner_service.requests
244+
expected_types: List of expected request types (excluding session creation requests)
245+
transaction_type: TransactionType enum value to check multiplexed session status
246+
allow_multiple_batch_create: If True, skip all leading BatchCreateSessionsRequest and one optional CreateSessionRequest
247+
"""
248+
from google.cloud.spanner_v1 import (
249+
BatchCreateSessionsRequest,
250+
CreateSessionRequest,
251+
)
252+
253+
mux_enabled = is_multiplexed_enabled(transaction_type)
254+
idx = 0
255+
# Skip all leading BatchCreateSessionsRequest (for retries)
256+
if allow_multiple_batch_create:
257+
while idx < len(requests) and isinstance(
258+
requests[idx], BatchCreateSessionsRequest
259+
):
260+
idx += 1
261+
# For multiplexed, optionally skip a CreateSessionRequest
262+
if (
263+
mux_enabled
264+
and idx < len(requests)
265+
and isinstance(requests[idx], CreateSessionRequest)
266+
):
267+
idx += 1
268+
else:
269+
if mux_enabled:
270+
self.assertTrue(
271+
isinstance(requests[idx], BatchCreateSessionsRequest),
272+
f"Expected BatchCreateSessionsRequest at index {idx}, got {type(requests[idx])}",
273+
)
274+
idx += 1
275+
self.assertTrue(
276+
isinstance(requests[idx], CreateSessionRequest),
277+
f"Expected CreateSessionRequest at index {idx}, got {type(requests[idx])}",
278+
)
279+
idx += 1
280+
else:
281+
self.assertTrue(
282+
isinstance(requests[idx], BatchCreateSessionsRequest),
283+
f"Expected BatchCreateSessionsRequest at index {idx}, got {type(requests[idx])}",
284+
)
285+
idx += 1
286+
# Check the rest of the expected request types
287+
for expected_type in expected_types:
288+
self.assertTrue(
289+
isinstance(requests[idx], expected_type),
290+
f"Expected {expected_type} at index {idx}, got {type(requests[idx])}",
291+
)
292+
idx += 1
293+
self.assertEqual(
294+
idx, len(requests), f"Expected {idx} requests, got {len(requests)}"
295+
)
296+
297+
def adjust_request_id_sequence(self, expected_segments, requests, transaction_type):
298+
"""Adjust expected request ID sequence numbers based on actual session creation requests.
299+
300+
Args:
301+
expected_segments: List of expected (method, (sequence_numbers)) tuples
302+
requests: List of actual requests from spanner_service.requests
303+
transaction_type: TransactionType enum value to check multiplexed session status
304+
305+
Returns:
306+
List of adjusted expected segments with corrected sequence numbers
307+
"""
308+
from google.cloud.spanner_v1 import (
309+
BatchCreateSessionsRequest,
310+
CreateSessionRequest,
311+
ExecuteSqlRequest,
312+
BeginTransactionRequest,
313+
)
314+
315+
# Count session creation requests that come before the first non-session request
316+
session_requests_before = 0
317+
for req in requests:
318+
if isinstance(req, (BatchCreateSessionsRequest, CreateSessionRequest)):
319+
session_requests_before += 1
320+
elif isinstance(req, (ExecuteSqlRequest, BeginTransactionRequest)):
321+
break
322+
323+
# For multiplexed sessions, we expect 2 session requests (BatchCreateSessions + CreateSession)
324+
# For non-multiplexed, we expect 1 session request (BatchCreateSessions)
325+
mux_enabled = is_multiplexed_enabled(transaction_type)
326+
expected_session_requests = 2 if mux_enabled else 1
327+
extra_session_requests = session_requests_before - expected_session_requests
328+
329+
# Adjust sequence numbers based on extra session requests
330+
adjusted_segments = []
331+
for method, seq_nums in expected_segments:
332+
# Adjust the sequence number (5th element in the tuple)
333+
adjusted_seq_nums = list(seq_nums)
334+
adjusted_seq_nums[4] += extra_session_requests
335+
adjusted_segments.append((method, tuple(adjusted_seq_nums)))
336+
337+
return adjusted_segments

0 commit comments

Comments
 (0)