@@ -202,6 +202,23 @@ func newTopic(c *Client, name string) *Topic {
202
202
}
203
203
}
204
204
205
+ // TopicState denotes the possible states for a topic.
206
+ type TopicState int
207
+
208
+ const (
209
+ // TopicStateUnspecified is the default value. This value is unused.
210
+ TopicStateUnspecified = iota
211
+
212
+ // TopicStateActive means the topic does not have any persistent errors.
213
+ TopicStateActive
214
+
215
+ // TopicStateIngestionResourceError means ingestion from the data source
216
+ // has encountered a permanent error.
217
+ // See the more detailed error state in the corresponding ingestion
218
+ // source configuration.
219
+ TopicStateIngestionResourceError
220
+ )
221
+
205
222
// TopicConfig describes the configuration of a topic.
206
223
type TopicConfig struct {
207
224
// The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
@@ -232,6 +249,13 @@ type TopicConfig struct {
232
249
//
233
250
// For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention.
234
251
RetentionDuration optional.Duration
252
+
253
+ // State is an output-only field indicating the state of the topic.
254
+ State TopicState
255
+
256
+ // IngestionDataSourceSettings are settings for ingestion from a
257
+ // data source into this topic.
258
+ IngestionDataSourceSettings * IngestionDataSourceSettings
235
259
}
236
260
237
261
// String returns the printable globally unique name for the topic config.
@@ -260,11 +284,12 @@ func (tc *TopicConfig) toProto() *pb.Topic {
260
284
retDur = durationpb .New (optional .ToDuration (tc .RetentionDuration ))
261
285
}
262
286
pbt := & pb.Topic {
263
- Labels : tc .Labels ,
264
- MessageStoragePolicy : messageStoragePolicyToProto (& tc .MessageStoragePolicy ),
265
- KmsKeyName : tc .KMSKeyName ,
266
- SchemaSettings : schemaSettingsToProto (tc .SchemaSettings ),
267
- MessageRetentionDuration : retDur ,
287
+ Labels : tc .Labels ,
288
+ MessageStoragePolicy : messageStoragePolicyToProto (& tc .MessageStoragePolicy ),
289
+ KmsKeyName : tc .KMSKeyName ,
290
+ SchemaSettings : schemaSettingsToProto (tc .SchemaSettings ),
291
+ MessageRetentionDuration : retDur ,
292
+ IngestionDataSourceSettings : tc .IngestionDataSourceSettings .toProto (),
268
293
}
269
294
return pbt
270
295
}
@@ -296,15 +321,23 @@ type TopicConfigToUpdate struct {
296
321
//
297
322
// Use the zero value &SchemaSettings{} to remove the schema from the topic.
298
323
SchemaSettings * SchemaSettings
324
+
325
+ // IngestionDataSourceSettings are settings for ingestion from a
326
+ // data source into this topic.
327
+ //
328
+ // Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic.
329
+ IngestionDataSourceSettings * IngestionDataSourceSettings
299
330
}
300
331
301
332
func protoToTopicConfig (pbt * pb.Topic ) TopicConfig {
302
333
tc := TopicConfig {
303
- name : pbt .Name ,
304
- Labels : pbt .Labels ,
305
- MessageStoragePolicy : protoToMessageStoragePolicy (pbt .MessageStoragePolicy ),
306
- KMSKeyName : pbt .KmsKeyName ,
307
- SchemaSettings : protoToSchemaSettings (pbt .SchemaSettings ),
334
+ name : pbt .Name ,
335
+ Labels : pbt .Labels ,
336
+ MessageStoragePolicy : protoToMessageStoragePolicy (pbt .MessageStoragePolicy ),
337
+ KMSKeyName : pbt .KmsKeyName ,
338
+ SchemaSettings : protoToSchemaSettings (pbt .SchemaSettings ),
339
+ State : TopicState (pbt .State ),
340
+ IngestionDataSourceSettings : protoToIngestionDataSourceSettings (pbt .IngestionDataSourceSettings ),
308
341
}
309
342
if pbt .GetMessageRetentionDuration () != nil {
310
343
tc .RetentionDuration = pbt .GetMessageRetentionDuration ().AsDuration ()
@@ -364,6 +397,122 @@ func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePo
364
397
return & pb.MessageStoragePolicy {AllowedPersistenceRegions : msp .AllowedPersistenceRegions }
365
398
}
366
399
400
+ // IngestionDataSourceSettings enables ingestion from a data source into this topic.
401
+ type IngestionDataSourceSettings struct {
402
+ Source IngestionDataSource
403
+ }
404
+
405
+ // IngestionDataSource is the kind of ingestion source to be used.
406
+ type IngestionDataSource interface {
407
+ isIngestionDataSource () bool
408
+ }
409
+
410
+ // AWSKinesisState denotes the possible states for ingestion from Amazon Kinesis Data Streams.
411
+ type AWSKinesisState int
412
+
413
+ const (
414
+ // AWSKinesisStateUnspecified is the default value. This value is unused.
415
+ AWSKinesisStateUnspecified = iota
416
+
417
+ // AWSKinesisStateActive means ingestion is active.
418
+ AWSKinesisStateActive
419
+
420
+ // AWSKinesisStatePermissionDenied means encountering an error while consumign data from Kinesis.
421
+ // This can happen if:
422
+ // - The provided `aws_role_arn` does not exist or does not have the
423
+ // appropriate permissions attached.
424
+ // - The provided `aws_role_arn` is not set up properly for Identity
425
+ // Federation using `gcp_service_account`.
426
+ // - The Pub/Sub SA is not granted the
427
+ // `iam.serviceAccounts.getOpenIdToken` permission on
428
+ // `gcp_service_account`.
429
+ AWSKinesisStatePermissionDenied
430
+
431
+ // AWSKinesisStatePublishPermissionDenied means permission denied encountered while publishing to the topic.
432
+ // This can happen due to Pub/Sub SA has not been granted the appropriate publish
433
+ // permissions https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher
434
+ AWSKinesisStatePublishPermissionDenied
435
+
436
+ // AWSKinesisStateStreamNotFound means the Kinesis stream does not exist.
437
+ AWSKinesisStateStreamNotFound
438
+
439
+ // AWSKinesisStateConsumerNotFound means the Kinesis consumer does not exist.
440
+ AWSKinesisStateConsumerNotFound
441
+ )
442
+
443
+ // IngestionDataSourceAWSKinesis are ingestion settings for Amazon Kinesis Data Streams.
444
+ type IngestionDataSourceAWSKinesis struct {
445
+ // State is an output-only field indicating the state of the kinesis connection.
446
+ State AWSKinesisState
447
+
448
+ // StreamARN is the Kinesis stream ARN to ingest data from.
449
+ StreamARN string
450
+
451
+ // ConsumerARn is the Kinesis consumer ARN to used for ingestion in Enhanced
452
+ // Fan-Out mode. The consumer must be already created and ready to be used.
453
+ ConsumerARN string
454
+
455
+ // AWSRoleARn is the AWS role ARN to be used for Federated Identity authentication
456
+ // with Kinesis. Check the Pub/Sub docs for how to set up this role and the
457
+ // required permissions that need to be attached to it.
458
+ AWSRoleARN string
459
+
460
+ // GCPServiceAccount is the GCP service account to be used for Federated Identity
461
+ // authentication with Kinesis (via a `AssumeRoleWithWebIdentity` call for
462
+ // the provided role). The `aws_role_arn` must be set up with
463
+ // `accounts.google.com:sub` equals to this service account number.
464
+ GCPServiceAccount string
465
+ }
466
+
467
+ var _ IngestionDataSource = (* IngestionDataSourceAWSKinesis )(nil )
468
+
469
+ func (i * IngestionDataSourceAWSKinesis ) isIngestionDataSource () bool {
470
+ return true
471
+ }
472
+
473
+ func protoToIngestionDataSourceSettings (pbs * pb.IngestionDataSourceSettings ) * IngestionDataSourceSettings {
474
+ if pbs == nil {
475
+ return nil
476
+ }
477
+
478
+ s := & IngestionDataSourceSettings {}
479
+ if k := pbs .GetAwsKinesis (); k != nil {
480
+ s .Source = & IngestionDataSourceAWSKinesis {
481
+ State : AWSKinesisState (k .State ),
482
+ StreamARN : k .GetStreamArn (),
483
+ ConsumerARN : k .GetConsumerArn (),
484
+ AWSRoleARN : k .GetAwsRoleArn (),
485
+ GCPServiceAccount : k .GetGcpServiceAccount (),
486
+ }
487
+ }
488
+ return s
489
+ }
490
+
491
+ func (i * IngestionDataSourceSettings ) toProto () * pb.IngestionDataSourceSettings {
492
+ if i == nil {
493
+ return nil
494
+ }
495
+ // An empty/zero-valued config is treated the same as nil and clearing this setting.
496
+ if (IngestionDataSourceSettings {}) == * i {
497
+ return nil
498
+ }
499
+ pbs := & pb.IngestionDataSourceSettings {}
500
+ if out := i .Source ; out != nil {
501
+ if k , ok := out .(* IngestionDataSourceAWSKinesis ); ok {
502
+ pbs .Source = & pb.IngestionDataSourceSettings_AwsKinesis_ {
503
+ AwsKinesis : & pb.IngestionDataSourceSettings_AwsKinesis {
504
+ State : pb .IngestionDataSourceSettings_AwsKinesis_State (k .State ),
505
+ StreamArn : k .StreamARN ,
506
+ ConsumerArn : k .ConsumerARN ,
507
+ AwsRoleArn : k .AWSRoleARN ,
508
+ GcpServiceAccount : k .GCPServiceAccount ,
509
+ },
510
+ }
511
+ }
512
+ }
513
+ return pbs
514
+ }
515
+
367
516
// Config returns the TopicConfig for the topic.
368
517
func (t * Topic ) Config (ctx context.Context ) (TopicConfig , error ) {
369
518
pbt , err := t .c .pubc .GetTopic (ctx , & pb.GetTopicRequest {Topic : t .name })
@@ -437,6 +586,10 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
437
586
pt .SchemaSettings = nil
438
587
}
439
588
}
589
+ if cfg .IngestionDataSourceSettings != nil {
590
+ pt .IngestionDataSourceSettings = cfg .IngestionDataSourceSettings .toProto ()
591
+ paths = append (paths , "ingestion_data_source_settings" )
592
+ }
440
593
return & pb.UpdateTopicRequest {
441
594
Topic : pt ,
442
595
UpdateMask : & fmpb.FieldMask {Paths : paths },
0 commit comments