Skip to content

Commit 43cc5bc

Browse files
authored
feat(firestore): adds snapshot reads impl. (#6718)
* feat(firestore): adds snapshot reads impl.
1 parent f968297 commit 43cc5bc

14 files changed

+428
-45
lines changed

firestore/client.go

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"google.golang.org/grpc/codes"
3737
"google.golang.org/grpc/metadata"
3838
"google.golang.org/grpc/status"
39+
"google.golang.org/protobuf/types/known/timestamppb"
3940
)
4041

4142
// resourcePrefixHeader is the name of the metadata header used to indicate
@@ -53,9 +54,10 @@ const DetectProjectID = "*detect-project-id*"
5354

5455
// A Client provides access to the Firestore service.
5556
type Client struct {
56-
c *vkit.Client
57-
projectID string
58-
databaseID string // A client is tied to a single database.
57+
c *vkit.Client
58+
projectID string
59+
databaseID string // A client is tied to a single database.
60+
readSettings *readSettings // readSettings allows setting a snapshot time to read the database
5961
}
6062

6163
// NewClient creates a new Firestore client that uses the given project.
@@ -94,9 +96,10 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
9496
}
9597
vc.SetGoogleClientInfo("gccl", internal.Version)
9698
c := &Client{
97-
c: vc,
98-
projectID: projectID,
99-
databaseID: "(default)", // always "(default)", for now
99+
c: vc,
100+
projectID: projectID,
101+
databaseID: "(default)", // always "(default)", for now
102+
readSettings: &readSettings{},
100103
}
101104
return c, nil
102105
}
@@ -199,10 +202,10 @@ func (c *Client) GetAll(ctx context.Context, docRefs []*DocumentRef) (_ []*Docum
199202
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.GetAll")
200203
defer func() { trace.EndSpan(ctx, err) }()
201204

202-
return c.getAll(ctx, docRefs, nil)
205+
return c.getAll(ctx, docRefs, nil, nil)
203206
}
204207

