package org.xlightweb;

import com.hyphenate.util.HanziToPinyin;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.SocketTimeoutException;
import java.nio.BufferUnderflowException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.xlightweb.client.HttpClient;
import org.xsocket.DataConverter;
import org.xsocket.Execution;

/* loaded from: classes2.dex */
public final class EventDataSource implements IEventDataSource {
    public static final int DEFAULT_READ_TIMEOUT_MILLIS = Integer.MAX_VALUE;
    private static final int DEFAULT_RECONNECT_TIME_MILLIS = 10000;
    private NonBlockingBodyDataSource bodyDataSource;
    private final Object bodyDataSourceGuard;
    private final String[] headerlines;
    private final HttpClient httpClient;
    private final List<Event> inQueue;
    private int inQueueVersion;
    private boolean isIgnoreCommentMessages;
    private final AtomicBoolean isOpen;
    private final AtomicBoolean isReconnecting;
    private AtomicReference<String> lastEventIdRef;
    private final AtomicInteger numReconnects;
    private final AtomicInteger reconnectTrials;
    private int reconnectionTimeMillis;
    private long timeLastConnectTrial;
    private final String uriString;
    private final EventStreamHandlerAdapter webEventHandlerAdapter;
    private static final Logger LOG = Logger.getLogger(EventDataSource.class.getName());
    private static final int DEFAULT_MAX_RECONNECT_TRIALS = Integer.parseInt(System.getProperty("org.xlightweb.eventdatasource.maxreconnectrials", "5"));

    /* JADX INFO: Access modifiers changed from: private */
    @Execution(0)
    /* loaded from: classes.dex */
    public final class EventHandler implements IBodyDataHandler, IBodyDestroyListener, IBodyCompleteListener {
        private EventHandler() {
        }

        @Override // org.xlightweb.IBodyCompleteListener
        public void onComplete() {
            EventDataSource.this.performReconnect();
        }

        @Override // org.xlightweb.IBodyDataHandler
        public boolean onData(NonBlockingBodyDataSource nonBlockingBodyDataSource) throws BufferUnderflowException {
            Event event = null;
            try {
                int indexOf = nonBlockingBodyDataSource.indexOf("\n\n");
                if (indexOf != -1) {
                    event = Event.parse(nonBlockingBodyDataSource.readStringByLength(indexOf + 2));
                } else {
                    int indexOf2 = nonBlockingBodyDataSource.indexOf("\r\n\r\n");
                    if (indexOf2 != -1) {
                        event = Event.parse(nonBlockingBodyDataSource.readStringByLength(indexOf2 + 4));
                    }
                }
                if (event != null) {
                    if (!EventDataSource.this.isIgnoreCommentMessages || !event.isCommentMessage()) {
                        synchronized (EventDataSource.this.inQueue) {
                            EventDataSource.this.lastEventIdRef.set(event.getId());
                            if (event.getRetryMillis() != null) {
                                EventDataSource.this.reconnectionTimeMillis = event.getRetryMillis().intValue();
                            }
                            EventDataSource.access$1208(EventDataSource.this);
                            EventDataSource.this.inQueue.add(event);
                            EventDataSource.this.inQueue.notifyAll();
                        }
                        EventDataSource.this.webEventHandlerAdapter.onMessage(EventDataSource.this);
                    } else if (EventDataSource.LOG.isLoggable(Level.FINE)) {
                        EventDataSource.LOG.fine("comment message received. ignoring it (property isIgnoreCommentMessages=true)");
                    }
                }
            } catch (IOException e) {
                if (EventDataSource.LOG.isLoggable(Level.FINE)) {
                    EventDataSource.LOG.fine("[" + nonBlockingBodyDataSource.getId() + "] error occured by parsing event " + e.toString());
                }
                nonBlockingBodyDataSource.destroy();
            }
            return true;
        }

        @Override // org.xlightweb.IBodyDestroyListener
        public void onDestroyed() throws IOException {
            EventDataSource.this.performReconnect();
        }
    }

