Skip to content

Commit 9bba269

Browse files
authored
feat(pubsub): support kinesis ingestion admin (#9458)
* feat(pubsub): support kinesis ingestion admin * add comments to kinesis struct fields * reword comments * capitalize Kinesis in comments
1 parent 5ca0271 commit 9bba269

File tree

3 files changed

+233
-10
lines changed

3 files changed

+233
-10
lines changed

pubsub/pstest/fake.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,10 @@ func (s *GServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error)
320320
if err := checkTopicMessageRetention(t.MessageRetentionDuration); err != nil {
321321
return nil, err
322322
}
323+
// Take any ingestion setting to mean the topic is active.
324+
if t.IngestionDataSourceSettings != nil {
325+
t.State = pb.Topic_ACTIVE
326+
}
323327
top := newTopic(t)
324328
s.topics[t.Name] = top
325329
return top.proto, nil
@@ -384,6 +388,15 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p
384388
t.proto.SchemaSettings = &pb.SchemaSettings{}
385389
}
386390
t.proto.SchemaSettings.LastRevisionId = req.Topic.SchemaSettings.LastRevisionId
391+
case "ingestion_data_source_settings":
392+
if t.proto.IngestionDataSourceSettings == nil {
393+
t.proto.IngestionDataSourceSettings = &pb.IngestionDataSourceSettings{}
394+
}
395+
t.proto.IngestionDataSourceSettings = req.Topic.IngestionDataSourceSettings
396+
// Take any ingestion setting to mean the topic is active.
397+
if t.proto.IngestionDataSourceSettings != nil {
398+
t.proto.State = pb.Topic_ACTIVE
399+
}
387400
default:
388401
return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path)
389402
}

pubsub/topic.go

Lines changed: 163 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,23 @@ func newTopic(c *Client, name string) *Topic {
202202
}
203203
}
204204

205+
// TopicState denotes the possible states for a topic.
206+
type TopicState int
207+
208+
const (
209+
// TopicStateUnspecified is the default value. This value is unused.
210+
TopicStateUnspecified = iota
211+
212+
// TopicStateActive means the topic does not have any persistent errors.
213+
TopicStateActive
214+
215+
// TopicStateIngestionResourceError means ingestion from the data source
216+
// has encountered a permanent error.
217+
// See the more detailed error state in the corresponding ingestion
218+
// source configuration.
219+
TopicStateIngestionResourceError
220+
)
221+
205222
// TopicConfig describes the configuration of a topic.
206223
type TopicConfig struct {
207224
// The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
@@ -232,6 +249,13 @@ type TopicConfig struct {
232249
//
233250
// For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention.
234251
RetentionDuration optional.Duration
252+
253+
// State is an output-only field indicating the state of the topic.
254+
State TopicState
255+
256+
// IngestionDataSourceSettings are settings for ingestion from a
257+
// data source into this topic.
258+
IngestionDataSourceSettings *IngestionDataSourceSettings
235259
}
236260

237261
// String returns the printable globally unique name for the topic config.
@@ -260,11 +284,12 @@ func (tc *TopicConfig) toProto() *pb.Topic {
260284
retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration))
261285
}
262286
pbt := &pb.Topic{
263-
Labels: tc.Labels,
264-
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
265-
KmsKeyName: tc.KMSKeyName,
266-
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
267-
MessageRetentionDuration: retDur,
287+
Labels: tc.Labels,
288+
MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
289+
KmsKeyName: tc.KMSKeyName,
290+
SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
291+
MessageRetentionDuration: retDur,
292+
IngestionDataSourceSettings: tc.IngestionDataSourceSettings.toProto(),
268293
}
269294
return pbt
270295
}
@@ -296,15 +321,23 @@ type TopicConfigToUpdate struct {
296321
//
297322
// Use the zero value &SchemaSettings{} to remove the schema from the topic.
298323
SchemaSettings *SchemaSettings
324+
325+
// IngestionDataSourceSettings are settings for ingestion from a
326+
// data source into this topic.
327+
//
328+
// Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic.
329+
IngestionDataSourceSettings *IngestionDataSourceSettings
299330
}
300331

