Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsub;

// [START pubsub_create_topic_with_aws_msk_ingestion]

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithAwsMskIngestionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
// AWS MSK ingestion settings.
String clusterArn = "cluster-arn";
String mskTopic = "msk-topic";
String awsRoleArn = "aws-role-arn";
String gcpServiceAccount = "gcp-service-account";

createTopicWithAwsMskIngestionExample(
projectId, topicId, clusterArn, mskTopic, awsRoleArn, gcpServiceAccount);
}

public static void createTopicWithAwsMskIngestionExample(
String projectId,
String topicId,
String clusterArn,
String mskTopic,
String awsRoleArn,
String gcpServiceAccount)
throws IOException {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);

IngestionDataSourceSettings.AwsMsk awsMsk =
IngestionDataSourceSettings.AwsMsk.newBuilder()
.setClusterArn(clusterArn)
.setTopic(mskTopic)
.setAwsRoleArn(awsRoleArn)
.setGcpServiceAccount(gcpServiceAccount)
.build();
IngestionDataSourceSettings ingestionDataSourceSettings =
IngestionDataSourceSettings.newBuilder().setAwsMsk(awsMsk).build();

Topic topic =
topicAdminClient.createTopic(
Topic.newBuilder()
.setName(topicName.toString())
.setIngestionDataSourceSettings(ingestionDataSourceSettings)
.build());

System.out.println("Created topic with AWS MSK ingestion settings: " + topic.getAllFields());
}
}
}
// [END pubsub_create_topic_with_aws_msk_ingestion]
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsub;

// [START pubsub_create_topic_with_azure_event_hubs_ingestion]

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithAzureEventHubsIngestionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
// Azure Event Hubs ingestion settings.
String resourceGroup = "resource-group";
String namespace = "namespace";
String eventHub = "event-hub";
String clientId = "client-id";
String tenantId = "tenant-id";
String subscriptionId = "subscription-id";
String gcpServiceAccount = "gcp-service-account";

createTopicWithAzureEventHubsIngestionExample(
projectId,
topicId,
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount);
}

public static void createTopicWithAzureEventHubsIngestionExample(
String projectId,
String topicId,
String resourceGroup,
String namespace,
String eventHub,
String clientId,
String tenantId,
String subscriptionId,
String gcpServiceAccount)
throws IOException {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);

IngestionDataSourceSettings.AzureEventHubs azureEventHubs =
IngestionDataSourceSettings.AzureEventHubs.newBuilder()
.setResourceGroup(resourceGroup)
.setNamespace(namespace)
.setEventHub(eventHub)
.setClientId(clientId)
.setTenantId(tenantId)
.setSubscriptionId(subscriptionId)
.setGcpServiceAccount(gcpServiceAccount)
.build();
IngestionDataSourceSettings ingestionDataSourceSettings =
IngestionDataSourceSettings.newBuilder().setAzureEventHubs(azureEventHubs).build();

Topic topic =
topicAdminClient.createTopic(
Topic.newBuilder()
.setName(topicName.toString())
.setIngestionDataSourceSettings(ingestionDataSourceSettings)
.build());

System.out.println(
"Created topic with Azure Event Hubs ingestion settings: " + topic.getAllFields());
}
}
}
// [END pubsub_create_topic_with_azure_event_hubs_ingestion]
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsub;

// [START pubsub_create_topic_with_confluent_cloud_ingestion]

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithConfluentCloudIngestionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
// Confluent Cloud ingestion settings.
String bootstrapServer = "bootstrap-server";
String clusterId = "cluster-id";
String confluentTopic = "confluent-topic";
String identityPoolId = "identity-pool-id";
String gcpServiceAccount = "gcp-service-account";

createTopicWithConfluentCloudIngestionExample(
projectId,
topicId,
bootstrapServer,
clusterId,
confluentTopic,
identityPoolId,
gcpServiceAccount);
}

