Skip to content

Commit 757f1ca

Browse files
feat(spanner): set client wide ReadOptions, ApplyOptions, and TransactionOptions (#6486)
* feat(spanner): set client wide ReadOptions, ApplyOptions, and TransactionOptions * Add missing unit tests Co-authored-by: rahul2393 <rahulyadavsep92@gmail.com>
1 parent b3ec895 commit 757f1ca

File tree

5 files changed

+513
-22
lines changed

5 files changed

+513
-22
lines changed

spanner/batch.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,13 @@ func (t *BatchReadOnlyTransaction) PartitionRead(ctx context.Context, table stri
104104
// can be configured using PartitionOptions. Pass a ReadOptions to modify the
105105
// read operation.
106106
func (t *BatchReadOnlyTransaction) PartitionReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opt PartitionOptions, readOptions ReadOptions) ([]*Partition, error) {
107-
return t.PartitionReadUsingIndexWithOptions(ctx, table, "", keys, columns, opt, readOptions)
107+
return t.PartitionReadUsingIndexWithOptions(ctx, table, "", keys, columns, opt, t.ReadOnlyTransaction.txReadOnly.ro.merge(readOptions))
108108
}
109109

110110
// PartitionReadUsingIndex returns a list of Partitions that can be used to read
111111
// rows from the database using an index.
112112
func (t *BatchReadOnlyTransaction) PartitionReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string, opt PartitionOptions) ([]*Partition, error) {
113-
return t.PartitionReadUsingIndexWithOptions(ctx, table, index, keys, columns, opt, ReadOptions{})
113+
return t.PartitionReadUsingIndexWithOptions(ctx, table, index, keys, columns, opt, t.ReadOnlyTransaction.txReadOnly.ro)
114114
}
115115

116116
// PartitionReadUsingIndexWithOptions returns a list of Partitions that can be

spanner/batch_test.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ import (
2222
"testing"
2323
"time"
2424

25-
. "cloud.google.com/go/spanner/internal/testutil"
2625
sppb "google.golang.org/genproto/googleapis/spanner/v1"
26+
27+
. "cloud.google.com/go/spanner/internal/testutil"
2728
)
2829

2930
func TestPartitionRoundTrip(t *testing.T) {
@@ -120,6 +121,77 @@ func TestPartitionQuery_QueryOptions(t *testing.T) {
120121
}
121122
}
122123

