import { EventChannel } from '@redux-saga/core'
import { IFrame, IMessage, StompHeaders } from '@stomp/stompjs'
import { normalize, Schema } from 'normalizr'
import * as R from 'ramda'
import { AnyAction } from 'redux'
import { buffers, eventChannel } from 'redux-saga'
import {
  all,
  call,
  delay,
  put,
  select,
  take,
  takeEvery,
  takeLatest,
} from 'redux-saga/effects'
import { createSelector } from 'reselect'
import { ApiError, getSessionId } from '@pbt/pbt-ui-components'

import { wsNotificationData, wsSoapTaskData } from '~/api/schemas'
import { tokenHolder } from '~/api/utils/token'
import {
  closeConnection,
  connect,
  subscribeToTopic,
  unsubscribeFromTopic,
} from '~/api/websocket/websocket'
import {
  ConnectionState,
  RECONNECT_DELAY,
  WS_AUTH_CONNECT_RETRIES,
  WS_MESSAGE_BUFFER_SIZE,
  WsAction,
} from '~/constants/websocket'
import { getWSStatus } from '~/utils/websocketUtils'

import * as AuthActions from '../actions/types/auth'
import { FETCH_SECURE_CONSTANTS_SUCCESS } from '../actions/types/constants'
import { NEW_WS_NOTIFICATION } from '../actions/types/notifications'
import { WS_TASK_CREATE, WS_TASK_UPDATE } from '../actions/types/tasks'
import { UPDATE_CURRENT_USERS_BUSINESS_ID } from '../actions/types/users'
import type { RootState } from '../index'
import { getCurrentBusinessId } from '../reducers/auth'

export const WS_CONNECT = 'websocket/WS_CONNECT'
export const WS_CONNECT_SUCCESS = 'websocket/WS_CONNECT_SUCCESS'
export const WS_CONNECT_FAILURE = 'websocket/WS_CONNECT_FAILURE'
export const WS_AUTHORIZATION_ERROR = 'websocket/WS_AUTHORIZATION_ERROR'

export const WS_DISCONNECT = 'websocket/WS_DISCONNECT'
export const WS_DISCONNECT_SUCCESS = 'websocket/WS_DISCONNECT_SUCCESS'
export const WS_MESSAGE_FAILURE = 'websocket/WS_MESSAGE_FAILURE'

export const WS_SUBSCRIBE = 'websocket/WS_SUBSCRIBE'
export const WS_SUBSCRIBE_SUCCESS = 'websocket/WS_SUBSCRIBE_SUCCESS'
export const WS_SUBSCRIBE_FAILURE = 'websocket/WS_SUBSCRIBE_FAILURE'

export const WS_UNSUBSCRIBE = 'websocket/WS_UNSUBSCRIBE'
export const WS_UNSUBSCRIBE_SUCCESS = 'websocket/WS_UNSUBSCRIBE_SUCCESS'

export const WS_SUBSCRIPTIONS_COUNT_DECREMENT =
  'websocket/WS_SUBSCRIPTIONS_COUNT_DECREMENT'
export const WS_SUBSCRIPTIONS_COUNT_INCREMENT =
  'websocket/WS_SUBSCRIPTIONS_COUNT_INCREMENT'

const wsConnect = () => ({ type: WS_CONNECT })
export const wsConnectSuccess = () => ({ type: WS_CONNECT_SUCCESS })
export const wsConnectFailure = (error: ApiError) => ({
  type: WS_CONNECT_FAILURE,
  error,
})
export const wsAuthorizationError = (error: IFrame) => ({
  type: WS_AUTHORIZATION_ERROR,
  error,
})

export const wsDisconnect = () => ({ type: WS_DISCONNECT })
export const wsDisconnectSuccess = () => ({ type: WS_DISCONNECT_SUCCESS })
export const wsMessageFailure = (error: ApiError, errorDetails: any) => ({
  type: WS_MESSAGE_FAILURE,
  error,
  errorDetails,
})

export const wsSubscribe = (
  topic: string,
  wsMessageHandleDataByWsAction: {
    [K in WsAction]?: { actionType: string }
  },
  actionData?: () => {},
) => ({ type: WS_SUBSCRIBE, topic, wsMessageHandleDataByWsAction, actionData })
export const wsSubscribeSuccess = (topic: string, subscriptionId: string) => ({
  type: WS_SUBSCRIBE_SUCCESS,
  topic,
  subscriptionId,
})
export const wsSubscribeFailure = (error: ApiError) => ({
  type: WS_SUBSCRIBE_FAILURE,
  error,
})