301332
func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
302333
tc := TopicConfig{
303-
name: pbt.Name,
304-
Labels: pbt.Labels,
305-
MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
306-
KMSKeyName: pbt.KmsKeyName,
307-
SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings),
334+
name: pbt.Name,
335+
Labels: pbt.Labels,
336+
MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
337+
KMSKeyName: pbt.KmsKeyName,
338+
SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings),
339+
State: TopicState(pbt.State),
340+
IngestionDataSourceSettings: protoToIngestionDataSourceSettings(pbt.IngestionDataSourceSettings),
308341
}
309342
if pbt.GetMessageRetentionDuration() != nil {
310343
tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration()
@@ -364,6 +397,122 @@ func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePo
364397
return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
365398
}
366399

400+
// IngestionDataSourceSettings enables ingestion from a data source into this topic.
401+
type IngestionDataSourceSettings struct {
402+
Source IngestionDataSource
403+
}
404+
405+
// IngestionDataSource is the kind of ingestion source to be used.
406+
type IngestionDataSource interface {
407+
isIngestionDataSource() bool
408+
}
409+
410+
// AWSKinesisState denotes the possible states for ingestion from Amazon Kinesis Data Streams.
411+
type AWSKinesisState int
412+
413+
const (
414+
// AWSKinesisStateUnspecified is the default value. This value is unused.
415+
AWSKinesisStateUnspecified = iota
416+
417+
// AWSKinesisStateActive means ingestion is active.
418+
AWSKinesisStateActive
419+
420+
// AWSKinesisStatePermissionDenied means encountering an error while consumign data from Kinesis.
421+
// This can happen if:
422+
// - The provided `aws_role_arn` does not exist or does not have the
423+
// appropriate permissions attached.
424+
// - The provided `aws_role_arn` is not set up properly for Identity
425+
// Federation using `gcp_service_account`.
426+
// - The Pub/Sub SA is not granted the
427+
// `iam.serviceAccounts.getOpenIdToken` permission on
428+
// `gcp_service_account`.
429+
AWSKinesisStatePermissionDenied
430+
431+
// AWSKinesisStatePublishPermissionDenied means permission denied encountered while publishing to the topic.
432+
// This can happen due to Pub/Sub SA has not been granted the appropriate publish
433+
// permissions https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher
434+
AWSKinesisStatePublishPermissionDenied
435+
436+
// AWSKinesisStateStreamNotFound means the Kinesis stream does not exist.
437+
AWSKinesisStateStreamNotFound
438+
439+
// AWSKinesisStateConsumerNotFound means the Kinesis consumer does not exist.
440+
AWSKinesisStateConsumerNotFound
441+
)
442+
443+
// IngestionDataSourceAWSKinesis are ingestion settings for Amazon Kinesis Data Streams.
444+
type IngestionDataSourceAWSKinesis struct {
445+
// State is an output-only field indicating the state of the kinesis connection.
446+
State AWSKinesisState
447+
448+
// StreamARN is the Kinesis stream ARN to ingest data from.
449+
StreamARN string
450+
451+
// ConsumerARn is the Kinesis consumer ARN to used for ingestion in Enhanced
452+
// Fan-Out mode. The consumer must be already created and ready to be used.
453+
ConsumerARN string
454+
455+
// AWSRoleARn is the AWS role ARN to be used for Federated Identity authentication
456+
// with Kinesis. Check the Pub/Sub docs for how to set up this role and the
457+
// required permissions that need to be attached to it.
458+
AWSRoleARN string
459+
460+
// GCPServiceAccount is the GCP service account to be used for Federated Identity
461+
// authentication with Kinesis (via a `AssumeRoleWithWebIdentity` call for
462+
// the provided role). The `aws_role_arn` must be set up with
463+
// `accounts.google.com:sub` equals to this service account number.
464+
GCPServiceAccount string
465+
}
466+
467+
var _ IngestionDataSource = (*IngestionDataSourceAWSKinesis)(nil)
468+
469+
func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool {
470+
return true
471+
}
472+
473+
func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings {
474+
if pbs == nil {
475+
return nil
476+
}
477+
478+
s := &IngestionDataSourceSettings{}
479+
if k := pbs.GetAwsKinesis(); k != nil {
480+
s.Source = &IngestionDataSourceAWSKinesis{
481+
State: AWSKinesisState(k.State),
482+
StreamARN: k.GetStreamArn(),
483+
ConsumerARN: k.GetConsumerArn(),
484+
AWSRoleARN: k.GetAwsRoleArn(),
485+
GCPServiceAccount: k.GetGcpServiceAccount(),
486+
}
487+
}
488+
return s
489+
}
490+
491+
func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings {
492+
if i == nil {
493+
return nil
494+
}
495+
// An empty/zero-valued config is treated the same as nil and clearing this setting.
496+
if (IngestionDataSourceSettings{}) == *i {
497+
return nil
498+
}
499+
pbs := &pb.IngestionDataSourceSettings{}
500+
if out := i.Source; out != nil {
501+
if k, ok := out.(*IngestionDataSourceAWSKinesis); ok {
502+
pbs.Source = &pb.IngestionDataSourceSettings_AwsKinesis_{
503+
AwsKinesis: &pb.IngestionDataSourceSettings_AwsKinesis{
504+
State: pb.IngestionDataSourceSettings_AwsKinesis_State(k.State),
505+
StreamArn: k.StreamARN,
506+
ConsumerArn: k.ConsumerARN,
507+
AwsRoleArn: k.AWSRoleARN,
508+
GcpServiceAccount: k.GCPServiceAccount,
509+
},
510+
}
511+
}
512+
}
513+
return pbs
514+
}
515+
367516
// Config returns the TopicConfig for the topic.
368517
func (t *Topic) Config(ctx context.Context) (TopicConfig, error) {
369518
pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
@@ -437,6 +586,10 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
437586
pt.SchemaSettings = nil
438587
}
439588
}
589+
if cfg.IngestionDataSourceSettings != nil {
590+
pt.IngestionDataSourceSettings = cfg.IngestionDataSourceSettings.toProto()
591+
paths = append(paths, "ingestion_data_source_settings")
592+
}
440593
return &pb.UpdateTopicRequest{
441594
Topic: pt,
442595
UpdateMask: &fmpb.FieldMask{Paths: paths},

pubsub/topic_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,63 @@ func TestCreateTopicWithConfig(t *testing.T) {
108108
}
109109
}
110110

