mutations) throws SpannerException {
return createMultiplexedSessionTransaction(/* singleUse= */ false).write(mutations);
@@ -745,7 +558,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
- return createMultiplexedSessionTransaction(/* singleUse= */ true)
+ return createMultiplexedSessionTransaction(/* singleUse= */ false)
.executePartitionedUpdate(stmt, options);
}
@@ -771,7 +584,7 @@ final class MultiplexedSessionMaintainer {
this.clock = clock;
}
- void start() {
+ private synchronized void start() {
// Schedule the maintainer to run once every ten minutes (by default).
long loopFrequencyMillis =
MultiplexedSessionDatabaseClient.this
@@ -786,7 +599,7 @@ void start() {
this::maintain, loopFrequencyMillis, loopFrequencyMillis, TimeUnit.MILLISECONDS);
}
- void stop() {
+ private synchronized void stop() {
if (this.scheduledFuture != null) {
this.scheduledFuture.cancel(false);
}
@@ -811,9 +624,6 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
// ignore any errors during re-creation of the multiplexed session. This means that
// we continue to use the session that has passed its expiration date for now, and
// that a new attempt at creating a new session will be done in 10 minutes from now.
- // The only exception to this rule is if the server returns UNIMPLEMENTED. In that
- // case we invalidate the client and fall back to regular sessions.
- maybeMarkUnimplemented(t);
}
});
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MutableCredentials.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MutableCredentials.java
new file mode 100644
index 00000000000..9d09b9fe268
--- /dev/null
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MutableCredentials.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.spanner;
+
+import com.google.auth.CredentialTypeForMetrics;
+import com.google.auth.Credentials;
+import com.google.auth.RequestMetadataCallback;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import javax.annotation.Nonnull;
+
+/**
+ * A mutable {@link Credentials} implementation that delegates authentication behavior to a scoped
+ * {@link ServiceAccountCredentials} instance.
+ *
+ * This class is intended for scenarios where an application needs to replace the underlying
+ * service account credentials for a long-running Spanner Client.
+ *
+ *
All operations inherited from {@link Credentials} are forwarded to the current delegate,
+ * including request metadata retrieval and token refresh. Calling {@link
+ * #updateCredentials(ServiceAccountCredentials)} replaces the delegate with a newly scoped
+ * credentials instance created from the same scopes that were provided when this object was
+ * constructed.
+ */
+public class MutableCredentials extends Credentials {
+ private volatile ServiceAccountCredentials delegate;
+ private final Set scopes;
+
+ /** Creates a MutableCredentials instance with default spanner scopes. */
+ public MutableCredentials(ServiceAccountCredentials credentials) {
+ this(credentials, SpannerOptions.SCOPES);
+ }
+
+ public MutableCredentials(
+ @Nonnull ServiceAccountCredentials credentials, @Nonnull Set scopes) {
+ Objects.requireNonNull(credentials, "credentials must not be null");
+ Objects.requireNonNull(scopes, "scopes must not be null");
+ if (scopes.isEmpty()) {
+ throw new IllegalArgumentException("Scopes must not be empty");
+ }
+ this.scopes = new java.util.HashSet<>(scopes);
+ delegate = (ServiceAccountCredentials) credentials.createScoped(this.scopes);
+ }
+
+ /**
+ * Replaces the current delegate with a newly scoped credentials instance.
+ *
+ * Note any in-flight RPC may continue to use the old credentials.
+ *
+ *
The provided {@link ServiceAccountCredentials} is scoped using the same scopes that were
+ * supplied when this {@link MutableCredentials} instance was created.
+ *
+ * @param credentials the new base service account credentials to scope and use for client
+ * authorization.
+ */
+ public void updateCredentials(@Nonnull ServiceAccountCredentials credentials) {
+ Objects.requireNonNull(credentials, "credentials must not be null");
+ delegate = (ServiceAccountCredentials) credentials.createScoped(scopes);
+ }
+
+ @Override
+ public String getAuthenticationType() {
+ return delegate.getAuthenticationType();
+ }
+
+ @Override
+ public Map> getRequestMetadata(URI uri) throws IOException {
+ return delegate.getRequestMetadata(uri);
+ }
+
+ @Override
+ public boolean hasRequestMetadata() {
+ return delegate.hasRequestMetadata();
+ }
+
+ @Override
+ public boolean hasRequestMetadataOnly() {
+ return delegate.hasRequestMetadataOnly();
+ }
+
+ @Override
+ public void refresh() throws IOException {
+ delegate.refresh();
+ }
+
+ @Override
+ public void getRequestMetadata(URI uri, Executor executor, RequestMetadataCallback callback) {
+ delegate.getRequestMetadata(uri, executor, callback);
+ }
+
+ @Override
+ public String getUniverseDomain() throws IOException {
+ return delegate.getUniverseDomain();
+ }
+
+ @Override
+ public CredentialTypeForMetrics getMetricsCredentialType() {
+ return delegate.getMetricsCredentialType();
+ }
+}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Mutation.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Mutation.java
index c5a09bc3eea..0545221804c 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Mutation.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Mutation.java
@@ -21,7 +21,9 @@
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ListValue;
+import com.google.protobuf.Timestamp;
import java.io.Serializable;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -87,6 +89,12 @@ public enum Op {
/** Deletes rows from a table. Succeeds whether or not the named rows were present. */
DELETE,
+
+ /** Send a message to a queue, optionally with specified delivery time. */
+ SEND,
+
+ /** Acknowledge a message in a queue. Ack only succeeds if the message still exists. */
+ ACK,
}
private final String table;
@@ -94,6 +102,12 @@ public enum Op {
private final ImmutableList columns;
private final ImmutableList values;
private final KeySet keySet;
+ // Queue related fields
+ private final String queue;
+ private final Key key;
+ private final Value payload;
+ private final Instant deliveryTime;
+ private final boolean ignoreNotFound;
private Mutation(
String table,
@@ -101,11 +115,30 @@ private Mutation(
@Nullable ImmutableList columns,
@Nullable ImmutableList values,
@Nullable KeySet keySet) {
+ this(table, operation, columns, values, keySet, null, null, null, null, false);
+ }
+
+ private Mutation(
+ @Nullable String table,
+ Op operation,
+ @Nullable ImmutableList columns,
+ @Nullable ImmutableList values,
+ @Nullable KeySet keySet,
+ @Nullable String queue,
+ @Nullable Key key,
+ @Nullable Value payload,
+ @Nullable Instant deliveryTime,
+ boolean ignoreNotFound) {
this.table = table;
this.operation = operation;
this.columns = columns;
this.values = values;
this.keySet = keySet;
+ this.queue = queue;
+ this.key = key;
+ this.payload = payload;
+ this.deliveryTime = deliveryTime;
+ this.ignoreNotFound = ignoreNotFound;
}
/**
@@ -153,6 +186,22 @@ public static Mutation delete(String table, KeySet keySet) {
return new Mutation(table, Op.DELETE, null, null, checkNotNull(keySet));
}
+ /**
+ * Returns a builder that can be used to construct an {@link Op#SEND} mutation against {@code
+ * queue}; see the {@code SEND} documentation for mutation semantics.
+ */
+ public static SendBuilder newSendBuilder(String queue) {
+ return new SendBuilder(queue);
+ }
+
+ /**
+ * Returns a builder that can be used to construct an {@link Op#ACK} mutation against {@code
+ * queue}; see the {@code ACK} documentation for mutation semantics.
+ */
+ public static AckBuilder newAckBuilder(String queue) {
+ return new AckBuilder(queue);
+ }
+
/**
* Builder for {@link Op#INSERT}, {@link Op#INSERT_OR_UPDATE}, {@link Op#UPDATE}, and {@link
* Op#REPLACE} mutations.
@@ -227,6 +276,66 @@ private void checkDuplicateColumns(ImmutableList columnNames) {
}
}
+ /** Builder for {@link Op#SEND} mutation. */
+ public static class SendBuilder {
+ private final String queue;
+ private Key key;
+ private Value payload;
+ private Instant deliveryTime;
+
+ private SendBuilder(String queue) {
+ this.queue = checkNotNull(queue);
+ }
+
+ public SendBuilder setKey(Key key) {
+ this.key = checkNotNull(key);
+ return this;
+ }
+
+ public SendBuilder setPayload(Value payload) {
+ this.payload = checkNotNull(payload);
+ return this;
+ }
+
+ public SendBuilder setDeliveryTime(Instant deliveryTime) {
+ this.deliveryTime = deliveryTime;
+ return this;
+ }
+
+ public Mutation build() {
+ checkState(key != null, "Key must be set for Send mutation");
+ checkState(payload != null, "Payload must be set for Send mutation");
+ return new Mutation(
+ null, Op.SEND, null, null, null, queue, key, payload, deliveryTime, false);
+ }
+ }
+
+ /** Builder for {@link Op#ACK} mutation. */
+ public static class AckBuilder {
+ private final String queue;
+ private Key key;
+ private boolean ignoreNotFound = false;
+
+ private AckBuilder(String queue) {
+ this.queue = checkNotNull(queue);
+ }
+
+ public AckBuilder setKey(Key key) {
+ this.key = checkNotNull(key);
+ return this;
+ }
+
+ public AckBuilder setIgnoreNotFound(boolean ignoreNotFound) {
+ this.ignoreNotFound = ignoreNotFound;
+ return this;
+ }
+
+ public Mutation build() {
+ checkState(key != null, "Key must be set for Ack mutation");
+ return new Mutation(null, Op.ACK, null, null, null, queue, key, null, null, ignoreNotFound);
+ }
+ }
+
/** Returns the name of the table that this mutation will affect. */
public String getTable() {
return table;
@@ -248,27 +357,72 @@ public Iterable getColumns() {
}
/**
- * For all types except {@link Op#DELETE}, returns the values that this mutation will write. The
- * number of elements returned is always the same as the number returned by {@link #getColumns()},
- * and the {@code i}th value corresponds to the {@code i}th column.
+ * For all types except {@link Op#DELETE}, {@link Op#SEND}, and {@link Op#ACK}, returns the values
+ * that this mutation will write. The number of elements returned is always the same as the number
+ * returned by {@link #getColumns()}, and the {@code i}th value corresponds to the {@code i}th
+ * column.
*
- * @throws IllegalStateException if {@code operation() == Op.DELETE}
+ * @throws IllegalStateException if {@code operation() == Op.DELETE or operation() == Op.SEND or
+ * operation() == Op.ACK}
*/
public Iterable getValues() {
- checkState(operation != Op.DELETE, "values() cannot be called for a DELETE mutation");
+ checkState(
+ operation != Op.DELETE && operation != Op.SEND && operation != Op.ACK,
+ "values() cannot be called for a DELETE/SEND/ACK mutation");
return values;
}
+ /** Returns the name of the queue that this mutation will affect. */
+ public String getQueue() {
+ checkState(
+ operation == Op.SEND || operation == Op.ACK,
+ "getQueue() can only be called " + "for SEND or ACK mutations");
+ return queue;
+ }
+
+ /** Returns the key of the message to the queue that this mutation will affect. */
+ public Key getKey() {
+ checkState(
+ operation == Op.SEND || operation == Op.ACK,
+ "getKey() can only be called for " + "SEND or ACK mutations");
+ return key;
+ }
+
+ /** Returns the payload of the message to the queue that this mutation will affect. */
+ public Value getPayload() {
+ checkState(operation == Op.SEND, "getPayload() can only be called for a SEND mutation");
+ return payload;
+ }
+
+ /** Returns the delivery timestamp of the message to the queue that this mutation will affect. */
+ @Nullable
+ public Instant getDeliveryTime() {
+ checkState(operation == Op.SEND, "getDeliverTime() can only be called for a SEND mutation");
+ return deliveryTime;
+ }
+
+ /**
+ * Returns whether an error will be ignored for an ACK mutation that affects a message that does
+ * not exist
+ */
+ public boolean getIgnoreNotFound() {
+ checkState(operation == Op.ACK, "getIgnoreNotFound() can only be called for an ACK mutation");
+ return ignoreNotFound;
+ }
+
/**
- * For all types except {@link Op#DELETE}, constructs a map from column name to value. This is
- * mainly intended as a convenience for testing; direct access via {@link #getColumns()} and
- * {@link #getValues()} is more efficient.
+ * For all types except {@link Op#DELETE}, {@link Op#SEND}, and {@link Op#ACK}, constructs a map
+ * from column name to value. This is mainly intended as a convenience for testing; direct access
+ * via {@link #getColumns()} and {@link #getValues()} is more efficient.
*
- * @throws IllegalStateException if {@code operation() == Op.DELETE}, or if any duplicate columns
- * are present. Detection of duplicates does not consider case.
+ * @throws IllegalStateException if {@code operation() == Op.DELETE or operation() == Op.SEND or
+ * operation() == Op.ACK}, or if any duplicate columns are present. Detection of duplicates
+ * does not consider case.
*/
public Map asMap() {
- checkState(operation != Op.DELETE, "asMap() cannot be called for a DELETE mutation");
+ checkState(
+ operation != Op.DELETE && operation != Op.SEND && operation != Op.ACK,
+ "asMap() cannot be called for a DELETE/SEND/ACK mutation");
LinkedHashMap map = new LinkedHashMap<>();
for (int i = 0; i < columns.size(); ++i) {
Value existing = map.put(columns.get(i), values.get(i));
@@ -310,6 +464,25 @@ void toString(StringBuilder b) {
opName = "delete";
isWrite = false;
break;
+ case SEND:
+ // return directly for SEND
+ b.append("send(").append(queue).append('{');
+ b.append("key=").append(key);
+ b.append(", payload=").append(payload);
+ if (deliveryTime != null) {
+ b.append(", deliveryTime=").append(deliveryTime);
+ }
+ b.append("})");
+ return;
+ case ACK:
+ // return directly for ACK
+ b.append("ack(").append(queue).append('{');
+ b.append("key=").append(key);
+ if (ignoreNotFound) {
+ b.append(", ignoreNotFound=true");
+ }
+ b.append("})");
+ return;
default:
throw new AssertionError("Unhandled Op: " + operation);
}
@@ -348,8 +521,24 @@ public boolean equals(Object o) {
}
Mutation that = (Mutation) o;
- return operation == that.operation
- && Objects.equals(table, that.table)
+ if (operation != that.operation) {
+ return false;
+ }
+
+ if (operation == Op.SEND) {
+ return Objects.equals(queue, that.queue)
+ && Objects.equals(key, that.key)
+ && Objects.equals(payload, that.payload)
+ && Objects.equals(deliveryTime, that.deliveryTime);
+ }
+
+ if (operation == Op.ACK) {
+ return Objects.equals(queue, that.queue)
+ && Objects.equals(key, that.key)
+ && Objects.equals(ignoreNotFound, that.ignoreNotFound);
+ }
+
+ return Objects.equals(table, that.table)
&& Objects.equals(columns, that.columns)
&& areValuesEqual(values, that.values)
&& Objects.equals(keySet, that.keySet);
@@ -357,7 +546,8 @@ && areValuesEqual(values, that.values)
@Override
public int hashCode() {
- return Objects.hash(operation, table, columns, values, keySet);
+ return Objects.hash(
+ operation, table, columns, values, keySet, key, payload, deliveryTime, ignoreNotFound);
}
/**
@@ -435,16 +625,8 @@ static com.google.spanner.v1.Mutation toProtoAndReturnRandomMutation(
if (last != null && last.operation == Op.DELETE && mutation.table.equals(last.table)) {
mutation.keySet.appendToProto(keySet);
} else {
- if (proto != null) {
- com.google.spanner.v1.Mutation builtMutation = proto.build();
- out.add(builtMutation);
- // Skip tracking the largest insert mutation if there are mutations other than INSERT.
- if (allMutationsExcludingInsert.isEmpty()
- && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
- largestInsertMutation = builtMutation;
- }
- maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert);
- }
+ largestInsertMutation =
+ flushMutation(out, proto, allMutationsExcludingInsert, largestInsertMutation);
proto = com.google.spanner.v1.Mutation.newBuilder();
com.google.spanner.v1.Mutation.Delete.Builder delete =
proto.getDeleteBuilder().setTable(mutation.table);
@@ -452,6 +634,33 @@ && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
mutation.keySet.appendToProto(keySet);
}
write = null;
+ } else if (mutation.operation == Op.SEND) {
+ largestInsertMutation =
+ flushMutation(out, proto, allMutationsExcludingInsert, largestInsertMutation);
+ proto = com.google.spanner.v1.Mutation.newBuilder();
+ com.google.spanner.v1.Mutation.Send.Builder send =
+ proto
+ .getSendBuilder()
+ .setQueue(mutation.queue)
+ .setKey(mutation.key.toProto())
+ .setPayload(mutation.payload.toProto());
+ if (mutation.getDeliveryTime() != null) {
+ Instant deliveryTime = mutation.getDeliveryTime();
+ Timestamp.Builder timeBuilder =
+ send.getDeliverTimeBuilder()
+ .setSeconds(deliveryTime.getEpochSecond())
+ .setNanos(deliveryTime.getNano());
+ send.setDeliverTime(timeBuilder);
+ }
+ } else if (mutation.operation == Op.ACK) {
+ largestInsertMutation =
+ flushMutation(out, proto, allMutationsExcludingInsert, largestInsertMutation);
+ proto = com.google.spanner.v1.Mutation.newBuilder();
+ proto
+ .getAckBuilder()
+ .setQueue(mutation.queue)
+ .setKey(mutation.getKey().toProto())
+ .setIgnoreNotFound(mutation.ignoreNotFound);
} else {
ListValue.Builder values = ListValue.newBuilder();
for (Value value : mutation.getValues()) {
@@ -464,16 +673,8 @@ && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
// Same as previous mutation: coalesce values to reduce request size.
write.addValues(values);
} else {
- if (proto != null) {
- com.google.spanner.v1.Mutation builtMutation = proto.build();
- out.add(builtMutation);
- // Skip tracking the largest insert mutation if there are mutations other than INSERT.
- if (allMutationsExcludingInsert.isEmpty()
- && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
- largestInsertMutation = builtMutation;
- }
- maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert);
- }
+ largestInsertMutation =
+ flushMutation(out, proto, allMutationsExcludingInsert, largestInsertMutation);
proto = com.google.spanner.v1.Mutation.newBuilder();
switch (mutation.operation) {
case INSERT:
@@ -498,9 +699,26 @@ && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
last = mutation;
}
// Flush last item.
+ largestInsertMutation =
+ flushMutation(out, proto, allMutationsExcludingInsert, largestInsertMutation);
+
+ // Select a random mutation based on the heuristic.
+ if (!allMutationsExcludingInsert.isEmpty()) {
+ return allMutationsExcludingInsert.get(
+ ThreadLocalRandom.current().nextInt(allMutationsExcludingInsert.size()));
+ } else {
+ return largestInsertMutation;
+ }
+ }
+
+ private static com.google.spanner.v1.Mutation flushMutation(
+ List out,
+ com.google.spanner.v1.Mutation.Builder proto,
+ List allMutationsExcludingInsert,
+ com.google.spanner.v1.Mutation largestInsertMutation) {
if (proto != null) {
com.google.spanner.v1.Mutation builtMutation = proto.build();
- out.add(proto.build());
+ out.add(builtMutation);
// Skip tracking the largest insert mutation if there are mutations other than INSERT.
if (allMutationsExcludingInsert.isEmpty()
&& checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
@@ -508,14 +726,7 @@ && checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
}
maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert);
}
-
- // Select a random mutation based on the heuristic.
- if (!allMutationsExcludingInsert.isEmpty()) {
- return allMutationsExcludingInsert.get(
- ThreadLocalRandom.current().nextInt(allMutationsExcludingInsert.size()));
- } else {
- return largestInsertMutation;
- }
+ return largestInsertMutation;
}
// Returns true if the input mutation is of type INSERT and has more values than the current
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
index 1e6ce34d672..116e1aa4fc5 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java
@@ -20,6 +20,7 @@
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ReadRequest.LockHint;
import com.google.spanner.v1.ReadRequest.OrderBy;
+import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.RequestOptions.Priority;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import com.google.spanner.v1.TransactionOptions.ReadWrite.ReadLockMode;
@@ -265,6 +266,37 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
return new PriorityOption(priority);
}
+ /**
+ * Specifying this will add the given client context to the request. The client context is used to
+ * pass side-channel or configuration information to the backend, such as a user ID for a
+ * parameterized secure view.
+ */
+ public static ReadQueryUpdateTransactionOption clientContext(
+ RequestOptions.ClientContext clientContext) {
+ return new ClientContextOption(clientContext);
+ }
+
+ RequestOptions toRequestOptionsProto(boolean isTransactionOption) {
+ if (!hasPriority() && !hasTag() && !hasClientContext()) {
+ return RequestOptions.getDefaultInstance();
+ }
+ RequestOptions.Builder builder = RequestOptions.newBuilder();
+ if (hasPriority()) {
+ builder.setPriority(priority());
+ }
+ if (hasTag()) {
+ if (isTransactionOption) {
+ builder.setTransactionTag(tag());
+ } else {
+ builder.setRequestTag(tag());
+ }
+ }
+ if (hasClientContext()) {
+ builder.setClientContext(clientContext());
+ }
+ return builder.build();
+ }
+
public static TransactionOption maxCommitDelay(Duration maxCommitDelay) {
Preconditions.checkArgument(!maxCommitDelay.isNegative(), "maxCommitDelay should be positive");
return new MaxCommitDelayOption(maxCommitDelay);
@@ -462,6 +494,20 @@ void appendToOptions(Options options) {
}
}
+ static final class ClientContextOption extends InternalOption
+ implements ReadQueryUpdateTransactionOption {
+ private final RequestOptions.ClientContext clientContext;
+
+ ClientContextOption(RequestOptions.ClientContext clientContext) {
+ this.clientContext = clientContext;
+ }
+
+ @Override
+ void appendToOptions(Options options) {
+ options.clientContext = clientContext;
+ }
+ }
+
static final class TagOption extends InternalOption implements ReadQueryUpdateTransactionOption {
private final String tag;
@@ -574,6 +620,7 @@ void appendToOptions(Options options) {
private String filter;
private RpcPriority priority;
private String tag;
+ private RequestOptions.ClientContext clientContext;
private String etag;
private Boolean validateOnly;
private Boolean withExcludeTxnFromChangeStreams;
@@ -666,6 +713,14 @@ Priority priority() {
return priority == null ? null : priority.proto;
}
+ boolean hasClientContext() {
+ return clientContext != null;
+ }
+
+ RequestOptions.ClientContext clientContext() {
+ return clientContext;
+ }
+
boolean hasTag() {
return tag != null;
}
@@ -777,6 +832,9 @@ public String toString() {
if (priority != null) {
b.append("priority: ").append(priority).append(' ');
}
+ if (clientContext != null) {
+ b.append("clientContext: ").append(clientContext).append(' ');
+ }
if (tag != null) {
b.append("tag: ").append(tag).append(' ');
}
@@ -850,6 +908,7 @@ public boolean equals(Object o) {
&& Objects.equals(pageToken(), that.pageToken())
&& Objects.equals(filter(), that.filter())
&& Objects.equals(priority(), that.priority())
+ && Objects.equals(clientContext(), that.clientContext())
&& Objects.equals(tag(), that.tag())
&& Objects.equals(etag(), that.etag())
&& Objects.equals(validateOnly(), that.validateOnly())
@@ -894,6 +953,9 @@ public int hashCode() {
if (priority != null) {
result = 31 * result + priority.hashCode();
}
+ if (clientContext != null) {
+ result = 31 * result + clientContext.hashCode();
+ }
if (tag != null) {
result = 31 * result + tag.hashCode();
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
index 5f0d497c74c..394b8bfbd9e 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java
@@ -16,6 +16,7 @@
package com.google.cloud.spanner;
+import static com.google.cloud.spanner.AbstractReadContext.getChannelHintOptions;
import static com.google.common.base.Preconditions.checkState;
import com.google.api.core.InternalApi;
@@ -42,6 +43,7 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -56,12 +58,16 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction
private final Ticker ticker;
private final IsRetryableInternalError isRetryableInternalErrorPredicate;
private volatile boolean isValid = true;
+ private final Map channelHintOptions;
PartitionedDmlTransaction(SessionImpl session, SpannerRpc rpc, Ticker ticker) {
this.session = session;
this.rpc = rpc;
this.ticker = ticker;
this.isRetryableInternalErrorPredicate = new IsRetryableInternalError();
+ this.channelHintOptions =
+ getChannelHintOptions(
+ session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
}
/**
@@ -79,15 +85,13 @@ long executeStreamingPartitionedUpdate(
boolean foundStats = false;
long updateCount = 0L;
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
- XGoogSpannerRequestId reqId =
- session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
- UpdateOption[] allOptions = new UpdateOption[updateOptions.length + 1];
- System.arraycopy(updateOptions, 0, allOptions, 0, updateOptions.length);
- allOptions[allOptions.length - 1] = new Options.RequestIdOption(reqId);
- Options options = Options.fromUpdateOptions(allOptions);
+ Options options = Options.fromUpdateOptions(updateOptions);
try {
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
+ // The channel ID is set to zero here. It will be filled in later by SpannerRpc when it reads
+ // the channel hint from the options that are passed in.
+ XGoogSpannerRequestId requestId = this.session.getRequestIdCreator().nextRequestId(0);
while (true) {
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);
@@ -95,7 +99,7 @@ long executeStreamingPartitionedUpdate(
try {
ServerStream stream =
rpc.executeStreamingPartitionedDml(
- request, reqId.withOptions(session.getOptions()), remainingTimeout);
+ request, channelHintOptions, requestId, remainingTimeout);
for (PartialResultSet rs : stream) {
if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) {
@@ -110,8 +114,12 @@ long executeStreamingPartitionedUpdate(
} catch (UnavailableException e) {
LOGGER.log(
Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e);
- reqId.incrementAttempt();
request = resumeOrRestartRequest(resumeToken, statement, request, options);
+ if (resumeToken.isEmpty()) {
+ // Create a new xGoogSpannerRequestId if there is no resume token, as that means that
+ // the entire transaction will be retried.
+ requestId = session.getRequestIdCreator().nextRequestId(session.getChannel());
+ }
} catch (InternalException e) {
if (!isRetryableInternalErrorPredicate.apply(e)) {
throw e;
@@ -119,8 +127,12 @@ long executeStreamingPartitionedUpdate(
LOGGER.log(
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
- reqId.incrementAttempt();
request = resumeOrRestartRequest(resumeToken, statement, request, options);
+ if (resumeToken.isEmpty()) {
+ // Create a new xGoogSpannerRequestId if there is no resume token, as that means that
+ // the entire transaction will be retried.
+ requestId = session.getRequestIdCreator().nextRequestId(session.getChannel());
+ }
} catch (AbortedException e) {
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
resumeToken = ByteString.EMPTY;
@@ -128,22 +140,18 @@ long executeStreamingPartitionedUpdate(
updateCount = 0L;
request = newTransactionRequestFrom(statement, options);
// Create a new xGoogSpannerRequestId.
- reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
- } catch (SpannerException e) {
- e.setRequestId(reqId);
- throw e;
+ requestId = session.getRequestIdCreator().nextRequestId(session.getChannel());
}
}
if (!foundStats) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
- "Partitioned DML response missing stats possibly due to non-DML statement as input",
- reqId);
+ "Partitioned DML response missing stats possibly due to non-DML statement as input");
}
LOGGER.log(Level.FINER, "Finished PartitionedUpdate statement");
return updateCount;
} catch (Exception e) {
- throw SpannerExceptionFactory.newSpannerException(e, reqId);
+ throw SpannerExceptionFactory.asSpannerException(e);
}
}
@@ -223,14 +231,11 @@ private ByteString initTransaction(final Options options) {
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
- XGoogSpannerRequestId reqId =
- session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
- Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
+ Transaction tx = rpc.beginTransaction(request, channelHintOptions, true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
- "Failed to init transaction, missing transaction id\n" + session.getName(),
- reqId);
+ "Failed to init transaction, missing transaction id\n" + session.getName());
}
return tx.getId();
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java
index 1240dd631ac..aac7f63c861 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java
@@ -69,10 +69,10 @@ abstract class ResumableStreamIterator extends AbstractIterator stream;
+ private int attempts;
private ByteString resumeToken;
private boolean finished;
- public XGoogSpannerRequestId xGoogRequestId;
- private XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator;
+ private final XGoogSpannerRequestId requestId;
/**
* Indicates whether it is currently safe to retry RPCs. This will be {@code false} if we have
@@ -119,7 +119,8 @@ protected ResumableStreamIterator(
this.errorHandler = errorHandler;
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
- this.xGoogRequestIdCreator = xGoogRequestIdCreator;
+ // The channel is automatically updated by the gRPC client when the request is actually sent.
+ this.requestId = xGoogRequestIdCreator.nextRequestId(0);
}
private ExponentialBackOff newBackOff() {
@@ -187,27 +188,15 @@ private void backoffSleep(Context context, long backoffMillis) throws SpannerExc
}
if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) {
// Woken by context cancellation.
- throw newSpannerExceptionForCancellation(context, null, this.xGoogRequestId);
+ throw newSpannerExceptionForCancellation(context, null);
}
} catch (InterruptedException interruptExcept) {
- throw newSpannerExceptionForCancellation(context, interruptExcept, this.xGoogRequestId);
+ throw newSpannerExceptionForCancellation(context, interruptExcept);
} finally {
context.removeListener(listener);
}
}
- public void ensureNonNullXGoogRequestId() {
- if (this.xGoogRequestId == null) {
- this.xGoogRequestId =
- this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 1 /*attempt*/);
- }
- }
-
- public void incrementXGoogRequestIdAttempt() {
- this.ensureNonNullXGoogRequestId();
- this.xGoogRequestId.incrementAttempt();
- }
-
private enum DirectExecutor implements Executor {
INSTANCE;
@@ -218,7 +207,9 @@ public void execute(Runnable command) {
}
abstract CloseableIterator startStream(
- @Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamMessageListener);
+ @Nullable ByteString resumeToken,
+ AsyncResultSet.StreamMessageListener streamMessageListener,
+ XGoogSpannerRequestId requestId);
/**
* Prepares the iterator for a retry on a different gRPC channel. Returns true if that is
@@ -242,6 +233,11 @@ public boolean isWithBeginTransaction() {
return stream != null && stream.isWithBeginTransaction();
}
+ @Override
+ public boolean isLastStatement() {
+ return stream != null && stream.isLastStatement();
+ }
+
@Override
@InternalApi
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
@@ -299,7 +295,6 @@ protected PartialResultSet computeNext() {
}
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
stream = null;
- incrementXGoogRequestIdAttempt();
try (IScope s = tracer.withSpan(span)) {
long delay = spannerException.getRetryDelayInMillis();
if (delay != -1) {
@@ -321,14 +316,12 @@ protected PartialResultSet computeNext() {
if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts()
&& prepareIteratorForRetryOnDifferentGrpcChannel()) {
stream = null;
- xGoogRequestId = null;
continue;
}
}
}
span.addAnnotation("Stream broken. Not safe to retry", spannerException);
span.setStatus(spannerException);
- spannerException.setRequestId(this.xGoogRequestId);
throw spannerException;
} catch (RuntimeException e) {
span.addAnnotation("Stream broken. Not safe to retry", e);
@@ -347,13 +340,8 @@ private void startGrpcStreaming() {
try (IScope scope = tracer.withSpan(span)) {
// When start a new stream set the Span as current to make the gRPC Span a child of
// this Span.
- stream = checkNotNull(startStream(resumeToken, streamMessageListener));
+ stream = checkNotNull(startStream(resumeToken, streamMessageListener, requestId));
stream.requestPrefetchChunks();
- if (this.xGoogRequestId == null) {
- this.xGoogRequestId =
- this.xGoogRequestIdCreator.nextRequestId(
- 1 /* channelId shall be replaced by the instantiated class. */, 0);
- }
}
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java
index 20c86bdf25b..1fb49f2ced3 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java
@@ -36,7 +36,7 @@
import javax.annotation.concurrent.GuardedBy;
/** Client for creating single sessions and batches of sessions. */
-class SessionClient implements AutoCloseable, XGoogSpannerRequestId.RequestIdCreator {
+class SessionClient implements AutoCloseable {
static class SessionId {
private static final PathTemplate NAME_TEMPLATE =
PathTemplate.create(
@@ -110,15 +110,8 @@ Object value() {
return ImmutableMap.copyOf(tmp);
}
- static Map createRequestOptions(
- long channelId, XGoogSpannerRequestId requestId) {
- return ImmutableMap.of(
- Option.CHANNEL_HINT, channelId,
- Option.REQUEST_ID, requestId);
- }
-
- static Map createRequestOptions(XGoogSpannerRequestId requestId) {
- return ImmutableMap.of(Option.REQUEST_ID, requestId);
+ static Map createRequestOptions(long channelId) {
+ return ImmutableMap.of(Option.CHANNEL_HINT, channelId);
}
private final class BatchCreateSessionsRunnable implements Runnable {
@@ -140,7 +133,7 @@ public void run() {
List sessions;
int remainingSessionsToCreate = sessionCount;
ISpan span =
- spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS, commonAttributes);
+ spanner.getTracer().spanBuilder(SpannerImpl.BATCH_CREATE_SESSIONS, databaseAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
spanner
.getTracer()
@@ -185,7 +178,7 @@ interface SessionConsumer {
private final ExecutorFactory executorFactory;
private final ScheduledExecutorService executor;
private final DatabaseId db;
- private final Attributes commonAttributes;
+ private final Attributes databaseAttributes;
// SessionClient is created long before a DatabaseClientImpl is created,
// as batch sessions are firstly created then later attached to each Client.
@@ -204,7 +197,7 @@ interface SessionConsumer {
this.db = db;
this.executorFactory = executorFactory;
this.executor = executorFactory.get();
- this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
+ this.databaseAttributes = spanner.getTracer().createDatabaseAttributes(db);
}
@Override
@@ -220,12 +213,6 @@ DatabaseId getDatabaseId() {
return db;
}
- @Override
- public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
- return XGoogSpannerRequestId.of(
- this.nthId, channelId, this.nthRequest.incrementAndGet(), attempt);
- }
-
/** Create a single session. */
SessionImpl createSession() {
// The sessionChannelCounter could overflow, but that will just flip it to Integer.MIN_VALUE,
@@ -235,8 +222,8 @@ SessionImpl createSession() {
channelId = sessionChannelCounter;
sessionChannelCounter++;
}
- XGoogSpannerRequestId reqId = nextRequestId(channelId, 1);
- ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.commonAttributes);
+ ISpan span =
+ spanner.getTracer().spanBuilder(SpannerImpl.CREATE_SESSION, this.databaseAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
@@ -245,16 +232,15 @@ SessionImpl createSession() {
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
- createRequestOptions(channelId, reqId));
+ createRequestOptions(channelId));
SessionReference sessionReference =
new SessionReference(
session.getName(),
+ spanner.getOptions().getDatabaseRole(),
session.getCreateTime(),
session.getMultiplexed(),
optionMap(SessionOption.channelHint(channelId)));
- SessionImpl sessionImpl = new SessionImpl(spanner, sessionReference);
- sessionImpl.setRequestIdCreator(this);
- return sessionImpl;
+ return new SessionImpl(spanner, sessionReference);
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
@@ -289,10 +275,7 @@ SessionImpl createMultiplexedSession() {
ISpan span =
spanner
.getTracer()
- .spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.commonAttributes);
- // MultiplexedSession doesn't use a channelId hence this hard-coded value.
- int channelId = 0;
- XGoogSpannerRequestId reqId = nextRequestId(channelId, 1);
+ .spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION, this.databaseAttributes);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
@@ -301,14 +284,17 @@ SessionImpl createMultiplexedSession() {
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
- createRequestOptions(reqId),
+ null,
true);
SessionImpl sessionImpl =
new SessionImpl(
spanner,
new SessionReference(
- session.getName(), session.getCreateTime(), session.getMultiplexed(), null));
- sessionImpl.setRequestIdCreator(this);
+ session.getName(),
+ spanner.getOptions().getDatabaseRole(),
+ session.getCreateTime(),
+ session.getMultiplexed(),
+ null));
span.addAnnotation(
String.format("Request for %d multiplexed session returned %d session", 1, 1));
return sessionImpl;
@@ -422,8 +408,6 @@ private List internalBatchCreateSessions(
.spanBuilderWithExplicitParent(SpannerImpl.BATCH_CREATE_SESSIONS_REQUEST, parent);
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
try (IScope s = spanner.getTracer().withSpan(span)) {
- XGoogSpannerRequestId reqId =
- XGoogSpannerRequestId.of(this.nthId, channelHint, this.nthRequest.incrementAndGet(), 1);
List sessions =
spanner
.getRpc()
@@ -432,7 +416,7 @@ private List internalBatchCreateSessions(
sessionCount,
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
- createRequestOptions(channelHint, reqId));
+ createRequestOptions(channelHint));
span.addAnnotation(
String.format(
"Request for %d sessions returned %d sessions", sessionCount, sessions.size()));
@@ -444,10 +428,10 @@ private List internalBatchCreateSessions(
spanner,
new SessionReference(
session.getName(),
+ spanner.getOptions().getDatabaseRole(),
session.getCreateTime(),
session.getMultiplexed(),
optionMap(SessionOption.channelHint(channelHint))));
- sessionImpl.setRequestIdCreator(this);
res.add(sessionImpl);
}
return res;
@@ -464,8 +448,6 @@ SessionImpl sessionWithId(String name) {
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
}
- SessionImpl sessionImpl = new SessionImpl(spanner, new SessionReference(name, options));
- sessionImpl.setRequestIdCreator(this);
- return sessionImpl;
+ return new SessionImpl(spanner, new SessionReference(name, /* databaseRole= */ null, options));
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
index 68c37561c9d..e70ee390df1 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
@@ -126,31 +126,18 @@ interface SessionTransaction {
private final Clock clock;
private final Map options;
private final ErrorHandler errorHandler;
- private XGoogSpannerRequestId.RequestIdCreator requestIdCreator;
SessionImpl(SpannerImpl spanner, SessionReference sessionReference) {
this(spanner, sessionReference, NO_CHANNEL_HINT);
}
SessionImpl(SpannerImpl spanner, SessionReference sessionReference, int channelHint) {
- this(spanner, sessionReference, channelHint, new XGoogSpannerRequestId.NoopRequestIdCreator());
- }
-
- SessionImpl(
- SpannerImpl spanner,
- SessionReference sessionReference,
- int channelHint,
- XGoogSpannerRequestId.RequestIdCreator requestIdCreator) {
this.spanner = spanner;
this.tracer = spanner.getTracer();
this.sessionReference = sessionReference;
this.clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock();
this.options = createOptions(sessionReference, channelHint);
this.errorHandler = createErrorHandler(spanner.getOptions());
- this.requestIdCreator = requestIdCreator;
- if (this.requestIdCreator == null) {
- throw new IllegalStateException("requestIdCreator must be non-null");
- }
}
static Map createOptions(
@@ -173,6 +160,11 @@ public String getName() {
return sessionReference.getName();
}
+ @Override
+ public String getDatabaseRole() {
+ return sessionReference.getDatabaseRole();
+ }
+
/**
* Updates the session reference with the fallback session. This should only be used for updating
* session reference with regular session in case of unimplemented error in multiplexed session.
@@ -189,6 +181,10 @@ ErrorHandler getErrorHandler() {
return this.errorHandler;
}
+ SpannerImpl getSpanner() {
+ return spanner;
+ }
+
void setCurrentSpan(ISpan span) {
currentSpan = span;
}
@@ -306,12 +302,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
try (IScope s = tracer.withSpan(span)) {
return SpannerRetryHelper.runTxWithRetriesOnAborted(
- () -> {
- // On Aborted, we have to start a fresh request id.
- final XGoogSpannerRequestId reqId = reqIdOrFresh(options);
- return new CommitResponse(
- spanner.getRpc().commit(request, reqId.withOptions(getOptions())));
- });
+ () -> new CommitResponse(spanner.getRpc().commit(request, getOptions())));
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
@@ -320,14 +311,6 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
}
- private XGoogSpannerRequestId reqIdOrFresh(Options options) {
- XGoogSpannerRequestId reqId = options.reqId();
- if (reqId == null) {
- reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
- }
- return reqId;
- }
-
private RequestOptions getRequestOptions(TransactionOption... transactionOptions) {
Options requestOptions = Options.fromTransactionOptions(transactionOptions);
if (requestOptions.hasPriority() || requestOptions.hasTag()) {
@@ -356,7 +339,6 @@ public ServerStream batchWriteAtLeastOnce(
.addAllMutationGroups(mutationGroupsProto);
RequestOptions batchWriteRequestOptions = getRequestOptions(transactionOptions);
Options allOptions = Options.fromTransactionOptions(transactionOptions);
- final XGoogSpannerRequestId reqId = reqIdOrFresh(allOptions);
if (batchWriteRequestOptions != null) {
requestBuilder.setRequestOptions(batchWriteRequestOptions);
}
@@ -365,9 +347,7 @@ public ServerStream batchWriteAtLeastOnce(
}
ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE);
try (IScope s = tracer.withSpan(span)) {
- return spanner
- .getRpc()
- .batchWriteAtLeastOnce(requestBuilder.build(), reqId.withOptions(getOptions()));
+ return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), getOptions());
} catch (Throwable e) {
span.setStatus(e);
throw SpannerExceptionFactory.newSpannerException(e);
@@ -471,8 +451,7 @@ public ApiFuture asyncClose() {
if (getIsMultiplexed()) {
return com.google.api.core.ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
- XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
- return spanner.getRpc().asyncDeleteSession(getName(), reqId.withOptions(getOptions()));
+ return spanner.getRpc().asyncDeleteSession(getName(), getOptions());
}
@Override
@@ -482,8 +461,7 @@ public void close() {
}
ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION);
try (IScope s = tracer.withSpan(span)) {
- XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
- spanner.getRpc().deleteSession(getName(), reqId.withOptions(getOptions()));
+ spanner.getRpc().deleteSession(getName(), getOptions());
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
@@ -511,14 +489,27 @@ ApiFuture beginTransactionAsync(
if (sessionReference.getIsMultiplexed() && mutation != null) {
requestBuilder.setMutationKey(mutation);
}
+ RequestOptions.Builder optionsBuilder =
+ transactionOptions.toRequestOptionsProto(true).toBuilder();
+ RequestOptions.ClientContext defaultClientContext = spanner.getOptions().getClientContext();
+ if (defaultClientContext != null) {
+ RequestOptions.ClientContext.Builder builder = defaultClientContext.toBuilder();
+ if (optionsBuilder.hasClientContext()) {
+ builder.mergeFrom(optionsBuilder.getClientContext());
+ }
+ optionsBuilder.setClientContext(builder.build());
+ }
+ if (!sessionReference.getIsMultiplexed()) {
+ optionsBuilder.clearTransactionTag();
+ }
+ RequestOptions requestOptions = optionsBuilder.build();
+ if (!requestOptions.equals(RequestOptions.getDefaultInstance())) {
+ requestBuilder.setRequestOptions(requestOptions);
+ }
final BeginTransactionRequest request = requestBuilder.build();
final ApiFuture requestFuture;
- XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
try (IScope ignore = tracer.withSpan(span)) {
- requestFuture =
- spanner
- .getRpc()
- .beginTransactionAsync(request, reqId.withOptions(channelHint), routeToLeader);
+ requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader);
}
requestFuture.addListener(
() -> {
@@ -526,7 +517,7 @@ ApiFuture beginTransactionAsync(
Transaction txn = requestFuture.get();
if (txn.getId().isEmpty()) {
throw newSpannerException(
- ErrorCode.INTERNAL, "Missing id in transaction\n" + getName(), reqId);
+ ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
}
span.end();
res.set(txn);
@@ -535,7 +526,7 @@ ApiFuture beginTransactionAsync(
span.end();
res.setException(
SpannerExceptionFactory.newSpannerException(
- e.getCause() == null ? e : e.getCause(), reqId));
+ e.getCause() == null ? e : e.getCause()));
} catch (InterruptedException e) {
span.setStatus(e);
span.end();
@@ -597,12 +588,8 @@ TraceWrapper getTracer() {
return tracer;
}
- public void setRequestIdCreator(XGoogSpannerRequestId.RequestIdCreator creator) {
- this.requestIdCreator = creator;
- }
-
public XGoogSpannerRequestId.RequestIdCreator getRequestIdCreator() {
- return this.requestIdCreator;
+ return this.spanner.getRpc().getRequestIdCreator();
}
int getChannel() {
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionNotFoundException.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionNotFoundException.java
index c17384db3ec..f4a62b1954a 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionNotFoundException.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionNotFoundException.java
@@ -35,7 +35,7 @@ public class SessionNotFoundException extends ResourceNotFoundException {
@Nullable String message,
ResourceInfo resourceInfo,
@Nullable Throwable cause) {
- this(token, message, resourceInfo, cause, null, null);
+ this(token, message, resourceInfo, cause, null);
}
/** Private constructor. Use {@link SpannerExceptionFactory} to create instances. */
@@ -44,8 +44,7 @@ public class SessionNotFoundException extends ResourceNotFoundException {
@Nullable String message,
ResourceInfo resourceInfo,
@Nullable Throwable cause,
- @Nullable ApiException apiException,
- @Nullable XGoogSpannerRequestId reqId) {
- super(token, message, resourceInfo, cause, apiException, reqId);
+ @Nullable ApiException apiException) {
+ super(token, message, resourceInfo, cause, apiException);
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
deleted file mode 100644
index 42a67a66296..00000000000
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
+++ /dev/null
@@ -1,3516 +0,0 @@
-/*
- * Copyright 2017 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.spanner;
-
-import static com.google.cloud.spanner.MetricRegistryConstants.COUNT;
-import static com.google.cloud.spanner.MetricRegistryConstants.GET_SESSION_TIMEOUTS;
-import static com.google.cloud.spanner.MetricRegistryConstants.IS_MULTIPLEXED;
-import static com.google.cloud.spanner.MetricRegistryConstants.MAX_ALLOWED_SESSIONS;
-import static com.google.cloud.spanner.MetricRegistryConstants.MAX_ALLOWED_SESSIONS_DESCRIPTION;
-import static com.google.cloud.spanner.MetricRegistryConstants.MAX_IN_USE_SESSIONS;
-import static com.google.cloud.spanner.MetricRegistryConstants.MAX_IN_USE_SESSIONS_DESCRIPTION;
-import static com.google.cloud.spanner.MetricRegistryConstants.METRIC_PREFIX;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_ACQUIRED_SESSIONS;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_ACQUIRED_SESSIONS_DESCRIPTION;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_IN_USE_SESSIONS;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_READ_SESSIONS;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_RELEASED_SESSIONS;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_RELEASED_SESSIONS_DESCRIPTION;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_SESSIONS_AVAILABLE;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_SESSIONS_BEING_PREPARED;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_SESSIONS_IN_POOL;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_SESSIONS_IN_POOL_DESCRIPTION;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_SESSIONS_IN_USE;
-import static com.google.cloud.spanner.MetricRegistryConstants.NUM_WRITE_SESSIONS;
-import static com.google.cloud.spanner.MetricRegistryConstants.SESSIONS_TIMEOUTS_DESCRIPTION;
-import static com.google.cloud.spanner.MetricRegistryConstants.SESSIONS_TYPE;
-import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_DEFAULT_LABEL_VALUES;
-import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_LABEL_KEYS;
-import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_LABEL_KEYS_WITH_MULTIPLEXED_SESSIONS;
-import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_LABEL_KEYS_WITH_TYPE;
-import static com.google.cloud.spanner.SpannerExceptionFactory.asSpannerException;
-import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.core.SettableApiFuture;
-import com.google.api.gax.core.ExecutorProvider;
-import com.google.api.gax.rpc.ServerStream;
-import com.google.cloud.Timestamp;
-import com.google.cloud.Tuple;
-import com.google.cloud.grpc.GrpcTransportOptions;
-import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
-import com.google.cloud.spanner.Options.QueryOption;
-import com.google.cloud.spanner.Options.ReadOption;
-import com.google.cloud.spanner.Options.TransactionOption;
-import com.google.cloud.spanner.Options.UpdateOption;
-import com.google.cloud.spanner.SessionClient.SessionConsumer;
-import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions;
-import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
-import com.google.cloud.spanner.SpannerImpl.ClosedException;
-import com.google.cloud.spanner.spi.v1.SpannerRpc;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Ticker;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.ForwardingListenableFuture;
-import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.protobuf.Empty;
-import com.google.spanner.v1.BatchWriteResponse;
-import com.google.spanner.v1.ResultSetStats;
-import io.opencensus.metrics.DerivedLongCumulative;
-import io.opencensus.metrics.DerivedLongGauge;
-import io.opencensus.metrics.LabelValue;
-import io.opencensus.metrics.MetricOptions;
-import io.opencensus.metrics.MetricRegistry;
-import io.opencensus.metrics.Metrics;
-import io.opentelemetry.api.OpenTelemetry;
-import io.opentelemetry.api.common.Attributes;
-import io.opentelemetry.api.common.AttributesBuilder;
-import io.opentelemetry.api.metrics.Meter;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-
-/**
- * Maintains a pool of sessions. This class itself is thread safe and is meant to be used
- * concurrently across multiple threads.
- */
-class SessionPool {
-
- private static final Logger logger = Logger.getLogger(SessionPool.class.getName());
- private final TraceWrapper tracer;
- static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession";
-
- /**
- * If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits
- * for the creation of at least {@link SessionPoolOptions#getMinSessions()} in the pool using the
- * given duration. If the waiting times out, a {@link SpannerException} with the {@link
- * ErrorCode#DEADLINE_EXCEEDED} is thrown.
- */
- void maybeWaitOnMinSessions() {
- final long timeoutNanos = options.getWaitForMinSessions().toNanos();
- if (timeoutNanos <= 0) {
- return;
- }
-
- try {
- if (!waitOnMinSessionsLatch.await(timeoutNanos, TimeUnit.NANOSECONDS)) {
- final long timeoutMillis = options.getWaitForMinSessions().toMillis();
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.DEADLINE_EXCEEDED,
- "Timed out after waiting " + timeoutMillis + "ms for session pool creation");
- }
- } catch (InterruptedException e) {
- throw SpannerExceptionFactory.propagateInterrupt(e);
- }
- }
-
- private abstract static class CachedResultSetSupplier
- implements com.google.common.base.Supplier {
-
- private ResultSet cached;
-
- abstract ResultSet load();
-
- ResultSet reload() {
- return cached = load();
- }
-
- @Override
- public ResultSet get() {
- if (cached == null) {
- cached = load();
- }
- return cached;
- }
- }
-
- /**
- * Wrapper around {@code ReadContext} that releases the session to the pool once the call is
- * finished, if it is a single use context.
- */
- private static class AutoClosingReadContext
- implements ReadContext {
- /**
- * {@link AsyncResultSet} implementation that keeps track of the async operations that are still
- * running for this {@link ReadContext} and that should finish before the {@link ReadContext}
- * releases its session back into the pool.
- */
- private class AutoClosingReadContextAsyncResultSetImpl extends AsyncResultSetImpl {
- private AutoClosingReadContextAsyncResultSetImpl(
- ExecutorProvider executorProvider, ResultSet delegate, int bufferRows) {
- super(executorProvider, delegate, bufferRows);
- }
-
- @Override
- public ApiFuture setCallback(Executor exec, ReadyCallback cb) {
- Runnable listener =
- () -> {
- synchronized (lock) {
- if (asyncOperationsCount.decrementAndGet() == 0 && closed) {
- // All async operations for this read context have finished.
- AutoClosingReadContext.this.close();
- }
- }
- };
- try {
- asyncOperationsCount.incrementAndGet();
- addListener(listener);
- return super.setCallback(exec, cb);
- } catch (Throwable t) {
- removeListener(listener);
- asyncOperationsCount.decrementAndGet();
- throw t;
- }
- }
- }
-
- private final Function readContextDelegateSupplier;
- private T readContextDelegate;
- private final SessionPool sessionPool;
- private final SessionReplacementHandler sessionReplacementHandler;
- private final boolean isSingleUse;
- private final AtomicInteger asyncOperationsCount = new AtomicInteger();
-
- private final Object lock = new Object();
-
- @GuardedBy("lock")
- private boolean sessionUsedForQuery = false;
-
- @GuardedBy("lock")
- private I session;
-
- @GuardedBy("lock")
- private boolean closed;
-
- @GuardedBy("lock")
- private boolean delegateClosed;
-
- private AutoClosingReadContext(
- Function delegateSupplier,
- SessionPool sessionPool,
- SessionReplacementHandler sessionReplacementHandler,
- I session,
- boolean isSingleUse) {
- this.readContextDelegateSupplier = delegateSupplier;
- this.sessionPool = sessionPool;
- this.sessionReplacementHandler = sessionReplacementHandler;
- this.session = session;
- this.isSingleUse = isSingleUse;
- }
-
- T getReadContextDelegate() {
- synchronized (lock) {
- if (readContextDelegate == null) {
- while (true) {
- try {
- this.readContextDelegate = readContextDelegateSupplier.apply(this.session);
- break;
- } catch (SessionNotFoundException e) {
- replaceSessionIfPossible(e);
- }
- }
- }
- }
- return readContextDelegate;
- }
-
- private ResultSet wrap(final CachedResultSetSupplier resultSetSupplier) {
- return new ForwardingResultSet(resultSetSupplier) {
- private boolean beforeFirst = true;
-
- @Override
- public boolean next() throws SpannerException {
- while (true) {
- try {
- return internalNext();
- } catch (SessionNotFoundException e) {
- while (true) {
- // Keep the replace-if-possible outside the try-block to let the exception bubble up
- // if it's too late to replace the session.
- replaceSessionIfPossible(e);
- try {
- replaceDelegate(resultSetSupplier.reload());
- break;
- } catch (SessionNotFoundException snfe) {
- e = snfe;
- // retry on yet another session.
- }
- }
- }
- }
- }
-
- private boolean internalNext() {
- try {
- boolean ret = super.next();
- if (beforeFirst) {
- synchronized (lock) {
- session.get().markUsed();
- beforeFirst = false;
- sessionUsedForQuery = true;
- }
- }
- if (!ret && isSingleUse) {
- close();
- }
- return ret;
- } catch (SessionNotFoundException e) {
- throw e;
- } catch (SpannerException e) {
- synchronized (lock) {
- if (!closed && isSingleUse) {
- session.get().setLastException(e);
- AutoClosingReadContext.this.close();
- }
- }
- throw e;
- }
- }
-
- @Override
- public void close() {
- try {
- super.close();
- } finally {
- if (isSingleUse) {
- AutoClosingReadContext.this.close();
- }
- }
- }
- };
- }
-
- private void replaceSessionIfPossible(SessionNotFoundException notFound) {
- synchronized (lock) {
- if (isSingleUse || !sessionUsedForQuery) {
- // This class is only used by read-only transactions, so we know that we only need a
- // read-only session.
- session = sessionReplacementHandler.replaceSession(notFound, session);
- readContextDelegate = readContextDelegateSupplier.apply(session);
- } else {
- throw notFound;
- }
- }
- }
-
- @Override
- public ResultSet read(
- final String table,
- final KeySet keys,
- final Iterable columns,
- final ReadOption... options) {
- return wrap(
- new CachedResultSetSupplier() {
- @Override
- ResultSet load() {
- return getReadContextDelegate().read(table, keys, columns, options);
- }
- });
- }
-
- @Override
- public AsyncResultSet readAsync(
- final String table,
- final KeySet keys,
- final Iterable columns,
- final ReadOption... options) {
- Options readOptions = Options.fromReadOptions(options);
- final int bufferRows =
- readOptions.hasBufferRows()
- ? readOptions.bufferRows()
- : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
- return new AutoClosingReadContextAsyncResultSetImpl(
- sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
- wrap(
- new CachedResultSetSupplier() {
- @Override
- ResultSet load() {
- return getReadContextDelegate().read(table, keys, columns, options);
- }
- }),
- bufferRows);
- }
-
- @Override
- public ResultSet readUsingIndex(
- final String table,
- final String index,
- final KeySet keys,
- final Iterable columns,
- final ReadOption... options) {
- return wrap(
- new CachedResultSetSupplier() {
- @Override
- ResultSet load() {
- return getReadContextDelegate().readUsingIndex(table, index, keys, columns, options);
- }
- });
- }
-
- @Override
- public AsyncResultSet readUsingIndexAsync(
- final String table,
- final String index,
- final KeySet keys,
- final Iterable columns,
- final ReadOption... options) {
- Options readOptions = Options.fromReadOptions(options);
- final int bufferRows =
- readOptions.hasBufferRows()
- ? readOptions.bufferRows()
- : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
- return new AutoClosingReadContextAsyncResultSetImpl(
- sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
- wrap(
- new CachedResultSetSupplier() {
- @Override
- ResultSet load() {
- return getReadContextDelegate()
- .readUsingIndex(table, index, keys, columns, options);
- }
- }),
- bufferRows);
- }
-
- @Override
- @Nullable
- public Struct readRow(String table, Key key, Iterable columns) {
- try {
- while (true) {
- try {
- synchronized (lock) {
- session.get().markUsed();
- }
- return getReadContextDelegate().readRow(table, key, columns);
- } catch (SessionNotFoundException e) {
- replaceSessionIfPossible(e);
- }
- }
- } finally {
- synchronized (lock) {
- sessionUsedForQuery = true;
- }
- if (isSingleUse) {
- close();
- }
- }
- }
-
- @Override
- public ApiFuture readRowAsync(String table, Key key, Iterable columns) {
- try (AsyncResultSet rs = readAsync(table, KeySet.singleKey(key), columns)) {
- return AbstractReadContext.consumeSingleRowAsync(rs);
- }
- }
-
- @Override
- @Nullable
- public Struct readRowUsingIndex(String table, String index, Key key, Iterable columns) {
- try {
- while (true) {
- try {
- synchronized (lock) {
- session.get().markUsed();
- }
- return getReadContextDelegate().readRowUsingIndex(table, index, key, columns);
- } catch (SessionNotFoundException e) {
- replaceSessionIfPossible(e);
- }
- }
- } finally {
- synchronized (lock) {
- sessionUsedForQuery = true;
- }
- if (isSingleUse) {
- close();
- }
- }
- }
-
- @Override
- public ApiFuture readRowUsingIndexAsync(
- String table, String index, Key key, Iterable columns) {
- try (AsyncResultSet rs = readUsingIndexAsync(table, index, KeySet.singleKey(key), columns)) {
- return AbstractReadContext.consumeSingleRowAsync(rs);
- }
- }
-
- @Override
- public ResultSet executeQuery(final Statement statement, final QueryOption... options) {
- return wrap(
- new CachedResultSetSupplier() {
- @Override
- ResultSet load() {
- return getReadContextDelegate().executeQuery(statement, options);
- }
- });
- }
-
- @Override
- public AsyncResultSet executeQueryAsync(
- final Statement statement, final QueryOption... options) {
- Options queryOptions = Options.fromQueryOptions(options);
- final int bufferRows =
- queryOptions.hasBufferRows()
- ? queryOptions.bufferRows()
- : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
- return new AutoClosingReadContextAsyncResultSetImpl(
- sessionPool.sessionClient.getSpanner().getAsyncExecutorProvider(),
- wrap(
- new CachedResultSetSupplier() {
- @Override
- ResultSet load() {
- return getReadContextDelegate().executeQuery(statement, options);
- }
- }),
- bufferRows);
- }
-
- @Override
- public ResultSet analyzeQuery(final Statement statement, final QueryAnalyzeMode queryMode) {
- return wrap(
- new CachedResultSetSupplier() {
- @Override
- ResultSet load() {
- return getReadContextDelegate().analyzeQuery(statement, queryMode);
- }
- });
- }
-
- @Override
- public void close() {
- synchronized (lock) {
- if (closed && delegateClosed) {
- return;
- }
- closed = true;
- if (asyncOperationsCount.get() == 0) {
- if (readContextDelegate != null) {
- readContextDelegate.close();
- }
- session.close();
- delegateClosed = true;
- }
- }
- }
- }
-
- private static class AutoClosingReadTransaction
- extends AutoClosingReadContext implements ReadOnlyTransaction {
-
- AutoClosingReadTransaction(
- Function txnSupplier,
- SessionPool sessionPool,
- SessionReplacementHandler sessionReplacementHandler,
- I session,
- boolean isSingleUse) {
- super(txnSupplier, sessionPool, sessionReplacementHandler, session, isSingleUse);
- }
-
- @Override
- public Timestamp getReadTimestamp() {
- return getReadContextDelegate().getReadTimestamp();
- }
- }
-
- interface SessionReplacementHandler {
- T replaceSession(SessionNotFoundException notFound, T sessionFuture);
-
- T denyListSession(RetryOnDifferentGrpcChannelException retryException, T sessionFuture);
- }
-
- class PooledSessionReplacementHandler implements SessionReplacementHandler {
- @Override
- public PooledSessionFuture replaceSession(
- SessionNotFoundException e, PooledSessionFuture session) {
- if (!options.isFailIfSessionNotFound() && session.get().isAllowReplacing()) {
- synchronized (lock) {
- numSessionsInUse--;
- numSessionsReleased++;
- checkedOutSessions.remove(session);
- markedCheckedOutSessions.remove(session);
- }
- session.leakedException = null;
- invalidateSession(session.get());
- return getSession();
- } else {
- throw e;
- }
- }
-
- @Override
- public PooledSessionFuture denyListSession(
- RetryOnDifferentGrpcChannelException retryException, PooledSessionFuture session) {
- // The feature was not enabled when the session pool was created.
- if (denyListedChannels == null) {
- throw SpannerExceptionFactory.asSpannerException(retryException.getCause());
- }
-
- int channel = session.get().getChannel();
- synchronized (lock) {
- // Calculate the size manually by iterating over the possible keys. We do this because the
- // size of a cache can be stale, and manually checking for each possible key will make sure
- // we get the correct value, and it will update the cache.
- int currentSize = 0;
- for (int i = 0; i < numChannels; i++) {
- if (denyListedChannels.getIfPresent(i) != null) {
- currentSize++;
- }
- }
- if (currentSize < numChannels - 1) {
- denyListedChannels.put(channel, DENY_LISTED);
- } else {
- // We have now deny-listed all channels. Give up and just throw the original error.
- throw SpannerExceptionFactory.asSpannerException(retryException.getCause());
- }
- }
- session.get().releaseToPosition = Position.LAST;
- session.close();
- return getSession();
- }
- }
-
- interface SessionNotFoundHandler {
- /**
- * Handles the given {@link SessionNotFoundException} by possibly converting it to a different
- * exception that should be thrown.
- */
- SpannerException handleSessionNotFound(SessionNotFoundException notFound);
- }
-
- static class SessionPoolResultSet extends ForwardingResultSet {
- private final SessionNotFoundHandler handler;
-
- private SessionPoolResultSet(SessionNotFoundHandler handler, ResultSet delegate) {
- super(delegate);
- this.handler = Preconditions.checkNotNull(handler);
- }
-
- @Override
- public boolean next() {
- try {
- return super.next();
- } catch (SessionNotFoundException e) {
- throw handler.handleSessionNotFound(e);
- }
- }
- }
-
- static class AsyncSessionPoolResultSet extends ForwardingAsyncResultSet {
- private final SessionNotFoundHandler handler;
-
- private AsyncSessionPoolResultSet(SessionNotFoundHandler handler, AsyncResultSet delegate) {
- super(delegate);
- this.handler = Preconditions.checkNotNull(handler);
- }
-
- @Override
- public ApiFuture setCallback(Executor executor, final ReadyCallback callback) {
- return super.setCallback(
- executor,
- resultSet -> {
- try {
- return callback.cursorReady(resultSet);
- } catch (SessionNotFoundException e) {
- throw handler.handleSessionNotFound(e);
- }
- });
- }
-
- @Override
- public boolean next() {
- try {
- return super.next();
- } catch (SessionNotFoundException e) {
- throw handler.handleSessionNotFound(e);
- }
- }
-
- @Override
- public CursorState tryNext() {
- try {
- return super.tryNext();
- } catch (SessionNotFoundException e) {
- throw handler.handleSessionNotFound(e);
- }
- }
- }
-
- /**
- * {@link TransactionContext} that is used in combination with an {@link
- * AutoClosingTransactionManager}. This {@link TransactionContext} handles {@link
- * SessionNotFoundException}s by replacing the underlying session with a fresh one, and then
- * throws an {@link AbortedException} to trigger the retry-loop that has been created by the
- * caller.
- */
- static class SessionPoolTransactionContext implements TransactionContext {
- private final SessionNotFoundHandler handler;
- final TransactionContext delegate;
-
- SessionPoolTransactionContext(SessionNotFoundHandler handler, TransactionContext delegate) {
- this.handler = Preconditions.checkNotNull(handler);
- this.delegate = delegate;
- }
-
- @Override
- public ResultSet read(
- String table, KeySet keys, Iterable columns, ReadOption... options) {
- return new SessionPoolResultSet(handler, delegate.read(table, keys, columns, options));
- }
-
- @Override
- public AsyncResultSet readAsync(
- String table, KeySet keys, Iterable columns, ReadOption... options) {
- return new AsyncSessionPoolResultSet(
- handler, delegate.readAsync(table, keys, columns, options));
- }
-
- @Override
- public ResultSet readUsingIndex(
- String table, String index, KeySet keys, Iterable columns, ReadOption... options) {
- return new SessionPoolResultSet(
- handler, delegate.readUsingIndex(table, index, keys, columns, options));
- }
-
- @Override
- public AsyncResultSet readUsingIndexAsync(
- String table, String index, KeySet keys, Iterable columns, ReadOption... options) {
- return new AsyncSessionPoolResultSet(
- handler, delegate.readUsingIndexAsync(table, index, keys, columns, options));
- }
-
- @Override
- public Struct readRow(String table, Key key, Iterable columns) {
- try {
- return delegate.readRow(table, key, columns);
- } catch (SessionNotFoundException e) {
- throw handler.handleSessionNotFound(e);
- }
- }
-
- @Override
- public ApiFuture readRowAsync(String table, Key key, Iterable columns) {
- try (AsyncResultSet rs = readAsync(table, KeySet.singleKey(key), columns)) {
- return ApiFutures.catching(
- AbstractReadContext.consumeSingleRowAsync(rs),
- SessionNotFoundException.class,
- input -> {
- throw handler.handleSessionNotFound(input);
- },
- MoreExecutors.directExecutor());
- }
- }
-
- @Override
- public void buffer(Mutation mutation) {
- delegate.buffer(mutation);
- }
-
- @Override
- public ApiFuture bufferAsync(Mutation mutation) {
- return delegate.bufferAsync(mutation);
- }
-
- @Override
- public Struct readRowUsingIndex(String table, String index, Key key, Iterable columns) {
- try {
- return delegate.readRowUsingIndex(table, index, key, columns);
- } catch (SessionNotFoundException e) {
- throw handler.handleSessionNotFound(e);
- }
- }
-
- @Override
- public ApiFuture readRowUsingIndexAsync(
- String table, String index, Key key, Iterable columns) {
- try (AsyncResultSet rs = readUsingIndexAsync(table, index, KeySet.singleKey(key), columns)) {
- return ApiFutures.catching(
- AbstractReadContext.consumeSingleRowAsync(rs),
- SessionNotFoundException.class,
- input -> {
- throw handler.handleSessionNotFound(input);
- },
- MoreExecutors.directExecutor());
- }
- }
-
- @Override
- public void buffer(Iterable mutations) {
- delegate.buffer(mutations);
- }
-
- @Override
- public ApiFuture bufferAsync(Iterable mutations) {
- return delegate.bufferAsync(mutations);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public ResultSetStats analyzeUpdate(
- Statement statement, QueryAnalyzeMode analyzeMode, UpdateOption... options) {
- try (ResultSet resultSet = analyzeUpdateStatement(statement, analyzeMode, options)) {
- return resultSet.getStats();
- }
- }
-
- @Override
- public ResultSet analyzeUpdateStatement(
- Statement statement, QueryAnalyzeMode analyzeMode, UpdateOption... options) {
- try {
- return delegate.analyzeUpdateStatement(statement, analyzeMode, options);
- } catch (SessionNotFoundException e) {
- throw handler.handleSessionNotFound(e);
- }
- }
-
- @Override
- public long executeUpdate(Statement statement, UpdateOption... options) {
- try {
- return delegate.executeUpdate(statement, options);
- } catch (SessionNotFoundException e) {
- throw handler.handleSessionNotFound(e);
- }
- }
-
- @Override
- public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... options) {
- return ApiFutures.catching(
- delegate.executeUpdateAsync(statement, options),
- SessionNotFoundException.class,
- input -> {
- throw handler.handleSessionNotFound(input);
- },
- MoreExecutors.directExecutor());
- }
-
- @Override
- public long[] batchUpdate(Iterable statements, UpdateOption... options) {
- try {
- return delegate.batchUpdate(statements, options);
- } catch (SessionNotFoundException e) {
- throw handler.handleSessionNotFound(e);
- }
- }
-
- @Override
- public ApiFuture batchUpdateAsync(
- Iterable statements, UpdateOption... options) {
- return ApiFutures.catching(
- delegate.batchUpdateAsync(statements, options),
- SessionNotFoundException.class,
- input -> {
- throw handler.handleSessionNotFound(input);
- },
- MoreExecutors.directExecutor());
- }
-
- @Override
- public ResultSet executeQuery(Statement statement, QueryOption... options) {
- return new SessionPoolResultSet(handler, delegate.executeQuery(statement, options));
- }
-
- @Override
- public AsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) {
- return new AsyncSessionPoolResultSet(handler, delegate.executeQueryAsync(statement, options));
- }
-
- @Override
- public ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode queryMode) {
- return new SessionPoolResultSet(handler, delegate.analyzeQuery(statement, queryMode));
- }
-
- @Override
- public void close() {
- delegate.close();
- }
- }
-
- private static class AutoClosingTransactionManager
- implements TransactionManager, SessionNotFoundHandler {
- private TransactionManager delegate;
- private T session;
- private final SessionReplacementHandler sessionReplacementHandler;
- private final TransactionOption[] options;
- private boolean closed;
- private boolean restartedAfterSessionNotFound;
-
- AutoClosingTransactionManager(
- T session,
- SessionReplacementHandler sessionReplacementHandler,
- TransactionOption... options) {
- this.session = session;
- this.options = options;
- this.sessionReplacementHandler = sessionReplacementHandler;
- }
-
- @Override
- public TransactionContext begin() {
- this.delegate = session.get().transactionManager(options);
- // This cannot throw a SessionNotFoundException, as it does not call the BeginTransaction RPC.
- // Instead, the BeginTransaction will be included with the first statement of the transaction.
- return internalBegin();
- }
-
- @Override
- public TransactionContext begin(AbortedException exception) {
- // For regular sessions, the input exception is ignored and the behavior is equivalent to
- // calling {@link #begin()}.
- return begin();
- }
-
- private TransactionContext internalBegin() {
- TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin());
- session.get().markUsed();
- return res;
- }
-
- @Override
- public SpannerException handleSessionNotFound(SessionNotFoundException notFoundException) {
- session = sessionReplacementHandler.replaceSession(notFoundException, session);
- CachedSession cachedSession = session.get();
- delegate = cachedSession.getDelegate().transactionManager(options);
- restartedAfterSessionNotFound = true;
- return createAbortedExceptionWithMinimalRetryDelay(notFoundException);
- }
-
- private static SpannerException createAbortedExceptionWithMinimalRetryDelay(
- SessionNotFoundException notFoundException) {
- return SpannerExceptionFactory.newSpannerException(
- ErrorCode.ABORTED,
- notFoundException.getMessage(),
- SpannerExceptionFactory.createAbortedExceptionWithRetryDelay(
- notFoundException.getMessage(), notFoundException, 0, 1));
- }
-
- @Override
- public void commit() {
- try {
- delegate.commit();
- } catch (SessionNotFoundException e) {
- throw handleSessionNotFound(e);
- } finally {
- if (getState() != TransactionState.ABORTED) {
- close();
- }
- }
- }
-
- @Override
- public void rollback() {
- try {
- delegate.rollback();
- } finally {
- close();
- }
- }
-
- @Override
- public TransactionContext resetForRetry() {
- while (true) {
- try {
- if (restartedAfterSessionNotFound) {
- TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin());
- restartedAfterSessionNotFound = false;
- return res;
- } else {
- return new SessionPoolTransactionContext(this, delegate.resetForRetry());
- }
- } catch (SessionNotFoundException e) {
- session = sessionReplacementHandler.replaceSession(e, session);
- CachedSession cachedSession = session.get();
- delegate = cachedSession.getDelegate().transactionManager(options);
- restartedAfterSessionNotFound = true;
- }
- }
- }
-
- @Override
- public Timestamp getCommitTimestamp() {
- return delegate.getCommitTimestamp();
- }
-
- @Override
- public CommitResponse getCommitResponse() {
- return delegate.getCommitResponse();
- }
-
- @Override
- public void close() {
- if (closed) {
- return;
- }
- closed = true;
- try {
- if (delegate != null) {
- delegate.close();
- }
- } finally {
- session.close();
- }
- }
-
- @Override
- public TransactionState getState() {
- if (restartedAfterSessionNotFound) {
- return TransactionState.ABORTED;
- } else {
- return delegate == null ? null : delegate.getState();
- }
- }
- }
-
- /**
- * {@link TransactionRunner} that automatically handles {@link SessionNotFoundException}s by
- * replacing the underlying session and then restarts the transaction.
- */
- static final class SessionPoolTransactionRunner
- implements TransactionRunner {
-
- private I session;
- private final SessionReplacementHandler sessionReplacementHandler;
- private final TransactionOption[] options;
- private TransactionRunner runner;
-
- SessionPoolTransactionRunner(
- I session,
- SessionReplacementHandler sessionReplacementHandler,
- TransactionOption... options) {
- this.session = session;
- this.options = options;
- this.sessionReplacementHandler = sessionReplacementHandler;
- }
-
- private TransactionRunner getRunner() {
- if (this.runner == null) {
- this.runner = session.get().readWriteTransaction(options);
- }
- return runner;
- }
-
- @Override
- @Nullable
- public T run(TransactionCallable callable) {
- try {
- T result;
- while (true) {
- try {
- result = getRunner().run(callable);
- break;
- } catch (SessionNotFoundException e) {
- session = sessionReplacementHandler.replaceSession(e, session);
- CachedSession cachedSession = session.get();
- runner = cachedSession.getDelegate().readWriteTransaction();
- } catch (RetryOnDifferentGrpcChannelException retryException) {
- // This error is thrown by the RetryOnDifferentGrpcChannelErrorHandler in the specific
- // case that a transaction failed with a DEADLINE_EXCEEDED error. This is an
- // experimental feature that is disabled by default, and that can be removed in a
- // future version.
- session = sessionReplacementHandler.denyListSession(retryException, session);
- CachedSession cachedSession = session.get();
- runner = cachedSession.getDelegate().readWriteTransaction();
- }
- }
- session.get().markUsed();
- return result;
- } catch (SpannerException e) {
- //noinspection ThrowableNotThrown
- session.get().setLastException(e);
- throw e;
- } finally {
- session.close();
- }
- }
-
- @Override
- public Timestamp getCommitTimestamp() {
- return getRunner().getCommitTimestamp();
- }
-
- @Override
- public CommitResponse getCommitResponse() {
- return getRunner().getCommitResponse();
- }
-
- @Override
- public TransactionRunner allowNestedTransaction() {
- getRunner().allowNestedTransaction();
- return this;
- }
- }
-
- private static class SessionPoolAsyncRunner implements AsyncRunner {
- private volatile I session;
- private final SessionReplacementHandler sessionReplacementHandler;
- private final TransactionOption[] options;
- private SettableApiFuture commitResponse;
-
- private SessionPoolAsyncRunner(
- I session,
- SessionReplacementHandler sessionReplacementHandler,
- TransactionOption... options) {
- this.session = session;
- this.options = options;
- this.sessionReplacementHandler = sessionReplacementHandler;
- }
-
- @Override
- public ApiFuture runAsync(final AsyncWork work, Executor executor) {
- commitResponse = SettableApiFuture.create();
- final SettableApiFuture res = SettableApiFuture.create();
- executor.execute(
- () -> {
- SpannerException exception = null;
- R r = null;
- AsyncRunner runner = null;
- while (true) {
- SpannerException se = null;
- try {
- runner = session.get().runAsync(options);
- r = runner.runAsync(work, MoreExecutors.directExecutor()).get();
- break;
- } catch (ExecutionException e) {
- se = asSpannerException(e.getCause());
- } catch (InterruptedException e) {
- se = SpannerExceptionFactory.propagateInterrupt(e);
- } catch (Throwable t) {
- se = SpannerExceptionFactory.newSpannerException(t);
- } finally {
- if (se instanceof SessionNotFoundException) {
- try {
- // The replaceSession method will re-throw the SessionNotFoundException if the
- // session cannot be replaced with a new one.
- session =
- sessionReplacementHandler.replaceSession(
- (SessionNotFoundException) se, session);
- } catch (SessionNotFoundException e) {
- exception = e;
- break;
- }
- } else {
- exception = se;
- break;
- }
- }
- }
- session.get().markUsed();
- session.close();
- setCommitResponse(runner);
- if (exception != null) {
- res.setException(exception);
- } else {
- res.set(r);
- }
- });
- return res;
- }
-
- private void setCommitResponse(AsyncRunner delegate) {
- try {
- commitResponse.set(delegate.getCommitResponse().get());
- } catch (Throwable t) {
- commitResponse.setException(t);
- }
- }
-
- @Override
- public ApiFuture getCommitTimestamp() {
- checkState(commitResponse != null, "runAsync() has not yet been called");
- return ApiFutures.transform(
- commitResponse, CommitResponse::getCommitTimestamp, MoreExecutors.directExecutor());
- }
-
- @Override
- public ApiFuture getCommitResponse() {
- checkState(commitResponse != null, "runAsync() has not yet been called");
- return commitResponse;
- }
- }
-
- // Exception class used just to track the stack trace at the point when a session was handed out
- // from the pool.
- final class LeakedSessionException extends RuntimeException {
- private static final long serialVersionUID = 1451131180314064914L;
-
- private LeakedSessionException() {
- super("Session was checked out from the pool at " + clock.instant());
- }
-
- private LeakedSessionException(String message) {
- super(message);
- }
- }
-
- private enum SessionState {
- AVAILABLE,
- BUSY,
- CLOSING,
- }
-
- private PooledSessionFuture createPooledSessionFuture(
- ListenableFuture future, ISpan span) {
- return new PooledSessionFuture(future, span);
- }
-
- /** Wrapper class for the {@link SessionFuture} implementations. */
- interface SessionFutureWrapper extends DatabaseClient {
-
- /** Method to resolve {@link SessionFuture} implementation for different use-cases. */
- T get();
-
- default Dialect getDialect() {
- return get().getDialect();
- }
-
- default String getDatabaseRole() {
- return get().getDatabaseRole();
- }
-
- default Timestamp write(Iterable mutations) throws SpannerException {
- return get().write(mutations);
- }
-
- default CommitResponse writeWithOptions(
- Iterable mutations, TransactionOption... options) throws SpannerException {
- return get().writeWithOptions(mutations, options);
- }
-
- default Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException {
- return get().writeAtLeastOnce(mutations);
- }
-
- default CommitResponse writeAtLeastOnceWithOptions(
- Iterable mutations, TransactionOption... options) throws SpannerException {
- return get().writeAtLeastOnceWithOptions(mutations, options);
- }
-
- default ServerStream batchWriteAtLeastOnce(
- Iterable mutationGroups, TransactionOption... options)
- throws SpannerException {
- return get().batchWriteAtLeastOnce(mutationGroups, options);
- }
-
- default ReadContext singleUse() {
- return get().singleUse();
- }
-
- default ReadContext singleUse(TimestampBound bound) {
- return get().singleUse(bound);
- }
-
- default ReadOnlyTransaction singleUseReadOnlyTransaction() {
- return get().singleUseReadOnlyTransaction();
- }
-
- default ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
- return get().singleUseReadOnlyTransaction(bound);
- }
-
- default ReadOnlyTransaction readOnlyTransaction() {
- return get().readOnlyTransaction();
- }
-
- default ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
- return get().readOnlyTransaction(bound);
- }
-
- default TransactionRunner readWriteTransaction(TransactionOption... options) {
- return get().readWriteTransaction(options);
- }
-
- default TransactionManager transactionManager(TransactionOption... options) {
- return get().transactionManager(options);
- }
-
- default AsyncRunner runAsync(TransactionOption... options) {
- return get().runAsync(options);
- }
-
- default AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
- return get().transactionManagerAsync(options);
- }
-
- default long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
- return get().executePartitionedUpdate(stmt, options);
- }
- }
-
- class PooledSessionFutureWrapper implements SessionFutureWrapper {
- PooledSessionFuture pooledSessionFuture;
-
- public PooledSessionFutureWrapper(PooledSessionFuture pooledSessionFuture) {
- this.pooledSessionFuture = pooledSessionFuture;
- }
-
- @Override
- public PooledSessionFuture get() {
- return this.pooledSessionFuture;
- }
- }
-
- interface SessionFuture extends Session {
-
- /**
- * We need to do this because every implementation of {@link SessionFuture} today extends {@link
- * SimpleForwardingListenableFuture}. The get() method in parent {@link
- * java.util.concurrent.Future} classes specifies checked exceptions in method signature.
- *
- * This method is a workaround we don't have to handle checked exceptions specified by other
- * interfaces.
- */
- CachedSession get();
-
- default void addListener(Runnable listener, Executor exec) {}
- }
-
- class PooledSessionFuture extends SimpleForwardingListenableFuture
- implements SessionFuture {
-
- private boolean closed;
- private volatile LeakedSessionException leakedException;
- private final AtomicBoolean inUse = new AtomicBoolean();
- private final CountDownLatch initialized = new CountDownLatch(1);
- private final ISpan span;
-
- @VisibleForTesting
- PooledSessionFuture(ListenableFuture delegate, ISpan span) {
- super(delegate);
- this.span = span;
- }
-
- @VisibleForTesting
- void clearLeakedException() {
- this.leakedException = null;
- }
-
- private void markCheckedOut() {
-
- if (options.isTrackStackTraceOfSessionCheckout()) {
- this.leakedException = new LeakedSessionException();
- synchronized (SessionPool.this.lock) {
- SessionPool.this.markedCheckedOutSessions.add(this);
- }
- }
- }
-
- @Override
- public Timestamp write(Iterable mutations) throws SpannerException {
- return writeWithOptions(mutations).getCommitTimestamp();
- }
-
- @Override
- public CommitResponse writeWithOptions(
- Iterable mutations, TransactionOption... options) throws SpannerException {
- try {
- return get().writeWithOptions(mutations, options);
- } finally {
- close();
- }
- }
-
- @Override
- public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException {
- return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
- }
-
- @Override
- public CommitResponse writeAtLeastOnceWithOptions(
- Iterable mutations, TransactionOption... options) throws SpannerException {
- try {
- return get().writeAtLeastOnceWithOptions(mutations, options);
- } finally {
- close();
- }
- }
-
- @Override
- public ServerStream batchWriteAtLeastOnce(
- Iterable mutationGroups, TransactionOption... options)
- throws SpannerException {
- try {
- return get().batchWriteAtLeastOnce(mutationGroups, options);
- } finally {
- close();
- }
- }
-
- @Override
- public ReadContext singleUse() {
- try {
- return new AutoClosingReadContext<>(
- session -> {
- PooledSession ps = session.get();
- return ps.delegate.singleUse();
- },
- SessionPool.this,
- pooledSessionReplacementHandler,
- this,
- true);
- } catch (Exception e) {
- close();
- throw e;
- }
- }
-
- @Override
- public ReadContext singleUse(final TimestampBound bound) {
- try {
- return new AutoClosingReadContext<>(
- session -> {
- PooledSession ps = session.get();
- return ps.delegate.singleUse(bound);
- },
- SessionPool.this,
- pooledSessionReplacementHandler,
- this,
- true);
- } catch (Exception e) {
- close();
- throw e;
- }
- }
-
- @Override
- public ReadOnlyTransaction singleUseReadOnlyTransaction() {
- return internalReadOnlyTransaction(
- session -> {
- PooledSession ps = session.get();
- return ps.delegate.singleUseReadOnlyTransaction();
- },
- true);
- }
-
- @Override
- public ReadOnlyTransaction singleUseReadOnlyTransaction(final TimestampBound bound) {
- return internalReadOnlyTransaction(
- session -> {
- PooledSession ps = session.get();
- return ps.delegate.singleUseReadOnlyTransaction(bound);
- },
- true);
- }
-
- @Override
- public ReadOnlyTransaction readOnlyTransaction() {
- return internalReadOnlyTransaction(
- session -> {
- PooledSession ps = session.get();
- return ps.delegate.readOnlyTransaction();
- },
- false);
- }
-
- @Override
- public ReadOnlyTransaction readOnlyTransaction(final TimestampBound bound) {
- return internalReadOnlyTransaction(
- session -> {
- PooledSession ps = session.get();
- return ps.delegate.readOnlyTransaction(bound);
- },
- false);
- }
-
- private ReadOnlyTransaction internalReadOnlyTransaction(
- Function transactionSupplier,
- boolean isSingleUse) {
- try {
- return new AutoClosingReadTransaction<>(
- transactionSupplier,
- SessionPool.this,
- pooledSessionReplacementHandler,
- this,
- isSingleUse);
- } catch (Exception e) {
- close();
- throw e;
- }
- }
-
- @Override
- public TransactionRunner readWriteTransaction(TransactionOption... options) {
- return new SessionPoolTransactionRunner<>(this, pooledSessionReplacementHandler, options);
- }
-
- @Override
- public TransactionManager transactionManager(TransactionOption... options) {
- return new AutoClosingTransactionManager<>(this, pooledSessionReplacementHandler, options);
- }
-
- @Override
- public AsyncRunner runAsync(TransactionOption... options) {
- return new SessionPoolAsyncRunner<>(this, pooledSessionReplacementHandler, options);
- }
-
- @Override
- public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
- return new SessionPoolAsyncTransactionManager<>(
- pooledSessionReplacementHandler, this, options);
- }
-
- @Override
- public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
- try {
- return get(true).executePartitionedUpdate(stmt, options);
- } finally {
- close();
- }
- }
-
- @Override
- public String getName() {
- return get().getName();
- }
-
- @Override
- public void close() {
- try {
- asyncClose().get();
- } catch (InterruptedException e) {
- throw SpannerExceptionFactory.propagateInterrupt(e);
- } catch (ExecutionException e) {
- throw asSpannerException(e.getCause());
- }
- }
-
- @Override
- public ApiFuture asyncClose() {
- synchronized (this) {
- // Don't add the session twice to the pool if a resource is being closed multiple times.
- if (closed) {
- return ApiFutures.immediateFuture(Empty.getDefaultInstance());
- }
- closed = true;
- }
- try {
- PooledSession delegate = getOrNull();
- if (delegate != null) {
- return delegate.asyncClose();
- }
- } finally {
- synchronized (lock) {
- leakedException = null;
- checkedOutSessions.remove(this);
- markedCheckedOutSessions.remove(this);
- }
- }
- return ApiFutures.immediateFuture(Empty.getDefaultInstance());
- }
-
- private PooledSession getOrNull() {
- try {
- return get();
- } catch (Throwable t) {
- return null;
- }
- }
-
- @Override
- public PooledSession get() {
- return get(false);
- }
-
- PooledSession get(final boolean eligibleForLongRunning) {
- if (inUse.compareAndSet(false, true)) {
- PooledSession res = null;
- try {
- res = super.get();
- } catch (Throwable e) {
- // ignore the exception as it will be handled by the call to super.get() below.
- }
- if (res != null) {
- res.markBusy(span);
- span.addAnnotation("Using Session", "sessionId", res.getName());
- synchronized (lock) {
- incrementNumSessionsInUse();
- checkedOutSessions.add(this);
- }
- res.eligibleForLongRunning = eligibleForLongRunning;
- }
- initialized.countDown();
- }
- try {
- initialized.await();
- return super.get();
- } catch (ExecutionException e) {
- throw SpannerExceptionFactory.newSpannerException(e.getCause());
- } catch (InterruptedException e) {
- throw SpannerExceptionFactory.propagateInterrupt(e);
- }
- }
-
- public int getChannel() {
- return get().getChannel();
- }
- }
-
- interface CachedSession extends Session {
-
- SessionImpl getDelegate();
-
- void markBusy(ISpan span);
-
- void markUsed();
-
- SpannerException setLastException(SpannerException exception);
-
- AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options);
-
- void setAllowReplacing(boolean b);
- }
-
- class PooledSession implements CachedSession {
-
- @VisibleForTesting final SessionImpl delegate;
- private volatile SpannerException lastException;
- private volatile boolean allowReplacing = true;
-
- /**
- * This ensures that the session is added at a random position in the pool the first time it is
- * actually added to the pool.
- */
- @GuardedBy("lock")
- private Position releaseToPosition = initialReleasePosition;
-
- /**
- * Property to mark if the session is eligible to be long-running. This can only be true if the
- * session is executing certain types of transactions (for ex - Partitioned DML) which can be
- * long-running. By default, most transaction types are not expected to be long-running and
- * hence this value is false.
- */
- private volatile boolean eligibleForLongRunning = false;
-
- /**
- * Property to mark if the session is no longer part of the session pool. For ex - A session
- * which is long-running gets cleaned up and removed from the pool.
- */
- private volatile boolean isRemovedFromPool = false;
-
- /**
- * Property to mark if a leaked session exception is already logged. Given a session maintainer
- * thread runs repeatedly at a defined interval, this property allows us to ensure that an
- * exception is logged only once per leaked session. This is to avoid noisy repeated logs around
- * session leaks for long-running sessions.
- */
- private volatile boolean isLeakedExceptionLogged = false;
-
- @GuardedBy("lock")
- private SessionState state;
-
- private PooledSession(SessionImpl delegate) {
- this.delegate = Preconditions.checkNotNull(delegate);
- this.state = SessionState.AVAILABLE;
-
- // initialise the lastUseTime field for each session.
- this.markUsed();
- }
-
- int getChannel() {
- Long channelHint = (Long) delegate.getOptions().get(SpannerRpc.Option.CHANNEL_HINT);
- return channelHint == null
- ? 0
- : (int) (channelHint % sessionClient.getSpanner().getOptions().getNumChannels());
- }
-
- @Override
- public String toString() {
- return getName();
- }
-
- @VisibleForTesting
- @Override
- public void setAllowReplacing(boolean allowReplacing) {
- this.allowReplacing = allowReplacing;
- }
-
- @VisibleForTesting
- void setEligibleForLongRunning(boolean eligibleForLongRunning) {
- this.eligibleForLongRunning = eligibleForLongRunning;
- }
-
- @Override
- public Timestamp write(Iterable mutations) throws SpannerException {
- return writeWithOptions(mutations).getCommitTimestamp();
- }
-
- @Override
- public CommitResponse writeWithOptions(
- Iterable mutations, TransactionOption... options) throws SpannerException {
- try {
- markUsed();
- return delegate.writeWithOptions(mutations, options);
- } catch (SpannerException e) {
- throw lastException = e;
- }
- }
-
- @Override
- public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException {
- return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
- }
-
- @Override
- public CommitResponse writeAtLeastOnceWithOptions(
- Iterable mutations, TransactionOption... options) throws SpannerException {
- try {
- markUsed();
- return delegate.writeAtLeastOnceWithOptions(mutations, options);
- } catch (SpannerException e) {
- throw lastException = e;
- }
- }
-
- @Override
- public ServerStream batchWriteAtLeastOnce(
- Iterable mutationGroups, TransactionOption... options)
- throws SpannerException {
- try {
- markUsed();
- return delegate.batchWriteAtLeastOnce(mutationGroups, options);
- } catch (SpannerException e) {
- throw lastException = e;
- }
- }
-
- @Override
- public long executePartitionedUpdate(Statement stmt, UpdateOption... options)
- throws SpannerException {
- try {
- markUsed();
- return delegate.executePartitionedUpdate(stmt, options);
- } catch (SpannerException e) {
- throw lastException = e;
- }
- }
-
- @Override
- public ReadContext singleUse() {
- return delegate.singleUse();
- }
-
- @Override
- public ReadContext singleUse(TimestampBound bound) {
- return delegate.singleUse(bound);
- }
-
- @Override
- public ReadOnlyTransaction singleUseReadOnlyTransaction() {
- return delegate.singleUseReadOnlyTransaction();
- }
-
- @Override
- public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
- return delegate.singleUseReadOnlyTransaction(bound);
- }
-
- @Override
- public ReadOnlyTransaction readOnlyTransaction() {
- return delegate.readOnlyTransaction();
- }
-
- @Override
- public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
- return delegate.readOnlyTransaction(bound);
- }
-
- @Override
- public TransactionRunner readWriteTransaction(TransactionOption... options) {
- return delegate.readWriteTransaction(options);
- }
-
- @Override
- public AsyncRunner runAsync(TransactionOption... options) {
- return delegate.runAsync(options);
- }
-
- @Override
- public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) {
- return delegate.transactionManagerAsync(options);
- }
-
- @Override
- public ApiFuture asyncClose() {
- close();
- return ApiFutures.immediateFuture(Empty.getDefaultInstance());
- }
-
- @Override
- public void close() {
- synchronized (lock) {
- numSessionsInUse--;
- numSessionsReleased++;
- }
- if ((lastException != null && isSessionNotFound(lastException)) || isRemovedFromPool) {
- invalidateSession(this);
- } else {
- if (isDatabaseOrInstanceNotFound(lastException)) {
- // Mark this session pool as no longer valid and then release the session into the pool as
- // there is nothing we can do with it anyways.
- synchronized (lock) {
- SessionPool.this.resourceNotFoundException =
- MoreObjects.firstNonNull(
- SessionPool.this.resourceNotFoundException,
- (ResourceNotFoundException) lastException);
- }
- }
- lastException = null;
- isRemovedFromPool = false;
- if (state != SessionState.CLOSING) {
- state = SessionState.AVAILABLE;
- }
- releaseSession(this, false);
- }
- }
-
- @Override
- public String getName() {
- return delegate.getName();
- }
-
- private void keepAlive() {
- markUsed();
- final ISpan previousSpan = delegate.getCurrentSpan();
- delegate.setCurrentSpan(tracer.getBlankSpan());
- try (ResultSet resultSet =
- delegate
- .singleUse(TimestampBound.ofMaxStaleness(60, TimeUnit.SECONDS))
- .executeQuery(Statement.newBuilder("SELECT 1").build())) {
- resultSet.next();
- } finally {
- delegate.setCurrentSpan(previousSpan);
- }
- }
-
- private void determineDialectAsync(final SettableFuture dialect) {
- Preconditions.checkNotNull(dialect);
- executor.submit(
- () -> {
- try {
- dialect.set(determineDialect());
- } catch (Throwable t) {
- // Catch-all as we want to propagate all exceptions to anyone who might be interested
- // in the database dialect, and there's nothing sensible that we can do with it here.
- dialect.setException(t);
- } finally {
- releaseSession(this, false);
- }
- });
- }
-
- private Dialect determineDialect() {
- try (ResultSet dialectResultSet =
- delegate.singleUse().executeQuery(DETERMINE_DIALECT_STATEMENT)) {
- if (dialectResultSet.next()) {
- return Dialect.fromName(dialectResultSet.getString(0));
- } else {
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.NOT_FOUND, "No dialect found for database");
- }
- }
- }
-
- @Override
- public SessionImpl getDelegate() {
- return this.delegate;
- }
-
- @Override
- public void markBusy(ISpan span) {
- this.delegate.setCurrentSpan(span);
- this.state = SessionState.BUSY;
- }
-
- private void markClosing() {
- this.state = SessionState.CLOSING;
- }
-
- @Override
- public void markUsed() {
- delegate.markUsed(clock.instant());
- }
-
- @Override
- public SpannerException setLastException(SpannerException exception) {
- this.lastException = exception;
- return exception;
- }
-
- boolean isAllowReplacing() {
- return this.allowReplacing;
- }
-
- @Override
- public TransactionManager transactionManager(TransactionOption... options) {
- return delegate.transactionManager(options);
- }
- }
-
- private final class WaiterFuture extends ForwardingListenableFuture {
- private static final long MAX_SESSION_WAIT_TIMEOUT = 240_000L;
- private final SettableFuture waiter = SettableFuture.create();
-
- @Override
- @Nonnull
- protected ListenableFuture extends PooledSession> delegate() {
- return waiter;
- }
-
- private void put(PooledSession session) {
- waiter.set(session);
- }
-
- private void put(SpannerException e) {
- waiter.setException(e);
- }
-
- @Override
- public PooledSession get() {
- long currentTimeout = options.getInitialWaitForSessionTimeoutMillis();
- while (true) {
- ISpan span = tracer.spanBuilder(WAIT_FOR_SESSION);
- try (IScope ignore = tracer.withSpan(span)) {
- PooledSession s =
- pollUninterruptiblyWithTimeout(currentTimeout, options.getAcquireSessionTimeout());
- if (s == null) {
- // Set the status to DEADLINE_EXCEEDED and retry.
- numWaiterTimeouts.incrementAndGet();
- tracer.getCurrentSpan().setStatus(ErrorCode.DEADLINE_EXCEEDED);
- currentTimeout = Math.min(currentTimeout * 2, MAX_SESSION_WAIT_TIMEOUT);
- } else {
- return s;
- }
- } catch (Exception e) {
- if (e instanceof SpannerException
- && ErrorCode.RESOURCE_EXHAUSTED.equals(((SpannerException) e).getErrorCode())) {
- numWaiterTimeouts.incrementAndGet();
- tracer.getCurrentSpan().setStatus(ErrorCode.RESOURCE_EXHAUSTED);
- }
- span.setStatus(e);
- throw e;
- } finally {
- span.end();
- }
- }
- }
-
- private PooledSession pollUninterruptiblyWithTimeout(
- long timeoutMillis, Duration acquireSessionTimeout) {
- boolean interrupted = false;
- try {
- while (true) {
- try {
- return acquireSessionTimeout == null
- ? waiter.get(timeoutMillis, TimeUnit.MILLISECONDS)
- : waiter.get(acquireSessionTimeout.toMillis(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- interrupted = true;
- } catch (TimeoutException e) {
- if (acquireSessionTimeout != null) {
- SpannerException exception =
- SpannerExceptionFactory.newSpannerException(
- ErrorCode.RESOURCE_EXHAUSTED,
- "Timed out after waiting "
- + acquireSessionTimeout.toMillis()
- + "ms for acquiring session. To mitigate error"
- + " SessionPoolOptions#setAcquireSessionTimeout(Duration) to set a higher"
- + " timeout or increase the number of sessions in the session pool.\n"
- + createCheckedOutSessionsStackTraces());
- if (waiter.setException(exception)) {
- // Only throw the exception if setting it on the waiter was successful. The
- // waiter.setException(..) method returns false if some other thread in the meantime
- // called waiter.set(..), which means that a session became available between the
- // time that the TimeoutException was thrown and now.
- throw exception;
- }
- }
- return null;
- } catch (ExecutionException e) {
- throw SpannerExceptionFactory.newSpannerException(e.getCause());
- }
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
-
- /**
- * Background task to maintain the pool. Tasks:
- *
- *
- * - Removes idle sessions from the pool. Sessions that go above MinSessions that have not
- * been used for the last 55 minutes will be removed from the pool. These will automatically
- * be garbage collected by the backend.
- *
- Keeps alive sessions that have not been used for a user configured time in order to keep
- * MinSessions sessions alive in the pool at any time. The keep-alive traffic is smeared out
- * over a window of 10 minutes to avoid bursty traffic.
- *
- Removes unexpected long running transactions from the pool. Only certain transaction
- * types (for ex - Partitioned DML / Batch Reads) can be long running. This tasks checks the
- * sessions which have been inactive for a longer than usual duration (for ex - 60 minutes)
- * and removes such sessions from the pool.
- *
- */
- final class PoolMaintainer {
-
- // Length of the window in millis over which we keep track of maximum number of concurrent
- // sessions in use.
- private final Duration windowLength = Duration.ofMillis(TimeUnit.MINUTES.toMillis(10));
- // Frequency of the timer loop.
- @VisibleForTesting final long loopFrequency = options.getLoopFrequency();
- // Number of loop iterations in which we need to close all the sessions waiting for closure.
- @VisibleForTesting final long numClosureCycles = windowLength.toMillis() / loopFrequency;
- private final Duration keepAliveMillis =
- Duration.ofMillis(TimeUnit.MINUTES.toMillis(options.getKeepAliveIntervalMinutes()));
- // Number of loop iterations in which we need to keep alive all the sessions
- @VisibleForTesting final long numKeepAliveCycles = keepAliveMillis.toMillis() / loopFrequency;
-
- /**
- * Variable maintaining the last execution time of the long-running transaction cleanup task.
- *
- * The long-running transaction cleanup needs to be performed every X minutes. The X minutes
- * recurs multiple times within the invocation of the pool maintainer thread. For ex - If the
- * main thread runs every 10s and the long-running transaction clean-up needs to be performed
- * every 2 minutes, then we need to keep a track of when was the last time that this task
- * executed and makes sure we only execute it every 2 minutes and not every 10 seconds.
- */
- @VisibleForTesting Instant lastExecutionTime;
-
- /**
- * The previous numSessionsAcquired seen by the maintainer. This is used to calculate the
- * transactions per second, which again is used to determine whether to randomize the order of
- * the session pool.
- */
- private long prevNumSessionsAcquired;
-
- boolean closed = false;
-
- @GuardedBy("lock")
- ScheduledFuture> scheduledFuture;
-
- @GuardedBy("lock")
- boolean running;
-
- void init() {
- lastExecutionTime = clock.instant();
-
- // Scheduled pool maintenance worker.
- synchronized (lock) {
- scheduledFuture =
- executor.scheduleAtFixedRate(
- this::maintainPool, loopFrequency, loopFrequency, TimeUnit.MILLISECONDS);
- }
- }
-
- void close() {
- synchronized (lock) {
- if (!closed) {
- closed = true;
- scheduledFuture.cancel(false);
- if (!running) {
- decrementPendingClosures(1);
- }
- }
- }
- }
-
- boolean isClosed() {
- synchronized (lock) {
- return closed;
- }
- }
-
- // Does various pool maintenance activities.
- void maintainPool() {
- Instant currTime;
- synchronized (lock) {
- if (SessionPool.this.isClosed()) {
- return;
- }
- running = true;
- if (loopFrequency >= 1000L) {
- SessionPool.this.transactionsPerSecond =
- (SessionPool.this.numSessionsAcquired - prevNumSessionsAcquired)
- / (loopFrequency / 1000L);
- }
- this.prevNumSessionsAcquired = SessionPool.this.numSessionsAcquired;
-
- currTime = clock.instant();
- // Reset the start time for recording the maximum number of sessions in the pool
- if (currTime.isAfter(SessionPool.this.lastResetTime.plus(Duration.ofMinutes(10)))) {
- SessionPool.this.maxSessionsInUse = SessionPool.this.numSessionsInUse;
- SessionPool.this.lastResetTime = currTime;
- }
- }
-
- removeIdleSessions(currTime);
- // Now go over all the remaining sessions and see if they need to be kept alive explicitly.
- keepAliveSessions(currTime);
- replenishPool();
- synchronized (lock) {
- running = false;
- if (SessionPool.this.isClosed()) {
- decrementPendingClosures(1);
- }
- }
- removeLongRunningSessions(currTime);
- }
-
- private void removeIdleSessions(Instant currTime) {
- synchronized (lock) {
- // Determine the minimum last use time for a session to be deemed to still be alive. Remove
- // all sessions that have a lastUseTime before that time, unless it would cause us to go
- // below MinSessions.
- Instant minLastUseTime = currTime.minus(options.getRemoveInactiveSessionAfterDuration());
- Iterator iterator = sessions.descendingIterator();
- while (iterator.hasNext()) {
- PooledSession session = iterator.next();
- if (session.delegate.getLastUseTime() != null
- && session.delegate.getLastUseTime().isBefore(minLastUseTime)) {
- if (session.state != SessionState.CLOSING) {
- boolean isRemoved = removeFromPool(session);
- if (isRemoved) {
- numIdleSessionsRemoved++;
- if (idleSessionRemovedListener != null) {
- idleSessionRemovedListener.apply(session);
- }
- }
- iterator.remove();
- }
- }
- }
- }
- }
-
- private void keepAliveSessions(Instant currTime) {
- long numSessionsToKeepAlive = 0;
- synchronized (lock) {
- if (numSessionsInUse >= (options.getMinSessions() + options.getMaxIdleSessions())) {
- // At least MinSessions are in use, so we don't have to ping any sessions.
- return;
- }
- // In each cycle only keep alive a subset of sessions to prevent burst of traffic.
- numSessionsToKeepAlive =
- (long)
- Math.ceil(
- (double)
- ((options.getMinSessions() + options.getMaxIdleSessions())
- - numSessionsInUse)
- / numKeepAliveCycles);
- }
- // Now go over all the remaining sessions and see if they need to be kept alive explicitly.
- Instant keepAliveThreshold = currTime.minus(keepAliveMillis);
-
- // Keep chugging till there is no session that needs to be kept alive.
- while (numSessionsToKeepAlive > 0) {
- Tuple sessionToKeepAlive;
- synchronized (lock) {
- sessionToKeepAlive = findSessionToKeepAlive(sessions, keepAliveThreshold, 0);
- }
- if (sessionToKeepAlive == null) {
- break;
- }
- try {
- logger.log(Level.FINE, "Keeping alive session " + sessionToKeepAlive.x().getName());
- numSessionsToKeepAlive--;
- sessionToKeepAlive.x().keepAlive();
- releaseSession(sessionToKeepAlive);
- } catch (SpannerException e) {
- handleException(e, sessionToKeepAlive);
- }
- }
- }
-
- private void replenishPool() {
- synchronized (lock) {
- // If we have gone below min pool size, create that many sessions.
- int sessionCount = options.getMinSessions() - (totalSessions() + numSessionsBeingCreated);
- if (sessionCount > 0) {
- createSessions(getAllowedCreateSessions(sessionCount), false);
- }
- }
- }
-
- // cleans up sessions which are unexpectedly long-running.
- void removeLongRunningSessions(Instant currentTime) {
- try {
- if (SessionPool.this.isClosed()) {
- return;
- }
- final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions =
- options.getInactiveTransactionRemovalOptions();
- final Instant minExecutionTime =
- lastExecutionTime.plus(inactiveTransactionRemovalOptions.getExecutionFrequency());
- if (currentTime.isBefore(minExecutionTime)) {
- return;
- }
- lastExecutionTime = currentTime; // update this only after we have decided to execute task
- if (options.closeInactiveTransactions()
- || options.warnInactiveTransactions()
- || options.warnAndCloseInactiveTransactions()) {
- removeLongRunningSessions(currentTime, inactiveTransactionRemovalOptions);
- }
- } catch (final Throwable t) {
- logger.log(Level.WARNING, "Failed removing long running transactions", t);
- }
- }
-
- private void removeLongRunningSessions(
- final Instant currentTime,
- final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions) {
- synchronized (lock) {
- final double usedSessionsRatio = getRatioOfSessionsInUse();
- if (usedSessionsRatio > inactiveTransactionRemovalOptions.getUsedSessionsRatioThreshold()) {
- Iterator iterator = checkedOutSessions.iterator();
- while (iterator.hasNext()) {
- final PooledSessionFuture sessionFuture = iterator.next();
- // the below get() call on future object is non-blocking since checkedOutSessions
- // collection is populated only when the get() method in {@code PooledSessionFuture} is
- // called.
- final PooledSession session = (PooledSession) sessionFuture.get();
- final Duration durationFromLastUse =
- Duration.between(session.getDelegate().getLastUseTime(), currentTime);
- if (!session.eligibleForLongRunning
- && durationFromLastUse.compareTo(
- inactiveTransactionRemovalOptions.getIdleTimeThreshold())
- > 0) {
- if ((options.warnInactiveTransactions() || options.warnAndCloseInactiveTransactions())
- && !session.isLeakedExceptionLogged) {
- if (options.warnAndCloseInactiveTransactions()) {
- logger.log(
- Level.WARNING,
- String.format("Removing long-running session => %s", session.getName()),
- sessionFuture.leakedException);
- session.isLeakedExceptionLogged = true;
- } else if (options.warnInactiveTransactions()) {
- logger.log(
- Level.WARNING,
- String.format(
- "Detected long-running session => %s. To automatically remove"
- + " long-running sessions, set SessionOption"
- + " ActionOnInactiveTransaction to WARN_AND_CLOSE by invoking"
- + " setWarnAndCloseIfInactiveTransactions() method.",
- session.getName()),
- sessionFuture.leakedException);
- session.isLeakedExceptionLogged = true;
- }
- }
- if ((options.closeInactiveTransactions()
- || options.warnAndCloseInactiveTransactions())
- && session.state != SessionState.CLOSING) {
- final boolean isRemoved = removeFromPool(session);
- if (isRemoved) {
- session.isRemovedFromPool = true;
- numLeakedSessionsRemoved++;
- if (longRunningSessionRemovedListener != null) {
- longRunningSessionRemovedListener.apply(session);
- }
- }
- iterator.remove();
- }
- }
- }
- }
- }
- }
- }
-
- enum Position {
- FIRST,
- LAST,
- RANDOM
- }
-
- /**
- * This statement is (currently) used to determine the dialect of the database that is used by the
- * session pool. This statement is subject to change when the INFORMATION_SCHEMA contains a table
- * where the dialect of the database can be read directly, and any tests that want to detect the
- * specific 'determine dialect statement' should rely on this constant instead of the actual
- * value.
- */
- @VisibleForTesting
- static final Statement DETERMINE_DIALECT_STATEMENT =
- Statement.newBuilder(
- "select option_value "
- + "from information_schema.database_options "
- + "where option_name='database_dialect'")
- .build();
-
- private final SessionPoolOptions options;
- private final SettableFuture dialect = SettableFuture.create();
- private final String databaseRole;
- private final SessionClient sessionClient;
- private final int numChannels;
- private final ScheduledExecutorService executor;
- private final ExecutorFactory executorFactory;
-
- final PoolMaintainer poolMaintainer;
- private final Clock clock;
-
- /**
- * initialReleasePosition determines where in the pool sessions are added when they are released
- * into the pool the first time. This is always RANDOM in production, but some tests use FIRST to
- * be able to verify the order of sessions in the pool. Using RANDOM ensures that we do not get an
- * unbalanced session pool where all sessions belonging to one gRPC channel are added to the same
- * region in the pool.
- */
- private final Position initialReleasePosition;
-
- private final Object lock = new Object();
- private final Random random = new Random();
-
- @GuardedBy("lock")
- private boolean detectDialectStarted;
-
- @GuardedBy("lock")
- private int pendingClosure;
-
- @GuardedBy("lock")
- private SettableFuture closureFuture;
-
- @GuardedBy("lock")
- private ClosedException closedException;
-
- @GuardedBy("lock")
- private ResourceNotFoundException resourceNotFoundException;
-
- @GuardedBy("lock")
- private final LinkedList sessions = new LinkedList<>();
-
- @GuardedBy("lock")
- private final Queue waiters = new LinkedList<>();
-
- @GuardedBy("lock")
- private int numSessionsBeingCreated = 0;
-
- @GuardedBy("lock")
- private int numSessionsInUse = 0;
-
- @GuardedBy("lock")
- private int maxSessionsInUse = 0;
-
- @GuardedBy("lock")
- private Instant lastResetTime = Clock.INSTANCE.instant();
-
- @GuardedBy("lock")
- private long numSessionsAcquired = 0;
-
- @GuardedBy("lock")
- private long numSessionsReleased = 0;
-
- @GuardedBy("lock")
- private long numIdleSessionsRemoved = 0;
-
- @GuardedBy("lock")
- private long transactionsPerSecond = 0L;
-
- @GuardedBy("lock")
- private long numLeakedSessionsRemoved = 0;
-
- private final AtomicLong numWaiterTimeouts = new AtomicLong();
-
- @GuardedBy("lock")
- private final Set allSessions = new HashSet<>();
-
- @GuardedBy("lock")
- @VisibleForTesting
- final Set checkedOutSessions = new HashSet<>();
-
- @GuardedBy("lock")
- private final Set markedCheckedOutSessions = new HashSet<>();
-
- private final SessionConsumer sessionConsumer = new SessionConsumerImpl();
-
- @VisibleForTesting Function idleSessionRemovedListener;
-
- @VisibleForTesting Function longRunningSessionRemovedListener;
- private final CountDownLatch waitOnMinSessionsLatch;
- private final PooledSessionReplacementHandler pooledSessionReplacementHandler =
- new PooledSessionReplacementHandler();
-
- private static final Object DENY_LISTED = new Object();
- private final Cache