#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Exotel Media Streams serializer for Pipecat."""

import base64
import json
from typing import Optional

from loguru import logger

from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.audio.utils import create_stream_resampler
from pipecat.frames.frames import (
    AudioRawFrame,
    Frame,
    InputAudioRawFrame,
    InputDTMFFrame,
    InterruptionFrame,
    OutputTransportMessageFrame,
    OutputTransportMessageUrgentFrame,
    StartFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer


class ExotelFrameSerializer(FrameSerializer):
    """Serializer for Exotel Media Streams WebSocket protocol.

    This serializer handles converting between Pipecat frames and Exotel's WebSocket
    media streams protocol. It supports audio conversion, DTMF events, and automatic
    call termination.

    Note: Ref docs for events:
        https://support.exotel.com/support/solutions/articles/3000108630-working-with-the-stream-and-voicebot-applet
    """

    class InputParams(FrameSerializer.InputParams):
        """Configuration parameters for ExotelFrameSerializer.

        Parameters:
            exotel_sample_rate: Sample rate used by Exotel, defaults to 8000 Hz.
            sample_rate: Optional override for pipeline input sample rate.
            ignore_rtvi_messages: Inherited from base FrameSerializer, defaults to True.
        """

        exotel_sample_rate: int = 8000
        sample_rate: Optional[int] = None

    def __init__(
        self, stream_sid: str, call_sid: Optional[str] = None, params: Optional[InputParams] = None
    ):
        """Initialize the ExotelFrameSerializer.

        Args:
            stream_sid: The Exotel Media Stream SID.
            call_sid: The associated Exotel Call SID (optional, not used in this implementation).
            params: Configuration parameters.
        """
        super().__init__(params or ExotelFrameSerializer.InputParams())

        self._stream_sid = stream_sid
        self._call_sid = call_sid

        self._exotel_sample_rate = self._params.exotel_sample_rate
        self._sample_rate = 0  # Pipeline input rate

        self._input_resampler = create_stream_resampler()
        self._output_resampler = create_stream_resampler()

    async def setup(self, frame: StartFrame):
        """Sets up the serializer with pipeline configuration.

        Args:
            frame: The StartFrame containing pipeline configuration.
        """
        self._sample_rate = self._params.sample_rate or frame.audio_in_sample_rate

    async def serialize(self, frame: Frame) -> str | bytes | None:
        """Serializes a Pipecat frame to Exotel WebSocket format.

        Handles conversion of various frame types to Exotel WebSocket messages.

        Args:
            frame: The Pipecat frame to serialize.

        Returns:
            Serialized data as string or bytes, or None if the frame isn't handled.
        """
        if isinstance(frame, InterruptionFrame):
            answer = {"event": "clear", "streamSid": self._stream_sid}
            return json.dumps(answer)
        elif isinstance(frame, AudioRawFrame):
            data = frame.audio

            # Output: Exotel outputs PCM audio, but we need to resample to match requested sample_rate
            serialized_data = await self._output_resampler.resample(
                data, frame.sample_rate, self._exotel_sample_rate
            )
            if serialized_data is None or len(serialized_data) == 0:
                # Ignoring in case we don't have audio
                return None

            payload = base64.b64encode(serialized_data).decode("ascii")

            answer = {
                "event": "media",
                "streamSid": self._stream_sid,
                "media": {"payload": payload},
            }

            return json.dumps(answer)
        elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
            if self.should_ignore_frame(frame):
                return None
            return json.dumps(frame.message)

        return None

    async def deserialize(self, data: str | bytes) -> Frame | None:
        """Deserializes Exotel WebSocket data to Pipecat frames.

        Handles conversion of Exotel media events to appropriate Pipecat frames.

        Args:
            data: The raw WebSocket data from Exotel.

        Returns:
            A Pipecat frame corresponding to the Exotel event, or None if unhandled.
        """
        message = json.loads(data)

        if message["event"] == "media":
            payload_base64 = message["media"]["payload"]
            payload = base64.b64decode(payload_base64)

            deserialized_data = await self._input_resampler.resample(
                payload,
                self._exotel_sample_rate,
                self._sample_rate,
            )
            if deserialized_data is None or len(deserialized_data) == 0:
                # Ignoring in case we don't have audio
                return None

            # Input: Exotel takes PCM data, so just resample to match sample_rate
            audio_frame = InputAudioRawFrame(
                audio=deserialized_data,
                num_channels=1,  # Assuming mono audio from Exotel
                sample_rate=self._sample_rate,  # Use the configured pipeline input rate
            )
            return audio_frame
        elif message["event"] == "dtmf":
            digit = message.get("dtmf", {}).get("digit")

            try:
                return InputDTMFFrame(KeypadEntry(digit))
            except ValueError:
                # Handle case where string doesn't match any enum value
                logger.info(f"Invalid DTMF digit: {digit}")
                return None

        return None
