Skip to content

Commit e2b7ae6

Browse files
nginsberg-googlerahul2393gcf-owl-bot[bot]
authored
feat(spanner): support max_commit_delay in Spanner transactions (#2854)
* add commit delay options * feat: add max_commit_delay options for commit requests * Point stuff to devel * Make samples work with devel. Add a typo. * Cleanup stuff * Remove duplicate definitions. * Fix merge conflict. * Remove SpannerSample pointing to devel. * Add some parans * Add a unit test to ensure unset commit delay is properly propagated * Add an integration test. * change maxCommitDelayInMillis of type Int to maxCommitDelay of type Duration. * Format * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: rahul yadav <rahulyadavsep92@gmail.com> Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 741e4cf commit e2b7ae6

File tree

5 files changed

+190
-3
lines changed

5 files changed

+190
-3
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.spanner.v1.DirectedReadOptions;
2121
import com.google.spanner.v1.RequestOptions.Priority;
2222
import java.io.Serializable;
23+
import java.time.Duration;
2324
import java.util.Objects;
2425

2526
/** Specifies options for various spanner operations */
@@ -140,6 +141,11 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
140141
return new PriorityOption(priority);
141142
}
142143

144+
public static ReadQueryUpdateTransactionOption maxCommitDelay(Duration maxCommitDelay) {
145+
Preconditions.checkArgument(!maxCommitDelay.isNegative(), "maxCommitDelay should be positive");
146+
return new MaxCommitDelayOption(maxCommitDelay);
147+
}
148+
143149
/**
144150
* Specifying this will cause the reads, queries, updates and writes operations statistics
145151
* collection to be grouped by tag.
@@ -247,6 +253,21 @@ void appendToOptions(Options options) {
247253

248254
static final CommitStatsOption COMMIT_STATS_OPTION = new CommitStatsOption();
249255

256+
/** Option to request {@link MaxCommitDelayOption} for read/write transactions. */
257+
static final class MaxCommitDelayOption extends InternalOption
258+
implements ReadQueryUpdateTransactionOption {
259+
final Duration maxCommitDelay;
260+
261+
MaxCommitDelayOption(Duration maxCommitDelay) {
262+
this.maxCommitDelay = maxCommitDelay;
263+
}
264+
265+
@Override
266+
void appendToOptions(Options options) {
267+
options.maxCommitDelay = maxCommitDelay;
268+
}
269+
}
270+
250271
/** Option to request Optimistic Concurrency Control for read/write transactions. */
251272
static final class OptimisticLockOption extends InternalOption implements TransactionOption {
252273
@Override
@@ -354,6 +375,9 @@ void appendToOptions(Options options) {
354375
}
355376

356377
private boolean withCommitStats;
378+
379+
private Duration maxCommitDelay;
380+
357381
private Long limit;
358382
private Integer prefetchChunks;
359383
private Integer bufferRows;
@@ -375,6 +399,14 @@ boolean withCommitStats() {
375399
return withCommitStats;
376400
}
377401

402+
boolean hasMaxCommitDelay() {
403+
return maxCommitDelay != null;
404+
}
405+
406+
Duration maxCommitDelay() {
407+
return maxCommitDelay;
408+
}
409+
378410
boolean hasLimit() {
379411
return limit != null;
380412
}
@@ -481,6 +513,9 @@ public String toString() {
481513
if (withCommitStats) {
482514
b.append("withCommitStats: ").append(withCommitStats).append(' ');
483515
}
516+
if (maxCommitDelay != null) {
517+
b.append("maxCommitDelay: ").append(maxCommitDelay).append(' ');
518+
}
484519
if (limit != null) {
485520
b.append("limit: ").append(limit).append(' ');
486521
}
@@ -533,6 +568,7 @@ public boolean equals(Object o) {
533568

534569
Options that = (Options) o;
535570
return Objects.equals(withCommitStats, that.withCommitStats)
571+
&& Objects.equals(maxCommitDelay, that.maxCommitDelay)
536572
&& (!hasLimit() && !that.hasLimit()
537573
|| hasLimit() && that.hasLimit() && Objects.equals(limit(), that.limit()))
538574
&& (!hasPrefetchChunks() && !that.hasPrefetchChunks()
@@ -562,6 +598,9 @@ public int hashCode() {
562598
if (withCommitStats) {
563599
result = 31 * result + 1231;
564600
}
601+
if (maxCommitDelay != null) {
602+
result = 31 * result + maxCommitDelay.hashCode();
603+
}
565604
if (limit != null) {
566605
result = 31 * result + limit.hashCode();
567606
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.google.common.collect.Lists;
3636
import com.google.common.util.concurrent.MoreExecutors;
3737
import com.google.protobuf.ByteString;
38+
import com.google.protobuf.Duration;
3839
import com.google.protobuf.Empty;
3940
import com.google.spanner.v1.BatchWriteRequest;
4041
import com.google.spanner.v1.BatchWriteResponse;
@@ -60,6 +61,7 @@
6061
* users need not be aware of the actual session management, pooling and handling.
6162
*/
6263
class SessionImpl implements Session {
64+
6365
private static final Tracer tracer = Tracing.getTracer();
6466

6567
/** Keep track of running transactions on this session per thread. */
@@ -86,8 +88,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
8688
* only have one such transaction active at a time.
8789
*/
8890
interface SessionTransaction {
91+
8992
/** Invalidates the transaction, generally because a new one has been started on the session. */
9093
void invalidate();
94+
9195
/** Registers the current span on the transaction. */
9296
void setSpan(Span span);
9397
}
@@ -176,16 +180,24 @@ public CommitResponse writeAtLeastOnceWithOptions(
176180
setActive(null);
177181
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
178182
Mutation.toProto(mutations, mutationsProto);
183+
Options options = Options.fromTransactionOptions(transactionOptions);
179184
final CommitRequest.Builder requestBuilder =
180185
CommitRequest.newBuilder()
181186
.setSession(name)
182-
.setReturnCommitStats(
183-
Options.fromTransactionOptions(transactionOptions).withCommitStats())
187+
.setReturnCommitStats(options.withCommitStats())
184188
.addAllMutations(mutationsProto)
185189
.setSingleUseTransaction(
186190
TransactionOptions.newBuilder()
187191
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
192+
if (options.hasMaxCommitDelay()) {
193+
requestBuilder.setMaxCommitDelay(
194+
Duration.newBuilder()
195+
.setSeconds(options.maxCommitDelay().getSeconds())
196+
.setNanos(options.maxCommitDelay().getNano())
197+
.build());
198+
}
188199
RequestOptions commitRequestOptions = getRequestOptions(transactionOptions);
200+
189201
if (commitRequestOptions != null) {
190202
requestBuilder.setRequestOptions(commitRequestOptions);
191203
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171

7272
/** Default implementation of {@link TransactionRunner}. */
7373
class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
74+
7475
private static final Tracer tracer = Tracing.getTracer();
7576
private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName());
7677
/**
@@ -84,6 +85,7 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
8485

8586
@VisibleForTesting
8687
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
88+
8789
static class Builder extends AbstractReadContext.Builder<Builder, TransactionContextImpl> {
8890

8991
private Clock clock = new Clock();
@@ -131,6 +133,7 @@ static Builder newBuilder() {
131133
*/
132134
private class TransactionContextAsyncResultSetImpl extends ForwardingAsyncResultSet
133135
implements ListenableAsyncResultSet {
136+
134137
private TransactionContextAsyncResultSetImpl(ListenableAsyncResultSet delegate) {
135138
super(delegate);
136139
}
@@ -339,6 +342,13 @@ ApiFuture<CommitResponse> commitAsync() {
339342
}
340343
builder.setRequestOptions(requestOptionsBuilder.build());
341344
}
345+
if (options.hasMaxCommitDelay()) {
346+
builder.setMaxCommitDelay(
347+
com.google.protobuf.Duration.newBuilder()
348+
.setSeconds(options.maxCommitDelay().getSeconds())
349+
.setNanos(options.maxCommitDelay().getNano())
350+
.build());
351+
}
342352
synchronized (lock) {
343353
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
344354
finishOps = SettableApiFuture.create();
@@ -354,6 +364,7 @@ ApiFuture<CommitResponse> commitAsync() {
354364
}
355365

356366
private final class CommitRunnable implements Runnable {
367+
357368
private final SettableApiFuture<CommitResponse> res;
358369
private final ApiFuture<Void> prev;
359370
private final CommitRequest.Builder requestBuilder;
@@ -575,7 +586,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
575586

576587
@Nullable
577588
String getTransactionTag() {
578-
if (this.options.hasTag()) return this.options.tag();
589+
if (this.options.hasTag()) {
590+
return this.options.tag();
591+
}
579592
return null;
580593
}
581594

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133

134134
@RunWith(JUnit4.class)
135135
public class DatabaseClientImplTest {
136+
136137
private static final String TEST_PROJECT = "my-project";
137138
private static final String TEST_INSTANCE = "my-instance";
138139
private static final String TEST_DATABASE = "my-database";
@@ -3635,6 +3636,112 @@ public void testAsyncTransactionManagerCommitWithPriority() {
36353636
assertEquals(Priority.PRIORITY_HIGH, request.getRequestOptions().getPriority());
36363637
}
36373638

3639+
@Test
3640+
public void testCommitWithoutMaxCommitDelay() {
3641+
DatabaseClient client =
3642+
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
3643+
TransactionRunner runner = client.readWriteTransaction();
3644+
runner.run(
3645+
transaction -> {
3646+
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
3647+
return null;
3648+
});
3649+
3650+
List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
3651+
assertThat(requests).hasSize(1);
3652+
CommitRequest request = requests.get(0);
3653+
assertFalse(request.hasMaxCommitDelay());
3654+
}
3655+
3656+
@Test
3657+
public void testCommitWithMaxCommitDelay() {
3658+
DatabaseClient client =
3659+
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
3660+
TransactionRunner runner =
3661+
client.readWriteTransaction(Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
3662+
runner.run(
3663+
transaction -> {
3664+
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
3665+
return null;
3666+
});
3667+
3668+
List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
3669+
assertThat(requests).hasSize(1);
3670+
CommitRequest request = requests.get(0);
3671+
assertNotNull(request.getMaxCommitDelay());
3672+
assertEquals(
3673+
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
3674+
request.getMaxCommitDelay());
3675+
}
3676+
3677+
@Test
3678+
public void testTransactionManagerCommitWithMaxCommitDelay() {
3679+
DatabaseClient client =
3680+
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
3681+
TransactionManager manager =
3682+
client.transactionManager(Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
3683+
TransactionContext transaction = manager.begin();
3684+
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
3685+
manager.commit();
3686+
3687+
List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
3688+
assertThat(requests).hasSize(1);
3689+
CommitRequest request = requests.get(0);
3690+
assertNotNull(request.getMaxCommitDelay());
3691+
assertEquals(
3692+
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
3693+
request.getMaxCommitDelay());
3694+
}
3695+
3696+
@Test
3697+
public void testAsyncRunnerCommitWithMaxCommitDelay() {
3698+
DatabaseClient client =
3699+
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
3700+
AsyncRunner runner = client.runAsync(Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
3701+
get(
3702+
runner.runAsync(
3703+
txn -> {
3704+
txn.buffer(Mutation.delete("TEST", KeySet.all()));
3705+
return ApiFutures.immediateFuture(null);
3706+
},
3707+
executor));
3708+
3709+
List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
3710+
assertThat(requests).hasSize(1);
3711+
CommitRequest request = requests.get(0);
3712+
assertNotNull(request.getMaxCommitDelay());
3713+
assertEquals(
3714+
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
3715+
request.getMaxCommitDelay());
3716+
}
3717+
3718+
@Test
3719+
public void testAsyncTransactionManagerCommitWithMaxCommitDelay() {
3720+
DatabaseClient client =
3721+
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
3722+
try (AsyncTransactionManager manager =
3723+
client.transactionManagerAsync(Options.maxCommitDelay(java.time.Duration.ofMillis(100)))) {
3724+
TransactionContextFuture transaction = manager.beginAsync();
3725+
get(
3726+
transaction
3727+
.then(
3728+
(txn, input) -> {
3729+
txn.buffer(Mutation.delete("TEST", KeySet.all()));
3730+
return ApiFutures.immediateFuture(null);
3731+
},
3732+
executor)
3733+
.commitAsync());
3734+
}
3735+
3736+
List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
3737+
assertThat(requests).hasSize(1);
3738+
CommitRequest request = requests.get(0);
3739+
assertNotNull(request.getMaxCommitDelay());
3740+
assertEquals(
3741+
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
3742+
request.getMaxCommitDelay());
3743+
}
3744+
36383745
@Test
36393746
public void singleUseNoAction_ClearsCheckedOutSession() {
36403747
DatabaseClientImpl client =

google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,22 @@ public void batchWriteAtLeastOnce() {
304304
}
305305
}
306306

307+
@Test
308+
public void testWriteWithMaxCommitDelay() {
309+
CommitResponse response =
310+
client.writeWithOptions(
311+
Collections.singletonList(
312+
Mutation.newInsertOrUpdateBuilder("T")
313+
.set("K")
314+
.to(lastKey = uniqueString())
315+
.set("StringValue")
316+
.to("v1")
317+
.build()),
318+
Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
319+
assertNotNull(response);
320+
assertNotNull(response.getCommitTimestamp());
321+
}
322+
307323
@Test
308324
public void testWriteReturnsCommitStats() {
309325
assumeFalse("Emulator does not return commit statistics", isUsingEmulator());

0 commit comments

Comments
 (0)