#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Deepgram Flux speech-to-text service implementation."""

import asyncio
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, AsyncGenerator, Dict, Optional
from urllib.parse import urlencode

from loguru import logger
from pydantic import BaseModel

from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    ErrorFrame,
    Frame,
    InterimTranscriptionFrame,
    StartFrame,
    TranscriptionFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
)
from pipecat.services.settings import NOT_GIVEN, STTSettings, _NotGiven, _warn_deprecated_param
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt

try:
    from websockets.asyncio.client import connect as websocket_connect
    from websockets.protocol import State
except ModuleNotFoundError as e:
    logger.error(f"Exception: {e}")
    logger.error("In order to use Deepgram Flux, you need to `pip install pipecat-ai[deepgram]`.")
    raise Exception(f"Missing module: {e}")


class FluxMessageType(str, Enum):
    """Deepgram Flux WebSocket message types.

    These are the top-level message types that can be received from the
    Deepgram Flux WebSocket connection.
    """

    RECEIVE_CONNECTED = "Connected"
    RECEIVE_FATAL_ERROR = "Error"
    TURN_INFO = "TurnInfo"
    CONFIGURE_SUCCESS = "ConfigureSuccess"
    CONFIGURE_FAILURE = "ConfigureFailure"


class FluxEventType(str, Enum):
    """Deepgram Flux TurnInfo event types.

    These events are contained within TurnInfo messages and indicate
    different stages of speech processing and turn detection.
    """

    START_OF_TURN = "StartOfTurn"
    TURN_RESUMED = "TurnResumed"
    END_OF_TURN = "EndOfTurn"
    EAGER_END_OF_TURN = "EagerEndOfTurn"
    UPDATE = "Update"


@dataclass
class DeepgramFluxSTTSettings(STTSettings):
    """Settings for DeepgramFluxSTTService.

    Parameters:
        eager_eot_threshold: EagerEndOfTurn/TurnResumed threshold. Off by default.
            Lower values = more aggressive (faster response, more LLM calls).
            Higher values = more conservative (slower response, fewer LLM calls).
        eot_threshold: End-of-turn confidence required to finish a turn (default 0.7).
        eot_timeout_ms: Time in ms after speech to finish a turn regardless of EOT
            confidence (default 5000).
        keyterm: Keyterms to boost recognition accuracy for specialized terminology.
        min_confidence: Minimum confidence required to create a TranscriptionFrame.
    """

    eager_eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    eot_threshold: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    eot_timeout_ms: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    keyterm: list | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
    min_confidence: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)


