package com.taobao.tao.messagekit.base.model;

import android.support.annotation.Nullable;
import com.taobao.codetrack.sdk.util.ReportUtil;
import com.taobao.tao.messagekit.base.MsgRouter;
import com.taobao.tao.messagekit.base.ResponseManager;
import com.taobao.tao.messagekit.core.MsgEnvironment;
import com.taobao.tao.messagekit.core.model.Ack;
import com.taobao.tao.messagekit.core.model.Package;
import com.taobao.tao.messagekit.core.utils.MsgLog;
import com.taobao.tao.messagekit.core.utils.MsgMonitor;
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import java.util.List;
import java.util.Map;

/* loaded from: classes13.dex */
public abstract class BaseConnection<T, R> {
    protected static final int BROKEN_LINE = 3;
    public static final String CONNECTION_CODE = "ConnectionCode";
    public static final String CONNECTION_TYPE = "ConnectionType";
    protected static final String TAG = "BaseConnection";
    private Converter2Data<List<Package>, T> converter2Data;
    private Converter2Msg<Received<R>, List<Package>> converter2Msg;
    protected int status;
    protected int type;

    /* loaded from: classes13.dex */
    public interface Converter2Data<T, R> extends ObservableTransformer<T, R> {
    }

    /* loaded from: classes13.dex */
    public interface Converter2Msg<T, R> extends ObservableTransformer<T, R> {
        Ack a(int i, Map<String, Object> map);
    }

    /* loaded from: classes13.dex */
    public static class Received<R> {

        /* renamed from: a, reason: collision with root package name */
        public String f6953a;
        public int b;
        public String c;
        public R d;

        static {
            ReportUtil.a(-1872419374);
        }

        public Received(String str, int i, String str2, R r) {
            this.f6953a = str;
            this.b = i;
            this.c = str2;
            this.d = r;
        }
    }

    static {
        ReportUtil.a(2052554163);
    }

    public boolean available() {
        return this.status < 3;
    }

    public Converter2Data<List<Package>, T> getConverter2Data() {
        if (this.converter2Data != null || !MsgEnvironment.c()) {
            return this.converter2Data;
        }
        throw new Error("Converter2Data " + this.type + " not set");
    }

    public Converter2Msg<Received<R>, List<Package>> getConverter2Msg() {
        if (this.converter2Msg != null || !MsgEnvironment.c()) {
            return this.converter2Msg;
        }
        throw new Error("Converter2Msg " + this.type + " not set");
    }

    public void onConnectChanged(int i, @Nullable Map<String, String> map) {
    }

    public void onReceive(Received<R> received) {
        if (received != null && getConverter2Msg() != null) {
            MsgLog.a(TAG, "receive >>>", received.c, received.f6953a, Integer.valueOf(received.b));
            Observable.just(received).subscribeOn(Schedulers.a()).compose(getConverter2Msg()).flatMap(new Function<List<Package>, Observable<Package>>() { // from class: com.taobao.tao.messagekit.base.model.BaseConnection.4
                @Override // io.reactivex.functions.Function
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public Observable<Package> apply(List<Package> list) throws Exception {
                    MsgLog.b(BaseConnection.TAG, "parse msgs:", Integer.valueOf(list.size()));
                    return Observable.fromIterable(list);
                }
            }).filter(new Predicate<Package>() { // from class: com.taobao.tao.messagekit.base.model.BaseConnection.3
                @Override // io.reactivex.functions.Predicate
                /* renamed from: a, reason: merged with bridge method [inline-methods] */
                public boolean test(Package r6) throws Exception {
                    MsgLog.a(BaseConnection.TAG, r6);
                    if (r6.msg instanceof Ack) {
                        ResponseManager.ResponseObserver a2 = MsgRouter.a().f().a((String) null, r6.msg.getID());
                        if (a2 != null) {
                            r6.context = a2.f6948a.context;
                            Observable.just(r6).subscribe(a2);
                        }
                    } else {
                        if (r6.msg.type() != 3) {
                            return true;
                        }
                        Observable.just(r6).subscribe(MsgRouter.a().d());
                        MsgMonitor.a("MKT", "cs", 1.0d);
                    }
                    return false;
                }
            }).subscribe(MsgRouter.a().c());
        } else if (MsgEnvironment.c()) {
            throw new Error("Converter2Msg " + this.type + " not set");
        }
    }

    public void onResponse(final String str, final int i, final Map<String, Object> map) {
        Object[] objArr = new Object[7];
        objArr[0] = "type:";
        objArr[1] = Integer.valueOf(this.type);
        objArr[2] = str;
        objArr[3] = "response:";
        objArr[4] = Integer.valueOf(i);
        objArr[5] = "service:";
        objArr[6] = map != null ? map.get("service_id") : null;
        MsgLog.b(TAG, objArr);
        Observable.just(str).subscribeOn(Schedulers.a()).flatMap(new Function<String, Observable<ResponseManager.ResponseObserver>>() { // from class: com.taobao.tao.messagekit.base.model.BaseConnection.2
            @Override // io.reactivex.functions.Function
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public Observable<ResponseManager.ResponseObserver> apply(String str2) throws Exception {
                return Observable.fromIterable(MsgRouter.a().f().a(str));
            }
        }).subscribe(new Consumer<ResponseManager.ResponseObserver>() { // from class: com.taobao.tao.messagekit.base.model.BaseConnection.1
            @Override // io.reactivex.functions.Consumer
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public void accept(ResponseManager.ResponseObserver responseObserver) throws Exception {
                if (responseObserver == null) {
                    return;
                }
                Ack ack = new Ack(responseObserver.f6948a.msg);
                int transCode = BaseConnection.this.transCode(i, (String) (map != null ? map.get("re_msg") : null));
                ack.setStatus(transCode);
                Package r2 = new Package(ack);
                r2.dataId = str;
                r2.context = responseObserver.f6948a.context;
                Observable.just(r2).subscribe(responseObserver);
                if (-30000 == transCode || 1000 == transCode) {
                    BaseConnection.this.status = 0;
                    MsgMonitor.a("MKT", "MKT_ACCS_RATE");
                    return;
                }
                if (BaseConnection.this.status < 3) {
                    BaseConnection.this.status++;
                }
                MsgMonitor.a("MKT", "MKT_ACCS_RATE", "" + i, (String) null);
            }
        });
    }

    public abstract void send(Package r1);

    public void setConverter2Data(Converter2Data<List<Package>, T> converter2Data) {
        this.converter2Data = converter2Data;
    }

    public void setConverter2Msg(Converter2Msg<Received<R>, List<Package>> converter2Msg) {
        this.converter2Msg = converter2Msg;
    }

    public abstract int transCode(int i, String str);

    public int type() {
        return this.type;
    }
}
