Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
cloud.google.com/go/iam v1.1.8
cloud.google.com/go/kms v1.17.1
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.12.5
go.einride.tech/aip v0.67.1
go.opencensus.io v0.24.0
Expand Down
1 change: 1 addition & 0 deletions pubsub/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.12.5 h1:8gw9KZK8TiVKB6q3zHY3SBzLnrGp6HQjyfYBYGmXdxA=
Expand Down
2 changes: 1 addition & 1 deletion pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOpt
maxMessages = 0
maxBytes = 0
}
ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes, po.maxExtensionPeriod)
ps = newPullStream(context.Background(), subc.StreamingPull, subName, po.clientID, maxMessages, maxBytes, po.maxExtensionPeriod)
}
// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
// the first keepAlive halfway towards the minimum ack deadline.
Expand Down
3 changes: 2 additions & 1 deletion pubsub/pullstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type pullStream struct {
// for testing
type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error)

func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream {
func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName, clientID string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream {
ctx = withSubscriptionKey(ctx, subName)
hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(subName))}
ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
Expand All @@ -62,6 +62,7 @@ func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName
}
err = spc.Send(&pb.StreamingPullRequest{
Subscription: subName,
ClientId: clientID,
StreamAckDeadlineSeconds: streamAckDeadline,
MaxOutstandingMessages: int64(maxOutstandingMessages),
MaxOutstandingBytes: int64(maxOutstandingBytes),
Expand Down
2 changes: 1 addition & 1 deletion pubsub/pullstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestPullStreamGet(t *testing.T) {
test.errors = test.errors[1:]
return &testStreamingPullClient{sendError: err}, nil
}
ps := newPullStream(context.Background(), streamingPull, "", 100, 1000, 0)
ps := newPullStream(context.Background(), streamingPull, "", "", 100, 1000, 0)
_, err := ps.get(nil)
if got := status.Code(err); got != test.wantCode {
t.Errorf("%s: got %s, want %s", test.desc, got, test.wantCode)
Expand Down
13 changes: 11 additions & 2 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"cloud.google.com/go/internal/optional"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/internal/scheduler"
"github.com/google/uuid"
gax "github.com/googleapis/gax-go/v2"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
Expand All @@ -50,6 +51,11 @@ type Subscription struct {

mu sync.Mutex
receiveActive bool

// clientID to be used across all streaming pull connections that are created.
// This indicates to the server that any guarantees made for a stream that
// disconnected will be made for the stream that is created to replace it.
clientID string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there use cases for making this visible to users?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java the field is marked private, so until that changes, I'm incline to leave as-is.

}

// Subscription creates a reference to a subscription.
Expand All @@ -60,8 +66,9 @@ func (c *Client) Subscription(id string) *Subscription {
// SubscriptionInProject creates a reference to a subscription in a given project.
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
return &Subscription{
c: c,
name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
c: c,
name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
clientID: uuid.NewString(),
}
}

Expand Down Expand Up @@ -1280,6 +1287,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
maxOutstandingMessages: maxCount,
maxOutstandingBytes: maxBytes,
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
clientID: s.clientID,
}
fc := newSubscriptionFlowController(FlowControlSettings{
MaxOutstandingMessages: maxCount,
Expand Down Expand Up @@ -1446,4 +1454,5 @@ type pullOptions struct {
maxOutstandingMessages int
maxOutstandingBytes int
useLegacyFlowControl bool
clientID string
}