Giter Site home page Giter Site logo

java-pubsub's Introduction

Google Cloud Pub/Sub Client for Java

Java idiomatic client for Cloud Pub/Sub.

Maven Stability

Quickstart

If you are using Maven with BOM, add this to your pom.xml file:

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>libraries-bom</artifactId>
      <version>26.44.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-pubsub</artifactId>
  </dependency>

</dependencies>

If you are using Maven without the BOM, add this to your dependencies:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.132.0</version>
</dependency>

If you are using Gradle 5.x or later, add this to your dependencies:

implementation platform('com.google.cloud:libraries-bom:26.44.0')

implementation 'com.google.cloud:google-cloud-pubsub'

If you are using Gradle without BOM, add this to your dependencies:

implementation 'com.google.cloud:google-cloud-pubsub:1.132.0'

If you are using SBT, add this to your dependencies:

libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.132.0"

Authentication

See the Authentication section in the base directory's README.

Authorization

The client application making API calls must be granted authorization scopes required for the desired Cloud Pub/Sub APIs, and the authenticated principal must have the IAM role(s) required to access GCP resources using the Cloud Pub/Sub API calls.

Getting Started

Prerequisites

You will need a Google Cloud Platform Console project with the Cloud Pub/Sub API enabled. You will need to enable billing to use Google Cloud Pub/Sub. Follow these instructions to get your project set up. You will also need to set up the local development environment by installing the Google Cloud Command Line Interface and running the following commands in command line: gcloud auth login and gcloud config set project [YOUR PROJECT ID].

Installation and setup

You'll need to obtain the google-cloud-pubsub library. See the Quickstart section to add google-cloud-pubsub as a dependency in your code.

About Cloud Pub/Sub

Cloud Pub/Sub is designed to provide reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. By decoupling senders and receivers, Google Cloud Pub/Sub allows developers to communicate between independently written applications.

See the Cloud Pub/Sub client library docs to learn how to use this Cloud Pub/Sub Client Library.

Creating a topic

With Pub/Sub you can create topics. A topic is a named resource to which messages are sent by publishers. Add the following imports at the top of your file:

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.TopicName;

Then, to create the topic, use the following code:

TopicName topic = TopicName.of("test-project", "test-topic");
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
  topicAdminClient.createTopic(topic);
}

Publishing messages

With Pub/Sub you can publish messages to a topic. Add the following import at the top of your file:

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;

Then, to publish messages asynchronously, use the following code:

Publisher publisher = null;
try {
  publisher = Publisher.newBuilder(topic).build();
  ByteString data = ByteString.copyFromUtf8("my-message");
  PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
  ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
  ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<String>() {
    public void onSuccess(String messageId) {
      System.out.println("published with message id: " + messageId);
    }

    public void onFailure(Throwable t) {
      System.out.println("failed to publish: " + t);
    }
  }, MoreExecutors.directExecutor());
  //...
} finally {
  if (publisher != null) {
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
  }
}

Creating a subscription

With Pub/Sub you can create subscriptions. A subscription represents the stream of messages from a single, specific topic. Add the following imports at the top of your file:

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;

Then, to create the subscription, use the following code:

TopicName topic = TopicName.of("test-project", "test-topic");
SubscriptionName subscription = SubscriptionName.of("test-project", "test-subscription");

try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
  subscriptionAdminClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);
}

Pulling messages

With Pub/Sub you can pull messages from a subscription. Add the following imports at the top of your file:

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;

Then, to pull messages asynchronously, use the following code:

SubscriptionName subscription = SubscriptionName.of("test-project", "test-subscription");

MessageReceiver receiver =
  new MessageReceiver() {
    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      System.out.println("got message: " + message.getData().toStringUtf8());
      consumer.ack();
    }
  };

