import {
  CatalogResponseModel,
  CatalogsResponseModel,
  NamespaceResponseModel,
  TableResponseModel,
} from './response-models'
import SparkDriver, { QueryResult, RawQueryResult } from 'spark-web-client'
import { duck } from './duck'
import { Table, tableFromIPC } from 'apache-arrow'
import { getConfig } from '../config/environment-config'
import { mockFilesHourly } from './mocks/mocked-files-hourly'
import { mockConsumptionHourly } from './mocks/mocked-consumptions-hourly'
import { mockIngestionsHourly } from './mocks/mocked-ingestions-hourly'
import { mockGlobalIngestionsHourly } from './mocks/mocked-global-ingestions-hourly'
import { mockGlobalFilesHourly } from './mocks/mocked-global-files-hourly'
import { appStore } from '../state/app-store'
import { convertArrowTableToPlainObject } from '../utils/table-utils'
import { mockedOptimizationEvents } from './mocks/mocked-optimization-events'

export const sparkDriver = new SparkDriver(getConfig().backendHost)

export type WebClientResultWithoutData = {
  hasFailed: boolean
  errorCode?: number
  errorMessage?: string
}

export type WebClientResultWithData<T> = {
  hasFailed: boolean
  errorCode?: number
  errorMessage?: string
  data?: T
}

export const loadFilesForTable = async (
  accessToken: string,
  duckTableId: string
): Promise<WebClientResultWithoutData> => {
  return await loadTable(accessToken, duckTableId, mockFilesHourly)
}

export const loadConsumptionsForTable = async (
  accessToken: string,
  duckTableId: string
): Promise<WebClientResultWithoutData> => {
  return await loadTable(accessToken, duckTableId, mockConsumptionHourly)
}

export const loadIngestionForTable = async (
  accessToken: string,
  duckTableId: string
): Promise<WebClientResultWithoutData> => {
  return await loadTable(accessToken, duckTableId, mockIngestionsHourly)
}

export const loadOptimizationEventsForTable = async (
  accessToken: string,
  duckTableId: string
): Promise<WebClientResultWithoutData> => {
  return await loadTable(accessToken, duckTableId, mockedOptimizationEvents)
}

/**
 * Load the global ingestion insights for all tables.
 * @param accessToken
 */
export const loadGlobalIngestion = async (accessToken: string): Promise<WebClientResultWithoutData> => {
  return await loadTable(accessToken, 'use_catalog.system_lake_tables.ingestions_hourly', mockGlobalIngestionsHourly)
}

/**
 * Load the global files insights for all tables.
 * @param accessToken
 */
export const loadGlobalFiles = async (accessToken: string): Promise<WebClientResultWithoutData> => {
  return await loadTable(accessToken, 'use_catalog.system_lake_tables.files_hourly', mockGlobalFilesHourly)
}

const loadTable = async (
  accessToken: string,
  fullTableId: string,
  mockTable: Table
): Promise<WebClientResultWithoutData> => {
  const duckName: string = fullTableId.replace(/\./g, '_')
  console.debug(`[SPARK-API] loading table ${fullTableId} and storing it in duckdb with name ${duckName}`)

  const tableExists = await duck.runWithSession((a) => a.checkTableExists(duckName))
  if (tableExists) {
    console.error('WARNING, trying to load twice the table %s: returning the cached one.', fullTableId)
    return {
      hasFailed: true,
      errorCode: 113,
      errorMessage: 'Table already exists',
    }
  }

  if (getConfig().shouldCallMockedBackend) {
    await duck.runWithSession((a) => a.insertExternalArrowTableByIPC(mockTable, duckName))
    return {
      hasFailed: false,
    }
  } else {
    const start = performance.now()

    const sparkData: RawQueryResult = await sparkDriver.runQueryRaw(`SELECT * FROM ${fullTableId}`, accessToken)
    const end = performance.now()
    const execTime = end - start

    if (sparkData.hasFailed) {
      return {
        hasFailed: true,
        errorCode: sparkData.status.code,
        errorMessage: sparkData.status.details,
      }
    } else {
      const tables: Table[] = sparkData.data
        .filter((r) => r.hasArrowBatch())
        .map((response) => {
          const buffer = response.getArrowBatch().getData_asU8()
          return tableFromIPC(buffer)
        })

      const table = new Table(tables.flatMap((t) => t.batches))
      await duck.runWithSession((session) => session.insertExternalArrowTableByIPC(table, duckName))

      console.debug(`[SPARK API] Table loading completed in ${execTime} ms and returned ${table.numRows} rows.`)

      return {
        hasFailed: false,
      }
    }
  }
}