    public EventDataSource(HttpClient httpClient, String str, IEventHandler iEventHandler) throws MalformedURLException, IOException {
        this(httpClient, str, true, iEventHandler, new String[0]);
    }

    EventDataSource(HttpClient httpClient, String str, boolean z, IEventHandler iEventHandler, int i, String... strArr) throws MalformedURLException, IOException {
        this.reconnectionTimeMillis = 10000;
        this.reconnectTrials = new AtomicInteger(0);
        this.isReconnecting = new AtomicBoolean(false);
        this.numReconnects = new AtomicInteger();
        this.timeLastConnectTrial = 0L;
        this.bodyDataSourceGuard = new Object();
        this.isOpen = new AtomicBoolean(true);
        this.isIgnoreCommentMessages = true;
        this.inQueue = new ArrayList();
        this.inQueueVersion = 0;
        this.lastEventIdRef = new AtomicReference<>();
        this.httpClient = httpClient;
        this.uriString = str;
        this.isIgnoreCommentMessages = z;
        this.headerlines = strArr;
        this.webEventHandlerAdapter = new EventStreamHandlerAdapter(iEventHandler);
        connect();
    }

    public EventDataSource(HttpClient httpClient, String str, boolean z, IEventHandler iEventHandler, String... strArr) throws MalformedURLException, IOException {
        this(httpClient, str, z, iEventHandler, DEFAULT_MAX_RECONNECT_TRIALS, strArr);
    }

