
    qik/                         d Z ddlmZmZmZmZmZmZ ddlm	Z	 ddl
mZmZmZmZmZmZ ddlmZ ddlmZ ddlmZmZ ddlmZ  G d	 d
e      Z G d de      Z G d de      Z ede      Z G d deee         Zy)zjService switcher for switching between different services at runtime, with different switching strategies.    )AnyGenericListOptionalTypeTypeVar)logger)
ErrorFrameFrameManuallySwitchServiceFrameServiceMetadataFrameServiceSwitcherFrame#ServiceSwitcherRequestMetadataFrame)ParallelPipeline)FunctionFilter)FrameDirectionFrameProcessor)
BaseObjectc                        e Zd ZdZdee   f fdZedee   fd       Zedefd       Z	de
dedee   fd	Zd
edee   fdZdedee   fdZ xZS )ServiceSwitcherStrategya  Base class for service switching strategies.

    Note:
        Strategy classes are instantiated internally by ServiceSwitcher.
        Developers should pass the strategy class (not an instance) to ServiceSwitcher.

    Event handlers available:

    - on_service_switched: Called when the active service changes.

    Example::

        @strategy.event_handler("on_service_switched")
        async def on_service_switched(strategy, service):
            ...
    servicesc                     t         |           t        |      dk(  rt        d      || _        |d   | _        | j                  d       y)a  Initialize the service switcher strategy with a list of services.

        Note:
            This is called internally by ServiceSwitcher. Do not instantiate directly.

        Args:
            services: List of frame processors to switch between.
        r   z2ServiceSwitcherStrategy needs at least one serviceon_service_switchedN)super__init__len	Exception	_services_active_service_register_event_handler)selfr   	__class__s     S/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/pipeline/service_switcher.pyr   z ServiceSwitcherStrategy.__init__-   sJ     	x=APRR!'{$$%:;    returnc                     | j                   S z&Return the list of available services.r   r!   s    r#   r   z ServiceSwitcherStrategy.services@        ~~r$   c                     | j                   S )z$Return the currently active service.)r   r)   s    r#   active_servicez&ServiceSwitcherStrategy.active_serviceE   s     ###r$   frame	directionc                    K   yw)a  Handle a frame that controls service switching.

        The base implementation returns ``None`` for all frames. Subclasses
        override this to implement specific switching behaviors.

        Args:
            frame: The frame to handle.
            direction: The direction of the frame (upstream or downstream).

        Returns:
            The newly active service if a switch occurred, or None otherwise.
        N r!   r-   r.   s      r#   handle_framez$ServiceSwitcherStrategy.handle_frameJ   s         errorc                    K   yw)a  Handle an error from the active service.

        Called by ``ServiceSwitcher`` when a non-fatal ``ErrorFrame`` is pushed
        upstream by the currently active service. Subclasses can override this
        to implement automatic failover.

        Args:
            error: The error frame pushed by the active service.

        Returns:
            The newly active service if a switch occurred, or None otherwise.
        Nr0   )r!   r4   s     r#   handle_errorz$ServiceSwitcherStrategy.handle_error[   s      r3   servicec                    K   || j                   v rF|| _        |j                  t        |             d{    | j	                  d|       d{    |S y7 !7 	w)a  Set the active service to the given one, if it is in the list of available services.

        If it's not in the list, the request is ignored, as it may have been
        intended for another ServiceSwitcher in the pipeline.

        Args:
            service: The service to set as active.

        Returns:
            The newly active service, or None if the service was not found.
        )r7   Nr   )r   r   queue_framer   _call_event_handler)r!   r7   s     r#   _set_active_if_availablez0ServiceSwitcherStrategy._set_active_if_availablej   s`      dmm##*D %%&IRY&Z[[[**+@'JJJN \Js!   4AAAAAA)__name__