export const getCatalogs = async (accessToken: string): Promise<WebClientResultWithData<CatalogsResponseModel>> => {
  const queryResult: QueryResult = await sparkDriver.runQuery(
    'SELECT * FROM use_catalog.system_lake_tables.metastore_summary',
    accessToken
  )
  if (queryResult.hasFailed) {
    return {
      hasFailed: true,
      errorCode: queryResult.status.code,
      errorMessage: queryResult.status.details,
    }
  } else {
    const metastore = queryResult.table.toArray().filter((row) => row.catalog_name != 'use_catalog')
    const catalogs = {} as { [catalogName: string]: CatalogResponseModel }
    const namespaces = {} as { [namespaceName: string]: NamespaceResponseModel }
    const tables = metastore.map((row) => {
      const catalog = row.catalog_name
      const namespace = row.namespace_name.toArray()
      if (!catalogs[catalog]) {
        catalogs[catalog] = {
          id: catalog,
          description: row.catalog_description,
          defaultNamespace: row.catalog_default_namespace.toJSON(),
          namespacesIds: row.catalog_namespaces.toArray().map((a) => a.toJSON()),
          tablesIds: row.catalog_tables
            .toArray()
            .map((a) => a.toJSON())
            .map((row) => row.namespace_name.toArray().concat(row.name).join('.')),
          viewsIds: row.catalog_views
            .toArray()
            .map((a) => a.toJSON())
            .map((row) => row.namespace_name.toArray().concat(row.name).join('.')),
          functionsIds: row.catalog_functions
            .toArray()
            .map((a) => a.toJSON())
            .map((row) => row.namespace_name.toArray().concat(row.name).join('.')),
          catalogType: row.catalog_type,
        }
      }
      if (!namespaces[namespace]) {
        namespaces[namespace] = {
          id: namespace,
          description: row.namespace_description,
          namespaceOwner: row.namespace_owner,
          isDefault: row.namespace_is_default,
          location: row.namespace_location,
          tablesIds: row.namespace_tables.toArray().map((a) => a.toJSON().name),
        }
      }

      if (row.table_name != null) {
        return {
          id: row.table_name,
          description: row.table_description,
          tableType: row.table_provider,
          createdBy: row.table_created_by,
          createdTime: row.table_created_time,
          owner: row.table_owner,
          location: row.table_location,
          index: row?.table_properties?.['columnsToIndex'],
          cubeSize: row?.table_properties?.['cubeSize'],
          columns: row.table_columns.toArray().map((a) => {
            return {
              name: a.name,
              description: a.description,
              dataType: a.data_type,
            }
          }),
          hasConsumptionInsights: row?.table_properties?.['use.consumption.enable'] !== undefined,
          hasIngestionInsights: row?.table_properties?.['use.ingestion.source.file.path'] !== undefined,
          hasContinuousOptimization: row?.table_properties?.['use.optimization.type'] !== undefined,
        } as TableResponseModel
      } else {
        return {
          id: 'none',
          description: 'none',
          tableType: 'none',
          createdBy: 'none',
          createdTime: 0,
          owner: 'none',
          location: 'none',
          index: 'none',
          cubeSize: 'none',
          columns: [],
          hasConsumptionInsights: false,
          hasIngestionInsights: false,
          hasContinuousOptimization: false,
        } as TableResponseModel
      }
    })

    const data = {
      catalogs: Object.values(catalogs),
      namespaces: Object.values(namespaces),
      tables: tables,
    } as CatalogsResponseModel
    return {
      hasFailed: false,
      data,
    }
  }
}

const optimizationType: string = 'use.optimization.type'
const leveledCompaction: string = 'LeveledCompaction'
const bytesPerIterArg: string = 'use.optimization.bytes-per-iter'

export const optimizeTable = async (
  accessToken: string,
  tableName: string,
  bytesPerIter: number
): Promise<WebClientResultWithoutData> => {
  try {
    const response = await sparkDriver.runQuery(
      `ALTER TABLE ${tableName} SET TBLPROPERTIES ('${optimizationType}'='${leveledCompaction}', '${bytesPerIterArg}'='${bytesPerIter}')`,
      accessToken
    )
    return {
      hasFailed: false,
    }
  } catch (error) {
    return {
      hasFailed: true,
      errorCode: error.code,
      errorMessage: error.message,
    }
  }
}

export const manageTableContinuousOptimization = async (
  accessToken: string,
  tableName: string,
  enable: boolean,
  bytesPerIter: number
): Promise<WebClientResultWithoutData> => {
  try {
    let query: string = `ALTER TABLE ${tableName} `
    if (enable) {
      query += `SET TBLPROPERTIES ('${optimizationType}'='${leveledCompaction}', '${bytesPerIterArg}'='${bytesPerIter}')`
    } else {
      query += `UNSET TBLPROPERTIES ('${optimizationType}', '${bytesPerIterArg}')`
    }
    const response = await sparkDriver.runQuery(query, accessToken)
    return {
      hasFailed: false,
    }
  } catch (error) {
    return {
      hasFailed: true,
      errorCode: error.code,
      errorMessage: error.message,
    }
  }
}

appStore.subscribe(() => {
  const currentState = appStore.getState()
  const accessToken = currentState.account.accessToken
  if (accessToken === window.use_debug.accessToken) return
  window.use_debug = {
    accessToken: accessToken,
    ...window.use_debug,
    runQuerySpark: async (query: string) => {
      const q = await sparkDriver.runQuery(query, accessToken)
      console.table(convertArrowTableToPlainObject(q.table))
      return q
    },
  }
})
// Do something with the updated state...

window.use_debug = { spark: sparkDriver, ...window.use_debug }
