Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,7 @@ public final void invalidate() {

@Override
public void close() {
session.onTransactionDone();
span.end();
synchronized (lock) {
isClosed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,14 @@ public ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode queryMode) {
}

@Override
public void close() {}
public void close() {
try {
this.readContextFuture.get().close();
} catch (Throwable ignore) {
// Ignore any errors during close, as this error has already propagated to the user through
// other means.
}
}

/**
* Represents a {@link ReadContext} using a multiplexed session that is not yet ready. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -63,6 +64,8 @@ static class MultiplexedSessionTransaction extends SessionImpl {

private final int singleUseChannelHint;

private boolean done;

MultiplexedSessionTransaction(
MultiplexedSessionDatabaseClient client,
ISpan span,
Expand All @@ -73,6 +76,7 @@ static class MultiplexedSessionTransaction extends SessionImpl {
this.client = client;
this.singleUse = singleUse;
this.singleUseChannelHint = singleUseChannelHint;
this.client.numSessionsAcquired.incrementAndGet();
setCurrentSpan(span);
}

Expand Down Expand Up @@ -103,6 +107,20 @@ void onReadDone() {
}
}

@Override
void onTransactionDone() {
boolean markedDone = false;
synchronized (this) {
if (!this.done) {
this.done = true;
markedDone = true;
}
}
if (markedDone) {
client.numSessionsReleased.incrementAndGet();
}
}

@Override
public void close() {
// no-op, we don't want to delete the multiplexed session.
Expand Down Expand Up @@ -152,6 +170,10 @@ public void close() {
private final AtomicReference<ResourceNotFoundException> resourceNotFoundException =
new AtomicReference<>();

private final AtomicLong numSessionsAcquired = new AtomicLong();

private final AtomicLong numSessionsReleased = new AtomicLong();

/**
* This flag is set to true if the server return UNIMPLEMENTED when we try to create a multiplexed
* session. TODO: Remove once this is guaranteed to be available.
Expand Down Expand Up @@ -239,6 +261,14 @@ boolean isValid() {
return resourceNotFoundException.get() == null;
}

AtomicLong getNumSessionsAcquired() {
return this.numSessionsAcquired;
}

AtomicLong getNumSessionsReleased() {
return this.numSessionsReleased;
}

boolean isMultiplexedSessionsSupported() {
return !this.unimplemented.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ void onError(SpannerException spannerException) {}

void onReadDone() {}

void onTransactionDone() {}

TraceWrapper getTracer() {
return tracer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not directly related to this change, but we should start to replace the use of com.google.common.base.Supplier with java.util.function.Supplier.

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
Expand Down Expand Up @@ -156,7 +155,8 @@ void maybeWaitOnMinSessions() {
}
}

private abstract static class CachedResultSetSupplier implements Supplier<ResultSet> {
private abstract static class CachedResultSetSupplier
implements com.google.common.base.Supplier<ResultSet> {

private ResultSet cached;

Expand Down Expand Up @@ -2265,7 +2265,6 @@ public String getName() {
@Override
public void close() {
synchronized (lock) {
numMultiplexedSessionsReleased++;
if (lastException != null && isDatabaseOrInstanceNotFound(lastException)) {
SessionPool.this.resourceNotFoundException =
MoreObjects.firstNonNull(
Expand Down Expand Up @@ -2771,15 +2770,9 @@ enum Position {
@GuardedBy("lock")
private long numSessionsAcquired = 0;

@GuardedBy("lock")
private long numMultiplexedSessionsAcquired = 0;

@GuardedBy("lock")
private long numSessionsReleased = 0;

@GuardedBy("lock")
private long numMultiplexedSessionsReleased = 0;

@GuardedBy("lock")
private long numIdleSessionsRemoved = 0;

Expand Down Expand Up @@ -2830,7 +2823,9 @@ static SessionPool createPool(
SessionClient sessionClient,
TraceWrapper tracer,
List<LabelValue> labelValues,
Attributes attributes) {
Attributes attributes,
AtomicLong numMultiplexedSessionsAcquired,
AtomicLong numMultiplexedSessionsReleased) {
final SessionPoolOptions sessionPoolOptions = spannerOptions.getSessionPoolOptions();

// A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests.
Expand All @@ -2846,7 +2841,9 @@ static SessionPool createPool(
tracer,
labelValues,
spannerOptions.getOpenTelemetry(),
attributes);
attributes,
numMultiplexedSessionsAcquired,
numMultiplexedSessionsReleased);
}

static SessionPool createPool(
Expand Down Expand Up @@ -2884,7 +2881,9 @@ static SessionPool createPool(
tracer,
SPANNER_DEFAULT_LABEL_VALUES,
openTelemetry,
null);
null,
new AtomicLong(),
new AtomicLong());
}

static SessionPool createPool(
Expand All @@ -2898,7 +2897,9 @@ static SessionPool createPool(
TraceWrapper tracer,
List<LabelValue> labelValues,
OpenTelemetry openTelemetry,
Attributes attributes) {
Attributes attributes,
AtomicLong numMultiplexedSessionsAcquired,
AtomicLong numMultiplexedSessionsReleased) {
SessionPool pool =
new SessionPool(
poolOptions,
Expand All @@ -2912,7 +2913,9 @@ static SessionPool createPool(
tracer,
labelValues,
openTelemetry,
attributes);
attributes,
numMultiplexedSessionsAcquired,
numMultiplexedSessionsReleased);
pool.initPool();
return pool;
}
Expand All @@ -2929,7 +2932,9 @@ private SessionPool(
TraceWrapper tracer,
List<LabelValue> labelValues,
OpenTelemetry openTelemetry,
Attributes attributes) {
Attributes attributes,
AtomicLong numMultiplexedSessionsAcquired,
AtomicLong numMultiplexedSessionsReleased) {
this.options = options;
this.databaseRole = databaseRole;
this.executorFactory = executorFactory;
Expand All @@ -2940,8 +2945,13 @@ private SessionPool(
this.initialReleasePosition = initialReleasePosition;
this.poolMaintainer = new PoolMaintainer();
this.tracer = tracer;
this.initOpenCensusMetricsCollection(metricRegistry, labelValues);
this.initOpenTelemetryMetricsCollection(openTelemetry, attributes);
this.initOpenCensusMetricsCollection(
metricRegistry,
labelValues,
numMultiplexedSessionsAcquired,
numMultiplexedSessionsReleased);
this.initOpenTelemetryMetricsCollection(
openTelemetry, attributes, numMultiplexedSessionsAcquired, numMultiplexedSessionsReleased);
this.waitOnMinSessionsLatch =
options.getMinSessions() > 0 ? new CountDownLatch(1) : new CountDownLatch(0);
this.waitOnMultiplexedSessionsLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -3143,7 +3153,7 @@ boolean isValid() {

/**
* Returns a multiplexed session. The method fallbacks to a regular session if {@link
* SessionPoolOptions#useMultiplexedSession} is not set.
* SessionPoolOptions#getUseMultiplexedSession} is not set.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reference was wrong, and showed up as a warning in IntelliJ

*/
SessionFutureWrapper getMultiplexedSessionWithFallback() throws SpannerException {
if (useMultiplexedSessions()) {
Expand Down Expand Up @@ -3250,8 +3260,6 @@ private void incrementNumSessionsInUse(boolean isMultiplexed) {
maxSessionsInUse = numSessionsInUse;
}
numSessionsAcquired++;
} else {
numMultiplexedSessionsAcquired++;
}
}
}
Expand Down Expand Up @@ -3775,7 +3783,10 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
* exporter, it allows users to monitor client behavior.
*/
private void initOpenCensusMetricsCollection(
MetricRegistry metricRegistry, List<LabelValue> labelValues) {
MetricRegistry metricRegistry,
List<LabelValue> labelValues,
AtomicLong numMultiplexedSessionsAcquired,
AtomicLong numMultiplexedSessionsReleased) {
if (!SpannerOptions.isEnabledOpenCensusMetrics()) {
return;
}
Expand Down Expand Up @@ -3860,18 +3871,14 @@ private void initOpenCensusMetricsCollection(
labelValuesWithRegularSessions, this, sessionPool -> sessionPool.numSessionsAcquired);
numAcquiredSessionsMetric.removeTimeSeries(labelValuesWithMultiplexedSessions);
numAcquiredSessionsMetric.createTimeSeries(
labelValuesWithMultiplexedSessions,
this,
sessionPool -> sessionPool.numMultiplexedSessionsAcquired);
labelValuesWithMultiplexedSessions, this, unused -> numMultiplexedSessionsAcquired.get());

numReleasedSessionsMetric.removeTimeSeries(labelValuesWithRegularSessions);
numReleasedSessionsMetric.createTimeSeries(
labelValuesWithRegularSessions, this, sessionPool -> sessionPool.numSessionsReleased);
numReleasedSessionsMetric.removeTimeSeries(labelValuesWithMultiplexedSessions);
numReleasedSessionsMetric.createTimeSeries(
labelValuesWithMultiplexedSessions,
this,
sessionPool -> sessionPool.numMultiplexedSessionsReleased);
labelValuesWithMultiplexedSessions, this, unused -> numMultiplexedSessionsReleased.get());

