package com.qq.taf.net;

import com.qq.taf.proxy.TafLoggerCenter;
import com.qq.taf.proxy.utils.Millis100TimeProvider;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

/* loaded from: classes.dex */
public class Processor implements Runnable {
    protected SessionWriter a;
    protected MessageReceiver b;
    protected HandlerExecutor c;
    protected Selector d;
    protected ConcurrentLinkedQueue<SelectionKey> e;
    protected ConcurrentLinkedQueue<Ternary<Integer, SocketChannel, Integer>> f;
    protected ConcurrentLinkedQueue<SelectionKey> g;
    protected long h;
    private Filter i;

    public Processor(String str, Filter filter, SessionWriter sessionWriter, MessageReceiver messageReceiver, HandlerExecutor handlerExecutor, int i) {
        try {
            this.i = filter;
            this.a = sessionWriter;
            this.b = messageReceiver;
            this.c = handlerExecutor;
            this.d = Selector.open();
            this.e = new ConcurrentLinkedQueue<>();
            this.f = new ConcurrentLinkedQueue<>();
            this.g = new ConcurrentLinkedQueue<>();
            new Thread(this, str + "-Processor-" + i).start();
        } catch (Throwable th) {
            Logger.a("processor init error", th);
            Exception.a(th);
        }
    }

    private void b(SelectionKey selectionKey, long j) {
        Session session;
        int k;
        if (selectionKey.isValid() && (k = (session = (Session) selectionKey.attachment()).k()) >= 0 && j - Math.max(session.a(), Math.max(session.b(), session.c())) > k) {
            TafLoggerCenter.a("connection io timeout|" + k + "ms|" + session.e() + "|" + session.d());
            session.a(true);
        }
    }

    private void c(SelectionKey selectionKey, long j) {
        int read;
        Session session = (Session) selectionKey.attachment();
        try {
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            int g = session.g();
            ByteBuffer allocate = ByteBuffer.allocate(g);
            int i = 0;
            while (true) {
                read = socketChannel.read(allocate);
                if (read <= 0) {
                    break;
                } else {
                    i += read;
                }
            }
            if (i > 0) {
                allocate.flip();
                this.i.a(session, allocate, this.b);
                if (i == g) {
                    session.h();
                } else if (i < (g >>> 1)) {
                    session.i();
                }
                session.b(j);
                session.a(i);
            } else {
                session.i();
            }
            if (read < 0) {
                session.a(false);
            }
        } catch (Throwable th) {
            session.a(true);
            this.c.a(session, th);
        }
    }

    protected void a(Ternary<Integer, SocketChannel, Integer> ternary, long j) {
        try {
            int intValue = ternary.a.intValue();
            SocketChannel socketChannel = ternary.b;
            int intValue2 = ternary.c.intValue();
            socketChannel.configureBlocking(false);
            socketChannel.socket().setReuseAddress(true);
            socketChannel.socket().setTcpNoDelay(true);
            socketChannel.socket().setKeepAlive(true);
            SelectionKey register = socketChannel.register(this.d, 1);
            Session session = new Session(intValue, register, this, this.i, this.a, this.c, intValue2);
            register.attach(session);
            session.a(j);
            this.c.b(session);
        } catch (Throwable th) {
            Logger.a("processor register error", th);
        }
    }

    public void a(SelectionKey selectionKey) {
        boolean isEmpty = this.g.isEmpty();
        this.g.offer(selectionKey);
        if (isEmpty) {
            this.d.wakeup();
        }
    }

    protected void a(SelectionKey selectionKey, long j) {
        ByteBuffer peek;
        if (selectionKey.isValid()) {
            Session session = (Session) selectionKey.attachment();
            try {
                session.b(false);
                a(selectionKey, false);
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                ConcurrentLinkedQueue<ByteBuffer> f = session.f();
                int i = 0;
                while (true) {
                    peek = f.peek();
                    if (peek == null) {
                        break;
                    }
                    if (peek == Session.a) {
                        f.clear();
                        b(selectionKey);
                        peek = null;
                        break;
                    }
                    int i2 = 0;
                    for (int i3 = 0; i3 < 32 && (i2 = socketChannel.write(peek)) <= 0; i3++) {
                    }
                    if (i2 == 0) {
                        break;
                    }
                    i += i2;
                    if (!peek.hasRemaining()) {
                        f.poll();
                    }
                }
                if (i > 0) {
                    session.c(j);
                    session.b(i);
                }
                if (peek != null) {
                    a(selectionKey, true);
                    session.b(true);
                }
            } catch (Throwable th) {
                session.a(true);
                this.c.a(session, th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void a(SelectionKey selectionKey, boolean z) {
        selectionKey.interestOps(z ? 5 : 1);
    }

    public void a(SocketChannel socketChannel, int i, int i2) {
        this.f.offer(new Ternary<>(Integer.valueOf(i), socketChannel, Integer.valueOf(i2)));
        this.d.wakeup();
    }

    public void b(SelectionKey selectionKey) {
        this.e.offer(selectionKey);
    }

    protected void c(SelectionKey selectionKey) {
        try {
            if (selectionKey.isValid()) {
                Session session = (Session) selectionKey.attachment();
                selectionKey.cancel();
                selectionKey.attach(null);
                ((SocketChannel) selectionKey.channel()).close();
                this.c.a(session);
            }
        } catch (Throwable th) {
            Logger.a("processor cancel error", th);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.h = Millis100TimeProvider.a.a();
            while (true) {
                int select = this.d.select(100L);
                long a = Millis100TimeProvider.a.a();
                while (true) {
                    Ternary<Integer, SocketChannel, Integer> poll = this.f.poll();
                    if (poll == null) {
                        break;
                    } else {
                        a(poll, a);
                    }
                }
                if (select > 0) {
                    Iterator<SelectionKey> it = this.d.selectedKeys().iterator();
                    long a2 = Millis100TimeProvider.a.a();
                    while (it.hasNext()) {
                        SelectionKey next = it.next();
                        if (next.isReadable()) {
                            c(next, a2);
                        }
                        if (next.isWritable()) {
                            ((Session) next.attachment()).b(true);
                        }
                        it.remove();
                    }
                }
                long a3 = Millis100TimeProvider.a.a();
                while (true) {
                    SelectionKey poll2 = this.g.poll();
                    if (poll2 == null) {
                        break;
                    } else {
                        a(poll2, a3);
                    }
                }
                while (true) {
                    SelectionKey poll3 = this.e.poll();
                    if (poll3 == null) {
                        break;
                    } else {
                        c(poll3);
                    }
                }
                long a4 = Millis100TimeProvider.a.a();
                if (a4 - this.h > 5000) {
                    this.h = a4;
                    Iterator<SelectionKey> it2 = this.d.keys().iterator();
                    while (it2.hasNext()) {
                        b(it2.next(), a4);
                    }
                }
            }
        } catch (Throwable th) {
            Logger.a("processor run error", th);
        }
    }
}
