/*
 * Decompiled with CFR 0.152.
 */
package com.gluonhq.snl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.gluonhq.snl.ConnectivityListener;
import com.gluonhq.snl.LegacyNetworkClient;
import com.gluonhq.snl.QuicNetworkClient;
import com.gluonhq.snl.Response;
import com.gluonhq.snl.SendGroupMessageResponse;
import com.gluonhq.snl.SendMessageResponse;
import com.gluonhq.snl.WebsocketResponse;
import com.google.protobuf.ByteString;
import io.privacyresearch.equation.NetworkMonitor;
import io.privacyresearch.equation.net.NetworkConfiguration;
import io.privacyresearch.equation.net.SignalUrl;
import io.privacyresearch.equation.signal.SignalBridge;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.WebSocket;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.whispersystems.signalservice.api.Util;
import org.whispersystems.signalservice.api.crypto.UnidentifiedAccess;
import org.whispersystems.signalservice.api.messages.SignalServiceEnvelope;
import org.whispersystems.signalservice.api.push.GroupMismatchedDevices;
import org.whispersystems.signalservice.api.push.GroupStaleDevices;
import org.whispersystems.signalservice.api.push.MismatchedDevices;
import org.whispersystems.signalservice.api.push.OutgoingPushMessageList;
import org.whispersystems.signalservice.api.push.StaleDevices;
import org.whispersystems.signalservice.api.push.exceptions.GroupMismatchedDevicesException;
import org.whispersystems.signalservice.api.push.exceptions.GroupStaleDevicesException;
import org.whispersystems.signalservice.api.push.exceptions.MalformedResponseException;
import org.whispersystems.signalservice.api.push.exceptions.MismatchedDevicesException;
import org.whispersystems.signalservice.api.push.exceptions.PushNetworkException;
import org.whispersystems.signalservice.api.push.exceptions.ServerRejectedException;
import org.whispersystems.signalservice.api.push.exceptions.StaleDevicesException;
import org.whispersystems.signalservice.api.util.CredentialsProvider;
import org.whispersystems.signalservice.internal.websocket.WebSocketProtos;

public abstract class NetworkClient {
    final SignalUrl signalUrl;
    final String signalAgent;
    final boolean allowStories;
    final Optional<CredentialsProvider> credentialsProvider;
    final Optional<ConnectivityListener> connectivityListener;
    final NetworkMonitor monitor;
    private static final Logger LOG = Logger.getLogger(NetworkClient.class.getName());
    final byte[] CLOSING = new byte[]{-1, 1, -2, 3, -5, 8, -13, 21};
    private final String VERB_CLOSING = "CLOSING";
    final BlockingQueue<byte[]> rawByteQueue = new LinkedBlockingQueue<byte[]>();
    private final BlockingQueue<WebSocketProtos.WebSocketRequestMessage> wsRequestMessageQueue = new LinkedBlockingQueue<WebSocketProtos.WebSocketRequestMessage>();
    private final Map<Long, CompletableFuture> outgoingRequests = new HashMap<Long, CompletableFuture>();
    private final Thread formatProcessingThread;
    private boolean closed = false;
    private static final String SERVER_DELIVERED_TIMESTAMP_HEADER = "X-Signal-Timestamp";
    boolean websocketCreated = false;
    private WsStatus wsStatus = WsStatus.INIT;
    private final Object statusLock = new Object();

    public static NetworkClient createNetworkClient(NetworkMonitor monitor, NetworkConfiguration config, NetworkConfiguration.Purpose purpose, Optional<CredentialsProvider> cp) {
        return NetworkClient.createNetworkClient(monitor, config.getSignalUrl(purpose), cp, SignalBridge.SIGNAL_USER_AGENT, Optional.empty(), false, config.isUseQuic(), config.getProxy());
    }

    public static NetworkClient createNetworkClient(NetworkMonitor monitor, SignalUrl url, Optional<CredentialsProvider> cp, String agent, Optional<ConnectivityListener> cl, boolean allowStories, boolean useQuic, String proxy) {
        LOG.info("Creating Networkclient with url " + (url != null ? url.getUrl() : "NULL") + ", using quic? " + useQuic + " and proxy = " + proxy);
        if (useQuic) {
            return new QuicNetworkClient(monitor, url, cp, agent, cl, allowStories, proxy);
        }
        return new LegacyNetworkClient(monitor, url, cp, agent, cl, allowStories);
    }

