diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 36ecbc8863ad..b6575aa3e62b 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -30,6 +30,7 @@ import com.google.api.gax.longrunning.OperationFutureImpl; import com.google.api.gax.longrunning.OperationSnapshot; import com.google.api.gax.paging.Page; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ServerStream; import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.BaseService; @@ -112,11 +113,15 @@ import java.util.logging.Logger; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import org.threeten.bp.Duration; +import org.threeten.bp.Instant; /** Default implementation of the Cloud Spanner interface. */ class SpannerImpl extends BaseService implements Spanner { private static final int MIN_BACKOFF_MS = 1000; private static final int MAX_BACKOFF_MS = 32000; + private static final int DEFAULT_MAX_ATTEMPT = 6; + private static final Duration DEFAULT_TOTAL_TIMEOUT = Duration.ofSeconds(50); private static final PathTemplate OP_NAME_TEMPLATE = PathTemplate.create( "projects/{project}/instances/{instance}/databases/{database}/operations/{operation}"); @@ -237,13 +242,36 @@ public void cancelled(Context context) { *

TODO: Consider replacing with RetryHelper from gcloud-core. */ static T runWithRetries(Callable callable) { + return runWithRetries(callable, null); + } + + static T runWithRetries(Callable callable, RetrySettings retrySettings) { + Instant start = Instant.now(); + Instant end; + // Use same backoff setting as abort, somewhat arbitrarily. Span span = tracer.getCurrentSpan(); ExponentialBackOff backOff = newBackOff(); Context context = Context.current(); int attempt = 0; + int maxAttempt = DEFAULT_MAX_ATTEMPT; + Duration retryDuration; + Duration totalTimeout = DEFAULT_TOTAL_TIMEOUT; + if (retrySettings != null) { + maxAttempt = retrySettings.getMaxAttempts(); + totalTimeout = retrySettings.getTotalTimeout(); + } while (true) { attempt++; + if (attempt >= maxAttempt) { + throw newSpannerException(ErrorCode.INTERNAL, "Exceeded maxAttempt " + maxAttempt); + } + end = Instant.now(); + retryDuration = Duration.between(start, end); + if (retryDuration.compareTo(totalTimeout) > 0) { + throw newSpannerException( + ErrorCode.INTERNAL, "Exceeded totalTimeout " + totalTimeout.toMillis()); + } try { span.addAnnotation( "Starting operation", @@ -282,7 +310,8 @@ public com.google.spanner.v1.Session call() throws Exception { return gapicRpc.createSession( db.getName(), getOptions().getSessionLabels(), options); } - }); + }, + getOptions().getRetrySettings()); span.end(); return new SessionImpl(session.getName(), options); } catch (RuntimeException e) { diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerImplTest.java index 8dffda872401..9d81cd3ced18 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerImplTest.java @@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spanner.spi.v1.SpannerRpc; import java.util.HashMap; @@ -33,6 +34,7 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.threeten.bp.Duration; /** Unit tests for {@link SpannerImpl}. */ @RunWith(JUnit4.class) @@ -130,7 +132,62 @@ public Void call() throws Exception { }); } catch (SpannerException e) { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); - assertThat(e.getMessage().contains("Unexpected exception thrown")); + assertThat(e.getMessage()).contains("Unexpected exception thrown"); + } + } + + @Test + public void testConnectionMaxAttempt() { + Map labels = new HashMap<>(); + labels.put("env", "dev"); + Mockito.when(spannerOptions.getSessionLabels()).thenReturn(labels); + String dbName = "projects/p1/instances/i1/databases/d1"; + DatabaseId db = DatabaseId.of(dbName); + + Mockito.when(spannerOptions.getTransportOptions()) + .thenReturn(GrpcTransportOptions.newBuilder().build()); + Mockito.when(spannerOptions.getSessionPoolOptions()) + .thenReturn(SessionPoolOptions.newBuilder().build()); + Mockito.when(rpc.createSession(Mockito.eq(dbName), Mockito.eq(labels), options.capture())) + .thenThrow( + SpannerExceptionFactory.newSpannerException( + ErrorCode.UNAVAILABLE, "Unable to resolve host spanner.googleapis.com")); + try { + impl.getDatabaseClient(db).singleUse(); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); + assertThat(e.getMessage()).contains("Exceeded maxAttempt"); + impl.close(); + } + } + + @Test + public void testConnectionTotalTimeout() { + Map labels = new HashMap<>(); + labels.put("env", "dev"); + String dbName = "projects/p1/instances/i1/databases/d1"; + DatabaseId db = DatabaseId.of(dbName); + RetrySettings retrySettings = + RetrySettings.newBuilder() + .setTotalTimeout(Duration.ofSeconds(4)) + .setMaxAttempts(10) + .build(); + Mockito.when(spannerOptions.getRetrySettings()).thenReturn(retrySettings); + Mockito.when(spannerOptions.getSessionLabels()).thenReturn(labels); + Mockito.when(spannerOptions.getTransportOptions()) + .thenReturn(GrpcTransportOptions.newBuilder().build()); + Mockito.when(spannerOptions.getSessionPoolOptions()) + .thenReturn(SessionPoolOptions.newBuilder().build()); + Mockito.when(rpc.createSession(Mockito.eq(dbName), Mockito.eq(labels), options.capture())) + .thenThrow( + SpannerExceptionFactory.newSpannerException( + ErrorCode.UNAVAILABLE, "Unable to resolve host spanner.googleapis.com")); + try { + impl.getDatabaseClient(db).singleUse(); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INTERNAL); + assertThat(e.getMessage()).contains("Exceeded totalTimeout"); + impl.close(); } } }