Subscriber subscriber = null;
try {
  subscriber = Subscriber.newBuilder(subscription.toString(), receiver).build();
  subscriber.addListener(
    new Subscriber.Listener() {
      @Override
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down.
        System.err.println(failure);
      }
    },
    MoreExecutors.directExecutor());
  subscriber.startAsync().awaitRunning();
  //...
} finally {
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Complete source code

In CreateTopicAndPublishMessages.java and CreateSubscriptionAndConsumeMessages.java we put together all the code shown above into two programs. The programs assume that you are running on Compute Engine, App Engine Flexible or from your own desktop.

Samples

Samples are in the samples/ directory.

Sample Source Code Try it
Commit Avro Schema Example source code Open in Cloud Shell
Commit Proto Schema Example source code Open in Cloud Shell
Create Avro Schema Example source code Open in Cloud Shell
Create Big Query Subscription Example source code Open in Cloud Shell
Create Cloud Storage Subscription Example source code Open in Cloud Shell
Create Proto Schema Example source code Open in Cloud Shell
Create Pull Subscription Example source code Open in Cloud Shell
Create Push Subscription Example source code Open in Cloud Shell
Create Subscription With Dead Letter Policy Example source code Open in Cloud Shell
Create Subscription With Exactly Once Delivery source code Open in Cloud Shell
Create Subscription With Filtering source code Open in Cloud Shell
Create Subscription With Ordering source code Open in Cloud Shell
Create Topic Example source code Open in Cloud Shell
Create Topic With Kinesis Ingestion Example source code Open in Cloud Shell
Create Topic With Schema Example source code Open in Cloud Shell
Create Topic With Schema Revisions Example source code Open in Cloud Shell
Create Unwrapped Push Subscription Example source code Open in Cloud Shell
Delete Schema Example source code Open in Cloud Shell
Delete Schema Revision Example source code Open in Cloud Shell
Delete Subscription Example source code Open in Cloud Shell
Delete Topic Example source code Open in Cloud Shell
Detach Subscription Example source code Open in Cloud Shell
Get Schema Example source code Open in Cloud Shell
Get Schema Revision Example source code Open in Cloud Shell
Get Subscription Policy Example source code Open in Cloud Shell
Get Topic Policy Example source code Open in Cloud Shell
List Schema Revisions Example source code Open in Cloud Shell
List Schemas Example source code Open in Cloud Shell
List Subscriptions In Project Example source code Open in Cloud Shell
List Subscriptions In Topic Example source code Open in Cloud Shell
List Topics Example source code Open in Cloud Shell
Optimistic Subscribe Example source code Open in Cloud Shell
Publish Avro Records Example source code Open in Cloud Shell
Publish Protobuf Messages Example source code Open in Cloud Shell
Publish With Batch Settings Example source code Open in Cloud Shell
Publish With Concurrency Control Example source code Open in Cloud Shell
Publish With Custom Attributes Example source code Open in Cloud Shell
Publish With Error Handler Example source code Open in Cloud Shell
Publish With Flow Control Example source code Open in Cloud Shell
Publish With Grpc Compression Example source code Open in Cloud Shell
Publish With Ordering Keys source code Open in Cloud Shell
Publish With Retry Settings Example source code Open in Cloud Shell
Publisher Example source code Open in Cloud Shell
Receive Messages With Delivery Attempts Example source code Open in Cloud Shell
Remove Dead Letter Policy Example source code Open in Cloud Shell
Resume Publish With Ordering Keys source code Open in Cloud Shell
Rollback Schema Example source code Open in Cloud Shell
Set Subscription Policy Example source code Open in Cloud Shell
Set Topic Policy Example source code Open in Cloud Shell
Subscribe Async Example source code Open in Cloud Shell
Subscribe Sync Example source code Open in Cloud Shell
Subscribe Sync With Lease Example source code Open in Cloud Shell
Subscribe With Avro Schema Example source code Open in Cloud Shell
Subscribe With Avro Schema Revisions Example source code Open in Cloud Shell
Subscribe With Concurrency Control Example source code Open in Cloud Shell
Subscribe With Custom Attributes Example source code Open in Cloud Shell
Subscribe With Error Listener Example source code Open in Cloud Shell
Subscribe With Exactly Once Consumer With Response Example source code Open in Cloud Shell
Subscribe With Flow Control Settings Example source code Open in Cloud Shell
Subscribe With Proto Schema Example source code Open in Cloud Shell
Test Subscription Permissions Example source code Open in Cloud Shell
Test Topic Permissions Example source code Open in Cloud Shell
Update Dead Letter Policy Example source code Open in Cloud Shell
Update Push Configuration Example source code Open in Cloud Shell
Update Topic Schema Example source code Open in Cloud Shell
Update Topic Type Example source code Open in Cloud Shell
Use Pub Sub Emulator Example source code Open in Cloud Shell
State source code Open in Cloud Shell
State Proto source code Open in Cloud Shell

Troubleshooting

To get help, follow the instructions in the shared Troubleshooting document.

Transport

Cloud Pub/Sub uses both gRPC and HTTP/JSON for the transport layer.

Supported Java Versions

Java 8 or above is required for using this client.

Google's Java client libraries, Google Cloud Client Libraries and Google Cloud API Libraries, follow the Oracle Java SE support roadmap (see the Oracle Java SE Product Releases section).

For new development

In general, new feature development occurs with support for the lowest Java LTS version covered by Oracle's Premier Support (which typically lasts 5 years from initial General Availability). If the minimum required JVM for a given library is changed, it is accompanied by a semver major release.

Java 11 and (in September 2021) Java 17 are the best choices for new development.

Keeping production systems current

Google tests its client libraries with all current LTS versions covered by Oracle's Extended Support (which typically lasts 8 years from initial General Availability).

Legacy support

Google's client libraries support legacy versions of Java runtimes with long term stable libraries that don't receive feature updates on a best efforts basis as it may not be possible to backport all patches.

Google provides updates on a best efforts basis to apps that continue to use Java 7, though apps might need to upgrade to current versions of the library that supports their JVM.

Where to find specific information

The latest versions and the supported Java versions are identified on the individual GitHub repository github.com/GoogleAPIs/java-SERVICENAME and on google-cloud-java.

Versioning

This library follows Semantic Versioning.

Contributing

Contributions to this library are always welcome and highly encouraged.

See CONTRIBUTING for more information how to get started.

Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.

License

Apache 2.0 - See LICENSE for more information.

CI Status

Java Version Status
Java 8 Kokoro CI
Java 8 OSX Kokoro CI
Java 8 Windows Kokoro CI
Java 11 Kokoro CI

Java is a registered trademark of Oracle and/or its affiliates.

java-pubsub's People

Contributors

alicejli avatar andreamlin avatar anguillanneuf avatar chingor13 avatar cloud-java-bot avatar dependabot[bot] avatar diegomarquezp avatar dpcollins-google avatar garrettjonesgoogle avatar gcf-owl-bot[bot] avatar hannahrogers-google avatar jesselovelace avatar kamalaboulhosn avatar kolea2 avatar lqiu96 avatar maitrimangal avatar michaelpri10 avatar mmicatka avatar mpeddada1 avatar neenu1995 avatar neozwu avatar pongad avatar release-please[bot] avatar renovate-bot avatar sduskis avatar stephaniewang526 avatar suztomo avatar vam-google avatar yihanzhen avatar yoshi-automation avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

java-pubsub's Issues

Subscriber maxAckExtensionPeriod at zero does not prevent ack deadline extension

Subscriber.Builder.setMaxAckExtensionPeriod() javadoc says "A zero duration effectively disables auto deadline extensions." But empirically, one 60 second extension is always done.
I made a small command line example -- it creates a subscription with default expiration of 30s, publishes a message and waits for it to be delivered and redelivered.

Expected behavior: ack deadline never extended, message redelivered after ~30s.
Actual behavior: ack deadline extended once with log message "Sending 1 receipts", and message is redelivered after 60 seconds+.

Some technical details:
MessageDispatcher seems to add each incoming message to its list of ack deadlines (pendingReceipts) regardless of expiration time. When deadlines get extended, they are extended by messageDeadlineSeconds, which starts at 60 seconds and is not controllable by the user application.

Perhaps the reasons for this are historical -- extending ack deadlines happens in two different places: extendDeadlines(), which seems to honor the user-set maxAckExtensionPeriod, and processOutstandingAckOperations(), which talks about "receipts" but actually sends ack deadline modification requests.

This issue is somewhat related to #20, but might end up being just a documentation question if this behavior is intentional.

Environment details

Java version: 8
pubsub version(s): 1.104.1

Code example

Subscriber.newBuilder(...)
.setMaxAckExtensionPeriod(Duration.ZERO)
.build();

Output at FINER level shows:

FINER: Sending 1 receipts

You can run the whole example (create topic/subscription with 30s ack expiration, publish a message, receive a message without acking or nacking and wait for it to be redelivered):
https://github.com/elefeint/examples/tree/master/pubsub-clientlibrary-ackextension

Remove experimental notice from DLQ related fields

DLQ is now GA, so all experimental notices should be removed. This should largely be picked up by regenerating the underlying client, but please do a pass through in case experimental notices were added anywhere else.

pubsub: expire old processing time measurements

The pubsub client currently measures time between reception and ack of messages (processing time) in a percentile distribution. The 99%th value of this percentile distribution is used as the ack deadline for messages. This has the the positive effect of reducing the amount of modacks (used to send the ack deadline); a single modack with the right time for a message is better than many modacks with an incorrect arbitrarily low time.

Unfortunately, since we use the 99%th value, we effectively always take the ceiling (excluding extreme outliers - the remaining 1%). Furthermore, since we don't expire data, a very small amount of high values at some point in time will dominate any low values thereafter until the total number of data points is so high as to push the high values into the 1%.

A simple solution to this downside is to expire recordings after a period of time. I think that 6 hours is a good starting place for how much data to hold on to.

com.google.pubsub.v1.ProjectTopicName is deprecated

Using version v1.102.1, I see some deprecated classes.

In the generated documentation I don't see any explanation to fix deprecated API usages. Besides, this project README is using deprecated code as examples:

https://github.com/googleapis/java-pubsub#creating-a-topic

The recent changelog mentions nothing:

https://github.com/googleapis/java-pubsub/blob/master/CHANGELOG.md

and the releases page brings nothing:

https://github.com/googleapis/java-pubsub/releases

Any guidance or an up-to-date example in the README and a mention in the changelog would be really helpful.

Enable publish to multiple topics using one Publisher

Is your feature request related to a problem? Please describe.
We have to create a Publisher for each topic that we want to publish to. This is annoying because we create one topic per long-running operation that we are working on (to send reply messages for sub-tasks).

Over the course of a minute, each of our worker instances may need to publish to up to 600 different topics. This means that we have to maintain a cache of Publisher objects (one per topic) which we constantly thrash (since we don't want to have hundreds of these in memory). It also means that we have to jump through hoops to use the same ExecutorService so that we don't continuously create/destroy threads (and so that we don't have hundreds of threads running at a time just for publishing messages).

Describe the solution you'd like
The Publisher.Builder interface should allow creation of a Publisher without specifying the target topic and there should be a publish method which accepts a topic name. If the Publisher is created without a topic, then it would be reasonable for the publish method which does not accept a topic to throw an exception (since forcing someone to pass the topic would be an API break).

Describe alternatives you've considered

  1. Use FixedExecutorProvider and a cache of Publisher instances (what we do now)
  2. Abandon the java-pubsub client entirely and just use the REST API to publish
  3. Re-architect our application to use fewer topics (it is not possible to entirely eliminate dynamic topics, but we could potentially reduce the worst case from hundreds to tens)

Additional context
This is especially irritating with the release of gax-1.53.1 (pulled in when I update to google-cloud-storage-1.103.1 or >=google-cloud-core-1.92.3) since alternative (1) above no longer works. See googleapis/gax-java#870

Allow the Pubsub Publisher to work (in low-volume cases), with fewer than 20 threads

See discussion with Kir Titievsky , Product Manager for Google PubSub, at GoogleGroup

My application publishes a low volume of messages (1 every few seconds at most), with no other use of PubSub. It does not subscribe. We are using simple Java code as in this sample. The Publisher object is retained for the lifetime of the process, as recommended here.

On first use, PubSub creates 60 threads that stay live permanently, like these:


"grpc-default-worker-ELG-1-1 Id\u003d115 RUNNABLE (in native)...
"grpc-default-worker-ELG-1-10 Id\u003d160 RUNNABLE (in native)":...
....
"Gax-16 Id\u003d141 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@24f8d334": ...
"Gax-17 Id\u003d142 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@24f8d334":....

Sixty is a very high default.

Moreover, if I setExecutorThreadCount to 4 (code following), I still get an extra 26 permanent threads. Setting it to 1 or 2 also gets about 20 extra threads.

ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
Publisher.Builder builder = Publisher.newBuilder(ProjectTopicName.of(proj, topic)).setExecutorProvider(executorProvider);

Our legacy application is already thread-heavy; and it cannot tolerate more than one or two extra threads, even if these are inactive. Unfortunately, the legacy threading model cannot be changed.

Yet it does seem that a low-volume Publisher really needs only one thread.

Please offer a way to configure PubSub down to 1 or 2 threads.

Need a method that will interrupt or terminate the com.google.cloud.pubsub.v1.Publisher immediately

Currently in PubSub there is only Publisher.shutdown() which sends any pending requests in the buffer and then clear the resources and then exits.

/**
   * Schedules immediate publishing of any outstanding messages and waits until all are processed.
   *
   * <p>Sends remaining outstanding messages and prevents future calls to publish. This method
   * should be invoked prior to deleting the {@link Publisher} object in order to ensure that no
   * pending messages are lost.
   */
  public void shutdown() throws Exception {
    if (shutdown.getAndSet(true)) {
      throw new IllegalStateException("Cannot shut down a publisher already shut-down.");
    }
    if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
      currentAlarmFuture.cancel(false);
    }
    publishAllOutstanding();
    messagesWaiter.waitNoMessages();
    for (AutoCloseable closeable : closeables) {
      closeable.close();
    }
    publisherStub.shutdown();
  }

Due to this the shutting down the publisher takes a while when there is huge number of requests pending in the buffer. There can be "n" batches pending to sent. This can be easily simulated by having a simple Publisher code which calls publish.send() based on the flow controller settings (since this is not honoured by the API, assume it was done by the client side code).

//Create publisher with default instance that picks up
             Publisher publisher = Publisher.newBuilder(topic)
                     .setBatchingSettings(batchingSettings)
                     .setRetrySettings(retrySettings)
                     .setExecutorProvider(executorProvider)
                     .build();

               PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                       .setData(data) // bytes to be sent
                       .build();
               /*
		* code to throttle when the limit is exceeding - this block will prevent adding more event to the API
		*/
		โ€ฆ

               ApiFuture<String> future = publisher.publish(pubsubMessage);
               RecoveryHandlingCallback callback = new RecoveryHandlingCallback(pos, event, sequence_no, future, receiptCallback, dataSize);

               ApiFutures.addCallback(future, callback, MoreExecutors.directExecutor());

           if (publisher != null) {
            publisher.shutdown();
	   }

Let us assume the MaxOutstandingRequestBytes was 100Mb (assume the user had enough network bandwidth and machine capabilities like mentioned here - https://cloud.google.com/pubsub/docs/troubleshooting#client-libraries-issue-tracker-java) and the user have a very high retry setting to avoid crashes due to network glitches etc. The shutdown won't return until it is done with the whole lot of pending requests either successfully or with an exception.

For a service like Publisher it is always good to have two ways of shutting down

  1. A graceful shutdown() which try to clear the existing buffer and then return (Publisher has this already)
  2. A shutdownNow() which will terminate the process immediately and clear all the depending resources cleanly and return. This is required when a PubSub's Publisher is used to build application that works on data pipelining and the application demands an immediate shutdown of all components in it.

java-pubsub: add support for filtering

Add an additional field for subscriptions that allows configuring a filter. Filters control what messages are sent to subscribers based on the message attributes.

As an example, if a message filter is the string attributes.event_type = "1", then messages with attributes of event_type=1 will be delivered, while everything else will be filtered out.
There will be more complicated filters to handle prefixes and non-exact matches, but any integration tests can just test the exact match case.

If empty, no messages are filtered out. Filtering can be configured only on subscription creation, not on updates.

Reference implementation in Go will be tracked under googleapis/google-cloud-go#1943

[PubSub] Caused by: java.lang.ClassNotFoundException: io.grpc.internal.BaseDnsNameResolverProvider

Upgrading to version 1.105.1 from version 1.104.1 for package com.google.cloud:google-cloud-pubsub, when executing the following piece of code:

try {
    subscriber.stopAsync().awaitTerminated();
} catch (IllegalStateException e) {
    log.error("Error while terminating subscriber", e);
}

I get the following exception:

java.lang.IllegalStateException: Expected the service InnerService [FAILED] to be TERMINATED, but the service has FAILED
	at com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:379) ~[guava-29.0-jre.jar:na]
	at com.google.common.util.concurrent.AbstractService.awaitTerminated(AbstractService.java:336) ~[guava-29.0-jre.jar:na]
	at com.google.api.core.AbstractApiService.awaitTerminated(AbstractApiService.java:104) ~[api-common-1.9.0.jar:na]
	at co.x.api.usage_handler.UsageHandlerService.onDestroy(UsageHandlerService.java:198) ~[main/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeDestroyMethods(InitDestroyAnnotationBeanPostProcessor.java:347) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeDestruction(InitDestroyAnnotationBeanPostProcessor.java:177) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.beans.factory.support.DisposableBeanAdapter.destroy(DisposableBeanAdapter.java:242) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroyBean(DefaultSingletonBeanRegistry.java:579) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingleton(DefaultSingletonBeanRegistry.java:551) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingleton(DefaultListableBeanFactory.java:1091) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.destroySingletons(DefaultSingletonBeanRegistry.java:512) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.destroySingletons(DefaultListableBeanFactory.java:1084) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.destroyBeans(AbstractApplicationContext.java:1060) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1029) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext$1.run(AbstractApplicationContext.java:948) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
Caused by: java.lang.NoClassDefFoundError: io/grpc/internal/BaseDnsNameResolverProvider
	at java.base/java.lang.ClassLoader.defineClass1(Native Method) ~[na:na]
	at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017) ~[na:na]
	at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:151) ~[na:na]
	at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:821) ~[na:na]
	at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:719) ~[na:na]
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:642) ~[na:na]
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:600) ~[na:na]
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
	at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
	at java.base/java.lang.Class.forName(Class.java:427) ~[na:na]
	at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1211) ~[na:na]
	at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1222) ~[na:na]
	at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1266) ~[na:na]
	at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1301) ~[na:na]
	at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1386) ~[na:na]
	at io.grpc.ServiceProviders.loadAll(ServiceProviders.java:67) ~[grpc-api-1.29.0.jar:1.29.0]
	at io.grpc.NameResolverRegistry.getDefaultRegistry(NameResolverRegistry.java:101) ~[grpc-api-1.29.0.jar:1.29.0]
	at io.grpc.internal.AbstractManagedChannelImplBuilder.<init>(AbstractManagedChannelImplBuilder.java:107) ~[grpc-core-1.29.0.jar:1.29.0]
	at io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder.<init>(NettyChannelBuilder.java:136) ~[grpc-netty-shaded-1.28.1.jar:1.28.1]
	at io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder.<init>(NettyChannelBuilder.java:131) ~[grpc-netty-shaded-1.28.1.jar:1.28.1]
	at io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder.forAddress(NettyChannelBuilder.java:117) ~[grpc-netty-shaded-1.28.1.jar:1.28.1]
	at io.grpc.netty.shaded.io.grpc.netty.NettyChannelProvider.builderForAddress(NettyChannelProvider.java:37) ~[grpc-netty-shaded-1.28.1.jar:1.28.1]
	at io.grpc.netty.shaded.io.grpc.netty.NettyChannelProvider.builderForAddress(NettyChannelProvider.java:23) ~[grpc-netty-shaded-1.28.1.jar:1.28.1]
	at io.grpc.ManagedChannelBuilder.forAddress(ManagedChannelBuilder.java:39) ~[grpc-api-1.29.0.jar:1.29.0]
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:270) ~[gax-grpc-1.56.0.jar:1.56.0]
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1500(InstantiatingGrpcChannelProvider.java:71) ~[gax-grpc-1.56.0.jar:1.56.0]
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:202) ~[gax-grpc-1.56.0.jar:1.56.0]
	at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72) ~[gax-grpc-1.56.0.jar:1.56.0]
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:209) ~[gax-grpc-1.56.0.jar:1.56.0]
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:192) ~[gax-grpc-1.56.0.jar:1.56.0]
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:155) ~[gax-1.56.0.jar:1.56.0]
	at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263) ~[google-cloud-pubsub-1.105.1.jar:1.105.1]
	at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:271) ~[google-cloud-pubsub-1.105.1.jar:1.105.1]
	at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148) ~[api-common-1.9.0.jar:na]
	at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:249) ~[guava-29.0-jre.jar:na]
	at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120) ~[api-common-1.9.0.jar:na]
	at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:263) ~[google-cloud-pubsub-1.105.1.jar:1.105.1]
	at co.x.api.usage_handler.UsageHandlerService.subscribe(UsageHandlerService.java:96) ~[main/:na]
	at co.x.api.usage_handler.Application.run(Application.java:32) ~[main/:na]
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795) ~[spring-boot-2.3.0.RELEASE.jar:2.3.0.RELEASE]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:779) ~[spring-boot-2.3.0.RELEASE.jar:2.3.0.RELEASE]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) ~[spring-boot-2.3.0.RELEASE.jar:2.3.0.RELEASE]
	at co.x.api.usage_handler.Application.main(Application.java:27) ~[main/:na]
Caused by: java.lang.ClassNotFoundException: io.grpc.internal.BaseDnsNameResolverProvider
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602) ~[na:na]
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
	... 44 common frames omitted

The root cause is:

Caused by: java.lang.ClassNotFoundException: io.grpc.internal.BaseDnsNameResolverProvider

In the classpath I have protobuf, guava, lettuce and spring-boot-starter (all up-to-date). Java 14 is used.

A workaround is to downgrade to version 1.104.1.

Upgrade to netty-shaded version 1.28.1 causes failure when using topic-admin-client

Environment details

  1. PubSub
  2. MacOS 10.15.4
  3. Java version zulu openjdk 11.0.3:
  4. Bug introduced in com.google.cloud:google-cloud-pubsub:1.104.1
  5. Run against pubsub emulator locally

The bug is caused by changing io.grpc:grpc-netty-shaded:jar from version 1.27.2 to 1.28.1

Steps to reproduce

  1. Create a maven project with a single dependency on com.google.cloud:google-cloud-pubsub:1.104.1 or higher
  2. Run the code example included below
  3. Observe the exception in the NettyClientStream static initializer: Exception in thread "grpc-default-executor-0" java.lang.NoSuchFieldError: NETTY_SHADED

Workaround

Either use pubsub 1.104.0 or older, or downgrade io.grpc:grpc-netty-shaded to version 1.27.2

Code example

import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.ListTopicsRequest;
import com.google.pubsub.v1.ProjectName;
import io.grpc.ManagedChannelBuilder;

import java.io.IOException;

public class Test {

    public static void main(String[] args) throws IOException {
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create(TopicAdminSettings.newBuilder()
                .setTransportChannelProvider(
                        FixedTransportChannelProvider.create(
                                GrpcTransportChannel.create(
                                        ManagedChannelBuilder
                                                .forAddress("localhost", 8538)
                                                .usePlaintext()
                                                .build()
                                )
                        )
                )
                .setCredentialsProvider(NoCredentialsProvider.create())
                .build())) {

            topicAdminClient.listTopics(ListTopicsRequest.newBuilder()
                    .setProject(ProjectName.of("test").toString())
                    .setPageSize(25)
                    .build());
        }
    }
}

Stack trace

