import React, { useEffect, useState, useMemo, useContext, useCallback } from "react";
import { v1 as uuidv1 } from "uuid";
import Paho from "paho-mqtt";
import { inflate } from "pako";
import { entries } from "idb-keyval";

import { confirmEvent, retryEvents, sendHeartBeat } from "../Base/Utilities/SendEvent";
import { personsCatchup } from "../Base/Utilities/Catchup/PersonsCatchup";
import { processCatchup } from "../Base/Utilities/Catchup/ProcessCatchup";
import { useInterval } from "../Base/Hooks/Interval";
import { checkOnlineStatus } from "../Base/Utilities/Online";
import { getFixtureStore } from "../Base/Factories/getFixtureStore";
import { FixtureDetailsContext } from "../../Config/FixtureDetails";
import { Tracing } from "../../App.tracing";
import Logger from "../Base/Utilities/Logger";
import { clearDesiredPlayingState } from "../Base/Utilities/ManagePlayers";
import { clearDesiredActiveState } from "../Base/Utilities/Player";
import { clockCatchup } from "../Base/Utilities/Catchup/ClockCatchup";

const logger = Logger.getInstance();

const DataConnector = (props) => {
  const { connectUrl, clientId, topics, channelMessage, updateState, currentState, setRefreshFlag } = props;
  const [mqttClient, setMqttClient] = useState("");
  const [catchupIntervalValue, setCatchupIntervalValue] = useState(2000);
  const [rebuildRequired, setRebuildRequired] = useState(false);
  const [, updateAllState] = React.useState();
  const forceUpdate = React.useCallback(() => updateAllState({}), []);
  const { fixtureProfile } = useContext(FixtureDetailsContext);

  const catchupStatus = useMemo(
    () => ({ expected: null, received: { persons: 0, pbp: 0, internalEvents: 0 } }),
    // eslint-disable-next-line
    [currentState.fixtureId],
  );

  const updateCatchupsDone = useCallback(() => {
    logger.log("CATCHUP :: updateCatchupsDone", JSON.stringify(catchupStatus));
    // we need to wait for expected catchups to be populated before we can decide
    if (catchupStatus.expected === null) return;
    const personsTopic = Object.keys(catchupStatus.expected).find((s) => s.endsWith("/p/c"));
    const expectedPersons = catchupStatus.expected[personsTopic];
    // we haven't received all persons catchups
    if (expectedPersons !== catchupStatus.received?.persons) return;

    const pbpTopic = Object.keys(catchupStatus.expected).find((s) => s.endsWith("/pbp/c"));
    const expectedPbp = catchupStatus.expected[pbpTopic];
    // we haven't received all pbp catchups
    if (expectedPbp !== catchupStatus.received?.pbp) return;

    const internalTopic = Object.keys(catchupStatus.expected).find((s) => s.endsWith("/int/c"));
    const expectedInternal = catchupStatus.expected[internalTopic];
    // we haven't received all internal event catchups
    if (expectedInternal !== catchupStatus.received.internalEvents) return;

    logger.log("CATCHUP :: all catchups done", catchupStatus);
    updateState("catchupsDone", true);
  }, [updateState, catchupStatus]);

  const eventsPendingStore = useMemo(() => getFixtureStore(currentState.fixtureId, "event"), [currentState.fixtureId]);

  useEffect(() => {
    if (channelMessage) {
      sendMessage(channelMessage);
    }
    // eslint-disable-next-line
  }, [channelMessage]);

  function logConnectionState() {
    return `connected = ${currentState.connected} mqttConnected = ${currentState.mqttConnected} reconnecting = ${currentState.reconnecting}`;
  }

  function tryReconnectMqtt() {
    updateState("reconnecting", true);
    setRefreshFlag(true);
    forceUpdate();
  }

  useEffect(() => {
    if (currentState.forceReconnect === true) {
      logger.log("Conn:", "Try Forcing Connection", ` :: ${logConnectionState()}`);
      tryReconnectMqtt();
      updateState("forceReconnect", false);
    }
    // eslint-disable-next-line
  }, [currentState.forceReconnect]);

  function tryCatchupOnEvents() {
    if (eventsPendingStore) {
      entries(eventsPendingStore).then((entries) => {
        const mqttState = currentState.mqtt;
        if (entries.length > 0) {
          logger.log("Conn:", "Sending unsent events. Count: ", entries.length);
          setRebuildRequired(true);
        }
        if (
          mqttState &&
          mqttState.plays &&
          mqttState.plays.length > 0 &&
          currentState.entities &&
          currentState.entities.length > 0 &&
          entries.length === 0
        ) {
          setCatchupIntervalValue(null);
          if (rebuildRequired) {
            logger.log("Conn:", "Processing events");
            setRebuildRequired(false);
            processCatchup(
              mqttState.plays,
              currentState,
              updateState,
              props.manageEvents,
              props.managePlays,
              props.currentState.fixtureId,
              fixtureProfile,
              true,
            );
          }
        }
      });
    }
  }

  useInterval(tryCatchupOnEvents, catchupIntervalValue);

  useInterval(() => {
    // Send heartbeat every 3 seconds (keepalive)
    if (currentState.mqtt && currentState.mqtt.disconnected !== true) {
      sendHeartBeat(props);
    }
  }, 3000);

  useEffect(() => {
    logger.log("Conn:", "Connection Details Received (data2.js)");
    clientConnect(connectUrl, clientId, topics);
    setRefreshFlag(false);

    // eslint-disable-next-line
  }, [connectUrl]);

  useEffect(() => {
    return () => {
      if (mqttClient && mqttClient.isConnected()) {
        mqttClient.disconnect();
      }
    };
  }, [mqttClient]);

  useEffect(() => {
    if (currentState.mqtt && currentState.mqtt.pbpProcessed && !currentState.hasInitiallyLoaded) {
      updateState("hasInitiallyLoaded", true);
    }
    // eslint-disable-next-line
  }, [JSON.stringify(currentState.mqtt)]);

  function clientConnect(connectUrl, clientId, topics) {
    if ((connectUrl, clientId, topics)) {
      logger.log("Conn:", "Initiating Connection (data2.js)");
      var client = new Paho.Client(
        connectUrl.split("/mqtt")[0].replace("wss://", ""),
        443,
        "/mqtt" + connectUrl.split("/mqtt")[1],
        clientId,
      );
      var connectOptions = {
        useSSL: true,
        timeout: 30000,
        mqttVersion: 4,
        reconnect: false,
        keepAliveInterval: 60,
        onSuccess: function () {
          updateState("mqttConnected", true);
          setMqttClient(client);
          logger.log(
            "Conn:",
            "Connect: ",
            topics.find((topic) => topic.scope === "write:stream_events").topic + " - " + clientId + " (data2.js)",
          );
          updateState("gettingTeams", false);
          // client.disconnectedPublishing = true;
          // client.disconnectedBufferSize = 10000;

          client.subscribe(topics.find((topic) => topic.scope === "read:response").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:stream_events").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:stream_persons_catchup").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:stream_persons").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:stream_play_by_play_catchup").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:stream_play_by_play").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:stream_statistics").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:stream_statistics_catchup").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:stream_catchup_summary").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:stream_notification").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:internal_events").topic);
          client.subscribe(topics.find((topic) => topic.scope === "read:internal_events_catchup").topic);

          updateState("mqtt", {
            topic: topics.find((topic) => topic.scope === "write:stream_events").topic,
            client: client,
            entities: [],
          });
          // Send Heartbeat
          sendHeartBeat(props);
          //Retry pending messages
          retryEvents(props);
          setCatchupIntervalValue(2000);
          updateState("connected", true);
          updateState("mqttConnected", true);
          updateState("reconnecting", false);
          updateState("forceReconnect", false);
          tryCatchupOnEvents();
          forceUpdate();
        },
        onFailure: function (error) {
          // connect failed
          console.error("Conn:", "failed to connect to IOT (data2.js)", error);
          logger.log("Conn:", "failed to connect to IOT (data2.js)", error);

          try {
            const payload = JSON.stringify(error);
            Tracing.capture(new Error(payload));
          } catch (err) {
            Tracing.capture(new Error(error?.errorMessage));
          }
          updateState("reconnecting", false);
          setRefreshFlag(true);
        },
      };
      client.onMessageArrived = onMessageArrived;
      client.onMessageDelivered = onMessageDelivered;
      client.onConnectionLost = onConnectionLost;
      client.onConnect = onConnect;

      client.connect(connectOptions);
    }
  }

  function onMessageArrived(message) {
    const parsedMessage = JSON.parse(message.payloadString);

    if (parsedMessage?.data?.class === "error") {
      logger.log(`received-${parsedMessage.data.code}`, parsedMessage.data.message);

      if (parsedMessage.data.message.includes("'eventType': 'person'")) {
        clearDesiredPlayingState(currentState.fixtureId);
      }

      if (parsedMessage.data.message.includes("'eventType': 'substitution'")) {
        clearDesiredActiveState(currentState, updateState);
      }

      Tracing.capture(new Error(JSON.stringify(parsedMessage)));
    }

    if (parsedMessage.compressedData) {
      logger.log(`received-${parsedMessage.type}`, JSON.parse(deCompressMessage(parsedMessage.compressedData)));
      processMessage(deCompressMessage(parsedMessage.compressedData), parsedMessage.type);
    }
    switch (parsedMessage.type) {
      case "response":
        handleAck(parsedMessage);
        break;
      case "notification":
        handleNotificationMessage(parsedMessage);
        break;
      case "event":
        processMessage(parsedMessage.data, "event");
        break;
      default:
        break;
    }
  }

  function onMessageDelivered(message) {
    logger.log("delivered", JSON.parse(message.payloadString));
  }

  async function onConnectionLost(responseObject) {
    const isOnline = await checkOnlineStatus(connectUrl);
    if (responseObject.errorCode !== 0) {
      logger.log(
        "Conn:",
        "onConnectionLost:" + responseObject.errorMessage,
        JSON.stringify(responseObject) + "(data.js)",
      );

      try {
        // circular reference can break JSON.stringify, in this case we report only the errorMessage
        const payload = JSON.stringify(responseObject);
        Tracing.capture(new Error(payload));
      } catch (err) {
        Tracing.capture(new Error(responseObject.errorMessage));
      }

      updateState("mqttConnected", false);
      updateState("connected", isOnline);
      let mqttState = currentState.mqtt;
      mqttState.disconnected = true;
      updateState("mqtt", mqttState);
      logger.log("Conn:", "Lost - Retrying connection", ` :: ${logConnectionState()}`);

      updateState("reconnecting", false);
      setRefreshFlag(true);
    }
  }

  function onConnect(responseObject) {
    // if (responseObject.errorCode !== 0) {
    logger.log("onConnect:", responseObject);
    let mqttState = currentState.mqtt;
    mqttState.disconnected = false;
    mqttState.plays.length = 0;
    updateState("mqtt", mqttState);
    // }
  }

  function deCompressMessage(message) {
    // Decode base64 string
    let compressed = atob(message);
    // Convert binary string to character array
    var characterDataArray = compressed.split("").map(function (x) {
      return x.charCodeAt(0);
    });
    // Turn character code array into byte-array
    var binaryDataArray = new Uint8Array(characterDataArray);
    // Decompress base64 binary data utf-8 array
    var decompressedData = inflate(binaryDataArray);
    // Convert buffer / byteArray back to ascii string (utf-16)
    var decompressedMessage = new TextDecoder("utf-8").decode(decompressedData);

    return decompressedMessage;
  }

  function sendMessage(msg) {
    const messageContent = {
      type: "config",
      content: msg,
      demo: false,
    };
    const message = new Paho.Message(JSON.stringify(messageContent));
    message.destinationName = topics.find((topic) => topic.scope === "write:stream_events").topic;
    mqttClient.send(message);
  }

  function processMessage(message, type) {
    switch (type) {
      case "persons":
        let mqtt = currentState.mqtt;
        let entity = JSON.parse(message);
        let entityIndex = mqtt.entities.findIndex((team) => entity.entityId === team.entityId);
        if (entityIndex > -1) {
          mqtt.entities[entityIndex] = entity;
        } else {
          mqtt.entities.push(entity);
        }
        updateState("mqtt", mqtt);
        personsCatchup(message, currentState, updateState);
        if (!currentState.catchupsDone) {
          catchupStatus.received.persons = catchupStatus.received.persons + 1;
          updateCatchupsDone();
        }
        break;
      case "statistics":
        let statistics = JSON.parse(message);
        updateState("statistics", statistics);
        break;
      case "play_by_play":
        let mqttState = currentState.mqtt;
        if (!mqttState.plays) {
          mqttState.plays = [];
        }
        const pbpMessages = JSON.parse(message);
        const pbpPeriod = pbpMessages?.[0]?.periodId || 1;
        pbpMessages.forEach((event) => {
          var foundIndex = mqttState.plays.findIndex((playEvent) => playEvent.eventId === event.eventId);

          if (event.eventType === "scoreAdjustment") {
            event.playId = uuidv1();
            props.manageEvents(event.eventId, event);

            let playObject = {
              playId: event.playId,
              eventTime: event.eventTime,
              eventId: event.eventId,
            };

            if (event.class === "sport") {
              playObject.clock = event.clock;
              playObject.periodId = event.periodId;
            }

            props.managePlays(event.playId, playObject);
          }

          if (foundIndex > -1) {
            mqttState.plays[foundIndex] = event;
          } else {
            mqttState.plays.push(event);
          }
        });
        mqttState.plays = [...mqttState.plays].filter(
          (ev) => pbpPeriod !== ev.periodId || pbpMessages.some((msg) => msg.eventId === ev.eventId),
        );
        updateState("mqtt", mqttState);
        if (!currentState.catchupsDone) {
          catchupStatus.received.pbp = catchupStatus.received.pbp + 1;
          updateCatchupsDone();
        }

        if (currentState?.userMode === "scoreOperator" && fixtureProfile) {
          clockCatchup(mqttState.plays, currentState, updateState, fixtureProfile);
        }
        break;

      case "event":
        if (message.class === "summary") {
          logger.log("received summary", message.pendingEvents);
          catchupStatus.expected = { ...message.pendingEvents };
          updateCatchupsDone();
        }

        if (message.status === "deleted") {
          props.manageEvents(message.eventId, message);
        }
        break;

      case "internal_events":
        let mqttSt = currentState.mqtt;
        if (!mqttSt.internalEvents) {
          mqttSt.internalEvents = [];
        }
        const messages = JSON.parse(message).map((m) => m.data);
        mqttSt.internalEvents = [...messages];
        updateState("mqtt", mqttSt);
        if (!currentState.catchupsDone) {
          catchupStatus.received.internalEvents = catchupStatus.received.internalEvents + 1;
          updateCatchupsDone();
        }
        break;

      default:
        break;
    }
  }

  function handleAck(message) {
    let data = message.data;
    if (data) {
      switch (data.class) {
        case "ack": {
          const { deletedEventIds, mqtt } = currentState;
          const deletedMessageId = deletedEventIds.find((id) => data.message.includes(id));

          if (deletedMessageId) {
            mqtt.plays = mqtt.plays.filter((event) => event.eventId !== deletedMessageId);
            updateState("mqtt", mqtt);
          }
          confirmEvent(data, props);
          break;
        }
        default:
          break;
      }
    }
  }

  function handleNotificationMessage(message) {
    updateState("notifications", [...currentState.notifications, message]);
  }

  return null;
};

export default DataConnector;