export const wsUnsubscribe = (topic: string) => ({
  type: WS_UNSUBSCRIBE,
  topic,
})
export const wsUnsubscribeSuccess = (topic: string) => ({
  type: WS_UNSUBSCRIBE_SUCCESS,
  topic,
})

export const wsSubscriptionsCountDecrement = (topic: string) => ({
  type: WS_SUBSCRIPTIONS_COUNT_DECREMENT,
  topic,
})
export const wsSubscriptionsCountIncrement = (topic: string) => ({
  type: WS_SUBSCRIPTIONS_COUNT_INCREMENT,
  topic,
})

export const wsOnMessage = (
  type: string,
  headers: StompHeaders,
  body: any,
) => ({ type, headers, body })

export type WebsocketState = {
  authConnectRetries: number
  connectionError: ApiError | null
  connectionState: ConnectionState
  error: ApiError | null
  subscriptionIdByTopic: Record<string, string>
  subscriptionsCounterByTopic: Record<string, number>
}

export const INITIAL_STATE: WebsocketState = {
  connectionState: ConnectionState.DISCONNECTED,
  connectionError: null,
  error: null,
  subscriptionIdByTopic: {},
  subscriptionsCounterByTopic: {},
  authConnectRetries: 0,
}

export const websocketReducer = (
  state: WebsocketState = INITIAL_STATE,
  action: AnyAction,
): WebsocketState => {
  switch (action.type) {
    case WS_CONNECT:
      return { ...state, connectionState: ConnectionState.CONNECTING }
    case WS_CONNECT_SUCCESS:
      return {
        ...state,
        connectionState: ConnectionState.CONNECTED,
        authConnectRetries: 0,
        subscriptionIdByTopic: {},
        subscriptionsCounterByTopic: {},
      }
    case WS_CONNECT_FAILURE:
      return {
        ...state,
        connectionState: ConnectionState.WAITING_RECONNECT_DELAY,
        connectionError: action.error,
      }

    case WS_DISCONNECT_SUCCESS:
      return { ...state, connectionState: ConnectionState.DISCONNECTED }

    case WS_MESSAGE_FAILURE:
      return { ...state, error: action.error }

    case WS_SUBSCRIBE_SUCCESS: {
      return {
        ...state,
        subscriptionIdByTopic: {
          ...state.subscriptionIdByTopic,
          [action.topic]: action.subscriptionId,
        },
        subscriptionsCounterByTopic: {
          ...state.subscriptionsCounterByTopic,
          [action.topic]: 1,
        },
      }
    }

    case WS_SUBSCRIPTIONS_COUNT_DECREMENT:
      return {
        ...state,
        subscriptionsCounterByTopic: {
          ...state.subscriptionsCounterByTopic,
          [action.topic]: Math.max(
            0,
            state.subscriptionsCounterByTopic[action.topic] - 1,
          ),
        },
      }
    case WS_SUBSCRIPTIONS_COUNT_INCREMENT:
      return {
        ...state,
        subscriptionsCounterByTopic: {
          ...state.subscriptionsCounterByTopic,
          [action.topic]: state.subscriptionsCounterByTopic[action.topic] + 1,
        },
      }
    case WS_UNSUBSCRIBE_SUCCESS:
      return {
        ...state,
        subscriptionIdByTopic: R.omit(
          [action.topic],
          state.subscriptionIdByTopic,
        ),
        subscriptionsCounterByTopic: R.omit(
          [action.topic],
          state.subscriptionsCounterByTopic,
        ),
      }

    case WS_AUTHORIZATION_ERROR:
      return { ...state, authConnectRetries: state.authConnectRetries + 1 }

    default:
      return state
  }
}

export const getWebsocket = (state: RootState): WebsocketState =>
  state.websocket
export const getWebsocketConnectionState = (state: RootState) =>
  getWebsocket(state).connectionState
export const getWebsocketIsConnected = (state: RootState) =>
  getWebsocketConnectionState(state) === ConnectionState.CONNECTED
export const getWebsocketConnectionError = (state: RootState) =>
  getWebsocket(state).connectionError
export const getWebsocketErrorMessage = (state: RootState) =>
  getWebsocket(state).error
export const getWebsocketAuthConnectRetries = (state: RootState) =>
  getWebsocket(state).authConnectRetries