List<LabelValue> labelValuesWithBeingPreparedType = new ArrayList<>(labelValues);
labelValuesWithBeingPreparedType.add(NUM_SESSIONS_BEING_PREPARED);
Expand Down Expand Up @@ -3909,7 +3916,10 @@ private void initOpenCensusMetricsCollection(
* an exporter, it allows users to monitor client behavior.
*/
private void initOpenTelemetryMetricsCollection(
OpenTelemetry openTelemetry, Attributes attributes) {
OpenTelemetry openTelemetry,
Attributes attributes,
AtomicLong numMultiplexedSessionsAcquired,
AtomicLong numMultiplexedSessionsReleased) {
if (openTelemetry == null || !SpannerOptions.isEnabledOpenTelemetryMetrics()) {
return;
}
Expand Down Expand Up @@ -3981,7 +3991,8 @@ private void initOpenTelemetryMetricsCollection(
.buildWithCallback(
measurement -> {
measurement.record(this.numSessionsAcquired, attributesRegularSession);
measurement.record(this.numMultiplexedSessionsAcquired, attributesMultiplexedSession);
measurement.record(
numMultiplexedSessionsAcquired.get(), attributesMultiplexedSession);
});

meter
Expand All @@ -3991,7 +4002,8 @@ private void initOpenTelemetryMetricsCollection(
.buildWithCallback(
measurement -> {
measurement.record(this.numSessionsReleased, attributesRegularSession);
measurement.record(this.numMultiplexedSessionsReleased, attributesMultiplexedSession);
measurement.record(
numMultiplexedSessionsReleased.get(), attributesMultiplexedSession);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -271,17 +272,29 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
attributesBuilder.put("database", db.getDatabase());
attributesBuilder.put("instance_id", db.getInstanceId().getName());

boolean useMultiplexedSession =
getOptions().getSessionPoolOptions().getUseMultiplexedSession();
MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient =
useMultiplexedSession
? new MultiplexedSessionDatabaseClient(SpannerImpl.this.getSessionClient(db))
: null;
AtomicLong numMultiplexedSessionsAcquired =
useMultiplexedSession
? multiplexedSessionDatabaseClient.getNumSessionsAcquired()
: new AtomicLong();
AtomicLong numMultiplexedSessionsReleased =
useMultiplexedSession
? multiplexedSessionDatabaseClient.getNumSessionsReleased()
: new AtomicLong();
SessionPool pool =
SessionPool.createPool(
getOptions(),
SpannerImpl.this.getSessionClient(db),
this.tracer,
labelValues,
attributesBuilder.build());
MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient =
getOptions().getSessionPoolOptions().getUseMultiplexedSession()
? new MultiplexedSessionDatabaseClient(SpannerImpl.this.getSessionClient(db))
: null;
attributesBuilder.build(),
numMultiplexedSessionsAcquired,
numMultiplexedSessionsReleased);
pool.maybeWaitOnMinSessions();
DatabaseClientImpl dbClient =
createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3859,12 +3859,12 @@ public void testCreateSessionsFailure_shouldNotPropagateToCloseMethod() {
// Simulate session creation failures on the backend.
mockSpanner.setCreateSessionExecutionTime(
SimulatedExecutionTime.ofStickyException(Status.RESOURCE_EXHAUSTED.asRuntimeException()));
DatabaseClient client =
spannerWithEmptySessionPool.getDatabaseClient(
DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
// This will not cause any failure as getting a session from the pool is guaranteed to be
// non-blocking, and any exceptions will be delayed until actual query execution.
mockSpanner.freeze();
DatabaseClient client =
spannerWithEmptySessionPool.getDatabaseClient(
DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
try (ResultSet rs = client.singleUse().executeQuery(SELECT1)) {
mockSpanner.unfreeze();
SpannerException e = assertThrows(SpannerException.class, rs::next);
Expand Down
Loading