# ============================================================================
# LAKEHOUSE TO VENA ETL — PROLYTICS CONNECT
# Auto-generated notebook. Do not edit manually.
# Generated by: FabricVenaNotebookService
# ============================================================================

from pyspark.sql import SparkSession
import pandas as pd
import requests
import base64
import json
from datetime import datetime

# ============================================================================
# CONFIGURATION — injected by PHP FabricVenaNotebookService
# ============================================================================

# Vena connection details
VENA_HUB = "{{VENA_HUB}}"              # e.g., "ca3", "us1"
VENA_API_USER = "{{VENA_API_USER}}"    # {org_id}.{user_id}  — stored for reference; not used in auth header directly
VENA_API_KEY = "{{VENA_API_KEY}}"      # full Vena credential: "{keyId}:{keySecret}" — used as-is for Basic Auth
VENA_TEMPLATE_ID = "{{VENA_TEMPLATE_ID}}"

# Source configuration
SOURCE_TABLE_NAME = "{{SOURCE_TABLE_NAME}}"  # Lakehouse Delta table name
FILTER_EXPRESSION = "{{FILTER_EXPRESSION}}"  # Optional SQL WHERE clause

# Transfer configuration
TRANSFER_MODE = "{{TRANSFER_MODE}}"    # 'data_array' or 'csv_file'
INCLUDE_HEADERS = {{INCLUDE_HEADERS}}  # True or False
COLUMN_MAPPING = {{COLUMN_MAPPING}}    # JSON dict or None

# Notebook identification (for logging)
TABLE_PREFIX = "{{TABLE_PREFIX}}"

# ============================================================================
# SPARK SESSION
# ============================================================================

spark = SparkSession.builder \
    .appName(f"LakehouseToVena_{TABLE_PREFIX}") \
    .getOrCreate()

print("Spark session initialized successfully")

# ============================================================================
# VENA API CLIENT
# ============================================================================

class VenaAPIClient:
    """Direct HTTP client for Vena ETL API (requests-based fallback)."""

    def __init__(self, hub: str, api_user: str, api_key: str, template_id: str):
        self.hub = hub
        self.api_user = api_user
        self.api_key = api_key
        self.template_id = template_id
        self.base_url = f"https://{hub}.vena.io/api/public/v1"

    def _get_auth_header(self) -> str:
        """Generate Basic Auth header.

        VENA_API_KEY is stored as the full Vena credential string
        in the form '{keyId}:{keySecret}' (exactly as Vena displays it
        in the API key management UI).  We encode it directly — do NOT
        prepend VENA_API_USER, which would double the key-id segment
        and produce a 401.
        """
        encoded = base64.b64encode(self.api_key.encode()).decode()
        return f"Basic {encoded}"

    def start_with_data(self, data_array: list) -> dict:
        """Send 2D data array to Vena ETL template via startWithData endpoint."""
        url = f"{self.base_url}/etl/templates/{self.template_id}/startWithData"

        headers = {
            "Authorization": self._get_auth_header(),
            "Content-Type": "application/json",
        }

        payload = {
            "input": {
                "data": data_array
            }
        }

        print(f"Sending {len(data_array)} rows to Vena...")
        print(f"URL: {url}")

        response = requests.post(url, headers=headers, json=payload, timeout=60)

        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(
                f"Vena API error {response.status_code}: {response.text}"
            )

    def start_with_file(self, file_path: str) -> dict:
        """Upload a CSV file to Vena ETL template via startWithFile endpoint.

        Vena's startWithFile endpoint requires a multipart/form-data request
        with exactly two named parts:
          1. 'metadata' — a JSON object containing at least {"fileName": "<name>"}
          2. 'file'     — the raw CSV bytes

        Omitting the metadata part returns HTTP 422 "No metadata part found".
        """
        import os

        url = f"{self.base_url}/etl/templates/{self.template_id}/startWithFile"

        headers = {
            "Authorization": self._get_auth_header(),
            # Do NOT set Content-Type here — requests sets it automatically
            # with the correct boundary for multipart/form-data.
        }

        file_name = os.path.basename(file_path)
        metadata  = json.dumps({
            "input": {
                "fileName":  file_name,
                "partName":  "file",
                "fileFormat": "CSV",
            }
        })

        with open(file_path, "rb") as f:
            files = {
                "metadata": (None, metadata, "application/json"),
                "file":     (file_name, f, "text/csv"),
            }
            response = requests.post(url, headers=headers, files=files, timeout=120)

        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(
                f"Vena API error {response.status_code}: {response.text}"
            )


def get_vena_client() -> VenaAPIClient:
    """
    Return a VenaAPIClient instance.

    Primary: uses the vepi library if available.
    Fallback: uses the direct requests-based VenaAPIClient above.
    """
    try:
        from vepi import VenaETL  # type: ignore

        class VepiAdapter:
            """Thin adapter so vepi.VenaETL matches VenaAPIClient interface."""

            def __init__(self):
                self._etl = VenaETL(
                    hub=VENA_HUB,
                    api_user=VENA_API_USER,
                    api_key=VENA_API_KEY,
                    template_id=VENA_TEMPLATE_ID,
                )

            def start_with_data(self, data_array: list) -> dict:
                print(f"[vepi] Sending {len(data_array)} rows to Vena...")
                return self._etl.start_with_data(data_array)

            def start_with_file(self, file_path: str) -> dict:
                print(f"[vepi] Uploading file to Vena: {file_path}")
                return self._etl.start_with_file(file_path)

        print("Using vepi library for Vena API calls")
        return VepiAdapter()

    except ImportError:
        print("vepi not available — using direct requests client")
        return VenaAPIClient(
            hub=VENA_HUB,
            api_user=VENA_API_USER,
            api_key=VENA_API_KEY,
            template_id=VENA_TEMPLATE_ID,
        )