    static /* synthetic */ int access$1208(EventDataSource eventDataSource) {
        int i = eventDataSource.inQueueVersion;
        eventDataSource.inQueueVersion = i + 1;
        return i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connect() throws MalformedURLException, IOException {
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("open data event stream " + this.uriString);
        }
        GetRequest getRequest = new GetRequest(this.uriString);
        getRequest.setHeader("Accept", "text/event-stream");
        getRequest.setHeader("Cache-Control", "no-cache");
        for (String str : this.headerlines) {
            getRequest.addHeaderLine(str);
        }
        String str2 = this.lastEventIdRef.get();
        if (str2 != null) {
            getRequest.setHeader("Last-Event-ID", str2);
        }
        this.timeLastConnectTrial = System.currentTimeMillis();
        IHttpResponse call = this.httpClient.call(getRequest);
        if (call.getStatus() < 200 || call.getStatus() > 299) {
            throw new IOException("got " + call.getStatus() + HanziToPinyin.Token.SEPARATOR + call.getReason());
        }
        if (!call.getContentType().toLowerCase().startsWith("text/event-stream")) {
            throw new IOException("got content type " + call.getContentType() + " instead text/event-stream");
        }
        synchronized (this.bodyDataSourceGuard) {
            this.bodyDataSource = call.getNonBlockingBody();
            EventHandler eventHandler = new EventHandler();
            this.bodyDataSource.setDataHandler(eventHandler);
            this.bodyDataSource.addDestroyListener(eventHandler);
            this.bodyDataSource.addCompleteListener(eventHandler);
        }
        this.webEventHandlerAdapter.onConnect(this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void performReconnect() {
        if (this.isOpen.get() && !this.isReconnecting.getAndSet(true)) {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("connection is terminated. Try to reconnect to " + this.uriString);
            }
            TimerTask timerTask = new TimerTask() { // from class: org.xlightweb.EventDataSource.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        if (EventDataSource.LOG.isLoggable(Level.FINE)) {
                            EventDataSource.LOG.fine("try reconnect " + EventDataSource.this.uriString);
                        }
                        EventDataSource.this.connect();
                        EventDataSource.this.numReconnects.incrementAndGet();
                        EventDataSource.this.reconnectTrials.set(0);
                        EventDataSource.this.isReconnecting.set(false);
                        if (EventDataSource.LOG.isLoggable(Level.FINE)) {
                            EventDataSource.LOG.fine("reconnected to " + EventDataSource.this.uriString);
                        }
                    } catch (IOException e) {
                        if (EventDataSource.LOG.isLoggable(Level.FINE)) {
                            EventDataSource.LOG.fine("reconnecting " + EventDataSource.this.uriString + " failed " + e.toString());
                        }
                        EventDataSource.this.isReconnecting.set(false);
                        EventDataSource.this.performReconnect();
                    }
                }
            };
            long currentTimeMillis = (this.reconnectionTimeMillis + this.timeLastConnectTrial) - System.currentTimeMillis();
            if (currentTimeMillis <= 0) {
                processMultithreaded(timerTask);
                return;
            }
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("reconnecting " + this.uriString + " in " + DataConverter.toFormatedDuration(currentTimeMillis));
            }
            HttpUtils.schedule(timerTask, currentTimeMillis);
        }
    }

    @Override // org.xlightweb.IReadableWebStream
    public int availableMessages() {
        int size;
        synchronized (this.inQueue) {
            size = this.inQueue.size();
        }
        return size;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        synchronized (this.bodyDataSourceGuard) {
            if (this.isOpen.getAndSet(false)) {
                this.bodyDataSource.close();
            }
        }
        this.webEventHandlerAdapter.onDisconnect(this);
    }

    @Override // org.xlightweb.IReadableWebStream
    public void closeQuitly() {
        try {
            close();
        } catch (IOException e) {
        }
    }

    @Override // org.xlightweb.IReadableWebStream
    public void destroy() {
        synchronized (this.bodyDataSourceGuard) {
            if (this.isOpen.getAndSet(false)) {
                this.bodyDataSource.destroy();
            }
        }
        this.webEventHandlerAdapter.onDisconnect(this);
    }

    @Override // org.xlightweb.IReadableWebStream
    public String getId() {
        return this.bodyDataSource.getId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getInQueueVersion() {
        int i;
        synchronized (this.inQueue) {
            i = this.inQueueVersion;
        }
        return i;
    }

    @Override // org.xlightweb.IEventDataSource
    public String getLastEventId() {
        return this.lastEventIdRef.get();
    }

    @Override // org.xlightweb.IEventDataSource
    public int getNumReconnects() {
        return this.numReconnects.get();
    }

    @Override // org.xlightweb.IEventDataSource
    public int getReconnectionTimeMillis() {
        return 0;
    }

    @Override // org.xlightweb.IEventDataSource
    public boolean isIgnoreCommentMessages() {
        return this.isIgnoreCommentMessages;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processMultithreaded(Runnable runnable) {
        this.bodyDataSource.getExecutor().processMultithreaded(runnable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processNonthreaded(Runnable runnable) {
        this.bodyDataSource.getExecutor().processNonthreaded(runnable);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.xlightweb.IReadableWebStream
    public Event readMessage() throws IOException, SocketTimeoutException, ClosedChannelException {
        return readMessage(Integer.MAX_VALUE);
    }

    @Override // org.xlightweb.IEventDataSource
    public Event readMessage(int i) throws IOException, SocketTimeoutException, ClosedChannelException {
        long currentTimeMillis = System.currentTimeMillis();
        long j = i;
        do {
            synchronized (this.inQueue) {
                if (!this.inQueue.isEmpty()) {
                    this.inQueueVersion++;
                    return this.inQueue.remove(0);
                }
                try {
                    this.inQueue.wait(j);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                j = HttpUtils.computeRemainingTime(currentTimeMillis, i);
            }
        } while (j > 0);
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("receive timeout " + DataConverter.toFormatedDuration(i) + " reached. throwing timeout exception");
        }
        throw new SocketTimeoutException("timeout " + DataConverter.toFormatedDuration(i) + " reached");
    }

    @Override // org.xlightweb.IEventDataSource
    public void setIgnoreCommentMessages(boolean z) {
        this.isIgnoreCommentMessages = z;
    }

    @Override // org.xlightweb.IEventDataSource
    public void setReconnectionTimeMillis(int i) {
        this.reconnectionTimeMillis = i;
    }
}