    public NetworkClient(NetworkMonitor monitor, SignalUrl url, Optional<CredentialsProvider> cp, String signalAgent, Optional<ConnectivityListener> connectivityListener, boolean allowStories) {
        this.signalUrl = url;
        this.signalAgent = signalAgent;
        this.allowStories = allowStories;
        this.credentialsProvider = cp;
        this.connectivityListener = connectivityListener;
        this.monitor = monitor;
        LOG.info("Created NetworkClient " + String.valueOf(this) + " with URL " + String.valueOf(url) + ", cp = " + String.valueOf(cp) + " and cl = " + String.valueOf(connectivityListener));
        this.formatProcessingThread = new Thread(){

            @Override
            public void run() {
                NetworkClient.this.processFormatConversion();
            }
        };
        this.formatProcessingThread.start();
    }

    public Future halt() {
        LOG.info("We need to halt " + String.valueOf(this));
        this.setWsStatus(WsStatus.HALT);
        LOG.info("Done setting status");
        return CompletableFuture.runAsync(() -> this.shutdown());
    }

    public boolean supportsJson() {
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createWebSocket() throws IOException {
        Object object = this.statusLock;
        synchronized (object) {
            if (this.wsStatus == WsStatus.HALT) {
                LOG.info("We are nog going to recreate a websocket.");
            }
            if (this.wsStatus == WsStatus.CREATING) {
                LOG.info("WebSocket is already being created for " + String.valueOf(this) + ". Have some patience");
                return;
            }
            if (this.wsStatus == WsStatus.READY) {
                LOG.severe("Can't create a READY websocket");
                throw new IllegalStateException();
            }
            this.setWsStatus(WsStatus.CREATING);
        }
        LOG.info("Creating websocket for " + String.valueOf(this) + ", using credentialsprovider? " + this.credentialsProvider.isPresent() + " and network = " + this.monitor.networkStatus);
        int interval = 1;
        while (this.wsStatus != WsStatus.READY) {
            if (!this.monitor.networkStatus) {
                CountDownLatch cdl = new CountDownLatch(1);
                Consumer<Boolean> callback = a -> {
                    if (a.booleanValue()) {
                        LOG.info("We're notified that the network is back!");
                        cdl.countDown();
                    }
                };
                this.monitor.addNetworkListener(callback);
                try {
                    if (!this.monitor.networkStatus) {
                        cdl.await();
                    }
                    this.monitor.removeNetworkListener(callback);
                }
                catch (InterruptedException ex) {
                    LOG.log(Level.SEVERE, null, ex);
                }
            }
            LOG.info("Now network is " + this.monitor.networkStatus);
            Object baseUrl = this.signalUrl.getUrl().replace("https://", "wss://").replace("http://", "ws://");
            if (!((String)baseUrl).endsWith("provisioning/")) {
                baseUrl = (String)baseUrl + "/v1/websocket/";
            }
            String auth = null;
            if (this.credentialsProvider.isPresent()) {
                CredentialsProvider cp = this.credentialsProvider.get();
                String identifier = cp.getAci() != null ? cp.getDeviceUuid() : cp.getE164();
                auth = "Basic " + Base64.getEncoder().encodeToString((identifier + ":" + cp.getPassword()).getBytes("UTF-8"));
            }
            this.implCreateWebSocket((String)baseUrl, auth);
            LOG.info("After creating WS for " + String.valueOf(this) + ", status = " + String.valueOf((Object)this.wsStatus));
            if (this.wsStatus == WsStatus.READY) continue;
            try {
                LOG.warning("Status not yet ready, retry but first sleep " + interval + " seconds");
                Thread.sleep(1000 * interval);
                interval = Math.min(interval * 2, 30);
                LOG.warning("Done sleeping, status = " + String.valueOf((Object)this.wsStatus) + " and next wait interval = " + interval + " seconds");
            }
            catch (Exception e) {
                LOG.log(Level.SEVERE, "Error sleeping", e);
                e.printStackTrace();
            }
        }
    }

    void implCreateWebSocket(String baseurl, String auth) throws IOException {
        throw new UnsupportedOperationException();
    }

    void implCreateWebSocket(String baseurl) throws IOException {
        throw new UnsupportedOperationException();
    }

    CompletableFuture<WebSocket> sendToStream(WebSocketProtos.WebSocketMessage msg, OutgoingPushMessageList list) throws IOException {
        if (!this.websocketCreated) {
            this.createWebSocket();
        }
        return this.implSendToStream(msg, list);
    }

    CompletableFuture<WebSocket> implSendToStream(WebSocketProtos.WebSocketMessage msg, OutgoingPushMessageList list) throws IOException {
        throw new UnsupportedOperationException();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void requestWSShutdown() {
        LOG.info("Got a request to shutdown the current websocket, status = " + String.valueOf((Object)this.wsStatus));
        try {
            Object object = this.statusLock;
            synchronized (object) {
                if (this.wsStatus == WsStatus.INIT) {
                    LOG.severe("Can't shutdown a websocket that is still in INIT");
                    throw new IllegalStateException();
                }
                if (this.wsStatus == WsStatus.CREATING) {
                    LOG.severe("Can't shutdown a websocket that is in CREATING");
                    throw new IllegalStateException();
                }
                if (this.wsStatus != WsStatus.READY) {
                    LOG.severe("Can't shutdown a websocket that is not in READY");
                    throw new IllegalStateException();
                }
                this.setWsStatus(WsStatus.RESTARTING);
            }
            this.rawByteQueue.put(this.CLOSING);
        }
        catch (InterruptedException ex) {
            Logger.getLogger(NetworkClient.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    WsStatus getWsStatus() {
        return this.wsStatus;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void setWsStatus(WsStatus status) {
        LOG.info("Move status for " + String.valueOf(this) + " from " + String.valueOf((Object)this.wsStatus) + " to " + String.valueOf((Object)status));
        Object object = this.statusLock;
        synchronized (object) {
            this.wsStatus = status;
        }
    }

    public void shutdown() {
        LOG.info("Shutting down networkclient " + String.valueOf(this));
        this.implShutdown();
        LOG.info("Shutting down networkclient done for " + String.valueOf(this));
    }

    void implShutdown() {
    }

    public Future<SendGroupMessageResponse> sendToGroup(byte[] body, final byte[] joinedUnidentifiedAccess, long timestamp, boolean online) throws IOException {
        if (this.closed) {
            throw new IOException("Trying to use a closed networkclient " + String.valueOf(this));
        }
        LinkedList<String> headers = new LinkedList<String>(this){
            final /* synthetic */ NetworkClient this$0;
            {
                this.this$0 = this$0;
                this.add("content-type:application/vnd.signal-messenger.mrm");
                this.add("Unidentified-Access-Key:" + Base64.getEncoder().encodeToString(joinedUnidentifiedAccess));
            }
        };
        String path = String.format(Locale.US, "/v1/messages/multi_recipient?ts=%s&online=%s", timestamp, online);
        LOG.info("Sending groupmessage to " + path);
        WebSocketProtos.WebSocketRequestMessage requestMessage = WebSocketProtos.WebSocketRequestMessage.newBuilder().setId(new SecureRandom().nextLong()).setVerb("PUT").setPath(path).addAllHeaders((Iterable)headers).setBody(ByteString.copyFrom((byte[])body)).build();
        CompletableFuture<WebsocketResponse> response = this.sendRequest(requestMessage, null);
        CompletionStage answer = response.thenCompose(f -> {
            int status = f.getStatus();
            LOG.info("Response has status " + status);
            if (status != 200) {
                return CompletableFuture.failedFuture(this.parseGroupException(status, f.getBody()));
            }
            return CompletableFuture.completedFuture(new SendGroupMessageResponse());
        });
        return answer;
    }

    private Exception parseGroupException(int status, String body) {
        try {
            LOG.info("Parse exception, status = " + status + " and body = " + body);
            if (status == 404) {
                System.err.println("ERROR: sendGroup -> 404");
                return new IOException();
            }
            if (status == 409) {
                GroupMismatchedDevices[] mismatchedDevices = NetworkClient.readBodyJson(body, GroupMismatchedDevices[].class);
                return new GroupMismatchedDevicesException(mismatchedDevices);
            }
            if (status == 410) {
                GroupStaleDevices[] staleDevices = NetworkClient.readBodyJson(body, GroupStaleDevices[].class);
                return new GroupStaleDevicesException(staleDevices);
            }
            if (status == 508) {
                return new ServerRejectedException();
            }
            if (status < 200 || status >= 300) {
                System.err.println("will throw IOexception, response = " + body);
                return new IOException("Non-successful response: " + status);
            }
        }
        catch (Exception e) {
            LOG.severe("Error processing exception: " + String.valueOf(e));
            return new IllegalArgumentException(e);
        }
        return null;
    }

    private Exception parseException(int status, String body) {
        try {
            LOG.info("Parse exception, status = " + status + " and body = " + body);
            if (status == 404) {
                System.err.println("ERROR: sendGroup -> 404");
                return new IOException();
            }
            if (status == 409) {
                MismatchedDevices mismatchedDevices = NetworkClient.readBodyJson(body, MismatchedDevices.class);
                return new MismatchedDevicesException(mismatchedDevices);
            }
            if (status == 410) {
                StaleDevices staleDevices = NetworkClient.readBodyJson(body, StaleDevices.class);
                return new StaleDevicesException(staleDevices);
            }
            if (status == 508) {
                return new ServerRejectedException();
            }
            if (status < 200 || status >= 300) {
                System.err.println("will throw IOexception, response = " + body);
                return new IOException("Non-successful response: " + status);
            }
        }
        catch (Exception e) {
            LOG.severe("Error processing exception: " + String.valueOf(e));
            return new IllegalArgumentException(e);
        }
        return null;
    }

    public Future<SendMessageResponse> sendDirectOverStream(OutgoingPushMessageList list, Optional<UnidentifiedAccess> unidentifiedAccess, boolean story) throws IOException {
        LinkedList<String> headers = new LinkedList<String>(){
            {
                this.add("content-type:application/json");
            }
        };
        unidentifiedAccess.ifPresent(ua -> headers.add("Unidentified-Access-Key:" + Base64.getEncoder().encodeToString(((UnidentifiedAccess)unidentifiedAccess.get()).getUnidentifiedAccessKey())));
        WebSocketProtos.WebSocketRequestMessage requestMessage = WebSocketProtos.WebSocketRequestMessage.newBuilder().setId(new SecureRandom().nextLong()).setVerb("PUT").setPath(String.format("/v1/messages/%s?story=%s", list.getDestination(), story ? "true" : "false")).addAllHeaders((Iterable)headers).setBody(ByteString.copyFrom((byte[])Util.toJson((Object)list).getBytes())).build();
        LOG.info("Send direct msg to " + list.getDestination());
        LOG.finest("JSONLISTSIZE = " + Util.toJson((Object)list).getBytes().length);
        LOG.finest("jsonlist = " + Util.toJson((Object)list));
        LOG.info("request headers = " + String.valueOf(headers) + "\n    hence headers = " + String.valueOf(requestMessage.getHeadersList()));
        CompletableFuture<WebsocketResponse> response = this.sendRequest(requestMessage, list);
        CompletionStage answer = response.thenCompose(this::validateResponse);
        return answer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> T read(long timeout, TimeUnit unit, Class<T> clazz) {
        try {
            long cooldown = 1L;
            while (this.wsStatus != WsStatus.READY) {
                try {
                    LOG.info("Waiting " + cooldown + " before we create a websocket");
                    Thread.sleep(cooldown);
                    cooldown = Math.min(300000L, cooldown * 2L);
                    LOG.info("Need to CreateWebSocket for " + String.valueOf(this));
                    this.createWebSocket();
                }
                catch (IOException ex) {
                    LOG.log(Level.SEVERE, null, ex);
                }
            }
            Object ex = this.statusLock;
            synchronized (ex) {
                if (this.wsStatus != WsStatus.READY) {
                    LOG.severe("Can't read when WS not ready");
                    throw new IllegalStateException("Can't read when websocket not ready");
                }
            }
            LOG.info("Wait for requestMessage...");
            WebSocketProtos.WebSocketRequestMessage request = this.wsRequestMessageQueue.take();
            if ("CLOSING".equals(request.getVerb())) {
                LOG.info("We are told to close this connection. Create a new one and try again.");
                this.websocketCreated = false;
                this.shutdown();
                return this.read(timeout, unit, clazz);
            }
            int left = this.wsRequestMessageQueue.size();
            LOG.info("IncomingQueue size after take = " + left);
            LOG.info("Got requestMessage, process now " + request.getVerb() + " " + request.getPath());
            Optional<T> sse = this.genhandleWebSocketRequestMessage(request, clazz);
            if (sse.isPresent()) {
                return sse.get();
            }
            return null;
        }
        catch (IOException | InterruptedException ex) {
            LOG.log(Level.SEVERE, null, ex);
            ex.printStackTrace();
            LOG.info("No idea why we would have an Exception here. Investigate");
            return null;
        }
    }

    public SignalServiceEnvelope read(long timeout, TimeUnit unit) {
        return this.read(timeout, unit, SignalServiceEnvelope.class);
    }

    <T> Optional<T> genhandleWebSocketRequestMessage(WebSocketProtos.WebSocketRequestMessage request, Class<T> type) throws IOException {
        LOG.info("Handle WS requestMessage ");
        if (this.isSocketEmptyRequest(request)) {
            return Optional.empty();
        }
        WebSocketProtos.WebSocketResponseMessage response = this.createWebSocketResponse(request);
        try {
            Optional<String> timestampHeader = NetworkClient.findHeader(request, SERVER_DELIVERED_TIMESTAMP_HEADER);
            long timestamp = 0L;
            if (timestampHeader.isPresent()) {
                try {
                    timestamp = Long.parseLong(timestampHeader.get());
                }
                catch (NumberFormatException e) {
                    LOG.warning("Failed to parse X-Signal-Timestamp");
                }
            }
            if (this.isSignalServiceEnvelope(request) && type.equals(SignalServiceEnvelope.class)) {
                LOG.info("body = " + String.valueOf(request.getBody()));
                SignalServiceEnvelope envelope = new SignalServiceEnvelope(request.getBody().toByteArray(), timestamp);
                LOG.finer("Request " + Objects.hashCode(request) + " has envelope " + Objects.hashCode(envelope));
                Optional<SignalServiceEnvelope> optional = Optional.of(envelope);
                return optional;
            }
            if (type.equals(WebSocketProtos.WebSocketRequestMessage.class)) {
                LOG.info("ProvisioningMessage!");
                Optional<WebSocketProtos.WebSocketRequestMessage> optional = Optional.of(request);
                return optional;
            }
            throw new IllegalArgumentException("Can not create an instance of " + String.valueOf(type) + " for this websocket message");
        }
        finally {
            LOG.info("[NC] sendResponse to ack websocket request msg");
            LOG.finer("[SSMP] readOrEmpty SHOULD send response");
            try {
                WebSocketProtos.WebSocketMessage msg = WebSocketProtos.WebSocketMessage.newBuilder().setType(WebSocketProtos.WebSocketMessage.Type.RESPONSE).setResponse(response).build();
                this.sendToStream(msg, null);
            }
            catch (Exception ioe) {
                LOG.log(Level.SEVERE, "IO exception in sending response", ioe);
            }
            LOG.fine("[SSMP] readOrEmpty did send response");
        }
    }

    private boolean isSignalServiceEnvelope(WebSocketProtos.WebSocketRequestMessage message) {
        return "PUT".equals(message.getVerb()) && "/api/v1/message".equals(message.getPath());
    }

    private boolean isSocketEmptyRequest(WebSocketProtos.WebSocketRequestMessage message) {
        return "PUT".equals(message.getVerb()) && "/api/v1/queue/empty".equals(message.getPath());
    }

    private boolean isProvisioningMessage(WebSocketProtos.WebSocketRequestMessage message) {
        return "PUT".equals(message.getVerb()) && "/v1/address".equals(message.getPath());
    }

    private WebSocketProtos.WebSocketResponseMessage createWebSocketResponse(WebSocketProtos.WebSocketRequestMessage request) {
        if (this.isSignalServiceEnvelope(request)) {
            return WebSocketProtos.WebSocketResponseMessage.newBuilder().setId(request.getId()).setStatus(200).setMessage("OK").build();
        }
        return WebSocketProtos.WebSocketResponseMessage.newBuilder().setId(request.getId()).setStatus(400).setMessage("Unknown").build();
    }

    private static Optional<String> findHeader(WebSocketProtos.WebSocketRequestMessage message, String targetHeader) {
        if (message.getHeadersCount() == 0) {
            return Optional.empty();
        }
        for (String header : message.getHeadersList()) {
            String[] split;
            if (!header.startsWith(targetHeader) || (split = header.split(":")).length != 2 || !split[0].trim().toLowerCase().equals(targetHeader.toLowerCase())) continue;
            return Optional.of(split[1].trim());
        }
        return Optional.empty();
    }

    private void processFormatConversion() {
        LOG.info("start processformatthread");
        while (!this.closed) {
            try {
                LOG.info("Wait for raw bytes on " + String.valueOf(this.rawByteQueue));
                byte[] raw = this.rawByteQueue.take();
                if (Arrays.equals(raw, this.CLOSING)) {
                    LOG.info("FormattingThread received closing bytes, stop loop");
                    WebSocketProtos.WebSocketRequestMessage closingMessage = WebSocketProtos.WebSocketRequestMessage.newBuilder().setVerb("CLOSING").build();
                    this.wsRequestMessageQueue.add(closingMessage);
                    continue;
                }
                LOG.info("Got " + raw.length + " bytes.");
                WebSocketProtos.WebSocketMessage message = WebSocketProtos.WebSocketMessage.parseFrom((byte[])raw);
                LOG.fine("Got message, type = " + String.valueOf(message.getType()));
                if (message.getType() == WebSocketProtos.WebSocketMessage.Type.REQUEST) {
                    LOG.finer("Add request message to queue");
                    this.wsRequestMessageQueue.put(message.getRequest());
                    int queueSize = this.wsRequestMessageQueue.size();
                    LOG.finer("IncomingQueue size after put = " + queueSize);
                    continue;
                }
                if (message.getType() != WebSocketProtos.WebSocketMessage.Type.RESPONSE) continue;
                CompletableFuture listener = this.outgoingRequests.get(message.getResponse().getId());
                LOG.finer("incoming message is Response for request with id " + message.getResponse().getId() + " and listener = " + String.valueOf(listener));
                LOG.finer("Headers = " + String.valueOf(message.getResponse().getHeadersList()));
                LOG.finer("msg size = " + message.getResponse().getBody().toByteArray().length);
                LOG.finer("Will complete this listener with status " + message.getResponse().getStatus());
                LOG.finer("And body = " + new String(message.getResponse().getBody().toByteArray()));
                if (listener == null) continue;
                listener.complete(new WebsocketResponse(message.getResponse().getStatus(), new String(message.getResponse().getBody().toByteArray()), (List<String>)message.getResponse().getHeadersList(), true));
            }
            catch (Throwable t) {
                t.printStackTrace();
            }
        }
        LOG.info("NetworkClient " + String.valueOf(this) + " is closed, formatting thread will stop.");
    }

    HttpResponse.BodyHandler createBodyHandler(final HttpRequest request) {
        HttpResponse.BodyHandler mbh = new HttpResponse.BodyHandler(){

            public HttpResponse.BodySubscriber apply(HttpResponse.ResponseInfo responseInfo) {
                String ct = responseInfo.headers().firstValue("content-type").orElse("");
                LOG.info("response statuscode = " + responseInfo.statusCode() + ", content-type = " + ct);
                if (responseInfo.statusCode() == 428) {
                    LOG.info("Got 428 response! all headers = " + String.valueOf(responseInfo.headers().map()));
                }
                if (ct.isBlank() && !request.uri().getHost().contains("cdn")) {
                    return HttpResponse.BodySubscribers.discarding();
                }
                if (ct.equals("application/json") || ct.equals("application/xml")) {
                    return HttpResponse.BodySubscribers.ofString(StandardCharsets.UTF_8);
                }
                return HttpResponse.BodySubscribers.ofByteArray();
            }
        };
        return mbh;
    }

    public final Response sendRequest(HttpRequest request, byte[] raw) throws IOException {
        try {
            return this.asyncSendRequest(request, raw).get();
        }
        catch (InterruptedException | ExecutionException ex) {
            LOG.log(Level.SEVERE, null, ex);
            throw new IOException(ex);
        }
    }

    public final CompletableFuture<Response> asyncSendRequest(HttpRequest request, byte[] raw) throws IOException {
        if (this.closed) {
            throw new IOException("Trying to use a closed networkclient " + String.valueOf(this));
        }
        CompletableFuture<Response> response = this.implAsyncSendRequest(request, raw);
        response.thenApply(res -> this.validateResponse(request.uri(), (Response)res));
        return response;
    }

    public final Response sendRequest(URI uri, String method, byte[] body, Map<String, List<String>> headers) throws IOException {
        try {
            return this.asyncSendRequest(uri, method, body, headers).get();
        }
        catch (InterruptedException | ExecutionException ex) {
            LOG.log(Level.SEVERE, null, ex);
            throw new IOException(ex);
        }
    }

    public final CompletableFuture<Response> asyncSendRequest(URI uri, String method, byte[] body, Map<String, List<String>> headers) throws IOException {
        if (this.closed) {
            throw new IOException("Trying to use a closed networkclient " + String.valueOf(this));
        }
        CompletableFuture<Response> response = this.implAsyncSendRequest(uri, method, body, headers);
        response.thenApply(res -> this.validateResponse(uri, (Response)res));
        return response;
    }

    protected abstract CompletableFuture<Response> implAsyncSendRequest(HttpRequest var1, byte[] var2) throws IOException;

    protected abstract CompletableFuture<Response> implAsyncSendRequest(URI var1, String var2, byte[] var3, Map<String, List<String>> var4) throws IOException;

    private CompletableFuture<SendMessageResponse> validateResponse(WebsocketResponse websocketResponse) {
        CompletableFuture<SendMessageResponse> a = new CompletableFuture<SendMessageResponse>();
        int status = websocketResponse.getStatus();
        LOG.fine("Validate response, status = " + status);
        if (status != 200) {
            LOG.info("Status = " + status + ", response = " + websocketResponse.getBody());
            if (status >= 400 && status < 500) {
                Exception exception = this.parseException(status, websocketResponse.getBody());
                LOG.info("Created exception " + String.valueOf(exception));
                a.completeExceptionally(exception);
                return a;
            }
        }
        return CompletableFuture.completedFuture(new SendMessageResponse());
    }

    private Response validateResponse(URI uri, Response response) {
        try {
            int statusCode = response.getStatusCode();
            LOG.fine("validate response with statuscode " + statusCode);
            switch (statusCode) {
                case 409: {
                    if (uri.getHost().indexOf("storage") > -1) {
                        LOG.info("Got a 409 exception in a storage request, ignore in this layer");
                        break;
                    }
                    LOG.info("Got a 409 exception, throw MMDE");
                    MismatchedDevices mismatchedDevices = NetworkClient.readResponseJson(response, MismatchedDevices.class);
                    throw new MismatchedDevicesException(mismatchedDevices);
                }
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return response;
    }

    private synchronized Future<WebsocketResponse> sendRequest(WebSocketProtos.WebSocketRequestMessage request) throws IOException {
        return this.sendRequest(request, null);
    }

    public synchronized CompletableFuture<WebsocketResponse> sendRequest(WebSocketProtos.WebSocketRequestMessage request, OutgoingPushMessageList list) throws IOException {
        if (this.closed) {
            throw new IOException("Trying to use a closed networkclient " + String.valueOf(this));
        }
        WebSocketProtos.WebSocketMessage message = WebSocketProtos.WebSocketMessage.newBuilder().setType(WebSocketProtos.WebSocketMessage.Type.REQUEST).setRequest(request).build();
        CompletableFuture<WebsocketResponse> future = new CompletableFuture<WebsocketResponse>();
        this.outgoingRequests.put(request.getId(), future);
        LOG.info("SENDING WSRM, path = " + request.getPath() + " with id = " + request.getId());
        LOG.finest("Bytes in wsm = " + message.toByteArray().length + " and request size = " + request.toByteArray().length + " and body size = " + request.getBody().toByteArray().length);
        this.sendToStream(message, list);
        LOG.fine("Did send to stream, returning future " + String.valueOf(future));
        return future;
    }

    private static <T> T readResponseJson(WebsocketResponse response, Class<T> clazz) throws PushNetworkException, MalformedResponseException {
        return NetworkClient.readBodyJson(response.getBody(), clazz);
    }

    private static <T> T readResponseJson(Response response, Class<T> clazz) throws PushNetworkException, MalformedResponseException {
        return NetworkClient.readBodyJson(response.body().string(), clazz);
    }

    private static <T> T readBodyJson(String json, Class<T> clazz) throws PushNetworkException, MalformedResponseException {
        try {
            return (T)Util.fromJson((String)json, clazz);
        }
        catch (JsonProcessingException e) {
            LOG.log(Level.SEVERE, "error parsing json response", e);
            throw new MalformedResponseException("Unable to parse entity", (IOException)((Object)e));
        }
        catch (IOException e) {
            throw new PushNetworkException((Exception)e);
        }
    }

    static enum WsStatus {
        INIT,
        CREATING,
        READY,
        RESTARTING,
        HALT;

    }
}