public static void createTopicWithConfluentCloudIngestionExample(
String projectId,
String topicId,
String bootstrapServer,
String clusterId,
String confluentTopic,
String identityPoolId,
String gcpServiceAccount)
throws IOException {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);

IngestionDataSourceSettings.ConfluentCloud confluentCloud =
IngestionDataSourceSettings.ConfluentCloud.newBuilder()
.setBootstrapServer(bootstrapServer)
.setClusterId(clusterId)
.setTopic(confluentTopic)
.setIdentityPoolId(identityPoolId)
.setGcpServiceAccount(gcpServiceAccount)
.build();
IngestionDataSourceSettings ingestionDataSourceSettings =
IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();

Topic topic =
topicAdminClient.createTopic(
Topic.newBuilder()
.setName(topicName.toString())
.setIngestionDataSourceSettings(ingestionDataSourceSettings)
.build());

System.out.println(
"Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
}
}
}
// [END pubsub_create_topic_with_confluent_cloud_ingestion]
104 changes: 102 additions & 2 deletions samples/snippets/src/test/java/pubsub/AdminIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public class AdminIT {
private static final String kinesisIngestionTopicId = "kinesis-ingestion-topic-" + _suffix;
private static final String cloudStorageIngestionTopicId =
"cloud-storage-ingestion-topic-" + _suffix;
private static final String awsMskIngestionTopicId = "aws-msk-ingestion-topic-" + _suffix;
private static final String confluentCloudIngestionTopicId =
"confluent-cloud-ingestion-topic-" + _suffix;
private static final String azureEventHubsIngestionTopicId =
"azure-event-hubs-ingestion-topic-" + _suffix;
private static final String pullSubscriptionId = "iam-pull-subscription-" + _suffix;
private static final String pushSubscriptionId = "iam-push-subscription-" + _suffix;
private static final String orderedSubscriptionId = "iam-ordered-subscription-" + _suffix;
Expand All @@ -66,6 +71,9 @@ public class AdminIT {
"java_samples_data_set" + _suffix.replace("-", "_");
private static final String bigquerySubscriptionId = "iam-bigquery-subscription-" + _suffix;
private static final String bigqueryTableId = "java_samples_table_" + _suffix;
private static final String gcpServiceAccount =
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";
// AWS Kinesis ingestion settings.
private static final String streamArn =
"arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name";
private static final String consumerArn =
Expand All @@ -75,20 +83,41 @@ public class AdminIT {
"arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/"
+ "consumer/consumer-2:2222222222";
private static final String awsRoleArn = "arn:aws:iam::111111111111:role/fake-role-name";
private static final String gcpServiceAccount =
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";
// GCS ingestion settings.
private static final String cloudStorageBucket = "pubsub-cloud-storage-bucket";
private static final String cloudStorageInputFormat = "text";
private static final String cloudStorageTextDelimiter = ",";
private static final String cloudStorageMatchGlob = "**.txt";
private static final String cloudStorageMinimumObjectCreateTime = "1970-01-01T00:00:01Z";
private static final String cloudStorageMinimumObjectCreateTimeSeconds = "seconds: 1";
// AWS MSK ingestion settings.
String clusterArn =
"arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1";
String mskTopic = "fake-msk-topic-name";
// Confluent Cloud ingestion settings.
String bootstrapServer = "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092";
String clusterId = "fake-cluster-id";
String confluentTopic = "fake-confluent-topic-name";
String identityPoolId = "fake-pool-id";
// Azure Event Hubs ingestion settings.
String resourceGroup = "fake-resource-group";
String namespace = "fake-namespace";
String eventHub = "fake-event-hub";
String clientId = "11111111-1111-1111-1111-111111111111";
String tenantId = "22222222-2222-2222-2222-222222222222";
String subscriptionId = "33333333-3333-3333-3333-333333333333";

private static final TopicName topicName = TopicName.of(projectId, topicId);
private static final TopicName kinesisIngestionTopicName =
TopicName.of(projectId, kinesisIngestionTopicId);
private static final TopicName cloudStorageIngestionTopicName =
TopicName.of(projectId, cloudStorageIngestionTopicId);
private static final TopicName awsMskIngestionTopicName =
TopicName.of(projectId, awsMskIngestionTopicId);
private static final TopicName confluentCloudIngestionTopicName =
TopicName.of(projectId, confluentCloudIngestionTopicId);
private static final TopicName azureEventHubsIngestionTopicName =
TopicName.of(projectId, azureEventHubsIngestionTopicId);
private static final SubscriptionName pullSubscriptionName =
SubscriptionName.of(projectId, pullSubscriptionId);
private static final SubscriptionName pushSubscriptionName =
Expand Down Expand Up @@ -361,5 +390,76 @@ public void testAdmin() throws Exception {
// Test delete Cloud Storage ingestion topic.
DeleteTopicExample.deleteTopicExample(projectId, cloudStorageIngestionTopicId);
assertThat(bout.toString()).contains("Deleted topic.");

bout.reset();
// Test create topic with AWS MSK ingestion settings.
CreateTopicWithAwsMskIngestionExample.createTopicWithAwsMskIngestionExample(
projectId,
awsMskIngestionTopicId,
clusterArn,
mskTopic,
awsRoleArn,
gcpServiceAccount);
assertThat(bout.toString())
.contains("google.pubsub.v1.Topic.name=" + awsMskIngestionTopicName.toString());
assertThat(bout.toString()).contains(clusterArn);
assertThat(bout.toString()).contains(mskTopic);
assertThat(bout.toString()).contains(awsRoleArn);
assertThat(bout.toString()).contains(gcpServiceAccount);

bout.reset();
// Test delete AWS MSK ingestion topic.
DeleteTopicExample.deleteTopicExample(projectId, awsMskIngestionTopicId);
assertThat(bout.toString()).contains("Deleted topic.");

bout.reset();
// Test create topic with Confluent Cloud ingestion settings.
CreateTopicWithConfluentCloudIngestionExample.createTopicWithConfluentCloudIngestionExample(
projectId,
confluentCloudIngestionTopicId,
bootstrapServer,
clusterId,
confluentTopic,
identityPoolId,
gcpServiceAccount);
assertThat(bout.toString())
.contains("google.pubsub.v1.Topic.name=" + confluentCloudIngestionTopicName.toString());
assertThat(bout.toString()).contains(bootstrapServer);
assertThat(bout.toString()).contains(clusterId);
assertThat(bout.toString()).contains(confluentTopic);
assertThat(bout.toString()).contains(identityPoolId);
assertThat(bout.toString()).contains(gcpServiceAccount);

bout.reset();
// Test delete Confluent Cloud ingestion topic.
DeleteTopicExample.deleteTopicExample(projectId, confluentCloudIngestionTopicId);
assertThat(bout.toString()).contains("Deleted topic.");

bout.reset();
// Test create topic with Azure Event Hubs ingestion settings.
CreateTopicWithAzureEventHubsIngestionExample.createTopicWithAzureEventHubsIngestionExample(
projectId,
azureEventHubsIngestionTopicId,
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount);
assertThat(bout.toString()).contains(
"google.pubsub.v1.Topic.name=" + azureEventHubsIngestionTopicName.toString());
assertThat(bout.toString()).contains(resourceGroup);
assertThat(bout.toString()).contains(namespace);
assertThat(bout.toString()).contains(eventHub);
assertThat(bout.toString()).contains(clientId);
assertThat(bout.toString()).contains(tenantId);
assertThat(bout.toString()).contains(subscriptionId);
assertThat(bout.toString()).contains(gcpServiceAccount);

bout.reset();
// Test delete Azure Event Hubs ingestion topic.
DeleteTopicExample.deleteTopicExample(projectId, azureEventHubsIngestionTopicId);
assertThat(bout.toString()).contains("Deleted topic.");
}
}
Loading