package org.elasticsearch.xpack.inference.external.http.sender;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.inference.InferenceServiceResults;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.inference.common.AdjustableCapacityBlockingQueue;
import org.elasticsearch.xpack.inference.external.http.RequestExecutor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/elasticsearch/xpack/inference/external/http/sender/RequestExecutorService.class */
public class RequestExecutorService implements RequestExecutor {
    private static final AdjustableCapacityBlockingQueue.QueueCreator<RejectableTask> QUEUE_CREATOR;
    private static final Logger logger;
    private final String serviceName;
    private final AdjustableCapacityBlockingQueue<RejectableTask> queue;
    private final AtomicBoolean running;
    private final CountDownLatch terminationLatch;
    private final HttpClientContext httpContext;
    private final ThreadPool threadPool;
    private final CountDownLatch startupLatch;
    private final BlockingQueue<Runnable> controlQueue;
    private final SingleRequestManager requestManager;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RequestExecutorService(String str, ThreadPool threadPool, @Nullable CountDownLatch countDownLatch, RequestExecutorServiceSettings requestExecutorServiceSettings, SingleRequestManager singleRequestManager) {
        this(str, threadPool, QUEUE_CREATOR, countDownLatch, requestExecutorServiceSettings, singleRequestManager);
    }

    RequestExecutorService(String str, ThreadPool threadPool, AdjustableCapacityBlockingQueue.QueueCreator<RejectableTask> queueCreator, @Nullable CountDownLatch countDownLatch, RequestExecutorServiceSettings requestExecutorServiceSettings, SingleRequestManager singleRequestManager) {
        this.running = new AtomicBoolean(true);
        this.terminationLatch = new CountDownLatch(1);
        this.controlQueue = new LinkedBlockingQueue();
        this.serviceName = (String) Objects.requireNonNull(str);
        this.threadPool = (ThreadPool) Objects.requireNonNull(threadPool);
        this.httpContext = HttpClientContext.create();
        this.queue = new AdjustableCapacityBlockingQueue<>(queueCreator, Integer.valueOf(requestExecutorServiceSettings.getQueueCapacity()));
        this.startupLatch = countDownLatch;
        this.requestManager = (SingleRequestManager) Objects.requireNonNull(singleRequestManager);
        Objects.requireNonNull(requestExecutorServiceSettings);
        requestExecutorServiceSettings.registerQueueCapacityCallback((v1) -> {
            onCapacityChange(v1);
        });
    }

    private void onCapacityChange(int i) {
        logger.debug(() -> {
            return Strings.format("Setting queue capacity to [%s]", new Object[]{Integer.valueOf(i)});
        });
        if (this.controlQueue.offer(() -> {
            updateCapacity(i);
        })) {
            this.queue.offer(new NoopTask());
        } else {
            logger.warn("Failed to change request batching service queue capacity. Control queue was full, please try again later.");
        }
    }

    private void updateCapacity(int i) {
        try {
            this.queue.setCapacity(i);
        } catch (Exception e) {
            logger.warn(Strings.format("Failed to set the capacity of the task queue to [%s] for request batching service [%s]", new Object[]{Integer.valueOf(i), this.serviceName}), e);
        }
    }

