package software.amazon.awssdk.core.async;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.exception.NonRetryableException;
import software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream;
import software.amazon.awssdk.core.internal.util.NoopSubscription;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.InputStreamConsumingPublisher;

@SdkPublicApi
/* loaded from: input_file:BOOT-INF/lib/sdk-core-2.25.55.jar:software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBody.class */
public final class BlockingInputStreamAsyncRequestBody implements AsyncRequestBody {
    private static final Duration DEFAULT_SUBSCRIBE_TIMEOUT = Duration.ofSeconds(10);
    private final InputStreamConsumingPublisher delegate = new InputStreamConsumingPublisher();
    private final CountDownLatch subscribedLatch = new CountDownLatch(1);
    private final AtomicBoolean subscribeCalled = new AtomicBoolean(false);
    private final Long contentLength;
    private final Duration subscribeTimeout;

    /* loaded from: input_file:BOOT-INF/lib/sdk-core-2.25.55.jar:software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBody$Builder.class */
    public static final class Builder {
        private Duration subscribeTimeout;
        private Long contentLength;

        private Builder() {
        }

        public Builder subscribeTimeout(Duration duration) {
            this.subscribeTimeout = duration;
            return this;
        }

        public Builder contentLength(Long l) {
            this.contentLength = l;
            return this;
        }

        public BlockingInputStreamAsyncRequestBody build() {
            return new BlockingInputStreamAsyncRequestBody(this);
        }
    }

    BlockingInputStreamAsyncRequestBody(Builder builder) {
        this.contentLength = builder.contentLength;
        this.subscribeTimeout = Validate.isPositiveOrNull(builder.subscribeTimeout, "subscribeTimeout") != null ? builder.subscribeTimeout : DEFAULT_SUBSCRIBE_TIMEOUT;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override // software.amazon.awssdk.core.async.AsyncRequestBody
    public Optional<Long> contentLength() {
        return Optional.ofNullable(this.contentLength);
    }

    public long writeInputStream(InputStream inputStream) {
        try {
            waitForSubscriptionIfNeeded();
            return this.contentLength != null ? this.delegate.doBlockingWrite(new SdkLengthAwareInputStream(inputStream, this.contentLength.longValue())) : this.delegate.doBlockingWrite(inputStream);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.delegate.cancel();
            throw new RuntimeException(e);
        }
    }

    public void cancel() {
        this.delegate.cancel();
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        if (this.subscribeCalled.compareAndSet(false, true)) {
            this.delegate.subscribe(subscriber);
            this.subscribedLatch.countDown();
        } else {
            subscriber.onSubscribe(new NoopSubscription(subscriber));
            subscriber.onError(NonRetryableException.create("A retry was attempted, but AsyncRequestBody.forBlockingInputStream does not support retries. Consider using AsyncRequestBody.fromInputStream with an input stream that supports mark/reset to get retry support."));
        }
    }

    private void waitForSubscriptionIfNeeded() throws InterruptedException {
        long seconds = this.subscribeTimeout.getSeconds();
        if (!this.subscribedLatch.await(seconds, TimeUnit.SECONDS)) {
            throw new IllegalStateException("The service request was not made within " + seconds + " seconds of doBlockingWrite being invoked. Make sure to invoke the service request BEFORE invoking doBlockingWrite if your caller is single-threaded.");
        }
    }
}