export const getWebsocketSubscriptionIdByTopicMap = (state: RootState) =>
  getWebsocket(state).subscriptionIdByTopic
export const getWebsocketSubscriptionIdByTopic = (topic: string) =>
  createSelector(getWebsocketSubscriptionIdByTopicMap, R.prop<any>(topic))
export const getWebsocketSubscriptionsCounterByTopicMap = (state: RootState) =>
  getWebsocket(state).subscriptionsCounterByTopic
export const getWebsocketSubscriptionsCounterByTopic = (topic: string) =>
  createSelector(getWebsocketSubscriptionsCounterByTopicMap, R.prop<any>(topic))

export function* wsConnectIfDisconnectedSaga() {
  const wsConnectionState: ConnectionState = yield select(
    getWebsocketConnectionState,
  )
  if (wsConnectionState === ConnectionState.DISCONNECTED) {
    yield put(wsConnect())
  }
}

let subscriptionEventChannel: EventChannel<null | {}>
let connectEventChannel: EventChannel<null | {}>

const clearEventChannels = () => {
  subscriptionEventChannel?.close()
  connectEventChannel?.close()
}

const createWSConnectEventChannel = (accessToken: string) =>
  eventChannel((emit) => {
    const onConnect = () => emit(wsConnectSuccess())
    const onDisconnect = () => emit(wsDisconnectSuccess())
    const onError = (frame: IFrame) => {
      const status = getWSStatus({ error: frame })
      if (status === 401) {
        emit(wsAuthorizationError(frame))
      } else {
        emit(new Error(frame?.headers?.message))
      }
    }

    connect(accessToken, onConnect, onDisconnect, onError)

    return () => emit(wsDisconnect())
  }, buffers.expanding<any>(WS_MESSAGE_BUFFER_SIZE))

export function* wsConnectSaga() {
  try {
    const token: string = yield call(tokenHolder.getToken)

    yield clearEventChannels()
    connectEventChannel = yield createWSConnectEventChannel(token)

    while (true) {
      const action: AnyAction = yield take(connectEventChannel)
      yield put(action)
    }
  } catch (error) {
    yield put(wsConnectFailure(error as ApiError))
    // Auto reconnect is now embedded into the stompjs library,
    // but we should use manual reconnect at least for auth error retires
    yield delay(RECONNECT_DELAY)
    yield put(wsConnect())
  }
}

export function* wsDisconnectSaga() {
  yield closeConnection()
}

export function wsDisconnectedSaga() {
  subscriptionEventChannel?.close()
}

export function* wsReconnectSaga() {
  yield put(wsDisconnect())
  yield put(wsConnect())
}

const SchemaByWsActionTypeMap: Record<string, Schema> = {
  [NEW_WS_NOTIFICATION]: wsNotificationData,
  [WS_TASK_CREATE]: wsSoapTaskData,
  [WS_TASK_UPDATE]: wsSoapTaskData,
}

const createWSMessageChannel = (
  topic: string,
  wsMessageHandleDataByWsAction: {
    [K in WsAction]?: { actionType: string }
  },
  businessId: string,
  actionData?: any,
) =>
  eventChannel((emit) => {
    const subscriptionId = subscribeToTopic(
      topic,
      businessId,
      (message: IMessage) => {
        const body = JSON.parse(message.body)
        const wsMessageHandleData =
          wsMessageHandleDataByWsAction[body.actionType as WsAction] ||
          ({} as { actionType: string })
        const schema = SchemaByWsActionTypeMap[wsMessageHandleData.actionType]

        const data = schema ? normalize(body || {}, schema) : body
        const sessionId = R.has('sessionId', actionData)
          ? R.prop('sessionId', actionData)
          : R.prop('sessionId', data)
        const mergedData = !R.isNil(actionData)
          ? { ...actionData, ...data }
          : data

        // we distinguish error message here since error duck requires a different action format
        if (wsMessageHandleData.actionType === WS_MESSAGE_FAILURE) {
          emit(wsMessageFailure(body.message, mergedData))
        } else if (sessionId) {
          const currentSessionId = getSessionId()
          const tmpTriggerFromAllSessions = topic.includes('invoiceLineItem') // TODO: remove in RHAP-6699

          // trigger events only from other tabs/windows
          if (currentSessionId !== sessionId || tmpTriggerFromAllSessions) {
            emit(
              wsOnMessage(
                wsMessageHandleData.actionType,
                message.headers,
                mergedData,
              ),
            )
          }
        } else {
          emit(
            wsOnMessage(
              wsMessageHandleData.actionType,
              message.headers,
              mergedData,
            ),
          )
        }
      },
    )

    emit(wsSubscribeSuccess(topic, subscriptionId))

    return () => emit(wsUnsubscribe(topic))
  }, buffers.expanding<any>(WS_MESSAGE_BUFFER_SIZE))