__module____qualname____doc__r   r   r   propertyr   r,   r   r   r   r2   r
   r6   r;   __classcell__r"   s   @r#   r   r      s    "<n!5 <& $~.   $ $ $)6D	.	!"
 x7O n R`Ia r$   r   c                   *    e Zd ZdZdededee   fdZy)ServiceSwitcherStrategyManualac  A strategy for switching between services manually.

    This strategy allows the user to manually select which service is active.
    The initial active service is the first one in the list.

    Example::

        stt_switcher = ServiceSwitcher(
            services=[stt_1, stt_2],
            strategy_type=ServiceSwitcherStrategyManual
        )
    r-   r.   r%   c                 v   K   t        |t              r#| j                  |j                         d{   S y7 w)a  Handle a frame that controls service switching.

        Args:
            frame: The frame to handle.
            direction: The direction of the frame (upstream or downstream).

        Returns:
            The newly active service if a switch occurred, or None otherwise.
        N)
isinstancer   r;   r7   r1   s      r#   r2   z*ServiceSwitcherStrategyManual.handle_frame   s4      e7866u}}EEE Fs   /979N)	r<   r=   r>   r?   r   r   r   r   r2   r0   r$   r#   rD   rD   ~   s)    )6D	.	!r$   rD   c                   &    e Zd ZdZdedee   fdZy)ServiceSwitcherStrategyFailovera  A strategy that automatically switches to a backup service on failure.

    When the active service produces a non-fatal error, this strategy switches
    to the next available service in the list. Recovery and fallback policies
    are left to application code via the ``on_service_switched`` event.

    Event handlers available:

    - on_service_switched: Called when the active service changes.

    Example::

        switcher = ServiceSwitcher(
            services=[primary_stt, backup_stt],
            strategy_type=ServiceSwitcherStrategyFailover,
        )

        @switcher.strategy.event_handler("on_service_switched")
        async def on_switched(strategy, service):
            # App decides when/how to recover the failed service
            ...
    r4   r%   c                   K   t        j                  d| j                  j                   d|j                          t        | j                        dk  rt        j                  d       y| j                  j                  | j                        }|dz   t        | j                        z  }| j                  | j                  |          d{   S 7 w)a  Handle an error from the active service by failing over.

        Switches to the next service in the list. The failed service remains
        in the list and can be switched back to manually or via application
        logic in the ``on_service_switched`` event handler.

        Args:
            error: The error frame pushed by the active service.

        Returns:
            The newly active service if a switch occurred, or None if no
            other service is available.
        zService z reported an error:    z'No other service available to switch toN)	r	   warningr   namer4   r   r   indexr;   )r!   r4   current_idxnext_idxs       r#   r6   z,ServiceSwitcherStrategyFailover.handle_error   s      	$"6"6";";!<<PQVQ\Q\P]^_t~~!#LLBCnn**4+?+?@!Os4>>'::224>>(3KLLLLs   C	CCCN)r<   r=   r>   r?   r
   r   r   r6   r0   r$   r#   rH   rH      s"    .M
 Mx7O Mr$   rH   StrategyType)boundc                        e Zd ZdZefdee   dee   f fdZ	e
defd       Ze
dee   fd       Zedee   dedee   fd	       Zed
ededefd       Zej&                  fdedef fdZdedef fdZ xZS )ServiceSwitchera  Parallel pipeline that routes frames to one active service at a time.

    Wraps each service in a pair of filters that gate frame flow based on
    which service is currently active. Switching is controlled by
    `ServiceSwitcherFrame` frames and delegated to a pluggable
    `ServiceSwitcherStrategy`.

    Example::

        switcher = ServiceSwitcher(services=[stt_1, stt_2])
    r   strategy_typec                 j     ||      }t        |   | j                  ||        || _        || _        y)a?  Initialize the service switcher with a list of services and a switching strategy.

        Args:
            services: List of frame processors to switch between.
            strategy_type: The strategy class to use for switching between services.
                Defaults to ``ServiceSwitcherStrategyManual``.
        N)r   r   _make_pipeline_definitionsr   	_strategy)r!   r   rT   rW   r"   s       r#   r   zServiceSwitcher.__init__   s7     "(+	$99(INO!"r$   r%   c                     | j                   S )z%Return the active switching strategy.)rW   r)   s    r#   strategyzServiceSwitcher.strategy   r*   r$   c                     | j                   S r'   r(   r)   s    r#   r   zServiceSwitcher.services   r*   r$   rY   c                 b    g }| D ]'  }|j                  t        j                  ||             ) |S N)appendrS   _make_pipeline_definition)r   rY   	pipelinesr7   s       r#   rV   z*ServiceSwitcher._make_pipeline_definitions   s<     	 	[G_FFwPXYZ	[r$   r7   c                      dt         dt        f fd}t        |t        j                  dd       t        |t        j
                  dd      gS )N_r%   c                 *   K   j                   k(  S wr\   )r,   )ra   r7   rY   s    r#   filterz9ServiceSwitcher._make_pipeline_definition.<locals>.filter  s     h5555s   T)rc   r.   filter_system_framesenable_direct_mode)r   boolr   r   
DOWNSTREAMUPSTREAM)r7   rY   rc   s   `` r#   r^   z)ServiceSwitcher._make_pipeline_definition  s[    	6E 	6d 	6 (33%)#'	 (11%)#'	
 	
r$   r-   r.   c                   K   t        |t              r$|j                  | j                  j                  k(  ryt        |t
              r.|j                  | j                  j                  j                  k7  ryt        |t              r/|j                  s#| j                  j                  |       d{    t        | 1  ||       d{    y7 7 w)a  Push a frame out of the service switcher.

        Suppresses `ServiceSwitcherRequestMetadataFrame` targeting the active
        service (since it has already been handled) and `ServiceMetadataFrame`
        from inactive services so only the active service's metadata reaches
        downstream processors. One case this happens is with `StartFrame` since
        all the filters let it pass, and `StartFrame` causes the service to
        generate `ServiceMetadataFrame`.

        Non-fatal ``ErrorFrame`` instances are forwarded to the strategy via
        ``handle_error`` so strategies like ``ServiceSwitcherStrategyFailover``
        can perform failover. The error frame is still propagated upstream so
        that application-level error handlers can observe it.
        N)rF   r   r7   rY   r,   r   service_namerL   r
   fatalr6   r   
push_frame)r!   r-   r.   r"   s      r#   rl   zServiceSwitcher.push_frame!  s     " e@A}} < << e12!!T]]%A%A%F%FF eZ(--,,U333g 	222 42s$   B-C0C1CC	CCc                    K   t        |t              r@| j                  j                  ||       d{   }|st        |   ||       d{    yyt        |   ||       d{    y7 97 !7 	w)zProcess a frame, handling frames which affect service switching.

        Args:
            frame: The frame to process.
            direction: The direction of the frame (upstream or downstream).
        N)rF   r   rY   r2   r   process_frame)r!   r-   r.   r7   r"   s       r#   rn   zServiceSwitcher.process_frameA  ss      e12 MM66uiHHG g+E9===  ''y999 I
 >9s3   0A3A-A3A/A3'A1(A3/A31A3)r<   r=   r>   r?   rD   r   r   r   rP   r   r@   rY   r   staticmethodr   r   rV   r^   r   rg   r   rl   rn   rA   rB   s   @r#   rS   rS      s   
 -J#~&# L)#" ,   $~.   ~&2I	c  

+B
	
 
: JXIbIb 3e 3 3@: :> : :r$   rS   N)r?   typingr   r   r   r   r   r   logurur	   pipecat.frames.framesr
   r   r   r   r   r   "pipecat.pipeline.parallel_pipeliner   *pipecat.processors.filters.function_filterr   "pipecat.processors.frame_processorr   r   pipecat.utils.base_objectr   r   rD   rH   rP   rS   r0   r$   r#   <module>rw      s    q > >   @ E M 0`j `F$; @.M&C .Mb ~-DE~:&(= ~:r$   