class DeepgramFluxSTTService(WebsocketSTTService):
    """Deepgram Flux speech-to-text service.

    Provides real-time speech recognition using Deepgram's WebSocket API with Flux capabilities.
    Supports configurable models, VAD events, and various audio processing options
    including advanced turn detection and EagerEndOfTurn events for improved conversational AI performance.

    Event handlers available (in addition to WebsocketSTTService events):

    - on_speech_started(service): Deepgram detected start of speech
    - on_utterance_end(service): Deepgram detected end of utterance
    - on_end_of_turn(service): Deepgram detected end of turn (EOT)
    - on_eager_end_of_turn(service): Deepgram predicted end of turn (EagerEOT)
    - on_turn_resumed(service): User resumed speaking after EagerEOT

    Example::

        @stt.event_handler("on_end_of_turn")
        async def on_end_of_turn(service):
            ...
    """

    Settings = DeepgramFluxSTTSettings
    _settings: DeepgramFluxSTTSettings
    _CONFIGURE_FIELDS = {"keyterm", "eot_threshold", "eager_eot_threshold", "eot_timeout_ms"}

    class InputParams(BaseModel):
        """Configuration parameters for Deepgram Flux API.

        .. deprecated:: 0.0.105
            Use ``settings=DeepgramFluxSTTSettings(...)`` instead.

        Parameters:
            eager_eot_threshold: Optional. EagerEndOfTurn/TurnResumed are off by default.
                You can turn them on by setting eager_eot_threshold to a valid value.
                Lower values = more aggressive EagerEndOfTurning (faster response, more LLM calls).
                Higher values = more conservative EagerEndOfTurning (slower response, fewer LLM calls).
            eot_threshold: Optional. End-of-turn confidence required to finish a turn (default 0.7).
                Lower values = turns end sooner (more interruptions, faster responses).
                Higher values = turns end later (fewer interruptions, more complete utterances).
            eot_timeout_ms: Optional. Time in milliseconds after speech to finish a turn
                regardless of EOT confidence (default 5000).
            keyterm: List of keyterms to boost recognition accuracy for specialized terminology.
            mip_opt_out: Optional. Opts out requests from the Deepgram Model Improvement Program
                (default False).
            tag: List of tags to label requests for identification during usage reporting.
            min_confidence: Optional. Minimum confidence required confidence to create a TranscriptionFrame
        """

        eager_eot_threshold: Optional[float] = None
        eot_threshold: Optional[float] = None
        eot_timeout_ms: Optional[int] = None
        keyterm: list = []
        mip_opt_out: Optional[bool] = None
        tag: list = []
        min_confidence: Optional[float] = None  # New parameter

    def __init__(
        self,
        *,
        api_key: str,
        url: str = "wss://api.deepgram.com/v2/listen",
        sample_rate: Optional[int] = None,
        mip_opt_out: Optional[bool] = None,
        model: Optional[str] = None,
        flux_encoding: str = "linear16",
        tag: Optional[list] = None,
        params: Optional[InputParams] = None,
        should_interrupt: bool = True,
        settings: Optional[DeepgramFluxSTTSettings] = None,
        **kwargs,
    ):
        """Initialize the Deepgram Flux STT service.

        Args:
            api_key: Deepgram API key for authentication. Required for API access.
            url: WebSocket URL for the Deepgram Flux API. Defaults to the preview endpoint.
            sample_rate: Audio sample rate in Hz. If None, uses the pipeline
                sample rate.
            mip_opt_out: Opt out of the Deepgram Model Improvement Program.
            model: Deepgram Flux model to use for transcription.

                .. deprecated:: 0.0.105
                    Use ``settings=DeepgramFluxSTTSettings(model=...)`` instead.

            flux_encoding: Audio encoding format required by Flux API. Must be "linear16".
                Raw signed little-endian 16-bit PCM encoding.
            tag: Tags to label requests for identification during usage reporting.
            params: InputParams instance containing detailed API configuration options.

                .. deprecated:: 0.0.105
                    Use ``settings=DeepgramFluxSTTSettings(...)`` instead.

            should_interrupt: Determine whether the bot should be interrupted when Flux detects that the user is speaking.
            settings: Runtime-updatable settings. When provided alongside deprecated
                parameters, ``settings`` values take precedence.
            **kwargs: Additional arguments passed to the parent WebsocketSTTService class.

        Examples:
            Basic usage with default parameters::

                stt = DeepgramFluxSTTService(api_key="your-api-key")

            Advanced usage with custom parameters::

                stt = DeepgramFluxSTTService(
                    api_key="your-api-key",
                    settings=DeepgramFluxSTTSettings(
                        model="flux-general-en",
                        eager_eot_threshold=0.5,
                        eot_threshold=0.8,
                        keyterm=["AI", "machine learning", "neural network"],
                        tag=["production", "voice-agent"],
                    ),
                )
        """
        # Note: For DeepgramFluxSTTService, differently from other processes, we need to create
        # the _receive_task inside _connect_websocket, because the websocket should only be
        # considered connected and ready to send audio once we receive from Flux the message
        # which confirms the connection has been established.
        # If we try to keep the logic reconnect_on_error, when receiving a message, the
        # _receive_task_handler would try to reconnect in case of error, invoking the
        # _connect_websocket again and leading to a case where the first _receive_task_handler
        # was never destroyed.
        # So we can keep it here as false, because inside the method send_with_retry, it will
        # already try to reconnect if needed.

        # 1. Initialize default_settings with hardcoded defaults
        default_settings = DeepgramFluxSTTSettings(
            model="flux-general-en",
            language=Language.EN,
            eager_eot_threshold=None,
            eot_threshold=None,
            eot_timeout_ms=None,
            keyterm=[],
            min_confidence=None,
        )

        # 2. Apply direct init arg overrides (deprecated)
        if model is not None:
            _warn_deprecated_param("model", DeepgramFluxSTTSettings, "model")
            default_settings.model = model

        # 3. Apply params overrides — only if settings not provided
        if params is not None:
            _warn_deprecated_param("params", DeepgramFluxSTTSettings)
            if not settings:
                default_settings.eager_eot_threshold = params.eager_eot_threshold
                default_settings.eot_threshold = params.eot_threshold
                default_settings.eot_timeout_ms = params.eot_timeout_ms
                default_settings.keyterm = params.keyterm or []
                if params.tag and tag is None:
                    tag = params.tag
                default_settings.min_confidence = params.min_confidence
                if params.mip_opt_out is not None:
                    mip_opt_out = params.mip_opt_out

        # 4. Apply settings delta (canonical API, always wins)
        if settings is not None:
            default_settings.apply_update(settings)

        super().__init__(
            sample_rate=sample_rate,
            reconnect_on_error=False,
            settings=default_settings,
            **kwargs,
        )
        self._api_key = api_key
        self._url = url
        self._should_interrupt = should_interrupt
        self._encoding = flux_encoding
        self._mip_opt_out = mip_opt_out
        self._tag = tag or []
        self._websocket_url = None
        self._receive_task = None

        # Flux event handlers
        self._register_event_handler("on_start_of_turn")
        self._register_event_handler("on_turn_resumed")
        self._register_event_handler("on_end_of_turn")
        self._register_event_handler("on_eager_end_of_turn")
        self._register_event_handler("on_update")
        self._connection_established_event = asyncio.Event()
        # Watchdog task to prevent dangling tasks
        # If we stop sending audio to Flux after we have received that the User has started speaking
        # we never receive the user stopped speaking event unless we resume sending audio to it.
        self._last_stt_time = None
        self._watchdog_task = None
        self._user_is_speaking = False

    async def _connect(self):
        """Connect to WebSocket and start background tasks.

        Establishes the WebSocket connection to the Deepgram Flux API and starts
        the background task for receiving transcription results.
        """
        await super()._connect()

        await self._connect_websocket()

    async def _disconnect(self):
        """Disconnect from WebSocket and clean up tasks.

        Gracefully disconnects from the Deepgram Flux API, cancels background tasks,
        and cleans up resources to prevent memory leaks.
        """
        await super()._disconnect()

        try:
            await self._disconnect_websocket()
        except Exception as e:
            await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
        finally:
            # Reset state only after everything is cleaned up
            self._websocket = None

    async def _send_silence(self, duration_secs: float = 0.5):
        """Send a block of silence of the specified duration (default 500 ms)."""
        sample_width = 2  # bytes per sample for 16-bit PCM
        num_channels = 1  # mono
        num_samples = int(self.sample_rate * duration_secs)
        silence = b"\x00" * (num_samples * sample_width * num_channels)
        await self._websocket.send(silence)

    async def _watchdog_task_handler(self):
        while self._websocket and self._websocket.state is State.OPEN:
            now = time.monotonic()
            # More than 500 ms without sending new audio to Flux
            if self._user_is_speaking and self._last_stt_time and now - self._last_stt_time > 0.5:
                logger.warning("Sending silence to Flux to prevent dangling task")
                await self._send_silence()
                self._last_stt_time = time.monotonic()
            # check every 100ms
            await asyncio.sleep(0.1)

    async def _connect_websocket(self):
        """Establish WebSocket connection to API.

        Creates a WebSocket connection to the Deepgram Flux API using the configured
        URL and authentication headers. Handles connection errors and reports them
        through the event handler system.
        """
        try:
            if self._websocket and self._websocket.state is State.OPEN:
                return

            self._connection_established_event.clear()
            self._user_is_speaking = False
            self._websocket = await websocket_connect(
                self._websocket_url,
                additional_headers={"Authorization": f"Token {self._api_key}"},
            )

            headers = {
                k: v for k, v in self._websocket.response.headers.items() if k.startswith("dg-")
            }
            logger.debug(f'{self}: Websocket connection initialized: {{"headers": {headers}}}')

            # Creating the receiver task
            if not self._receive_task:
                self._receive_task = self.create_task(
                    self._receive_task_handler(self._report_error)
                )

            # Creating the watchdog task
            if not self._watchdog_task:
                self._watchdog_task = self.create_task(self._watchdog_task_handler())

            # Now wait for the connection established event
            logger.debug("WebSocket connected, waiting for server confirmation...")
            await self._connection_established_event.wait()
            logger.debug("Connected to Deepgram Flux Websocket")
            await self._call_event_handler("on_connected")
        except Exception as e:
            await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
            self._websocket = None
            await self._call_event_handler("on_connection_error", f"{e}")

    async def _disconnect_websocket(self):
        """Close WebSocket connection and clean up state.

        Closes the WebSocket connection to the Deepgram Flux API and stops all
        metrics collection. Handles disconnection errors gracefully.
        """
        try:
            # Cancel background tasks BEFORE closing websocket
            if self._receive_task:
                await self.cancel_task(self._receive_task, timeout=2.0)
                self._receive_task = None
            if self._watchdog_task:
                await self.cancel_task(self._watchdog_task, timeout=2.0)
                self._watchdog_task = None
                self._last_stt_time = None

            self._connection_established_event.clear()
            await self.stop_all_metrics()

            if self._websocket:
                await self._send_close_stream()
                logger.debug("Disconnecting from Deepgram Flux Websocket")
                await self._websocket.close()
        except Exception as e:
            await self.push_error(error_msg=f"Error closing websocket: {e}", exception=e)
        finally:
            self._websocket = None
            await self._call_event_handler("on_disconnected")

    async def _send_close_stream(self) -> None:
        """Sends a CloseStream control message to the Deepgram Flux WebSocket API.

        This signals to the server that no more audio data will be sent.
        """
        try:
            if self._websocket:
                logger.debug("Sending CloseStream message to Deepgram Flux")
                message = {"type": "CloseStream"}
                await self._websocket.send(json.dumps(message))
        except Exception as e:
            await self.push_error(error_msg=f"Error sending closeStream: {e}", exception=e)

    async def _send_configure(self, fields: set[str]):
        """Send a Configure control message to update settings mid-stream.

        Builds a Configure JSON message containing only the fields that changed
        and sends it over the existing WebSocket connection.

        Args:
            fields: Set of changed field names to include in the message.
        """
        message: dict[str, Any] = {"type": "Configure"}

        if "keyterm" in fields:
            message["keyterms"] = self._settings.keyterm

        thresholds: dict[str, Any] = {}
        if "eot_threshold" in fields:
            thresholds["eot_threshold"] = self._settings.eot_threshold
        if "eager_eot_threshold" in fields:
            thresholds["eager_eot_threshold"] = self._settings.eager_eot_threshold
        if "eot_timeout_ms" in fields:
            thresholds["eot_timeout_ms"] = self._settings.eot_timeout_ms
        if thresholds:
            message["thresholds"] = thresholds

        logger.debug(f"{self}: sending Configure message: {message}")
        await self._websocket.send(json.dumps(message))

    def can_generate_metrics(self) -> bool:
        """Check if this service can generate processing metrics.

        Returns:
            True, as Deepgram service supports metrics generation.
        """
        return True

    async def _update_settings(self, delta: DeepgramFluxSTTSettings) -> dict[str, Any]:
        """Apply a settings delta.

        Configure-able fields (keyterm, eot_threshold, eager_eot_threshold,
        eot_timeout_ms) are sent to Deepgram via a Configure WebSocket message.
        Other fields are stored but cannot be applied to the active connection.
        """
        changed = await super()._update_settings(delta)

        if not changed:
            return changed

        configure_fields = changed.keys() & self._CONFIGURE_FIELDS
        if configure_fields and self._websocket and self._websocket.state is State.OPEN:
            await self._send_configure(configure_fields)

        self._warn_unhandled_updated_settings(changed.keys() - self._CONFIGURE_FIELDS)

        return changed

    async def start(self, frame: StartFrame):
        """Start the Deepgram Flux STT service.

        Initializes the service by constructing the WebSocket URL with all configured
        parameters and establishing the connection to begin transcription processing.

        Args:
            frame: The start frame containing initialization parameters and metadata.
        """
        await super().start(frame)

        url_params = [
            f"model={self._settings.model}",
            f"sample_rate={self.sample_rate}",
            f"encoding={self._encoding}",
        ]

        if self._settings.eager_eot_threshold is not None:
            url_params.append(f"eager_eot_threshold={self._settings.eager_eot_threshold}")

        if self._settings.eot_threshold is not None:
            url_params.append(f"eot_threshold={self._settings.eot_threshold}")

        if self._settings.eot_timeout_ms is not None:
            url_params.append(f"eot_timeout_ms={self._settings.eot_timeout_ms}")

        if self._mip_opt_out is not None:
            url_params.append(f"mip_opt_out={str(self._mip_opt_out).lower()}")

        # Add keyterm parameters (can have multiple)
        for keyterm in self._settings.keyterm:
            url_params.append(urlencode({"keyterm": keyterm}))

        # Add tag parameters (can have multiple)
        for tag_value in self._tag:
            url_params.append(urlencode({"tag": tag_value}))

        self._websocket_url = f"{self._url}?{'&'.join(url_params)}"
        await self._connect()

    async def stop(self, frame: EndFrame):
        """Stop the Deepgram Flux STT service.

        Args:
            frame: The end frame.
        """
        await super().stop(frame)
        await self._disconnect()

    async def cancel(self, frame: CancelFrame):
        """Cancel the Deepgram Flux STT service.

        Args:
            frame: The cancel frame.
        """
        await super().cancel(frame)
        await self._disconnect()

    async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
        """Send audio data to Deepgram Flux for transcription.

        Transmits raw audio bytes to the Deepgram Flux API for real-time speech
        recognition. Transcription results are received asynchronously through
        WebSocket callbacks and processed in the background.

        Args:
            audio: Raw audio bytes in linear16 format (signed little-endian 16-bit PCM).

        Yields:
            Frame: None (transcription results are delivered via WebSocket callbacks
                rather than as return values from this method).

        Raises:
            Exception: If the WebSocket connection is not established or if there
                are issues sending the audio data.
        """
        if not self._websocket:
            return

        try:
            self._last_stt_time = time.monotonic()
            await self.send_with_retry(audio, self._report_error)
        except Exception as e:
            yield ErrorFrame(error=f"Unknown error occurred: {e}")
            return

        yield None

    async def start_metrics(self):
        """Start TTFB and processing metrics collection."""
        # TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
        # Ideally, TTFB should measure the time from when a user starts speaking
        # until we receive the first transcript. However, Deepgram Flux delivers
        # both the "user started speaking" event and the first transcript simultaneously,
        # making this timing measurement meaningless in this context.
        # await self.start_ttfb_metrics()
        await self.start_processing_metrics()

    @traced_stt
    async def _handle_transcription(
        self, transcript: str, is_final: bool, language: Optional[Language] = None
    ):
        """Handle a transcription result with tracing."""
        pass

    def _get_websocket(self):
        """Get the current WebSocket connection.

        Returns the active WebSocket connection instance, raising an exception
        if no connection is currently established.

        Returns:
            The active WebSocket connection instance.

        Raises:
            Exception: If no WebSocket connection is currently active.
        """
        if self._websocket:
            return self._websocket
        raise Exception("Websocket not connected")

    def _validate_message(self, data: Dict[str, Any]) -> bool:
        """Validate basic message structure from Deepgram Flux.

        Ensures the received message has the expected structure before processing.

        Args:
            data: The parsed JSON message data to validate.

        Returns:
            True if the message structure is valid, False otherwise.
        """
        if not isinstance(data, dict):
            logger.warning("Message is not a dictionary")
            return False

        if "type" not in data:
            logger.warning("Message missing 'type' field")
            return False

        return True

    async def _receive_messages(self):
        """Receive and process messages from WebSocket.

        Continuously receives messages from the Deepgram Flux WebSocket connection
        and processes various message types including connection status, transcription
        results, turn information, and error conditions. Handles different event types
        such as StartOfTurn, EndOfTurn, EagerEndOfTurn, and Update events.
        """
        async for message in self._get_websocket():
            if isinstance(message, str):
                try:
                    data = json.loads(message)
                    await self._handle_message(data)
                except json.JSONDecodeError as e:
                    logger.error(f"Failed to decode JSON message: {e}")
                    # Skip malformed messages
                    continue
                except Exception as e:
                    await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
                    # Error will be handled inside WebsocketService->_receive_task_handler
                    raise
            else:
                logger.warning(f"Received non-string message: {type(message)}")

    async def _handle_message(self, data: Dict[str, Any]):
        """Handle a parsed WebSocket message from Deepgram Flux.

        Routes messages to appropriate handlers based on their type. Validates
        message structure before processing.

        Args:
            data: The parsed JSON message data from the WebSocket.
        """
        if not self._validate_message(data):
            return

        message_type = data.get("type")

        try:
            flux_message_type = FluxMessageType(message_type)
        except ValueError:
            logger.debug(f"Unhandled message type: {message_type or 'unknown'}")
            return

        match flux_message_type:
            case FluxMessageType.RECEIVE_CONNECTED:
                await self._handle_connection_established()
            case FluxMessageType.RECEIVE_FATAL_ERROR:
                await self._handle_fatal_error(data)
            case FluxMessageType.TURN_INFO:
                await self._handle_turn_info(data)
            case FluxMessageType.CONFIGURE_SUCCESS:
                logger.info(f"{self}: Configure accepted: {data}")
            case FluxMessageType.CONFIGURE_FAILURE:
                error_code = data.get("error_code", "unknown")
                description = data.get("description", "no description")
                error_msg = f"Configure rejected: [{error_code}] {description}"
                logger.warning(f"{self}: {error_msg}")
                await self.push_error(error_msg=error_msg)

    async def _handle_connection_established(self):
        """Handle successful connection establishment to Deepgram Flux.

        This event is fired when the WebSocket connection to Deepgram Flux
        is successfully established and ready to receive audio data for
        transcription processing.
        """
        logger.info("Connected to Flux - ready to stream audio")
        # Notify connection is established
        self._connection_established_event.set()

    async def _handle_fatal_error(self, data: Dict[str, Any]):
        """Handle fatal error messages from Deepgram Flux.

        Fatal errors indicate unrecoverable issues with the connection or
        configuration that require intervention. These errors will cause
        the connection to be terminated.

        Args:
            data: The error message data containing error details.

        Raises:
            Exception: Always raises to trigger error handling in the parent service.
        """
        error_msg = data.get("error", "Unknown error")
        deepgram_error = f"Fatal error: {error_msg}"
        logger.error(deepgram_error)
        # Error will be handled inside WebsocketService->_receive_task_handler
        raise Exception(deepgram_error)

    async def _handle_turn_info(self, data: Dict[str, Any]):
        """Handle TurnInfo events from Deepgram Flux.

        TurnInfo messages contain various turn-based events that indicate
        the state of speech processing, including turn boundaries, interim
        results, and turn finalization events.

        Args:
            data: The TurnInfo message data containing event type, transcript and some extra metadata.
        """
        event = data.get("event")
        transcript = data.get("transcript", "")

        try:
            flux_event_type = FluxEventType(event)
        except ValueError:
            logger.debug(f"Unhandled TurnInfo event: {event}")
            return

        match flux_event_type:
            case FluxEventType.START_OF_TURN:
                await self._handle_start_of_turn(transcript)
            case FluxEventType.TURN_RESUMED:
                await self._handle_turn_resumed(event)
            case FluxEventType.END_OF_TURN:
                await self._handle_end_of_turn(transcript, data)
            case FluxEventType.EAGER_END_OF_TURN:
                await self._handle_eager_end_of_turn(transcript, data)
            case FluxEventType.UPDATE:
                await self._handle_update(transcript)

    async def _handle_start_of_turn(self, transcript: str):
        """Handle StartOfTurn events from Deepgram Flux.

        StartOfTurn events are fired when Deepgram Flux detects the beginning
        of a new speaking turn. This triggers bot interruption to stop any
        ongoing speech synthesis and signals the start of user speech detection.

        The service will:
        - Send a BotInterruptionFrame upstream to stop bot speech
        - Send a UserStartedSpeakingFrame downstream to notify other components
        - Start metrics collection for measuring response times

        Args:
            transcript: maybe the first few words of the turn.
        """
        logger.debug("User started speaking")
        self._user_is_speaking = True
        await self.broadcast_frame(UserStartedSpeakingFrame)
        if self._should_interrupt:
            await self.broadcast_interruption()
        await self.start_metrics()
        await self._call_event_handler("on_start_of_turn", transcript)
        if transcript:
            logger.trace(f"Start of turn transcript: {transcript}")

    async def _handle_turn_resumed(self, event: str):
        """Handle TurnResumed events from Deepgram Flux.

        TurnResumed events indicate that speech has resumed after a brief pause
        within the same turn. This is primarily used for logging and debugging
        purposes and doesn't trigger any significant processing changes.

        Args:
            event: The event type string for logging purposes.
        """
        logger.trace(f"Received event TurnResumed: {event}")
        await self._call_event_handler("on_turn_resumed")

    def _calculate_average_confidence(self, transcript_data) -> Optional[float]:
        """Calculate the average confidence from transcript data.

        Return None if the data is missing or invalid.
        """
        # Example: Assume transcript_data has a list of words with confidence
        words = transcript_data.get("words")
        if not words or not isinstance(words, list):
            return None
        confidences = [
            w.get("confidence") for w in words if isinstance(w.get("confidence"), (float, int))
        ]
        if not confidences:
            return None
        return sum(confidences) / len(confidences)

    async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
        """Handle EndOfTurn events from Deepgram Flux.

        EndOfTurn events are fired when Deepgram Flux determines that a speaking
        turn has concluded, either due to sufficient silence or end-of-turn
        confidence thresholds being met. This provides the final transcript
        for the completed turn.

        The service will:
        - Create and send a final TranscriptionFrame with the complete transcript
        - Trigger transcription handling with tracing for metrics
        - Stop processing metrics collection
        - Send a UserStoppedSpeakingFrame to signal turn completion

        Args:
            transcript: The final transcript text for the completed turn.
            data: The TurnInfo message data containing event type, transcript and some extra metadata.
        """
        logger.debug("User stopped speaking")
        self._user_is_speaking = False

        # Compute the average confidence
        average_confidence = self._calculate_average_confidence(data)

        if not self._settings.min_confidence or average_confidence > self._settings.min_confidence:
            # EndOfTurn means Flux has determined the turn is complete,
            # so this TranscriptionFrame is always finalized
            await self.push_frame(
                TranscriptionFrame(
                    transcript,
                    self._user_id,
                    time_now_iso8601(),
                    self._settings.language,
                    result=data,
                    finalized=True,
                )
            )
        else:
            logger.warning(
                f"Transcription confidence below min_confidence threshold: {average_confidence}"
            )

        await self._handle_transcription(transcript, True, self._settings.language)
        await self.stop_processing_metrics()
        await self.broadcast_frame(UserStoppedSpeakingFrame)
        await self._call_event_handler("on_end_of_turn", transcript)

    async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):
        """Handle EagerEndOfTurn events from Deepgram Flux.

        EagerEndOfTurn events are fired when the end-of-turn confidence reaches the
        EagerEndOfTurn threshold but hasn't yet reached the full end-of-turn threshold.
        These provide interim transcripts that can be used for faster response
        generation while still allowing the user to continue speaking.

        EagerEndOfTurn events enable more responsive conversational AI by allowing
        the LLM to start processing likely final transcripts before the turn
        is definitively ended.

        Args:
            transcript: The interim transcript text that triggered the EagerEndOfTurn event.
            data: The TurnInfo message data containing event type, transcript and some extra metadata.
        """
        logger.trace(f"EagerEndOfTurn - {transcript}")
        # Deepgram's EagerEndOfTurn feature enables lower-latency voice agents by sending
        # medium-confidence transcripts before EndOfTurn certainty, allowing LLM processing to
        # begin early.
        #
        # However, if speech resumes or the transcripts differ from the final EndOfTurn, the
        # EagerEndOfTurn response should be cancelled to avoid incorrect or partial responses.
        #
        # Pipecat doesn't yet provide built-in Gate/control mechanisms to:
        # 1. Start LLM/TTS processing early on EagerEndOfTurn events
        # 2. Cancel in-flight processing when TurnResumed occurs
        #
        # By pushing EagerEndOfTurn transcripts as InterimTranscriptionFrame, we enable
        # developers to implement custom EagerEndOfTurn handling in their applications while
        # maintaining compatibility with existing interim transcription workflows.
        #
        # TODO: Implement proper EagerEndOfTurn support with cancellable processing pipeline
        # that can start response generation on EagerEndOfTurn and cancel or confirm it.
        await self.push_frame(
            InterimTranscriptionFrame(
                transcript,
                self._user_id,
                time_now_iso8601(),
                self._settings.language,
                result=data,
            )
        )
        await self._call_event_handler("on_eager_end_of_turn", transcript)

    async def _handle_update(self, transcript: str):
        """Handle Update events from Deepgram Flux.

        Update events provide incremental transcript updates during an ongoing
        turn. These events allow for real-time display of transcription progress
        and can be used to provide visual feedback to users about what's being
        recognized.

        The service stops TTFB (Time To First Byte) metrics when the first
        substantial update is received, indicating successful processing start.

        Args:
            transcript: The current partial transcript text for the ongoing turn.
        """
        if transcript:
            logger.trace(f"Update event: {transcript}")
            # TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
            # Ideally, TTFB should measure the time from when a user starts speaking
            # until we receive the first transcript. However, Deepgram Flux delivers
            # both the "user started speaking" event and the first transcript simultaneously,
            # making this timing measurement meaningless in this context.
            # await self.stop_ttfb_metrics()
            await self._call_event_handler("on_update", transcript)