205-
func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte) (_ []*DocumentSnapshot, err error) {
208+
func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte, rs *readSettings) (_ []*DocumentSnapshot, err error) {
206209
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.BatchGetDocuments")
207210
defer func() { trace.EndSpan(ctx, err) }()
208211

@@ -219,9 +222,18 @@ func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte)
219222
Database: c.path(),
220223
Documents: docNames,
221224
}
225+
226+
// Note that transaction ID and other consistency selectors are mutually exclusive.
227+
// We respect the transaction first, any read options passed by the caller second,
228+
// and any read options stored in the client third.
229+
if rt, hasOpts := parseReadTime(c, rs); hasOpts {
230+
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_ReadTime{ReadTime: rt}
231+
}
232+
222233
if tid != nil {
223-
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_Transaction{tid}
234+
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_Transaction{Transaction: tid}
224235
}
236+
225237
streamClient, err := c.c.BatchGetDocuments(withResourceHeader(ctx, req.Database), req)
226238
if err != nil {
227239
return nil, err
@@ -306,6 +318,15 @@ func (c *Client) BulkWriter(ctx context.Context) *BulkWriter {
306318
return bw
307319
}
308320

321+
// WithReadOptions specifies constraints for accessing documents from the database,
322+
// e.g. at what time snapshot to read the documents.
323+
func (c *Client) WithReadOptions(opts ...ReadOption) *Client {
324+
for _, ro := range opts {
325+
ro.apply(c.readSettings)
326+
}
327+
return c
328+
}
329+
309330
// commit calls the Commit RPC outside of a transaction.
310331
func (c *Client) commit(ctx context.Context, ws []*pb.Write) (_ []*WriteResult, err error) {
311332
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.commit")
@@ -381,3 +402,35 @@ func (ec emulatorCreds) GetRequestMetadata(ctx context.Context, uri ...string) (
381402
func (ec emulatorCreds) RequireTransportSecurity() bool {
382403
return false
383404
}
405+
406+
// ReadTime specifies a time-specific snapshot of the database to read.
407+
func ReadTime(t time.Time) ReadOption {
408+
return readTime(t)
409+
}
410+
411+
type readTime time.Time
412+
413+
func (rt readTime) apply(rs *readSettings) {
414+
rs.readTime = time.Time(rt)
415+
}
416+
417+
// ReadOption interface allows for abstraction of computing read time settings.
418+
type ReadOption interface {
419+
apply(*readSettings)
420+
}
421+
422+
// readSettings contains the ReadOptions for a read operation
423+
type readSettings struct {
424+
readTime time.Time
425+
}
426+
427+
// parseReadTime ensures that fallback order of read options is respected.
428+
func parseReadTime(c *Client, rs *readSettings) (*timestamppb.Timestamp, bool) {
429+
if rs != nil && !rs.readTime.IsZero() {
430+
return &timestamppb.Timestamp{Seconds: int64(rs.readTime.Unix())}, true
431+
}
432+
if c.readSettings != nil && !c.readSettings.readTime.IsZero() {
433+
return &timestamppb.Timestamp{Seconds: int64(c.readSettings.readTime.Unix())}, true
434+
}
435+
return nil, false
436+
}

firestore/client_test.go

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,19 @@ package firestore
1717
import (
1818
"context"
1919
"testing"
20+
"time"
2021

2122
tspb "github.com/golang/protobuf/ptypes/timestamp"
2223
pb "google.golang.org/genproto/googleapis/firestore/v1"
2324
"google.golang.org/grpc/codes"
2425
"google.golang.org/grpc/status"
26+
"google.golang.org/protobuf/types/known/timestamppb"
2527
)
2628

2729
var testClient = &Client{
28-
projectID: "projectID",
29-
databaseID: "(default)",
30+
projectID: "projectID",
31+
databaseID: "(default)",
32+
readSettings: &readSettings{},
3033
}
3134

3235
func TestClientCollectionAndDoc(t *testing.T) {
@@ -45,16 +48,18 @@ func TestClientCollectionAndDoc(t *testing.T) {
4548
path: "projects/projectID/databases/(default)/documents/X",
4649
parentPath: db + "/documents",
4750
},
51+
readSettings: &readSettings{},
4852
}
4953
if !testEqual(coll1, wantc1) {
5054
t.Fatalf("got\n%+v\nwant\n%+v", coll1, wantc1)
5155
}
5256
doc1 := testClient.Doc("X/a")
5357
wantd1 := &DocumentRef{
54-
Parent: coll1,
55-
ID: "a",
56-
Path: "projects/projectID/databases/(default)/documents/X/a",
57-
shortPath: "X/a",
58+
Parent: coll1,
59+
ID: "a",
60+
Path: "projects/projectID/databases/(default)/documents/X/a",
61+
shortPath: "X/a",
62+
readSettings: &readSettings{},
5863
}
5964

6065
if !testEqual(doc1, wantd1) {
@@ -309,3 +314,44 @@ func TestGetAllErrors(t *testing.T) {
309314
t.Error("got nil, want error")
310315
}
311316
}
317+
318+
func TestClient_WithReadOptions(t *testing.T) {
319+
ctx := context.Background()
320+
c, srv, cleanup := newMock(t)
321+
defer cleanup()
322+
323+
const dbPath = "projects/projectID/databases/(default)"
324+
const docPath = dbPath + "/documents/C/a"
325+
tm := time.Date(2021, time.February, 20, 0, 0, 0, 0, time.UTC)
326+
327+
dr := &DocumentRef{
328+
Parent: &CollectionRef{
329+
c: c,
330+
},
331+
ID: "123",
332+
Path: docPath,
333+
}
334+
335+
srv.addRPC(&pb.BatchGetDocumentsRequest{
336+
Database: dbPath,
337+
Documents: []string{docPath},
338+
ConsistencySelector: &pb.BatchGetDocumentsRequest_ReadTime{
339+
ReadTime: &timestamppb.Timestamp{Seconds: tm.Unix()},
340+
},
341+
}, []interface{}{
342+
&pb.BatchGetDocumentsResponse{
343+
ReadTime: &timestamppb.Timestamp{Seconds: tm.Unix()},
344+
Result: &pb.BatchGetDocumentsResponse_Found{
345+
Found: &pb.Document{},
346+
},
347+
},
348+
})
349+
350+
_, err := c.WithReadOptions(ReadTime(tm)).GetAll(ctx, []*DocumentRef{
351+
dr,
352+
})
353+
354+
if err != nil {
355+
t.Fatal(err)
356+
}
357+
}

firestore/collref.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ type CollectionRef struct {
4949

5050
// Use the methods of Query on a CollectionRef to create and run queries.
5151
Query
52+
53+
// readSettings specifies constraints for reading documents in the collection
54+
// e.g. read time
55+
readSettings *readSettings
5256
}
5357

5458
func newTopLevelCollRef(c *Client, dbPath, id string) *CollectionRef {
@@ -64,6 +68,7 @@ func newTopLevelCollRef(c *Client, dbPath, id string) *CollectionRef {
6468
path: dbPath + "/documents/" + id,
6569
parentPath: dbPath + "/documents",
6670
},
71+
readSettings: &readSettings{},
6772
}
6873
}
6974

@@ -82,6 +87,7 @@ func newCollRefWithParent(c *Client, parent *DocumentRef, id string) *Collection
8287
path: parent.Path + "/" + id,
8388
parentPath: parent.Path,
8489
},
90+
readSettings: &readSettings{},
8591
}
8692
}
8793