Exception in thread "grpc-default-executor-0" java.lang.NoSuchFieldError: NETTY_SHADED
	at io.grpc.netty.shaded.io.grpc.netty.NettyClientStream.<clinit>(NettyClientStream.java:59)
	at io.grpc.netty.shaded.io.grpc.netty.NettyClientTransport.newStream(NettyClientTransport.java:177)
	at io.grpc.internal.CallCredentialsApplyingTransportFactory$CallCredentialsApplyingTransport.newStream(CallCredentialsApplyingTransportFactory.java:117)
	at io.grpc.internal.ForwardingConnectionClientTransport.newStream(ForwardingConnectionClientTransport.java:49)
	at io.grpc.internal.InternalSubchannel$CallTracingTransport.newStream(InternalSubchannel.java:635)
	at io.grpc.internal.DelayedClientTransport$PendingStream.createRealStream(DelayedClientTransport.java:353)
	at io.grpc.internal.DelayedClientTransport$PendingStream.access$300(DelayedClientTransport.java:341)
	at io.grpc.internal.DelayedClientTransport$5.run(DelayedClientTransport.java:300)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

PubSub: In case of network issues, consumes a lot of CPU

Environment details

macOs 10.14.6, CentOS 7
Java 11.0.4 (corretto)
google-cloud-pubsub:1.93.0

Steps to reproduce

  1. Wrote simple publisher as below
  2. Run it, even 1rps is enough
  3. Emulate network issues (disconnect from network, block packets with iptables)
  4. Observe significantly increased CPU load

Code example

package com.pubsub.test

import com.google.api.gax.batching.BatchingSettings
import com.google.api.gax.batching.FlowControlSettings
import com.google.api.gax.batching.FlowController
import com.google.api.gax.retrying.RetrySettings
import com.google.cloud.pubsub.v1.Publisher
import com.google.common.util.concurrent.MoreExecutors
import com.google.protobuf.ByteString
import com.google.pubsub.v1.PubsubMessage
import org.threeten.bp.Duration
import java.lang.Exception
import java.lang.Thread.sleep

fun main() {
    val publisher = Publisher.newBuilder("projects/project!/topics/topic!")
        .setRetrySettings(
            RetrySettings
                .newBuilder()
                .setMaxAttempts(0)
                .setTotalTimeout(Duration.ofSeconds(20))
                .setInitialRpcTimeout(Duration.ofSeconds(20))
                .setMaxRpcTimeout(Duration.ofSeconds(20))
                .build()
        )
//        .setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(1)))
        .setBatchingSettings(
            BatchingSettings
                .newBuilder()
                .setDelayThreshold(Duration.ofSeconds(1))
                .setElementCountThreshold(1000)
                .setRequestByteThreshold(10_000_000)
                .setFlowControlSettings(
                    FlowControlSettings
                        .newBuilder()
                        .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
                        .setMaxOutstandingElementCount(200_000)
                        .setMaxOutstandingRequestBytes(100_000_000)
                        .build()
                )
                .build()
        )
        .build()

    repeat(1000) {
        val request = publisher.publish(
            PubsubMessage.newBuilder()
                .setData(ByteString.copyFromUtf8("Hello, World!"))
                .build()
        )

        request.addListener(Runnable {
            try {
                println("${Thread.currentThread().name}: Done: ${request.get()}")
            } catch (e: Exception) {
                println("${Thread.currentThread().name}: Done exceptionally: ${e.message}")
            }
        }, MoreExecutors.directExecutor())

        sleep(1000)
    }
}

Any additional information below

Profiling in yourkit, CPU load:

Screen Shot 2019-10-04 at 4 33 27 PM

Network was disconnected at 20 sec, as you can see - client works without issues for one minute, and then start to consume CPU.

This is sampling profiling:

Publisher Sampler.html.zip

Setting smaller executor (setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(1)))) helps:

Screen Shot 2019-10-04 at 4 40 00 PM

Pubsub: "Uncaught exception in the SynchronizationContext" When using example code from the "Getting Started Guide"

Environment details

  1. OS type and version: Mac-os 10.13.6
  2. Java version: 1.8.0_131
  3. google-cloud-java version(s): google-cloud-core:1.91.3
  4. google-cloud-pubsub:1.99.0

Steps to reproduce

  1. Use the example here to publish a message to Pubsub.
  2. Build and Run.

Stack trace

Nov 14, 2019 12:04:06 PM io.grpc.internal.ManagedChannelImpl$1 uncaughtException
    SEVERE: [Channel<313>: (pubsub.googleapis.com:443)] Uncaught exception in the SynchronizationContext. Panic!
    java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@1453b889 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@6f1d6611[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
        at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
        at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
        at io.grpc.internal.DelayedClientTransport.reprocess(DelayedClientTransport.java:297)
        at io.grpc.internal.ManagedChannelImpl.updateSubchannelPicker(ManagedChannelImpl.java:802)
        at io.grpc.internal.ManagedChannelImpl.access$4300(ManagedChannelImpl.java:106)
        at io.grpc.internal.ManagedChannelImpl$LbHelperImpl$1UpdateBalancingState.run(ManagedChannelImpl.java:1115)
        at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
        at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
        at io.grpc.internal.InternalSubchannel$TransportListener.transportReady(InternalSubchannel.java:507)
        at io.grpc.netty.shaded.io.grpc.netty.ClientTransportLifecycleManager.notifyReady(ClientTransportLifecycleManager.java:43)
        at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(NettyClientHandler.java:835)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:479)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$PrefaceFrameListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:697)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onSettingsRead(Http2InboundFrameLogger.java:93)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readSettingsFrame(DefaultHttp2FrameReader.java:542)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:263)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:242)
        at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475)
        at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224)
        at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
        at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
        at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
        at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

Any additional information below

messageIdFuture.get(); Resolves the issue. I am creating a PR.

Pub/Sub API docs needs cleanup

I am fine with minimal information here, but I think its important that it states accurate information.
Pub/Sub API reference

states TopicAdminClient is used to send messages to a topic.
SubscriptionAdminClient is used to pull messages : Both not true.

how to make MDC work with MessageReceiver

Hi
In the "main" thread I set up some values in the MDC.
the pubsub api spawns a number of threads.
How do I make the threads "aware" of the threadlocal MDC dict?
Moreover, I would like to add the messageId in the MDC itself.

PubSub: Deadlocks with nested Subscriber

Can I "nest" subscribers?
Let's say I have this application, with a "main" subscriber, always listening; and in the receiver I create a new subscriber (for another subscription), with an awaitTerminated with a timeout.
Not always, but some times the application remain "stuck"; when this happen it's in the awaitTerminated.
is there a way to debug this problem?
BTW: it's more common to get stuck if I run the code on kubernets than on my developer pc...

PubSub causes RejectedExecutionException when shutting down publisher