124+
func TestPartitionQuery_ReadOptions(t *testing.T) {
125+
testcases := []ReadOptionsTestCase{
126+
{
127+
name: "Client level",
128+
client: &ReadOptions{Index: "testIndex", Limit: 100, Priority: sppb.RequestOptions_PRIORITY_LOW, RequestTag: "testRequestTag"},
129+
// Index and Limit are always ignored
130+
want: &ReadOptions{Index: "", Limit: 0, Priority: sppb.RequestOptions_PRIORITY_LOW, RequestTag: "testRequestTag"},
131+
},
132+
{
133+
name: "Read level",
134+
client: &ReadOptions{},
135+
read: &ReadOptions{Index: "testIndex", Limit: 100, Priority: sppb.RequestOptions_PRIORITY_LOW, RequestTag: "testRequestTag"},
136+
// Index and Limit are always ignored
137+
want: &ReadOptions{Index: "", Limit: 0, Priority: sppb.RequestOptions_PRIORITY_LOW, RequestTag: "testRequestTag"},
138+
},
139+
{
140+
name: "Read level has precedence than client level",
141+
client: &ReadOptions{Index: "clientIndex", Limit: 10, Priority: sppb.RequestOptions_PRIORITY_LOW, RequestTag: "clientRequestTag"},
142+
read: &ReadOptions{Index: "readIndex", Limit: 20, Priority: sppb.RequestOptions_PRIORITY_MEDIUM, RequestTag: "readRequestTag"},
143+
// Index and Limit are always ignored
144+
want: &ReadOptions{Index: "", Limit: 0, Priority: sppb.RequestOptions_PRIORITY_MEDIUM, RequestTag: "readRequestTag"},
145+
},
146+
}
147+
148+
for _, tt := range testcases {
149+
t.Run(tt.name, func(t *testing.T) {
150+
ctx := context.Background()
151+
_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ReadOptions: *tt.client})
152+
defer teardown()
153+
154+
var (
155+
err error
156+
txn *BatchReadOnlyTransaction
157+
ps []*Partition
158+
)
159+
160+
if txn, err = client.BatchReadOnlyTransaction(ctx, StrongRead()); err != nil {
161+
t.Fatal(err)
162+
}
163+
defer txn.Cleanup(ctx)
164+
165+
if tt.read == nil {
166+
ps, err = txn.PartitionRead(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, PartitionOptions{0, 3})
167+
} else {
168+
ps, err = txn.PartitionReadWithOptions(ctx, "Albums", KeySets(Key{"foo"}), []string{"SingerId", "AlbumId", "AlbumTitle"}, PartitionOptions{0, 3}, *tt.read)
169+
}
170+
if err != nil {
171+
t.Fatal(err)
172+
}
173+
174+
for _, p := range ps {
175+
req := p.rreq
176+
if got, want := req.Index, tt.want.Index; got != want {
177+
t.Fatalf("Incorrect index: got %v, want %v", got, want)
178+
}
179+
if got, want := req.Limit, int64(tt.want.Limit); got != want {
180+
t.Fatalf("Incorrect limit: got %v, want %v", got, want)
181+
}
182+
183+
ro := req.RequestOptions
184+
if got, want := ro.Priority, tt.want.Priority; got != want {
185+
t.Fatalf("Incorrect priority: got %v, want %v", got, want)
186+
}
187+
if got, want := ro.RequestTag, tt.want.RequestTag; got != want {
188+
t.Fatalf("Incorrect request tag: got %v, want %v", got, want)
189+
}
190+
}
191+
})
192+
}
193+
}
194+
123195
func TestPartitionQuery_Parallel(t *testing.T) {
124196
ctx := context.Background()
125197
server, client, teardown := setupMockedTestServer(t)

spanner/client.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ import (
2525
"time"
2626

2727
"cloud.google.com/go/internal/trace"
28-
vkit "cloud.google.com/go/spanner/apiv1"
29-
"cloud.google.com/go/spanner/internal"
3028
"google.golang.org/api/option"
3129
"google.golang.org/api/option/internaloption"
3230
gtransport "google.golang.org/api/transport/grpc"
@@ -35,6 +33,9 @@ import (
3533
"google.golang.org/grpc/codes"
3634
"google.golang.org/grpc/metadata"
3735

36+
vkit "cloud.google.com/go/spanner/apiv1"
37+
"cloud.google.com/go/spanner/internal"
38+
3839
// Install google-c2p resolver, which is required for direct path.
3940
_ "google.golang.org/grpc/xds/googledirectpath"
4041
// Install RLS load balancer policy, which is needed for gRPC RLS.
@@ -86,6 +87,9 @@ type Client struct {
8687
idleSessions *sessionPool
8788
logger *log.Logger
8889
qo QueryOptions
90+
ro ReadOptions
91+
ao []ApplyOption
92+
txo TransactionOptions
8993
ct *commonTags
9094
}
9195

@@ -117,6 +121,15 @@ type ClientConfig struct {
117121
// QueryOptions is the configuration for executing a sql query.
118122
QueryOptions QueryOptions
119123

124+
// ReadOptions is the configuration for reading rows from a database
125+
ReadOptions ReadOptions
126+
127+
// ApplyOptions is the configuration for applying
128+
ApplyOptions []ApplyOption
129+
130+
// TransactionOptions is the configuration for a transaction.
131+
TransactionOptions TransactionOptions
132+
120133
// CallOptions is the configuration for providing custom retry settings that
121134
// override the default values.
122135
CallOptions *vkit.CallOptions
@@ -211,6 +224,9 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
211224
idleSessions: sp,
212225
logger: config.logger,
213226
qo: getQueryOptions(config.QueryOptions),
227+
ro: config.ReadOptions,
228+
ao: config.ApplyOptions,
229+
txo: config.TransactionOptions,
214230
ct: getCommonTags(sc),
215231
}
216232
return c, nil
@@ -273,6 +289,7 @@ func (c *Client) Single() *ReadOnlyTransaction {
273289
t.txReadOnly.sp = c.idleSessions
274290
t.txReadOnly.txReadEnv = t
275291
t.txReadOnly.qo = c.qo
292+
t.txReadOnly.ro = c.ro
276293
t.txReadOnly.replaceSessionFunc = func(ctx context.Context) error {
277294
if t.sh == nil {
278295
return spannerErrorf(codes.InvalidArgument, "missing session handle on transaction")
@@ -309,6 +326,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
309326
t.txReadOnly.sp = c.idleSessions
310327
t.txReadOnly.txReadEnv = t
311328
t.txReadOnly.qo = c.qo
329+
t.txReadOnly.ro = c.ro
312330
t.ct = c.ct
313331
return t
314332
}
@@ -378,6 +396,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
378396
t.txReadOnly.sh = sh
379397
t.txReadOnly.txReadEnv = t
380398
t.txReadOnly.qo = c.qo
399+
t.txReadOnly.ro = c.ro
381400
t.ct = c.ct
382401
return t, nil
383402
}
@@ -406,6 +425,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
406425
t.txReadOnly.sh = sh
407426
t.txReadOnly.txReadEnv = t
408427
t.txReadOnly.qo = c.qo
428+
t.txReadOnly.ro = c.ro
409429
t.ct = c.ct
410430
return t
411431
}
@@ -491,7 +511,8 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
491511
t.txReadOnly.sh = sh
492512
t.txReadOnly.txReadEnv = t
493513
t.txReadOnly.qo = c.qo
494-
t.txOpts = options
514+
t.txReadOnly.ro = c.ro
515+
t.txOpts = c.txo.merge(options)
495516
t.ct = c.ct
496517

497518
trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
@@ -555,6 +576,11 @@ func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
555576
// Apply applies a list of mutations atomically to the database.
556577
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
557578
ao := &applyOption{}
579+
580+
for _, opt := range c.ao {
581+
opt(ao)
582+
}
583+
558584
for _, opt := range opts {
559585
opt(ao)
560586
}

0 commit comments

Comments
 (0)