111+
func TestTopic_IngestionKinesis(t *testing.T) {
112+
c, srv := newFake(t)
113+
defer c.Close()
114+
defer srv.Close()
115+
116+
id := "test-topic-kinesis"
117+
want := TopicConfig{
118+
IngestionDataSourceSettings: &IngestionDataSourceSettings{
119+
Source: &IngestionDataSourceAWSKinesis{
120+
StreamARN: "fake-stream-arn",
121+
ConsumerARN: "fake-consumer-arn",
122+
AWSRoleARN: "fake-aws-role-arn",
123+
GCPServiceAccount: "fake-gcp-sa",
124+
},
125+
},
126+
}
127+
128+
topic := mustCreateTopicWithConfig(t, c, id, &want)
129+
got, err := topic.Config(context.Background())
130+
if err != nil {
131+
t.Fatalf("error getting topic config: %v", err)
132+
}
133+
want.State = TopicStateActive
134+
opt := cmpopts.IgnoreUnexported(TopicConfig{})
135+
if !testutil.Equal(got, want, opt) {
136+
t.Errorf("got %v, want %v", got, want)
137+
}
138+
139+
// Update ingestion settings.
140+
ctx := context.Background()
141+
settings := &IngestionDataSourceSettings{
142+
Source: &IngestionDataSourceAWSKinesis{
143+
StreamARN: "fake-stream-arn-2",
144+
ConsumerARN: "fake-consumer-arn-2",
145+
AWSRoleARN: "aws-role-arn-2",
146+
GCPServiceAccount: "gcp-service-account-2",
147+
},
148+
}
149+
config2, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
150+
if err != nil {
151+
t.Fatal(err)
152+
}
153+
if !testutil.Equal(config2.IngestionDataSourceSettings, settings, opt) {
154+
t.Errorf("\ngot %+v\nwant %+v", config2.IngestionDataSourceSettings, settings)
155+
}
156+
157+
// Clear schema settings.
158+
settings = &IngestionDataSourceSettings{}
159+
config3, err := topic.Update(ctx, TopicConfigToUpdate{IngestionDataSourceSettings: settings})
160+
if err != nil {
161+
t.Fatal(err)
162+
}
163+
if config3.IngestionDataSourceSettings != nil {
164+
t.Errorf("got: %+v, want nil", config3.IngestionDataSourceSettings)
165+
}
166+
}
167+
111168
func TestListTopics(t *testing.T) {
112169
ctx := context.Background()
113170
c, srv := newFake(t)

0 commit comments

Comments
 (0)