[ERROR] 2020-01-27 14:31:15.723 [grpc-default-worker-ELG-1-21] ManagedChannelImpl - [Channel<7>: (pubsub.googleapis.com:443)] Uncaught exception in the SynchronizationContext. Panic!
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@24b41f37[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@9b0ab7c[Wrapped task = io.grpc.internal.DelayedClientTransport$5@6cc4f311]] rejected from java.util.concurrent.ScheduledThreadPoolExecutor@43ed7709[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 3]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:705) ~[?:?]
at io.grpc.internal.DelayedClientTransport.reprocess(DelayedClientTransport.java:297) ~[grpc-core-1.26.0.jar:1.26.0]
at io.grpc.internal.ManagedChannelImpl.updateSubchannelPicker(ManagedChannelImpl.java:818) ~[grpc-core-1.26.0.jar:1.26.0]
at io.grpc.internal.ManagedChannelImpl.access$4400(ManagedChannelImpl.java:106) ~[grpc-core-1.26.0.jar:1.26.0]
at io.grpc.internal.ManagedChannelImpl$LbHelperImpl$1UpdateBalancingState.run(ManagedChannelImpl.java:1132) ~[grpc-core-1.26.0.jar:1.26.0]
at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) [grpc-api-1.26.0.jar:1.26.0]
at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) [grpc-api-1.26.0.jar:1.26.0]
at io.grpc.internal.InternalSubchannel$TransportListener.transportReady(InternalSubchannel.java:510) [grpc-core-1.26.0.jar:1.26.0]
at io.grpc.netty.shaded.io.grpc.netty.ClientTransportLifecycleManager.notifyReady(ClientTransportLifecycleManager.java:43) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$FrameListener.onSettingsRead(NettyClientHandler.java:835) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:479) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$PrefaceFrameListener.onSettingsRead(DefaultHttp2ConnectionDecoder.java:697) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onSettingsRead(Http2InboundFrameLogger.java:93) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readSettingsFrame(DefaultHttp2FrameReader.java:542) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:263) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$PrefaceDecoder.decode(Http2ConnectionHandler.java:242) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1224) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1271) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:483) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:383) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [grpc-netty-shaded-1.25.0.jar:1.25.0]
at java.lang.Thread.run(Thread.java:834) [?:?]
[ERROR] 2020-01-27 14:31:15.723 [grpc-default-worker-

While using the code from
https://github.com/googleapis/java-pubsub#creating-a-topic

our service crashed due to the mentioned exception.
Found in:
https://stackoverflow.com/questions/45069436/gcloud-pubsub-java-implementation-java-util-concurrent-rejectedexecutionexcept

finally {
messageIdFuture.get(); //This resolves this issue.
// Wait on any pending requests
if (publisher != null) {
publisher.shutdown();
//publisher.awaitTermination(1, TimeUnit.SECONDS);
}
is missing.

Cloud Pub/Sub API ignores FlowControlSettings in BatchingSettings

Gax has introduced FlowControl settings in [1] (see setFlowControlSettings). This makes it seem like Flow Control is available in the APIs that support BatchSettings, such as Pub/Sub's Publisher [2]. There are two issues here:

  1. FlowControlSettings settings in GAX have no way of communicating to the user whether a particular client implements them. This is actually true of all batching settings. This is a recipe for confusion.
  2. In the particular case of Pub/Sub client, publisher flow control is a desirable feature in itself. We either need to decide to implement it and turn this bug into a feature request and offer explicit failure and documentation if we decided not to.

[1] http://googleapis.github.io/gax-java/1.4.1/apidocs/com/google/api/gax/batching/BatchingSettings.Builder.html#setFlowControlSettings-com.google.api.gax.batching.FlowControlSettings-
[2] https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L124
Thanks for stopping by to let us know something could be better!

PLEASE READ: If you have a support contract with Google, please create an issue in the support console instead of filing on GitHub. This will ensure a timely response.

Please run down the following list and make sure you've tried the usual "quick fixes":

If you are still having issues, please be sure to include as much information as possible:

Environment details

  1. Specify the API at the beginning of the title (for example, "BigQuery: ...")
    General, Core, and Other are also allowed as types
  2. OS type and version:
  3. Java version:
  4. google-cloud-java version(s):

Steps to reproduce

  1. ?
  2. ?

Code example

// example

Stack trace

Any relevant stacktrace here.

External references such as API reference guides used

  • ?

Any additional information below

Making sure to follow these steps will guarantee the quickest resolution possible.

Thanks!

got more than one input Future failure. Logging failures after the first\ncom.google.api.gax.rpc.DeadlineExceededException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9999934704

I have 10 million data to post to pubsub topic during a batch process while reading it one by one from an input file which contains those objects in json array format. But, with following code, I am getting above mentioned exception. Can you please check if with this volume of data the corresponding settings are fine or not.

OS- macOS
Java -1.8
pubsub- 1.103.1
gax-grpc- 1.45.0
100 mbps network connection via wifi

Following is the dummy code without any additional business logic:

	```

ProjectTopicName topicName = ProjectTopicName.of(pubsubProjectId, pubsubTopicName);
Publisher publisher = null;
try (JsonParser parser = getJsonParser(filePath)) {
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setElementCountThreshold(1000L)
.setRequestByteThreshold(10000L)
.setDelayThreshold(Duration.ofMillis(10L)).setFlowControlSettings(FlowControlSettings.newBuilder().setLimitExceededBehavior(LimitExceededBehavior.Block).setMaxOutstandingElementCount(100L).build())
.build();
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(1))
.setRetryDelayMultiplier(2.0)
.setMaxRetryDelay(Duration.ofSeconds(10))
.setTotalTimeout(Duration.ofSeconds(10))
.setInitialRpcTimeout(Duration.ofSeconds(10))
.setMaxRpcTimeout(Duration.ofSeconds(100))
.build();

		publisher = Publisher.newBuilder(topicName)
				.setCredentialsProvider(() -> GoogleCredentials.fromStream(new FileInputStream(pubsubKeyPath))).setBatchingSettings(batchingSettings).setRetrySettings(retrySettings)
				.build();
		int count=0;
		int totalMessageCount=0;
		List<ApiFuture<String>> apiFutureList = new ArrayList<>();
		 while (parser.nextToken() == JsonToken.START_OBJECT) {
			if(count != 1000) {
			ByteString data = ByteString.copyFromUtf8(mapper.writeValueAsString(mapper.readValue(parser, Test.class)));
			PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
			ApiFuture<String> apiFuture = publisher.publish(pubsubMessage);
			apiFutureList.add(apiFuture);
			}else 
			{
				System.out.println("Count reaches 1000");
				List<String> messageIdList = ApiFutures.allAsList(apiFutureList).get();
				totalMessageCount += messageIdList.size();
				System.out.println("total message Count so far: "+ totalMessageCount);
				apiFutureList = new ArrayList<>();
				ByteString data = ByteString.copyFromUtf8(mapper.writeValueAsString(mapper.readValue(parser, Test.class)));
				PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
				ApiFuture<String> apiFuture = publisher.publish(pubsubMessage);
				apiFutureList.add(apiFuture);
				count = 0;
			}
			count++;
		 }
		 if(apiFutureList.size()!=0) {
			 System.out.println("Remaining count : "+ count);
			 List<String> messageIdList = ApiFutures.allAsList(apiFutureList).get();
			 totalMessageCount += messageIdList.size();
			 System.out.println("total message Count final : " + totalMessageCount);
		 }
		 
	} finally {
		if (null != publisher) {
			publisher.shutdown();
			publisher.awaitTermination(1, TimeUnit.MINUTES);
		}
		log.info("shutting down publisher");
	}

Removing dead letter policy on a subscription doesn't clean up all its fields

Removing dead letter policy on a subscription using the client libraries doesn't clean up all the fields. Field max_delivery_attempts stays and takes a default value of 5.

This can be reproduced using the sample snippet here.

Before the update: 

google.pubsub.v1.Subscription.dead_letter_policy=dead_letter_topic: "projects/.../topics/may-dlq"
max_delivery_attempts: 10
After the update:

google.pubsub.v1.Subscription.dead_letter_policy=max_delivery_attempts: 5

Google PubSub async rate limitation doesn't work as expected

Environment details

  1. PubSub
  2. OS type and version: Ubuntu LTS 16.04
  3. Java version: Java 11.0.5
  4. pubsub version(s): '1.92.0'

Steps to reproduce

We're using PubSub in prod and seeing a problem that there are more VMs handling PubSub messages that we would expect to have.

Iโ€™ve run simple tests using PubSub overnight and it appears that something goes not so smooth as we've expected with the rate limiting mechanism.

Here is the test:

Publish some amount of messages into a topic with Pull Subscription. In the experiment, there are about 2,7k messages (started approx at 9pm)
Configure one async client using the StreamingPull connection and FlowControl set to 2.
Simulate that handling of every incoming message takes 5 seconds via moving the execution into a timer and acknowledging the message only when the timer finishes.
Expected results: Messages from PubSub are consumed with the same speed, getting 2 messages at a time every 5 seconds. A small timeout between acking a message and a new message pulled due to all the network and processing expenses is expected.

Actual result: PubSub starts throttling, or something like this, with a huge timeout. No message arrives at that time. The timeout depends on amount of unacked messages in subscription.

It doesn't seem clear from the FlowControl docs.

PubSub

Code example

// Here is the code of consumer (client)

var concurrentFlowsNumber = config.getLong(CONFIG_NUMBER_OF_THREADS);
    var flowSettings = FlowControlSettings.newBuilder()
      .setMaxOutstandingElementCount(concurrentFlowsNumber)
      .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
      .build();

    var subscriber = Subscriber.newBuilder(subscriptionName, receiver)
      .setCredentialsProvider(() -> serviceAccountCredentials)
      .setFlowControlSettings(flowSettings)
      .build();

    subscriber.addListener(
      new Subscriber.Listener() {
        @Override
        public void failed(ApiService.State from, Throwable failure) {
          logger.error(failure);
        }
      },
      MoreExecutors.directExecutor());

    var apiService = subscriber.startAsync();
    apiService.addListener(new ApiService.Listener() {
      @Override
      public void running() {
         logger.info("Pubsub started");
      }

      @Override
      public void failed(ApiService.State from, Throwable failure) {
        logger.error("Pubsub failed on step: {}", from);
      }
    }, Runnable::run);

// And the message handler is:

private static void handlePubSubMessage(PubsubMessage message, AckReplyConsumer consumer) {
    new Timer().schedule(new TimerTask() {
      @Override
      public void run() {
               consumer.ack();
      }
    }, (long) 3000 + rand.nextInt(5000));
  }

External references such as API reference guides

Message flow config:
https://cloud.google.com/pubsub/docs/pull#config

Any additional information below

In prod, we have about 3 VMS that process messages from pubsub from the same subscription. The messages are injested into the topic with rate of about 20-40 messages per minute. The processing takes 3 to 6 seconds, and the message is acked afterwards.

Thanks!

How to properly decode com.google.protobuf.ByteString data message

I'm collecting auditlogs export to PubSub services. So I'm using PubSub client to pull subscribe to PubSub subscription. I was able to get the auditlogs using this client. However, some auditlog seems not correctly encoded by the api, so when I use the PubSubMessage.getData().toStringUtf8(), some part of the auditlog was not properly decoded. Please see attached text file.

As you can see in the attached text file, the "fingerprint" part was not decoded correctly.

Now my question is, how can I decode the PubSubMessage data properly?

Note: Same thing is happening when I viewed the auditlog on Google Cloud Platform Logging Console

Thanks,
samplelog.txt
samplelog

Synthesis failed for java-pubsub

Hello! Autosynth couldn't regenerate java-pubsub. ๐Ÿ’”

Here's the output from running synth.py:

Cloning into 'working_repo'...
Switched to branch 'autosynth'
Traceback (most recent call last):
  File "/home/kbuilder/.pyenv/versions/3.6.1/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/home/kbuilder/.pyenv/versions/3.6.1/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/tmpfs/src/git/autosynth/autosynth/synth.py", line 256, in <module>
    main()
  File "/tmpfs/src/git/autosynth/autosynth/synth.py", line 196, in main
    last_synth_commit_hash = get_last_metadata_commit(args.metadata_path)
  File "/tmpfs/src/git/autosynth/autosynth/synth.py", line 149, in get_last_metadata_commit
    text=True,
  File "/home/kbuilder/.pyenv/versions/3.6.1/lib/python3.6/subprocess.py", line 403, in run
    with Popen(*popenargs, **kwargs) as process:
TypeError: __init__() got an unexpected keyword argument 'text'

Google internal developers can see the full log here.

bring back polling pull

streaming pull, while very fast, has the problem that very long running messages (> 30min) might duplicate.

Pub/Sub: messages stuck in buffer, preventing proper load balancing

Repro:

  1. Publish 60 messages, with numbers 1 through 6 as message content. Observe the total backlog for a subscription reach 60 messages and 23 bytes.
  2. Start an instance of a subscriber client, with a single-threaded executor and FlowControl set to 1 message per buffer (see code below). The subscriber takes 10 second to process each message.
  3. Observe that the subscriber processes messages, one at a time, every 10 seconds (see log output below)
  4. The bug: Start two new instances of the same subscriber client, roughly a minute later. Observe: they process no messages. Expected behavior: the two subscribers immediately start processing messages.
  5. Stop the first subscriber client. Observe: the next two subscriber clients start processing messages.

The hypothesis here is that the entire backlog is stuck in the gRPC and other buffers, between the server and the client. So the server thinks the messages are out and being processed, while the client code can't really see the messages. When new clients connect, the server does not have anything to send them. Killing the original client effectively "nacks" the messages in the buffer, by sending a stream close signal to the server. This allows the server to start sending messages to the other clients.

import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import org.threeten.bp.ZonedDateTime;
import org.threeten.bp.format.DateTimeFormatter;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Sub{

    private static AtomicInteger messageCounter = new AtomicInteger(0);
    private static String appInstanceId = ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HHmmss"));

    static class MessageHandler implements MessageReceiver {

        @Override
        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            try {
                TimeUnit.SECONDS.sleep(10);
                System.out.println(
                        ZonedDateTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")) + ",\t"
                        + "App instance id" + appInstanceId
                        + ",\tProcessing id: " + messageCounter.incrementAndGet()
                        + ",\tmessage content:" + message.getData().toStringUtf8()
                );
                consumer.ack();
            }
            catch (InterruptedException e){
                consumer.nack();
            }
        }
    }

    /** Receive messages over a subscription. */
    public static void main(String... args) throws Exception {
        // set subscriber id, eg. my-sub
        String projectId = args[0];
        String subscriptionId = args[1];
        ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of( projectId, subscriptionId);
        Subscriber subscriber = null;
        try {
            // we create a single threaded subscriber with the most restrictive flow control setting:
            subscriber =
                    Subscriber.newBuilder(subscriptionName, new MessageHandler())
                            .setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1L).build())
                            .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build())
                            .build();
            subscriber.startAsync().awaitTerminated();
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync();
            }
        }
    }
}
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-pubsub</artifactId>
        <version>0.42.1-beta</version>
    </dependency>
Logs
# start first client at 17:00:34
17:00:47,	App instance id170034,	Processing id: 1,	message content:1
17:00:57,	App instance id170034,	Processing id: 2,	message content:5
17:01:07,	App instance id170034,	Processing id: 3,	message content:9
17:01:17,	App instance id170034,	Processing id: 4,	message content:4
17:01:27,	App instance id170034,	Processing id: 5,	message content:8
17:01:37,	App instance id170034,	Processing id: 6,	message content:3
17:01:47,	App instance id170034,	Processing id: 7,	message content:7
17:01:57,	App instance id170034,	Processing id: 8,	message content:2
17:02:07,	App instance id170034,	Processing id: 9,	message content:6
17:02:17,	App instance id170034,	Processing id: 10,	message content:13
17:02:27,	App instance id170034,	Processing id: 11,	message content:17
17:02:37,	App instance id170034,	Processing id: 12,	message content:12
17:02:47,	App instance id170034,	Processing id: 13,	message content:16
17:02:57,	App instance id170034,	Processing id: 14,	message content:11
17:03:07,	App instance id170034,	Processing id: 15,	message content:15
17:03:17,	App instance id170034,	Processing id: 16,	message content:10
17:03:27,	App instance id170034,	Processing id: 17,	message content:14
17:03:37,	App instance id170034,	Processing id: 18,	message content:22

# start second and third clients around here: they generate no logs

17:03:47,	App instance id170034,	Processing id: 19,	message content:26
17:03:57,	App instance id170034,	Processing id: 20,	message content:21
17:04:07,	App instance id170034,	Processing id: 21,	message content:25
17:04:17,	App instance id170034,	Processing id: 22,	message content:19
# kill first client

Process finished with exit code 130 (interrupted by signal 2: SIGINT)
# second client logs (note they start "10 seconds" after the first client is killed
17:04:35,	App instance id170328,	Processing id: 1,	message content:39
17:04:45,	App instance id170328,	Processing id: 2,	message content:47
17:04:55,	App instance id170328,	Processing id: 3,	message content:20
17:05:05,	App instance id170328,	Processing id: 4,	message content:40
17:05:15,	App instance id170328,	Processing id: 5,	message content:38
17:05:25,	App instance id170328,	Processing id: 6,	message content:46
17:05:35,	App instance id170328,	Processing id: 7,	message content:27
17:05:45,	App instance id170328,	Processing id: 8,	message content:42
17:05:55,	App instance id170328,	Processing id: 9,	message content:55
17:06:05,	App instance id170328,	Processing id: 10,	message content:48
17:06:15,	App instance id170328,	Processing id: 11,	message content:56
17:06:25,	App instance id170328,	Processing id: 12,	message content:54
17:06:35,	App instance id170328,	Processing id: 13,	message content:49
17:06:45,	App instance id170328,	Processing id: 14,	message content:57
17:07:39,	App instance id170328,	Processing id: 15,	message content:18
17:07:49,	App instance id170328,	Processing id: 16,	message content:23
17:08:19,	App instance id170328,	Processing id: 17,	message content:30
17:08:29,	App instance id170328,	Processing id: 18,	message content:34
17:09:39,	App instance id170328,	Processing id: 19,	message content:32

# third client logs (note they start "10 seconds" after the first client is killed
17:04:35,	App instance id170328,	Processing id: 1,	message content:39
17:04:45,	App instance id170328,	Processing id: 2,	message content:47
17:04:55,	App instance id170328,	Processing id: 3,	message content:20
17:05:05,	App instance id170328,	Processing id: 4,	message content:40
17:05:15,	App instance id170328,	Processing id: 5,	message content:38
17:05:25,	App instance id170328,	Processing id: 6,	message content:46
17:05:35,	App instance id170328,	Processing id: 7,	message content:27
17:05:45,	App instance id170328,	Processing id: 8,	message content:42
17:05:55,	App instance id170328,	Processing id: 9,	message content:55
17:06:05,	App instance id170328,	Processing id: 10,	message content:48
17:06:15,	App instance id170328,	Processing id: 11,	message content:56
17:06:25,	App instance id170328,	Processing id: 12,	message content:54
17:06:35,	App instance id170328,	Processing id: 13,	message content:49
17:06:45,	App instance id170328,	Processing id: 14,	message content:57
17:07:39,	App instance id170328,	Processing id: 15,	message content:18
17:07:49,	App instance id170328,	Processing id: 16,	message content:23
17:08:19,	App instance id170328,	Processing id: 17,	message content:30
17:08:29,	App instance id170328,	Processing id: 18,	message content:34
17:09:39,	App instance id170328,	Processing id: 19,	message content:32

PubSub : add the concept of message abandonment

PusubMessage / AckReplyConsumer needs the concept of "abandoning" a message.

Abandoning a message would mean that the message ack deadline would no longer be auto-extended.

public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
	try {

		// message processing

		consumer.ack();
	} catch (Throwable t) {
		// In an effort to prevent a tight message delivery / processing loop,
		// we want to abandon the message (stop extending the ack deadline and
		// let the message expire "naturally")
		
		consumer.abandon();
	}
}

Local PubSub Helper

I would like a class like the LocalDatastoreHelper to start the PubSub Emulator directly from my code.

I want to contribute on this if you guys plan to continue supporting this kind of feature.

Thanks.

Make google-cloud-pubsub's Subscriber more unit test-friendly

While building unit tests for the Spring Integration Pub/Sub channel adapters, we needed to simulate the behaviour of a Subscriber receiving a message. The recommended way to do testing in google-cloud-pubsub seems to be to spin up a local emulator, according to these instructions, but the Cloud SDK is prohibitive to follow this approach.

I looked for a way to inject mocks into a Subscriber, but nothing seems to have the desired effect of triggering a received message.

Add PubSub Push message Proto definition

Is your feature request related to a problem? Please describe.

There is no PubSub proto message for the incoming push subscription messages described here. And thus, anyone who'd like to receive a Protobuf equivalent of the push message has to create its own proto wrapper around specified JSON representation.

Describe the solution you'd like
It'd be great to have an appropriate Protobuf message equivalent for the defined JSON representation:

   {
     "message": {
       "attributes": {
         "key": "value"
       },
       "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
       "messageId": "136969346945"
     },
     "subscription": "projects/myproject/subscriptions/mysubscription"
   }

Describe alternatives you've considered
I'm currently using my own Proto definition:

syntax = "proto3";

package google.pubsub.v1;

option java_multiple_files = true;
option java_outer_classname = "PubsubPushProto";
option java_package = "com.google.pubsub.v1";

import "google/pubsub/v1/pubsub.proto";

message PubsubPushNotification {

    // Pubsub message.
    PubsubMessage message = 1;

    // The name of the Pubsub subscription that pushed the current notification.
    // Format is `projects/{project}/subscriptions/{subscription}`.
    string subscription = 2;
}

Additional context
I'm currently using such a Protobuf as an input parameter for Spring's RestController POST handler. The conversion is done with the help of com.google.protobuf.util.JsonFormat.Parser class.

sync pull causes DeadlineExceededException

doing a sync pull on a subscription, and if there is no message, it throws DeadlineExceededException. Usually, if a pull times out, it would just return null. Also, pull wait time interval is usually configuration, how can this be configured?

Caused by: com.google.api.gax.rpc.DeadlineExceededException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 4.978775875s. [buffered_nanos=291856840, buffered_nanos=331252614, remote_addr=pubsub.googleapis.com/172.217.12.138:443]
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:51)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1039)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1165)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:958)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:749)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:522)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:497)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
	at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
	at io.grpc.internal.ClientCallImpl$1CloseInContext.runInContext(ClientCallImpl.java:416)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	... 6 more

PubSub: Subscriber doesn't close connections after stopAsync

Environment details

  1. PubSub: google-cloud-pubsub-1.98.0
  2. Ubuntu 19.04, x64
  3. Java version: zulu8.38.0.13-ca-jdk8.0.212-linux_x64
  4. google-cloud-java version(s): google-cloud-core-1.91.3.jar, google-cloud-core-grpc-1.91.3.jar, google-cloud-pubsub-1.98.0.jar

Steps to reproduce

  1. Configure Subscriber
  2. Start Subscriber, receive several messages, send ack.
  3. Stop Subscriber, await termination.
  4. Don's stop JVM, check connections using nenstat, for example: netstat -natp | grep java | grep 443
  5. There is at least one established connection.
  6. Also logs contain messages like following:

io.grpc.internal.AbstractClientStream$TransportState inboundDataReceived
INFO: Received data on closed stream

  1. There are several runnable threads "grpc-default-worker-ELG-..."

Code example

// configure subscriber (from samples)
subscriber.startAsync().awaitRunning();
Thread.sleep(30000); //some messages are received
subscriber.stopAsync().awaitTerminated();
// do other job, check connections using netstat
Thread.sleep(Long.MAX_VALUE);

It looks like that the Subscriber instance doesn't shutdown the field com.google.cloud.pubsub.v1.Subscriber#subStub. This field isn't added to the backgroundResources, also shutdown() or shutdownNow() method isn't called. That's why grpc channel isn't closed.
Whey I call subStub.shutdownNow() using java reflection, the problem disappears.

com.google.api.core.ApiFuture<String> should also return timestamp and not just message id

I would like to know the timestamp of the message which was generated by the server when the message was published for one of my audit use cases as well as logging. Currently we only receive the message id, but would like to extend this to object which can contain metadata of the message such as timestamp , message id (in future we can add any more information) rather than just returning string.

public com.google.api.core.ApiFuture publish(PubsubMessage message) should be
public com.google.api.core.ApiFuture publish(PubsubMessage message) , whereas PubsubMessage should contain the server timestamp populated.

Occasional NotFoundException when attempting to pull messages from subscription immediately after creation.

I am utilizing this library as part of an effort to build out a test automation framework. The basic aim is to create a wrapper class around the already existing Subscriber functionality to abstract out the lower level details and make creating, polling, and deleting subscriptions as easy as possible. A plausible test scenario is as follows:

  • Create a pull subscription on a preexisting topic and project
  • Create an instance of the subscriber and start
  • Trigger some workflow/task that should publish to subscribed topic
  • Pull messages from the subscriber and validate the response
  • Delete the subscription and cleanup

Java version: 1.8.0_201
google-cloud-pubsub version: 1.102.1

With my implementation, I am able to create the pull subscription, start the subscription, receive x incoming messages, stop, and delete the subscription. However, it seems like something is off because occasionally I will receive a NotFoundException due to the resource (projects/{project}/subscriptions/{subscription}) not being found. My immediate thought was that the resource is sometimes unavailable because I am not giving it any time in between creating and starting the subscription, and thus it's possible the subscription had not completed the creation process before I started polling. However, it seems like this issue is linked more to the repetition of the workflow, rather than the time between calls. In other words, I am more likely to see this error every other execution, as opposed to it being related to the amount of time I wait after creating the subscription to begin polling. I'm not a java expert and have referenced a lot of examples to implement my wrapper class so I'm afraid that I'm doing things against the general paradigms that this library was built upon.

Code example

Things to note about my naive PubSubSubscriber implementation

  • One instance of PubSubSubscriber per subscription. Allow for multiple instances to be running asynchronously
  • Each instance of PubSubSubscriber has its own Subscriber attribute, and a queue (I'm using LinkedBlockingDeque). I've read that it's important to use a queue that is thread safe but I'm not exactly sure how much this matters if I'm creating a new instance of it per instance of the PubSubSubscriber. Is it simply incorrect to be creating an instance of LinkedBlockingDeque per PubSubSubscriber if the queue is already designed to be thread-safe?
  • I have implemented a static method in this class called createPullSubscription(). Since it is static, it doesn't require an instance to be created in order to call this method; does that cause issues if I create an instance after creating the pull subscription so that I can subscribe and pull from the newly created subscription?

Basic constructor looks something like this:

  public PubSubSubscriber(String projectId, String subscriptionId)  {
    this.PROJECT_ID = projectId;
    this.SUBSCRIPTION_ID = subscriptionId;
    this.messages = new LinkedBlockingDeque<>();
      ProjectSubscriptionName subscriptionName = ProjectSubscriptionName
          .of(this.PROJECT_ID, this.SUBSCRIPTION_ID);

    this.subscriber = Subscriber.newBuilder(subscriptionName,
          ((PubsubMessage message, AckReplyConsumer consumer) -> {
            this.messages.offer(message);
            consumer.ack();
            })
          )
        .build();

The createPullSubscription method (which resides in the same class) looks like:

  public static void createPullSubscription(String projectId, String topicId, String subscriptionId)
      throws IOException {

    ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
    ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
        projectId, subscriptionId);

    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
      Subscription subscription = subscriptionAdminClient.createSubscription(subscriptionName, topicName,
          PushConfig.getDefaultInstance(), 0);
    } catch (ApiException e) {
      System.out.println(e.getStatusCode().getCode());
      System.out.println(e.isRetryable());
    }

Test code in main method:

public static void main(String... args) {
  PubSubSubscriber.createPullSubscription("my-project", "my-topic", "my-test-sub");
  PubSubSubscriber ps = new PubSubSubscriber("my-project", "my-test-sub");

  // this simply calls ps.subscriber.startAsync().awaitRunning();
  ps.start();

  // Receive Messages on the subscription based on a key and value to look for (my own method that uses poll())
  JsonArray messages = ps.getMessages("id", testId);

  // this simply calls ps.subscriber.stopAsync().awaitTerminated();
  ps.stop();

  // also my own static method to delete
  PubSubSubscriber.deleteSubscription("my-project", "my-test-sub");
}

Stack trace

Feb 03, 2020 4:40:02 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$1 onFailure
SEVERE: terminated streaming with exception
com.google.api.gax.rpc.NotFoundException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=projects/my-project/subscriptions/my-test-sub).
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:45)
	at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$1.onFailure(StreamingSubscriberConnection.java:238)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1039)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1165)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:958)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:749)
	at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95)
	at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77)
	at com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:52)
	at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$StreamingPullResponseObserver.onError(StreamingSubscriberConnection.java:174)
	at com.google.api.gax.tracing.TracedResponseObserver.onError(TracedResponseObserver.java:103)
	at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:84)
	at com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:86)
	at com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:149)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:521)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:641)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:529)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:703)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:692)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=projects/my-project/subscriptions/my-test-sub).
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:45)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
	at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:82)
	... 25 more
Caused by: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=projects/my-project/subscriptions/my-test-sub).
	at io.grpc.Status.asRuntimeException(Status.java:533)
	... 24 more

Any additional information below

  • The error doesn't occur during instantiation of the class for a new subscription. It occurs when I attempt to pull messages. (this.messages.poll())
  • I've tried modifying flow control settings, parallel pull counts, making state checks, etc. Nothing seems to change the outcome.
  • The only "pattern" I've been able to identify is that it rarely happens twice in a row, and it's pretty common for it to happen every other run, but sometimes it will work 2-3 times consecutively before crashing with that error. The resource and project names for the generated subscription have remained unchanged for each run.

Is there something obvious I'm missing here? Is it possible I'm not shutting down threads somehow? I'm using startAsync() and stopAsync() to start and stop and have even tried awaiting for Running/Terminated states. Is there anything else regarding the resource status that I can continuously check for before attempting to connect to a resource to ensure that the resource is available? Any and all help (including ripping apart my naive implementation) is appreciated.

Thanks!

Synthesis failed for java-pubsub

Hello! Autosynth couldn't regenerate java-pubsub. ๐Ÿ’”

Here's the output from running synth.py:

Cloning into 'working_repo'...
Switched to a new branch 'autosynth'
Cloning into '/tmpfs/tmp/tmp2arqvjk5/googleapis'...
Cloning into '/tmpfs/tmp/tmp2arqvjk5/synthtool'...
2020-05-05 21:32:56 [INFO] PR already exists: https://github.com/googleapis/java-pubsub/pull/174
2020-05-05 21:32:56 [INFO] PR already exists: https://github.com/googleapis/java-pubsub/pull/175
Switched to branch 'autosynth-synthtool'
Note: checking out '486ed4130a17a0fa411a30e7c48bd04082e68458'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by performing another checkout.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -b with the checkout command again. Example:

  git checkout -b <new-branch-name>

HEAD is now at 486ed41 deps: update dependency com.google.api.grpc:proto-google-common-protos to v1.18.0 (#176)
Note: checking out 'ab883569eb0257bbf16a6d825fd018b3adde3912'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by performing another checkout.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -b with the checkout command again. Example:

  git checkout -b <new-branch-name>

HEAD is now at ab88356 fix: make .kokoro-autosynth executable (#522)
Note: checking out 'b5e9b0b09cc15c263f07da92fa64db31b2e4dd07'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by performing another checkout.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -b with the checkout command again. Example:

  git checkout -b <new-branch-name>

HEAD is now at b5e9b0b0 feat: add `order` field to `TagField` and `TagTemplateField` message in tags.proto fix: broken document links & polish comments feat: add csharp/ruby/PHP namespaces chore: update copyright for proto files
Switched to a new branch 'autosynth-synthtool-19'
2020-05-05 21:32:56 [INFO] Running synthtool
2020-05-05 21:32:56 [INFO] ['/tmpfs/src/github/synthtool/env/bin/python3', '-m', 'synthtool', '--metadata', 'synth.metadata', 'synth.py', '--']
2020-05-05 21:32:56,805 synthtool > Executing /tmpfs/src/github/synthtool/working_repo/synth.py.
On branch autosynth-synthtool-19
nothing to commit, working tree clean
2020-05-05 21:32:56,935 synthtool > Ensuring dependencies.
2020-05-05 21:32:56,946 synthtool > Cloning googleapis.
2020-05-05 21:32:56,947 synthtool > Using precloned repo /tmpfs/tmp/tmp2arqvjk5/googleapis
2020-05-05 21:32:56,952 synthtool > Generating code for: //google/pubsub/v1:google-cloud-pubsub-v1-java.
2020-05-05 21:32:57,062 synthtool > Failed executing bazel build //google/pubsub/v1:google-cloud-pubsub-v1-java:

Starting local Bazel server and connecting to it...
Server crashed during startup. Now printing /home/kbuilder/.cache/bazel/_bazel_kbuilder/d453506891d057cfcc6976d9195ca24d/server/jvm.out
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000715200000, 247463936, 0) failed; error='Not enough space' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 247463936 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /tmpfs/tmp/tmp2arqvjk5/googleapis/hs_err_pid25733.log

2020-05-05 21:32:57,063 synthtool > Wrote metadata to synth.metadata.
Traceback (most recent call last):
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/tmpfs/src/github/synthtool/synthtool/__main__.py", line 102, in <module>
    main()
  File "/tmpfs/src/github/synthtool/env/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/github/synthtool/env/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/github/synthtool/env/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/github/synthtool/env/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/github/synthtool/synthtool/__main__.py", line 94, in main
    spec.loader.exec_module(synth_module)  # type: ignore
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/tmpfs/src/github/synthtool/working_repo/synth.py", line 425, in <module>
    bazel_target=f'//google/{service}/{version}:google-cloud-{service}-{version}-java',
  File "/tmpfs/src/github/synthtool/synthtool/languages/java.py", line 310, in bazel_library
    library = gapic.java_library(service=service, version=version, **kwargs)
  File "/tmpfs/src/github/synthtool/synthtool/gcp/gapic_bazel.py", line 63, in java_library
    return self._generate_code(service, version, "java", **kwargs)
  File "/tmpfs/src/github/synthtool/synthtool/gcp/gapic_bazel.py", line 177, in _generate_code
    shell.run(bazel_run_args)
  File "/tmpfs/src/github/synthtool/synthtool/shell.py", line 39, in run
    raise exc
  File "/tmpfs/src/github/synthtool/synthtool/shell.py", line 33, in run
    encoding="utf-8",
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/subprocess.py", line 438, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['bazel', 'build', '//google/pubsub/v1:google-cloud-pubsub-v1-java']' returned non-zero exit status 37.
2020-05-05 21:32:57 [ERROR] Synthesis failed
HEAD is now at 486ed41 deps: update dependency com.google.api.grpc:proto-google-common-protos to v1.18.0 (#176)
Switched to branch 'autosynth-synthtool'
Note: checking out '486ed4130a17a0fa411a30e7c48bd04082e68458'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by performing another checkout.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -b with the checkout command again. Example:

  git checkout -b <new-branch-name>

HEAD is now at 486ed41 deps: update dependency com.google.api.grpc:proto-google-common-protos to v1.18.0 (#176)
HEAD is now at ab88356 fix: make .kokoro-autosynth executable (#522)
Previous HEAD position was b5e9b0b0 feat: add `order` field to `TagField` and `TagTemplateField` message in tags.proto fix: broken document links & polish comments feat: add csharp/ruby/PHP namespaces chore: update copyright for proto files
HEAD is now at a3a0bf0f BREAKING_CHANGE: Removing TimeSeriesQueryLanguageCondition as an alert condition type. The condition type is unsupported and unused. It was originally added for the Monitoring Query Language Alpha feature. refactor!: Drop support for TimeSeriesQueryLanguageCondition as an alert condition type.
Switched to a new branch 'autosynth-49'
2020-05-05 21:32:57 [INFO] Running synthtool
2020-05-05 21:32:57 [INFO] ['/tmpfs/src/github/synthtool/env/bin/python3', '-m', 'synthtool', '--metadata', 'synth.metadata', 'synth.py', '--']
2020-05-05 21:32:57,357 synthtool > Executing /tmpfs/src/github/synthtool/working_repo/synth.py.
On branch autosynth-49
nothing to commit, working tree clean
2020-05-05 21:32:57,487 synthtool > Ensuring dependencies.
2020-05-05 21:32:57,498 synthtool > Cloning googleapis.
2020-05-05 21:32:57,499 synthtool > Using precloned repo /tmpfs/tmp/tmp2arqvjk5/googleapis
2020-05-05 21:32:57,503 synthtool > Generating code for: //google/pubsub/v1:google-cloud-pubsub-v1-java.
2020-05-05 21:32:57,614 synthtool > Failed executing bazel build //google/pubsub/v1:google-cloud-pubsub-v1-java:

Starting local Bazel server and connecting to it...
Server crashed during startup. Now printing /home/kbuilder/.cache/bazel/_bazel_kbuilder/d453506891d057cfcc6976d9195ca24d/server/jvm.out
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000715200000, 247463936, 0) failed; error='Not enough space' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 247463936 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /tmpfs/tmp/tmp2arqvjk5/googleapis/hs_err_pid25768.log

2020-05-05 21:32:57,615 synthtool > Wrote metadata to synth.metadata.
Traceback (most recent call last):
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/tmpfs/src/github/synthtool/synthtool/__main__.py", line 102, in <module>
    main()
  File "/tmpfs/src/github/synthtool/env/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/github/synthtool/env/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/github/synthtool/env/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/github/synthtool/env/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/github/synthtool/synthtool/__main__.py", line 94, in main
    spec.loader.exec_module(synth_module)  # type: ignore
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/tmpfs/src/github/synthtool/working_repo/synth.py", line 425, in <module>
    bazel_target=f'//google/{service}/{version}:google-cloud-{service}-{version}-java',
  File "/tmpfs/src/github/synthtool/synthtool/languages/java.py", line 310, in bazel_library
    library = gapic.java_library(service=service, version=version, **kwargs)
  File "/tmpfs/src/github/synthtool/synthtool/gcp/gapic_bazel.py", line 63, in java_library
    return self._generate_code(service, version, "java", **kwargs)
  File "/tmpfs/src/github/synthtool/synthtool/gcp/gapic_bazel.py", line 177, in _generate_code
    shell.run(bazel_run_args)
  File "/tmpfs/src/github/synthtool/synthtool/shell.py", line 39, in run
    raise exc
  File "/tmpfs/src/github/synthtool/synthtool/shell.py", line 33, in run
    encoding="utf-8",
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/subprocess.py", line 438, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['bazel', 'build', '//google/pubsub/v1:google-cloud-pubsub-v1-java']' returned non-zero exit status 37.
2020-05-05 21:32:57 [ERROR] Synthesis failed
HEAD is now at 486ed41 deps: update dependency com.google.api.grpc:proto-google-common-protos to v1.18.0 (#176)
Switched to branch 'autosynth'
Traceback (most recent call last):
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 584, in <module>
    main()
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 465, in main
    return _inner_main(temp_dir)
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 574, in _inner_main
    commit_count = synthesize_loop(x, multiple_prs, change_pusher, synthesizer)
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 360, in synthesize_loop
    synthesize_inner_loop(toolbox, synthesizer)
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 370, in synthesize_inner_loop
    synthesizer, len(toolbox.versions) - 1
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 259, in synthesize_version_in_new_branch
    synthesizer.synthesize(self.environ)
  File "/tmpfs/src/github/synthtool/autosynth/synthesizer.py", line 115, in synthesize
    synth_proc.check_returncode()  # Raise an exception.
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/subprocess.py", line 389, in check_returncode
    self.stderr)
subprocess.CalledProcessError: Command '['/tmpfs/src/github/synthtool/env/bin/python3', '-m', 'synthtool', '--metadata', 'synth.metadata', 'synth.py', '--']' returned non-zero exit status 1.

Google internal developers can see the full log here.

Expose ProxyDetector from gRPC library

Thanks for stopping by to let us know something could be better!

Is your feature request related to a problem? Please describe.
There is no way to define proxy parameters without defining variables at the JVM level which can create side effects for systems running in the same JVM.

Describe the solution you'd like
I opened a feature request on grpc/grpc-java#7043 and they suggested to use ProxyDetector but I didn't find a way to leverage this in the PubSub client.

Describe alternatives you've considered
Running things in different JVMs but that's a no-go for us.

Additional context
Any other context or screenshots about the feature request.

Synthesis failed for java-pubsub

Hello! Autosynth couldn't regenerate java-pubsub. ๐Ÿ’”

Here's the output from running synth.py:

der/.cache/synthtool/synthtool
.github/ISSUE_TEMPLATE/bug_report.md
.github/ISSUE_TEMPLATE/feature_request.md
.github/ISSUE_TEMPLATE/support_request.md
.github/PULL_REQUEST_TEMPLATE.md
.github/release-please.yml
.github/trusted-contribution.yml
.github/workflows/ci.yaml
.kokoro/build.bat
.kokoro/build.sh
.kokoro/coerce_logs.sh
.kokoro/common.cfg
.kokoro/common.sh
.kokoro/continuous/common.cfg
.kokoro/continuous/dependencies.cfg
.kokoro/continuous/integration.cfg
.kokoro/continuous/java11.cfg
.kokoro/continuous/java7.cfg
.kokoro/continuous/java8-osx.cfg
.kokoro/continuous/java8-win.cfg
.kokoro/continuous/java8.cfg
.kokoro/continuous/lint.cfg
.kokoro/continuous/propose_release.cfg
.kokoro/continuous/samples.cfg
.kokoro/dependencies.sh
.kokoro/linkage-monitor.sh
.kokoro/nightly/common.cfg
.kokoro/nightly/dependencies.cfg
.kokoro/nightly/integration.cfg
.kokoro/nightly/java11.cfg
.kokoro/nightly/java7.cfg
.kokoro/nightly/java8-osx.cfg
.kokoro/nightly/java8-win.cfg
.kokoro/nightly/java8.cfg
.kokoro/nightly/lint.cfg
.kokoro/nightly/samples.cfg
.kokoro/presubmit/clirr.cfg
.kokoro/presubmit/common.cfg
.kokoro/presubmit/dependencies.cfg
.kokoro/presubmit/integration.cfg
.kokoro/presubmit/java11.cfg
.kokoro/presubmit/java7.cfg
.kokoro/presubmit/java8-osx.cfg
.kokoro/presubmit/java8-win.cfg
.kokoro/presubmit/java8.cfg
.kokoro/presubmit/linkage-monitor.cfg
.kokoro/presubmit/lint.cfg
.kokoro/presubmit/samples.cfg
.kokoro/release/bump_snapshot.cfg
.kokoro/release/common.cfg
.kokoro/release/common.sh
.kokoro/release/drop.cfg
.kokoro/release/drop.sh
.kokoro/release/promote.cfg
.kokoro/release/promote.sh
.kokoro/release/publish_javadoc.cfg
.kokoro/release/publish_javadoc.sh
.kokoro/release/snapshot.cfg
.kokoro/release/snapshot.sh
.kokoro/release/stage.cfg
.kokoro/release/stage.sh
.kokoro/trampoline.sh
CODE_OF_CONDUCT.md
CONTRIBUTING.md
LICENSE
README.md
codecov.yaml
java.header
license-checks.xml
renovate.json
samples/install-without-bom/pom.xml
samples/pom.xml
samples/snapshot/pom.xml
samples/snippets/pom.xml
2020-05-14 16:40:03,227 synthtool [DEBUG] > existing pom file found (samples/pom.xml) - keeping the existing
2020-05-14 16:40:03,228 synthtool [DEBUG] > existing pom file found (samples/snippets/pom.xml) - keeping the existing
2020-05-14 16:40:03,228 synthtool [DEBUG] > existing pom file found (samples/install-without-bom/pom.xml) - keeping the existing
2020-05-14 16:40:03,228 synthtool [DEBUG] > existing pom file found (samples/snapshot/pom.xml) - keeping the existing
2020-05-14 16:40:03,240 synthtool [DEBUG] > Wrote metadata to synth.metadata.
2020-05-14 16:40:03,295 autosynth [INFO] > Changed files:
2020-05-14 16:40:03,295 autosynth [INFO] > M README.md
 M google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SubscriptionAdminClient.java
 M google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SubscriptionAdminSettings.java
 M google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/TopicAdminClient.java
 M google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/TopicAdminSettings.java
 M google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/GrpcPublisherStub.java
 M google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/GrpcSubscriberStub.java
 M google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/PublisherStub.java
 M google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/PublisherStubSettings.java
 M google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/SubscriberStub.java
 M google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/SubscriberStubSettings.java
 M google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MockPublisherImpl.java
 M google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MockSubscriberImpl.java
 M google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriptionAdminClientTest.java
 M google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/TopicAdminClientTest.java
 M proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PushConfig.java
 M proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/PushConfigOrBuilder.java
 M proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/ReceivedMessage.java
 M proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/ReceivedMessageOrBuilder.java
 M proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/Subscription.java
 M proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/SubscriptionOrBuilder.java
 M proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/TopicName.java
 M proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/UpdateTopicRequest.java
 M proto-google-cloud-pubsub-v1/src/main/java/com/google/pubsub/v1/UpdateTopicRequestOrBuilder.java
 M proto-google-cloud-pubsub-v1/src/main/proto/google/pubsub/v1/pubsub.proto
 M synth.metadata
2020-05-14 16:40:03,296 autosynth [DEBUG] > Running: git log a7759f81c25396207d46532ed389ad4d34879857 -1 --no-decorate --pretty=%B%n%nSource-Author: %an <%ae>%nSource-Date: %ad
2020-05-14 16:40:03,300 autosynth [DEBUG] > Running: git add -A
2020-05-14 16:40:03,311 autosynth [DEBUG] > Running: git commit -m fix: don't crash trying to double branch when forking the toolbox (#537)

Co-authored-by: Jeffrey Rennie <[email protected]>

Source-Author: Jeff Ching <[email protected]>
Source-Date: Tue May 12 10:14:22 2020 -0700
Source-Repo: googleapis/synthtool
Source-Sha: a7759f81c25396207d46532ed389ad4d34879857
Source-Link: https://github.com/googleapis/synthtool/commit/a7759f81c25396207d46532ed389ad4d34879857
[autosynth-75 e14c2f0] fix: don't crash trying to double branch when forking the toolbox (#537)
 26 files changed, 834 insertions(+), 189 deletions(-)
2020-05-14 16:40:03,342 autosynth [DEBUG] > Running: git reset --hard HEAD
HEAD is now at e14c2f0 fix: don't crash trying to double branch when forking the toolbox (#537)
2020-05-14 16:40:03,350 autosynth [DEBUG] > Running: git checkout autosynth
Switched to branch 'autosynth'
2020-05-14 16:40:03,357 autosynth [DEBUG] > Running: git diff HEAD..autosynth-75 -- . :(exclude)synth.metadata
2020-05-14 16:40:03,361 autosynth [DEBUG] > Running: git diff HEAD..autosynth-75 -- synth.metadata
2020-05-14 16:40:03,365 autosynth [DEBUG] > Running: git diff HEAD..autosynth-76 -- . :(exclude)synth.metadata
2020-05-14 16:40:03,370 autosynth [DEBUG] > Running: git diff HEAD autosynth-76
2020-05-14 16:40:03,374 autosynth [DEBUG] > Running: git apply /tmpfs/tmp/tmp_b8nd9dz/autosynth-76.patch
2020-05-14 16:40:03,377 autosynth [DEBUG] > Running: git add -A
2020-05-14 16:40:03,383 autosynth [DEBUG] > Running: git commit -m docs: update CONTRIBUTING.md to include code formatting (#534)

Co-authored-by: Jeff Ching <[email protected]>
Co-authored-by: Jeffrey Rennie <[email protected]>

Source-Author: Brian Chen <[email protected]>
Source-Date: Tue May 12 10:24:59 2020 -0700
Source-Repo: googleapis/synthtool
Source-Sha: 5b48b0716a36ca069db3038da7e205c87a22ed19
Source-Link: https://github.com/googleapis/synthtool/commit/5b48b0716a36ca069db3038da7e205c87a22ed19
[autosynth 35da8fe] docs: update CONTRIBUTING.md to include code formatting (#534)
 2 files changed, 13 insertions(+), 4 deletions(-)
2020-05-14 16:40:03,393 autosynth [DEBUG] > Running: git diff autosynth-76..autosynth-82 -- . :(exclude)synth.metadata
2020-05-14 16:40:03,398 autosynth [DEBUG] > Running: git diff autosynth-76..autosynth-82 -- synth.metadata
2020-05-14 16:40:03,402 autosynth [DEBUG] > Running: git diff autosynth-82..autosynth-94 -- . :(exclude)synth.metadata
2020-05-14 16:40:03,406 autosynth [DEBUG] > Running: git diff autosynth-82..autosynth-94 -- synth.metadata
2020-05-14 16:40:03,410 autosynth [DEBUG] > Running: git push --force origin autosynth
To https://github.com/googleapis/java-pubsub.git
 + de14eb7...35da8fe autosynth -> autosynth (forced update)
2020-05-14 16:40:08,145 autosynth [DEBUG] > Running: git log -5 --pretty=%b
2020-05-14 16:40:08,729 autosynth [ERROR] > Error making request (422): Validation Failed
2020-05-14 16:40:08,729 autosynth [DEBUG] > {'message': 'Validation Failed', 'errors': [{'resource': 'PullRequest', 'code': 'custom', 'message': 'A pull request already exists for googleapis:autosynth.'}], 'documentation_url': 'https://developer.github.com/v3/pulls/#create-a-pull-request'}
Traceback (most recent call last):
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/home/kbuilder/.pyenv/versions/3.6.9/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 600, in <module>
    main()
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 472, in main
    return _inner_main(temp_dir)
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 590, in _inner_main
    commit_count = synthesize_loop(x, multiple_prs, change_pusher, synthesizer)
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 368, in synthesize_loop
    toolbox.push_changes(change_pusher)
  File "/tmpfs/src/github/synthtool/autosynth/synth.py", line 292, in push_changes
    pr = change_pusher.push_changes(self.commit_count, self.branch, pr_title)
  File "/tmpfs/src/github/synthtool/autosynth/change_pusher.py", line 92, in push_changes
    body=build_pr_body(synth_log, trailers),
  File "/tmpfs/src/github/synthtool/autosynth/github.py", line 84, in create_pull_request
    return cast(Dict, _get_json_or_raise_exception(response))
  File "/tmpfs/src/github/synthtool/autosynth/github.py", line 389, in _get_json_or_raise_exception
    response.raise_for_status()
  File "/tmpfs/src/github/synthtool/env/lib/python3.6/site-packages/requests/models.py", line 941, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 422 Client Error: Unprocessable Entity for url: https://api.github.com/repos/googleapis/java-pubsub/pulls

Google internal developers can see the full log here.

How to enable or disable DLQ

I could see option for DLQ but not clear how to enable or disable DLQ and what are the configurations required

PubSub: Support maximum ACK deadline extension in streaming pull

We worked with the PubSub backend team recently to diagnose a problem that our application experiences occasionally with NACK'd messages failing to be redelivered in a timely fashion and we traced the issue back to the way the client behaves WRT managing ACK deadline extension for streaming pull.

Our application uses streaming pull for processing long-running operations (i.e. hours) which may yield and NACK so that another instance can finish processing the message (using PubSub to guarantee that the message is eventually ACK'd). When messages are processed to completion, the average time between receiving the message in the client and producing an ACK can be very long compared to the ackDeadline seconds configured on the subscription.

The issue is that when MessageDispatcher automatically extends the ACK deadline, it computes the duration of the extension using the 99.9 percentile latency to ACK with an upper limit of Subscriber.MAX_ACK_DEADLINE_SECONDS (10 minutes), which is 60 times longer than the ackDeadline that we configure on our subscription and 10 times longer than our application can tolerate! If the ACK deadline is extended by 10 minutes and the instance processing the message crashes, the VM dies (or is network isolated), or an ACK/NACK otherwise lost, this means that the message will not get redelivered for up to 10 minutes, which causes outages for our application as we can no longer make progress on the work we were doing for 10 minutes.

In addition, even for our other workloads, where average processing time for message is around 2s, the outliers in the latency distribution mean that the 99.9 percentile latency is quite a lot higher than our average (roughly 20s), so message ackDeadline extension has surprising results. We would expect that the death of a node running one of these operations would cause the messages to be redelivered within 10s, but in reality it can take 20s, which is non-obvious without profiling the ACK response time and reading the source code of MessageDispatcher.

To avoid these problems and make it easier to build systems which rely on deterministic message redelivery on ACK/NACK failure, the Subscriber should support configuration to:

  1. Override the 99.9 percentile latency calculation for determining ackDeadline extension (and just use what the user has configured)
    AND/OR
  2. Specify the maximum (and minimum?) value(s) that can be used for ACK deadline extension (to override the hard-coded 10s/10m boundaries). For example, in our application I would specify a value like 30s for the Subscriber(s) consuming from our long-running job topic and 15s for the Subscriber(s) consuming from short-running job topic.

In leiu of this, we have implemented our own domain-specific republication logic that infers that a message must be in the bad state (ACK deadline extended beyond what is reasonable for our application) and re-republishes the message and then we deal with de-duplicating them on the consumer side, but this is not ideal.

pubsub: expose a MaxDurationPerAckExtension setting for subscribers

For languages that support lease management, we have bound the percentile distribution to 10s - 10m. It would be useful if the upper bound were configurable.

A user might process messages very slowly (8m to ack)
If the application fails 5s after receiving a message (whose ack deadline was set to 8m, because that's the deadline the percentile distribution gave), the user has to wait 7m55s for the message to be redelivered
Users should be able to set MaxDurationPerAckExtension = 20s, which caps an individual modAck RPC to 20s. This would have the downside of causing more modack RPCs to be sent, but the upside that the redelivery time would only be 15s
This setting differs from MaxAckExtensionPeriod, which defines the total amount of time we'll hold a message.
In addition, MaxDurationPerAckExtension should be disabled if set to be 0 (or less), and should default to 0.

For an idea of how this is implemented, please refer to the Go implementation, Github PR.

java-pubsub: add support for retry policy

Add an additional field for subscriptions that allows configuring a RetryPolicy. RetryPolicy allows configuration of how long a message can be delayed before delivery is retried. The default policy is the current behavior, in that messages are redelivered as soon as possible after a NACK or message expiration.

Reference implementation in Go will be tracked under googleapis/google-cloud-go#1942

pubsub: add support for dead letter topics

Add fields necessary to create/update subscriptions with a dead letter policy. This allows messages to be redelivered to a separate dead letter topic after a configured number of
delivery attempts have failed (nack/ack_deadline expired).

In addition to adding DeadLetterPolicy to a Subscription, we need to properly expose the delivery_attempt field in the client. This is done one of two ways, so for detailed implementation details, please consult the detailed information guide that Prad wrote. Feel free to ping me if you need a link to this.

PubSub: subscriber.stopAsync().awaitTerminated() does not destroy respective threads

Environment details

OS type and version: macOS 10.14.5 / Debian 4.9.168-1 (App Engine Flex)
Java version: Oracle 1.8.0_181 / openjdk version "1.8.0_212"
google-cloud-java version: 1.75.0

Steps to reproduce

  1. Create a subscriber, call startAsync() followed by stopAsync() as per documentation
  2. Observe the number of threads before and after calling stopAsync()
  3. The number of threads is growing

Code example

import com.google.api.gax.core.ExecutorProvider
import com.google.api.gax.core.FixedExecutorProvider
import com.google.cloud.ServiceOptions
import com.google.cloud.pubsub.v1.Subscriber
import com.google.pubsub.v1.ProjectSubscriptionName
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

fun main() {
    val subscriptionName = ProjectSubscriptionName.of(
        ServiceOptions.getDefaultProjectId(), "test"
    )

    for (x in 1..2) {
        var i = 0
        val subscriber = Subscriber.newBuilder(subscriptionName)
            { message, consumer -> consumer.ack() }
            .build()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
        subscriber.startAsync().awaitRunning()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
        subscriber.stopAsync().awaitTerminated()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
    }

    for (x in 3..8) {
        var i = 0
        val executorProvider: ExecutorProvider =
            FixedExecutorProvider.create(Executors.newSingleThreadScheduledExecutor())
        val subscriber = Subscriber.newBuilder(subscriptionName)
        { message, consumer -> consumer.ack() }
            .setExecutorProvider(executorProvider)
            .setSystemExecutorProvider(executorProvider)
            .build()
        subscriber.startAsync().awaitRunning()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
        subscriber.stopAsync().awaitTerminated()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
        executorProvider.executor.shutdown()
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries.map { it.key.name }}")
        if (!executorProvider.executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
            val hangingTasks = executorProvider.executor.shutdownNow()
            println("Hanging tasks: $hangingTasks")
        }
        println("$x/${++i}: ${Thread.activeCount()}: ${Thread.getAllStackTraces().entries}")
    }
}

Expected results

The number of active threads before calling startAsync() and after calling stopAsync() shall be equal.

Actual results

When not explicitly providing executor providers, calling startAsync() created five threads and calling stopAsync() does not stop any of them.

When providing Executors.newSingleThreadScheduledExecutor() as executorProvider and systemExecutorProvider and explicitly calling shutdown() on the executor, the result is "better" in a way that the number of threads grow much slower, but it still grows. Further, if three or more such subscribers with a custom executor provider are started and stopped, "ManagedChannel allocation site ("was not shutdown properly") exceptions appear, which suggests that the underlying gRPC state is not being cleaned up properly upon stopAsync().

1/1: 2: [Monitor Ctrl-Break, Finalizer, Reference Handler, main, Signal Dispatcher]
1/2: 7: [grpc-default-worker-ELG-1-1, pool-1-thread-2, Attach Listener, grpc-default-executor-0, Monitor Ctrl-Break, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler, main, Signal Dispatcher]
1/3: 9: [grpc-default-worker-ELG-1-1, pool-1-thread-5, pool-1-thread-2, Attach Listener, Monitor Ctrl-Break, main, Signal Dispatcher, grpc-default-executor-0, pool-1-thread-4, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
2/1: 9: [grpc-default-worker-ELG-1-1, pool-1-thread-5, pool-1-thread-2, Attach Listener, Monitor Ctrl-Break, main, Signal Dispatcher, grpc-default-executor-0, pool-1-thread-4, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
2/2: 14: [grpc-default-worker-ELG-1-1, pool-1-thread-5, pool-1-thread-2, pool-4-thread-2, Attach Listener, Monitor Ctrl-Break, grpc-default-executor-1, main, Signal Dispatcher, pool-4-thread-3, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-4, pool-4-thread-1, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
2/3: 16: [grpc-default-worker-ELG-1-1, pool-1-thread-5, pool-1-thread-2, pool-4-thread-2, Attach Listener, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, Signal Dispatcher, pool-4-thread-3, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-4, pool-4-thread-5, pool-4-thread-1, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
3/1: 19: [grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-5, pool-1-thread-2, grpc-default-executor-2, pool-4-thread-2, Attach Listener, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, Signal Dispatcher, pool-4-thread-3, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-4, pool-4-thread-5, pool-6-thread-1, pool-4-thread-1, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
3/2: 19: [grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-5, pool-1-thread-2, grpc-default-executor-2, pool-4-thread-2, Attach Listener, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, Signal Dispatcher, pool-4-thread-3, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-4, pool-4-thread-5, pool-6-thread-1, pool-4-thread-1, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
3/3: 19: [grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-5, pool-1-thread-2, grpc-default-executor-2, pool-4-thread-2, Attach Listener, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, Signal Dispatcher, pool-4-thread-3, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-4, pool-4-thread-5, pool-6-thread-1, pool-4-thread-1, pool-1-thread-3, pool-1-thread-1, Finalizer, Reference Handler]
3/4: 18: [Thread[grpc-default-worker-ELG-1-1,5,main]=[Ljava.lang.StackTraceElement;@609db546, Thread[grpc-default-worker-ELG-1-3,5,main]=[Ljava.lang.StackTraceElement;@20f5281c, Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@56c4278e, Thread[pool-1-thread-2,5,main]=[Ljava.lang.StackTraceElement;@301eda63, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@3d246ea3, Thread[pool-4-thread-2,5,main]=[Ljava.lang.StackTraceElement;@341814d3, Thread[Attach Listener,9,system]=[Ljava.lang.StackTraceElement;@4397ad89, Thread[pool-4-thread-4,5,main]=[Ljava.lang.StackTraceElement;@59cba5a, Thread[Monitor Ctrl-Break,5,main]=[Ljava.lang.StackTraceElement;@1bd39d3c, Thread[grpc-default-executor-1,5,main]=[Ljava.lang.StackTraceElement;@6f19ac19, Thread[main,5,main]=[Ljava.lang.StackTraceElement;@119cbf96, Thread[Signal Dispatcher,9,system]=[Ljava.lang.StackTraceElement;@71329995, Thread[pool-4-thread-3,5,main]=[Ljava.lang.StackTraceElement;@768fc0f2, Thread[grpc-default-worker-ELG-1-2,5,main]=[Ljava.lang.StackTraceElement;@5454d35e, Thread[grpc-default-executor-0,5,main]=[Ljava.lang.StackTraceElement;@20c0a64d, Thread[pool-1-thread-4,5,main]=[Ljava.lang.StackTraceElement;@455b6df1, Thread[pool-4-thread-5,5,main]=[Ljava.lang.StackTraceElement;@4ddbbdf8, Thread[pool-4-thread-1,5,main]=[Ljava.lang.StackTraceElement;@3f67593e, Thread[pool-1-thread-3,5,main]=[Ljava.lang.StackTraceElement;@1ab06251, Thread[pool-1-thread-1,5,main]=[Ljava.lang.StackTraceElement;@41ab013, Thread[Finalizer,8,system]=[Ljava.lang.StackTraceElement;@14bee915, Thread[Reference Handler,10,system]=[Ljava.lang.StackTraceElement;@1115ec15]
4/1: 21: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-3, Finalizer, Reference Handler, pool-8-thread-1]
4/2: 21: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-3, Finalizer, Reference Handler, pool-8-thread-1]
4/3: 21: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, pool-1-thread-3, Finalizer, Reference Handler, pool-8-thread-1]
4/4: 20: [Thread[grpc-default-worker-ELG-1-1,5,main]=[Ljava.lang.StackTraceElement;@58ce9668, Thread[grpc-default-worker-ELG-1-3,5,main]=[Ljava.lang.StackTraceElement;@172b013, Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@56673b2c, Thread[pool-1-thread-2,5,main]=[Ljava.lang.StackTraceElement;@2796aeae, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@b4711e2, Thread[grpc-default-worker-ELG-1-4,5,main]=[Ljava.lang.StackTraceElement;@1fa1cab1, Thread[pool-4-thread-2,5,main]=[Ljava.lang.StackTraceElement;@70f02c32, Thread[Attach Listener,9,system]=[Ljava.lang.StackTraceElement;@62010f5c, Thread[pool-4-thread-4,5,main]=[Ljava.lang.StackTraceElement;@51fadaff, Thread[Monitor Ctrl-Break,5,main]=[Ljava.lang.StackTraceElement;@401f7633, Thread[grpc-default-executor-1,5,main]=[Ljava.lang.StackTraceElement;@31ff43be, Thread[main,5,main]=[Ljava.lang.StackTraceElement;@5b6ec132, Thread[Signal Dispatcher,9,system]=[Ljava.lang.StackTraceElement;@5c44c582, Thread[pool-4-thread-3,5,main]=[Ljava.lang.StackTraceElement;@67d18ed7, Thread[grpc-default-worker-ELG-1-2,5,main]=[Ljava.lang.StackTraceElement;@2c78d320, Thread[grpc-default-executor-0,5,main]=[Ljava.lang.StackTraceElement;@132e0cc, Thread[pool-1-thread-4,5,main]=[Ljava.lang.StackTraceElement;@7b205dbd, Thread[pool-4-thread-5,5,main]=[Ljava.lang.StackTraceElement;@106cc338, Thread[grpc-default-executor-3,5,main]=[Ljava.lang.StackTraceElement;@7a67e3c6, Thread[pool-4-thread-1,5,main]=[Ljava.lang.StackTraceElement;@6cc558c6, Thread[pool-1-thread-3,5,main]=[Ljava.lang.StackTraceElement;@15713d56, Thread[pool-1-thread-1,5,main]=[Ljava.lang.StackTraceElement;@63f259c3, Thread[Finalizer,8,system]=[Ljava.lang.StackTraceElement;@26ceffa8, Thread[Reference Handler,10,system]=[Ljava.lang.StackTraceElement;@600b90df]
5/1: 23: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, pool-10-thread-1, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
5/2: 23: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, pool-10-thread-1, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
5/3: 22: [pool-1-thread-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
5/4: 22: [Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@22f31dec, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@34c01041, Thread[grpc-default-worker-ELG-1-4,5,main]=[Ljava.lang.StackTraceElement;@76f4b65, Thread[pool-4-thread-2,5,main]=[Ljava.lang.StackTraceElement;@c94fd30, Thread[pool-4-thread-4,5,main]=[Ljava.lang.StackTraceElement;@36328d33, Thread[Monitor Ctrl-Break,5,main]=[Ljava.lang.StackTraceElement;@2c4d1ac, Thread[grpc-default-executor-1,5,main]=[Ljava.lang.StackTraceElement;@7f0d96f2, Thread[main,5,main]=[Ljava.lang.StackTraceElement;@545b995e, Thread[pool-4-thread-3,5,main]=[Ljava.lang.StackTraceElement;@76a2ddf3, Thread[pool-1-thread-4,5,main]=[Ljava.lang.StackTraceElement;@524f3b3a, Thread[grpc-default-executor-4,5,main]=[Ljava.lang.StackTraceElement;@41e68d87, Thread[pool-4-thread-5,5,main]=[Ljava.lang.StackTraceElement;@49ff7d8c, Thread[grpc-default-executor-3,5,main]=[Ljava.lang.StackTraceElement;@29526c05, Thread[pool-4-thread-1,5,main]=[Ljava.lang.StackTraceElement;@2ef14fe, Thread[pool-1-thread-1,5,main]=[Ljava.lang.StackTraceElement;@77102b91, Thread[grpc-default-worker-ELG-1-1,5,main]=[Ljava.lang.StackTraceElement;@45312be2, Thread[grpc-default-worker-ELG-1-3,5,main]=[Ljava.lang.StackTraceElement;@7fb95505, Thread[pool-1-thread-2,5,main]=[Ljava.lang.StackTraceElement;@58be6e8, Thread[Attach Listener,9,system]=[Ljava.lang.StackTraceElement;@7331196b, Thread[Signal Dispatcher,9,system]=[Ljava.lang.StackTraceElement;@3f9342d4, Thread[grpc-default-worker-ELG-1-2,5,main]=[Ljava.lang.StackTraceElement;@ab7395e, Thread[grpc-default-executor-0,5,main]=[Ljava.lang.StackTraceElement;@50d13246, Thread[grpc-default-worker-ELG-1-5,5,main]=[Ljava.lang.StackTraceElement;@2bd08376, Thread[pool-1-thread-3,5,main]=[Ljava.lang.StackTraceElement;@e70f13a, Thread[Finalizer,8,system]=[Ljava.lang.StackTraceElement;@3d3e5463, Thread[Reference Handler,10,system]=[Ljava.lang.StackTraceElement;@64a40280]
Geg 28, 2019 8:11:54 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=9, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:94)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:52)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:43)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:449)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:223)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157)
	at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263)
	at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:268)
	at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148)
	at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:248)
	at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120)
	at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:260)
	at TestKt.main(Test.kt:33)
	at TestKt.main(Test.kt)

Geg 28, 2019 8:11:54 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=1, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:94)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:52)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:43)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:449)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:223)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157)
	at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263)
	at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:268)
	at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148)
	at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:248)
	at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120)
	at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:260)
	at TestKt.main(Test.kt:18)
	at TestKt.main(Test.kt)

Geg 28, 2019 8:11:54 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=16, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:94)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:52)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:43)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:449)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:223)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157)
	at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263)
	at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:268)
	at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148)
	at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:248)
	at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120)
	at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:260)
	at TestKt.main(Test.kt:33)
	at TestKt.main(Test.kt)

Geg 28, 2019 8:11:54 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:94)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:52)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:43)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:449)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:223)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157)
	at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263)
	at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:268)
	at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148)
	at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:248)
	at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120)
	at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:260)
	at TestKt.main(Test.kt:18)
	at TestKt.main(Test.kt)

6/1: 25: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, pool-12-thread-1, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
6/2: 25: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, pool-12-thread-1, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
6/3: 25: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler]
6/4: 24: [Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@6c0d9d86, Thread[grpc-default-executor-5,5,main]=[Ljava.lang.StackTraceElement;@4ce1d6d0, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@24111ef1, Thread[grpc-default-worker-ELG-1-4,5,main]=[Ljava.lang.StackTraceElement;@1f3f02ee, Thread[pool-4-thread-2,5,main]=[Ljava.lang.StackTraceElement;@1fde5d22, Thread[pool-4-thread-4,5,main]=[Ljava.lang.StackTraceElement;@5dcb4f5f, Thread[Monitor Ctrl-Break,5,main]=[Ljava.lang.StackTraceElement;@71812481, Thread[grpc-default-executor-1,5,main]=[Ljava.lang.StackTraceElement;@16ce702d, Thread[main,5,main]=[Ljava.lang.StackTraceElement;@7b94089b, Thread[pool-4-thread-3,5,main]=[Ljava.lang.StackTraceElement;@7ca20101, Thread[pool-1-thread-4,5,main]=[Ljava.lang.StackTraceElement;@47f9738, Thread[grpc-default-executor-4,5,main]=[Ljava.lang.StackTraceElement;@6155d082, Thread[pool-4-thread-5,5,main]=[Ljava.lang.StackTraceElement;@3a5ecce3, Thread[grpc-default-executor-3,5,main]=[Ljava.lang.StackTraceElement;@561868a0, Thread[pool-4-thread-1,5,main]=[Ljava.lang.StackTraceElement;@2ea6e30c, Thread[pool-1-thread-1,5,main]=[Ljava.lang.StackTraceElement;@6138e79a, Thread[grpc-default-worker-ELG-1-1,5,main]=[Ljava.lang.StackTraceElement;@2dcd168a, Thread[grpc-default-worker-ELG-1-3,5,main]=[Ljava.lang.StackTraceElement;@388526fb, Thread[pool-1-thread-2,5,main]=[Ljava.lang.StackTraceElement;@21a21c64, Thread[Attach Listener,9,system]=[Ljava.lang.StackTraceElement;@7803bfd, Thread[Signal Dispatcher,9,system]=[Ljava.lang.StackTraceElement;@42bc14c1, Thread[grpc-default-worker-ELG-1-2,5,main]=[Ljava.lang.StackTraceElement;@531f4093, Thread[grpc-default-worker-ELG-1-6,5,main]=[Ljava.lang.StackTraceElement;@62ef27a8, Thread[grpc-default-executor-0,5,main]=[Ljava.lang.StackTraceElement;@6436a7db, Thread[grpc-default-worker-ELG-1-5,5,main]=[Ljava.lang.StackTraceElement;@460ebd80, Thread[pool-1-thread-3,5,main]=[Ljava.lang.StackTraceElement;@6f3c660a, Thread[Finalizer,8,system]=[Ljava.lang.StackTraceElement;@74f5ce22, Thread[Reference Handler,10,system]=[Ljava.lang.StackTraceElement;@25aca718]
7/1: 27: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, pool-14-thread-1, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-6]
7/2: 27: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, pool-14-thread-1, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-6]
7/3: 27: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-6]
7/4: 26: [Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@2a448449, Thread[grpc-default-executor-5,5,main]=[Ljava.lang.StackTraceElement;@32f232a5, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@43f82e78, Thread[grpc-default-worker-ELG-1-4,5,main]=[Ljava.lang.StackTraceElement;@e54303, Thread[pool-4-thread-2,5,main]=[Ljava.lang.StackTraceElement;@e8df99a, Thread[grpc-default-worker-ELG-1-7,5,main]=[Ljava.lang.StackTraceElement;@2dc995f4, Thread[pool-4-thread-4,5,main]=[Ljava.lang.StackTraceElement;@2f40e5db, Thread[Monitor Ctrl-Break,5,main]=[Ljava.lang.StackTraceElement;@517566b, Thread[grpc-default-executor-1,5,main]=[Ljava.lang.StackTraceElement;@64b73e7a, Thread[main,5,main]=[Ljava.lang.StackTraceElement;@530712d, Thread[pool-4-thread-3,5,main]=[Ljava.lang.StackTraceElement;@2df6226d, Thread[pool-1-thread-4,5,main]=[Ljava.lang.StackTraceElement;@12ed9db6, Thread[grpc-default-executor-4,5,main]=[Ljava.lang.StackTraceElement;@4ff4357f, Thread[pool-4-thread-5,5,main]=[Ljava.lang.StackTraceElement;@49cb9cb5, Thread[grpc-default-executor-3,5,main]=[Ljava.lang.StackTraceElement;@55322aab, Thread[pool-4-thread-1,5,main]=[Ljava.lang.StackTraceElement;@2b4c1d96, Thread[pool-1-thread-1,5,main]=[Ljava.lang.StackTraceElement;@45fd9a4d, Thread[grpc-default-worker-ELG-1-1,5,main]=[Ljava.lang.StackTraceElement;@50468873, Thread[grpc-default-worker-ELG-1-3,5,main]=[Ljava.lang.StackTraceElement;@146587a2, Thread[pool-1-thread-2,5,main]=[Ljava.lang.StackTraceElement;@5f0e9815, Thread[Attach Listener,9,system]=[Ljava.lang.StackTraceElement;@76884e4b, Thread[Signal Dispatcher,9,system]=[Ljava.lang.StackTraceElement;@126945f9, Thread[grpc-default-worker-ELG-1-2,5,main]=[Ljava.lang.StackTraceElement;@2a898881, Thread[grpc-default-worker-ELG-1-6,5,main]=[Ljava.lang.StackTraceElement;@16c63f5, Thread[grpc-default-executor-0,5,main]=[Ljava.lang.StackTraceElement;@35229f85, Thread[grpc-default-worker-ELG-1-5,5,main]=[Ljava.lang.StackTraceElement;@6d3c5255, Thread[pool-1-thread-3,5,main]=[Ljava.lang.StackTraceElement;@b1712f3, Thread[Finalizer,8,system]=[Ljava.lang.StackTraceElement;@6986bbaf, Thread[Reference Handler,10,system]=[Ljava.lang.StackTraceElement;@4879dfad, Thread[grpc-default-executor-6,5,main]=[Ljava.lang.StackTraceElement;@4758820d]
8/1: 29: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-16-thread-1, grpc-default-worker-ELG-1-8, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-7, grpc-default-executor-6]
8/2: 29: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, pool-16-thread-1, grpc-default-worker-ELG-1-8, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-7, grpc-default-executor-6]
8/3: 29: [pool-1-thread-5, grpc-default-executor-5, grpc-default-executor-2, grpc-default-worker-ELG-1-4, pool-4-thread-2, grpc-default-worker-ELG-1-7, pool-4-thread-4, Monitor Ctrl-Break, grpc-default-executor-1, main, pool-4-thread-3, grpc-default-worker-ELG-1-8, pool-1-thread-4, grpc-default-executor-4, pool-4-thread-5, grpc-default-executor-3, pool-4-thread-1, pool-1-thread-1, grpc-default-worker-ELG-1-1, grpc-default-worker-ELG-1-3, pool-1-thread-2, Attach Listener, Signal Dispatcher, grpc-default-worker-ELG-1-2, grpc-default-worker-ELG-1-6, grpc-default-executor-0, grpc-default-worker-ELG-1-5, pool-1-thread-3, Finalizer, Reference Handler, grpc-default-executor-7, grpc-default-executor-6]
8/4: 28: [Thread[pool-1-thread-5,5,main]=[Ljava.lang.StackTraceElement;@1e8ce150, Thread[grpc-default-executor-5,5,main]=[Ljava.lang.StackTraceElement;@604f2bd2, Thread[grpc-default-executor-2,5,main]=[Ljava.lang.StackTraceElement;@1d3a

Stack trace

"pool-20-thread-6" googleapis/google-cloud-java#139 prio=5 os_prio=0 tid=0x00007f33ed9f1000 nid=0x184 waiting on condition [0x00007f33a9556000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007b6a0c940> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

"grpc-default-executor-4" googleapis/google-cloud-java#128 daemon prio=5 os_prio=0 tid=0x00007f33d816e000 nid=0x158 waiting on condition [0x00007f33aa968000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x0000000726f346b0> (a java.util.concurrent.SynchronousQueue$TransferStack)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
	at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
	at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

"Gax-18" googleapis/google-cloud-java#95 daemon prio=5 os_prio=0 tid=0x00007f33b403f800 nid=0x84 waiting on condition [0x00007f33aaa69000]
   java.lang.Thread.State: WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007bb5e2f30> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
	at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

PubSub documentation used

Related issues

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.