@@ -121,7 +127,7 @@ func (c *CollectionRef) Add(ctx context.Context, data interface{}) (*DocumentRef
121127
// missing documents. A missing document is a document that does not exist but has
122128
// sub-documents.
123129
func (c *CollectionRef) DocumentRefs(ctx context.Context) *DocumentRefIterator {
124-
return newDocumentRefIterator(ctx, c, nil)
130+
return newDocumentRefIterator(ctx, c, nil, c.readSettings)
125131
}
126132

127133
const alphanum = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
@@ -136,3 +142,12 @@ func uniqueID() string {
136142
}
137143
return string(b)
138144
}
145+
146+
// WithReadOptions specifies constraints for accessing documents from the database,
147+
// e.g. at what time snapshot to read the documents.
148+
func (c *CollectionRef) WithReadOptions(opts ...ReadOption) *CollectionRef {
149+
for _, ro := range opts {
150+
ro.apply(c.readSettings)
151+
}
152+
return c
153+
}

firestore/collref_test.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,22 @@ package firestore
1717
import (
1818
"context"
1919
"testing"
20+
"time"
2021

2122
"github.com/golang/protobuf/proto"
2223
pb "google.golang.org/genproto/googleapis/firestore/v1"
24+
"google.golang.org/protobuf/types/known/timestamppb"
2325
)
2426