    @Override // org.elasticsearch.xpack.inference.external.http.RequestExecutor
    public void start() {
        try {
            signalStartInitiated();
            while (this.running.get()) {
                handleTasks();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.running.set(false);
            notifyRequestsOfShutdown();
            this.terminationLatch.countDown();
        }
    }

    private void signalStartInitiated() {
        if (this.startupLatch != null) {
            this.startupLatch.countDown();
        }
    }

    private void handleTasks() throws InterruptedException {
        try {
            RejectableTask take = this.queue.take();
            Runnable poll = this.controlQueue.poll();
            if (poll != null) {
                poll.run();
            }
            if (this.running.get()) {
                executeTask(take);
            } else {
                logger.debug(() -> {
                    return Strings.format("Http executor service [%s] exiting", new Object[]{this.serviceName});
                });
                rejectTaskBecauseOfShutdown(take);
            }
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            logger.warn(Strings.format("Http executor service [%s] failed while retrieving task for execution", new Object[]{this.serviceName}), e2);
        }
    }

    private void executeTask(RejectableTask rejectableTask) {
        try {
            this.requestManager.execute(rejectableTask, this.httpContext);
        } catch (Exception e) {
            logger.warn(Strings.format("Http executor service [%s] failed to execute request [%s]", new Object[]{this.serviceName, rejectableTask}), e);
        }
    }

    private synchronized void notifyRequestsOfShutdown() {
        if (!$assertionsDisabled && !isShutdown()) {
            throw new AssertionError("Requests should only be notified if the executor is shutting down");
        }
        try {
            ArrayList arrayList = new ArrayList();
            this.queue.drainTo(arrayList);
            rejectTasks(arrayList, this::rejectTaskBecauseOfShutdown);
        } catch (Exception e) {
            logger.warn(Strings.format("Failed to notify tasks of queuing service [%s] shutdown", new Object[]{this.serviceName}));
        }
    }

    private void rejectTaskBecauseOfShutdown(RejectableTask rejectableTask) {
        try {
            rejectableTask.onRejection(new EsRejectedExecutionException(Strings.format("Failed to send request, queue service [%s] has shutdown prior to executing request", new Object[]{this.serviceName}), true));
        } catch (Exception e) {
            logger.warn(Strings.format("Failed to notify request [%s] for service [%s] of rejection after queuing service shutdown", new Object[]{rejectableTask, this.serviceName}));
        }
    }

    private void rejectTasks(List<RejectableTask> list, Consumer<RejectableTask> consumer) {
        Iterator<RejectableTask> it = list.iterator();
        while (it.hasNext()) {
            consumer.accept(it.next());
        }
    }

    public int queueSize() {
        return this.queue.size();
    }

    @Override // org.elasticsearch.xpack.inference.external.http.RequestExecutor
    public void shutdown() {
        if (this.running.compareAndSet(true, false)) {
            this.queue.offer(new NoopTask());
        }
    }

    @Override // org.elasticsearch.xpack.inference.external.http.RequestExecutor
    public boolean isShutdown() {
        return !this.running.get();
    }

    @Override // org.elasticsearch.xpack.inference.external.http.RequestExecutor
    public boolean isTerminated() {
        return this.terminationLatch.getCount() == 0;
    }

    @Override // org.elasticsearch.xpack.inference.external.http.RequestExecutor
    public boolean awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.terminationLatch.await(j, timeUnit);
    }

    @Override // org.elasticsearch.xpack.inference.external.http.RequestExecutor
    public void execute(RequestManager requestManager, InferenceInputs inferenceInputs, @Nullable TimeValue timeValue, ActionListener<InferenceServiceResults> actionListener) {
        completeExecution(new RequestTask(requestManager, inferenceInputs, timeValue, this.threadPool, ContextPreservingActionListener.wrapPreservingContext(actionListener, this.threadPool.getThreadContext())));
    }

    private void completeExecution(RequestTask requestTask) {
        if (isShutdown()) {
            requestTask.onRejection(new EsRejectedExecutionException(Strings.format("Failed to enqueue task because the http executor service [%s] has already shutdown", new Object[]{this.serviceName}), true));
        } else if (!this.queue.offer(requestTask)) {
            requestTask.onRejection(new EsRejectedExecutionException(Strings.format("Failed to execute task because the http executor service [%s] queue is full", new Object[]{this.serviceName}), false));
        } else if (isShutdown()) {
            notifyRequestsOfShutdown();
        }
    }

    int remainingQueueCapacity() {
        return this.queue.remainingCapacity();
    }

    static {
        $assertionsDisabled = !RequestExecutorService.class.desiredAssertionStatus();
        QUEUE_CREATOR = new AdjustableCapacityBlockingQueue.QueueCreator<RejectableTask>() { // from class: org.elasticsearch.xpack.inference.external.http.sender.RequestExecutorService.1
            @Override // org.elasticsearch.xpack.inference.common.AdjustableCapacityBlockingQueue.QueueCreator
            public BlockingQueue<RejectableTask> create(int i) {
                return i <= 0 ? create() : new LinkedBlockingQueue(i);
            }

            @Override // org.elasticsearch.xpack.inference.common.AdjustableCapacityBlockingQueue.QueueCreator
            public BlockingQueue<RejectableTask> create() {
                return new LinkedBlockingQueue();
            }
        };
        logger = LogManager.getLogger(RequestExecutorService.class);
    }
}