function* watchWSMessages(wsMessageChannel: EventChannel<null | {}>) {
  while (true) {
    try {
      const action: AnyAction = yield take(wsMessageChannel)
      yield put(action)
    } catch (error) {
      yield put(wsSubscribeFailure(error as ApiError))
    }
  }
}

export function* wsTopicSubscribeSaga({
  topic,
  wsMessageHandleDataByWsAction,
  actionData,
}: ReturnType<typeof wsSubscribe>) {
  try {
    const websocketIsConnected: boolean = yield select(getWebsocketIsConnected)
    if (!websocketIsConnected) {
      return
    }

    const subscriptionsForTopic: number = yield select(
      getWebsocketSubscriptionsCounterByTopic(topic),
    ) || 0
    if (subscriptionsForTopic > 0) {
      yield put(wsSubscriptionsCountIncrement(topic))
      return
    }

    const businessId: string = yield select(getCurrentBusinessId)
    subscriptionEventChannel = yield call(
      createWSMessageChannel,
      topic,
      wsMessageHandleDataByWsAction,
      businessId,
      actionData,
    )
    yield watchWSMessages(subscriptionEventChannel)
  } catch (error) {
    yield put(wsSubscribeFailure(error as ApiError))
  }
}

export function* wsTopicUnsubscribeSaga({
  topic,
}: ReturnType<typeof wsUnsubscribe>) {
  const websocketIsConnected: boolean = yield select(getWebsocketIsConnected)
  if (!websocketIsConnected) {
    return
  }

  const subscriptionsForTopic: number = yield select(
    getWebsocketSubscriptionsCounterByTopic(topic),
  )
  if (subscriptionsForTopic === 1) {
    const subscriptionId: string = yield select(
      getWebsocketSubscriptionIdByTopic(topic),
    )
    yield unsubscribeFromTopic(subscriptionId)
    yield put(wsUnsubscribeSuccess(topic))
  } else {
    yield put(wsSubscriptionsCountDecrement(topic))
  }
}

export function* wsAuthorizationErrorSaga() {
  const authConnectRetries: number = yield select(
    getWebsocketAuthConnectRetries,
  )
  if (authConnectRetries <= WS_AUTH_CONNECT_RETRIES) {
    yield delay(RECONNECT_DELAY)
    yield wsReconnectSaga()
  } else {
    yield put(wsDisconnect())
  }
}

function* watchWSConnect() {
  yield takeLatest(WS_CONNECT, wsConnectSaga)
}

function* watchWSConnectIfDisconnectedActions() {
  yield takeLatest(
    [FETCH_SECURE_CONSTANTS_SUCCESS, AuthActions.REFRESH_TOKEN_SUCCESS],
    wsConnectIfDisconnectedSaga,
  )
}

function* watchWSReconnectActions() {
  yield takeLatest(
    [UPDATE_CURRENT_USERS_BUSINESS_ID, AuthActions.SILENT_LOGIN_SUCCESS],
    wsReconnectSaga,
  )
}

function* watchWSTopicSubscribe() {
  yield takeEvery(WS_SUBSCRIBE, wsTopicSubscribeSaga)
}

function* watchDisconnect() {
  yield takeEvery(WS_DISCONNECT, wsDisconnectSaga)
}

function* watchDisconnected() {
  yield takeEvery(WS_DISCONNECT_SUCCESS, wsDisconnectedSaga)
}

function* watchWSTopicUnsubscribe() {
  yield takeEvery(WS_UNSUBSCRIBE, wsTopicUnsubscribeSaga)
}

function* watchWSAuthorizationError() {
  yield takeEvery(WS_AUTHORIZATION_ERROR, wsAuthorizationErrorSaga)
}

export function* websocketSaga() {
  yield all([
    watchDisconnect(),
    watchDisconnected(),
    watchWSConnect(),
    watchWSConnectIfDisconnectedActions(),
    watchWSTopicSubscribe(),
    watchWSTopicUnsubscribe(),
    watchWSReconnectActions(),
    watchWSAuthorizationError(),
  ])
}