# ============================================================================
# DATA EXTRACTION AND TRANSFORMATION
# ============================================================================

def read_lakehouse_table(table_name: str, filter_expr: str = None):
    """Read data from Lakehouse Delta table using spark.table()."""
    print(f"\nReading from Lakehouse table: {table_name}")

    df = spark.table(table_name)

    # Apply SQL WHERE filter if provided
    if filter_expr and filter_expr.strip():
        print(f"Applying filter: {filter_expr}")
        df = df.filter(filter_expr)

    row_count = df.count()
    col_count = len(df.columns)
    print(f"Loaded {row_count:,} rows, {col_count} columns")

    return df


def apply_column_mapping(df_pandas, column_mapping):
    """Rename/reorder columns according to COLUMN_MAPPING dict (if set)."""
    if not column_mapping:
        return df_pandas

    if isinstance(column_mapping, dict):
        print(f"Applying column mapping: {column_mapping}")
        # Rename only columns that exist to avoid KeyError
        rename_map = {k: v for k, v in column_mapping.items() if k in df_pandas.columns}
        df_pandas = df_pandas.rename(columns=rename_map)

    return df_pandas


def convert_to_vena_format(df_pandas, include_headers: bool) -> list:
    """Convert pandas DataFrame to Vena 2D array format."""
    print("\nConverting data to Vena format...")

    # Fix Timestamp serialization
    df_pandas = df_pandas.applymap(
        lambda x: x.isoformat() if isinstance(x, (pd.Timestamp, datetime)) else x
    )

    # All values — let pandas handle NaN → None conversion
    data_array = df_pandas.where(pd.notnull(df_pandas), None).values.tolist()

    if include_headers:
        headers = df_pandas.columns.tolist()
        data_array = [headers] + data_array
        print(f"Including headers: {headers}")

    print(f"Prepared {len(data_array)} rows for Vena (including header row if requested)")
    return data_array


def export_to_csv(df_pandas, file_path: str) -> str:
    """Export DataFrame to CSV for file-upload transfer mode."""
    print(f"\nExporting data to CSV: {file_path}")
    df_pandas.to_csv(file_path, index=False)
    print(f"CSV file created: {file_path}")
    return file_path

# ============================================================================
# MAIN EXECUTION
# ============================================================================

def run_vena_integration() -> dict:
    """Main orchestration function."""
    print("\n" + "=" * 60)
    print("LAKEHOUSE TO VENA ETL — PROLYTICS CONNECT")
    print("=" * 60)
    print(f"Vena Hub       : {VENA_HUB}")
    print(f"Template ID    : {VENA_TEMPLATE_ID}")
    print(f"Source Table   : {SOURCE_TABLE_NAME}")
    print(f"Transfer Mode  : {TRANSFER_MODE}")
    print(f"Start Time     : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("=" * 60)

    try:
        # Step 1: Read data from Lakehouse
        filter_expr = FILTER_EXPRESSION if FILTER_EXPRESSION.strip() else None
        df_spark = read_lakehouse_table(SOURCE_TABLE_NAME, filter_expr)

        # Step 2: Convert to pandas
        df_pandas = df_spark.toPandas()

        # Step 3: Apply column mapping
        df_pandas = apply_column_mapping(df_pandas, COLUMN_MAPPING)

        rows_read = len(df_pandas)
        cols_count = len(df_pandas.columns)

        # Step 4: Get Vena client (vepi preferred, requests fallback)
        vena_client = get_vena_client()

        # Step 5: Send data based on transfer mode
        if TRANSFER_MODE == "data_array":
            data_array = convert_to_vena_format(df_pandas, INCLUDE_HEADERS)
            vena_response = vena_client.start_with_data(data_array)

        elif TRANSFER_MODE == "csv_file":
            csv_path = f"/lakehouse/default/Files/vena_export_{TABLE_PREFIX}.csv"
            export_to_csv(df_pandas, csv_path)
            vena_response = vena_client.start_with_file(csv_path)

        else:
            raise ValueError(f"Invalid TRANSFER_MODE: {TRANSFER_MODE}")

        # Step 6: Log results
        print("\n" + "=" * 60)
        print("EXECUTION SUMMARY")
        print("=" * 60)
        print(f"Status         : SUCCESS")
        print(f"Rows Read      : {rows_read:,}")
        print(f"Rows Sent      : {rows_read:,}")
        print(f"Columns        : {cols_count}")
        print(f"Vena Response  : {vena_response}")
        print(f"End Time       : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 60)

        result = {
            "status": "success",
            "rows_read": rows_read,
            "rows_sent": rows_read,
            "columns_count": cols_count,
            "vena_response": str(vena_response),
            "transfer_mode": TRANSFER_MODE,
        }

        print("\n" + "=" * 80)
        print("###VENA_RESULTS_START###")
        print(json.dumps(result, indent=2, default=str))
        print("###VENA_RESULTS_END###")
        print("=" * 80)

        return result

    except Exception as e:
        print("\n" + "=" * 60)
        print("EXECUTION FAILED")
        print("=" * 60)
        print(f"Error: {str(e)}")
        print("=" * 60)

        result = {
            "status": "failed",
            "error": str(e),
        }

        print("\n" + "=" * 80)
        print("###VENA_RESULTS_START###")
        print(json.dumps(result, indent=2, default=str))
        print("###VENA_RESULTS_END###")
        print("=" * 80)

        raise

# ============================================================================
# EXECUTE
# ============================================================================

results = run_vena_integration()