2527
func TestDoc(t *testing.T) {
2628
coll := testClient.Collection("C")
2729
got := coll.Doc("d")
2830
want := &DocumentRef{
29-
Parent: coll,
30-
ID: "d",
31-
Path: "projects/projectID/databases/(default)/documents/C/d",
32-
shortPath: "C/d",
31+
Parent: coll,
32+
ID: "d",
33+
Path: "projects/projectID/databases/(default)/documents/C/d",
34+
shortPath: "C/d",
35+
readSettings: &readSettings{},
3336
}
3437
if !testEqual(got, want) {
3538
t.Errorf("got %+v, want %+v", got, want)
@@ -98,3 +101,35 @@ func TestNilErrors(t *testing.T) {
98101
t.Fatalf("got <%v>, want <%v>", err, errNilDocRef)
99102
}
100103
}
104+
105+
func TestCollRef_WithReadOptions(t *testing.T) {
106+
ctx := context.Background()
107+
c, srv, cleanup := newMock(t)
108+
defer cleanup()
109+
110+
const dbPath = "projects/projectID/databases/(default)"
111+
const docPath = dbPath + "/documents/C/a"
112+
tm := time.Date(2021, time.February, 20, 0, 0, 0, 0, time.UTC)
113+
114+
srv.addRPC(&pb.ListDocumentsRequest{
115+
Parent: dbPath,
116+
CollectionId: "myCollection",
117+
ShowMissing: true,
118+
ConsistencySelector: &pb.ListDocumentsRequest_ReadTime{
119+
ReadTime: &timestamppb.Timestamp{Seconds: tm.Unix()},
120+
},
121+
}, []interface{}{
122+
&pb.ListDocumentsResponse{
123+
Documents: []*pb.Document{
124+
{
125+
Name: docPath,
126+
},
127+
},
128+
},
129+
})
130+
131+
_, err := c.Collection("myCollection").WithReadOptions(ReadTime(tm)).DocumentRefs(ctx).GetAll()
132+
if err == nil {
133+
t.Fatal(err)
134+
}
135+
}

firestore/docref.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,18 @@ type DocumentRef struct {
4747

4848
// The ID of the document: the last component of the resource path.
4949
ID string
50+
51+
// The options (only read time currently supported) for reading this document
52+
readSettings *readSettings
5053
}
5154

5255
func newDocRef(parent *CollectionRef, id string) *DocumentRef {
5356
return &DocumentRef{
54-
Parent: parent,
55-
ID: id,
56-
Path: parent.Path + "/" + id,
57-
shortPath: parent.selfPath + "/" + id,
57+
Parent: parent,
58+
ID: id,
59+
Path: parent.Path + "/" + id,
60+
shortPath: parent.selfPath + "/" + id,
61+
readSettings: &readSettings{},
5862
}
5963
}
6064

@@ -77,7 +81,8 @@ func (d *DocumentRef) Get(ctx context.Context) (_ *DocumentSnapshot, err error)
7781
if d == nil {
7882
return nil, errNilDocRef
7983
}
80-
docsnaps, err := d.Parent.c.getAll(ctx, []*DocumentRef{d}, nil)
84+
85+
docsnaps, err := d.Parent.c.getAll(ctx, []*DocumentRef{d}, nil, d.readSettings)
8186
if err != nil {
8287
return nil, err
8388
}
@@ -803,7 +808,7 @@ type DocumentSnapshotIterator struct {
803808
// Next is not expected to return iterator.Done unless it is called after Stop.
804809
// Rarely, networking issues may also cause iterator.Done to be returned.
805810
func (it *DocumentSnapshotIterator) Next() (*DocumentSnapshot, error) {
806-
btree, _, readTime, err := it.ws.nextSnapshot()
811+
btree, _, rt, err := it.ws.nextSnapshot()
807812
if err != nil {
808813
if err == io.EOF {
809814
err = iterator.Done
@@ -812,7 +817,7 @@ func (it *DocumentSnapshotIterator) Next() (*DocumentSnapshot, error) {
812817
return nil, err
813818
}
814819
if btree.Len() == 0 { // document deleted
815-
return &DocumentSnapshot{Ref: it.docref, ReadTime: readTime}, nil
820+
return &DocumentSnapshot{Ref: it.docref, ReadTime: rt}, nil
816821
}
817822
snap, _ := btree.At(0)
818823
return snap.(*DocumentSnapshot), nil
@@ -824,3 +829,12 @@ func (it *DocumentSnapshotIterator) Next() (*DocumentSnapshot, error) {
824829
func (it *DocumentSnapshotIterator) Stop() {
825830
it.ws.stop()
826831
}
832+
833+
// WithReadOptions specifies constraints for accessing documents from the database,
834+
// e.g. at what time snapshot to read the documents.
835+
func (d *DocumentRef) WithReadOptions(opts ...ReadOption) *DocumentRef {
836+
for _, ro := range opts {
837+
ro.apply(d.readSettings)
838+
}
839+
return d
840+
}

0 commit comments

Comments
 (0)