
    qi                     r    d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ  G d d	e      Zy)
zPipeline runner for managing pipeline task execution.

This module provides the PipelineRunner class that handles the execution
of pipeline tasks with signal handling, garbage collection, and lifecycle
management.
    N)Optional)logger)PipelineTaskParams)PipelineTask)
BaseObjectc                        e Zd ZdZdddddddee   deded	ed
eej                     f
 fdZ	de
fdZd Zd Zd Zd Zd Zd Zd Zd Z xZS )PipelineRunnera  Manages the execution of pipeline tasks with lifecycle and signal handling.

    Provides a high-level interface for running pipeline tasks with automatic
    signal handling (SIGINT/SIGTERM), optional garbage collection, and proper
    cleanup of resources.
    NTF)namehandle_siginthandle_sigtermforce_gcloopr
   r   r   r   r   c                    t         |   |       i | _        d| _        || _        |xs t        j                         | _        |r| j                          |r| j                          yy)a  Initialize the pipeline runner.

        Args:
            name: Optional name for the runner instance.
            handle_sigint: Whether to automatically handle SIGINT signals.
            handle_sigterm: Whether to automatically handle SIGTERM signals.
            force_gc: Whether to force garbage collection after task completion.
            loop: Event loop to use. If None, uses the current running loop.
        )r
   N)
super__init___tasks	_sig_task	_force_gcasyncioget_running_loop_loop_setup_sigint_setup_sigterm)selfr
   r   r   r   r   	__class__s         I/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/pipeline/runner.pyr   zPipelineRunner.__init__"   sb    $ 	d#!7W557
 !     taskc                   K   t        j                  d|  d|        || j                  |j                  <   	 t	        | j
                        }|j                  |       d{    | j                  |j                  = | j                          d{    | j                  r| j                   d{    | j                  r| j                          t        j                  d|  d|        y7 # t        j                  $ r Y w xY w7 w7 Yw)zjRun a pipeline task to completion.

        Args:
            task: The pipeline task to execute.
        Runner z started running )r   Nz finished running )r   debugr   r
   r   r   runr   CancelledErrorcleanupr   r   _gc_collect)r   r   paramss      r   r"   zPipelineRunner.runA   s      	wtf$5dV<=!%DII	'TZZ8F((6""" KK		" lln >>..  >>wtf$6tf=># #%% 		 	
 !sX   5D*C0 "C.#C0 '*DD	D1D2<D.C0 0DDDDDc                    K   t        j                  d|  d       t        j                  | j                  j                         D cg c]  }|j                          c}  d{    yc c}w 7 
w)zMSchedule all running tasks to stop when their current processing is complete.r    z* scheduled to stop when all tasks are doneN)r   r!   r   gatherr   valuesstop_when_doner   ts     r   r*   zPipelineRunner.stop_when_donea   sU     wtf$NOPnn4;;;M;M;OPaq//1PQQQPQs   AA2A+A2%A0&A2c                 p   K   t        j                  d|         | j                          d{    y7 w)%Cancel all running tasks immediately.zCancelling runner N)r   r!   _cancelr   s    r   cancelzPipelineRunner.cancelf   s)     )$01lln   ,646c                    K   t        j                  | j                  j                         D cg c]  }|j	                          c}  d{    yc c}w 7 
w)r.   N)r   r(   r   r)   r1   r+   s     r   r/   zPipelineRunner._cancelk   s<     nn4;;3E3E3GHaqxxzHIIIHIs   ,AAAAAc                      	 t        j                         }|j                  t        j                   fd       y# t
        $ r* t        j                  t        j                   fd       Y yw xY w)-Set up signal handlers for graceful shutdown.c                  $    j                         S N_sig_handlerargsr   s    r   <lambda>z.PipelineRunner._setup_sigint.<locals>.<lambda>s   s    ARARAT r   c                 $    j                         S r7   r8   sfr   s     r   r<   z.PipelineRunner._setup_sigint.<locals>.<lambda>v   s    d6G6G6I r   N)r   r   add_signal_handlersignalSIGINTNotImplementedErrorr   r   s   ` r   r   zPipelineRunner._setup_siginto   sP    	K++-D##FMM3TU" 	KMM&--)IJ	K   7; 0A.-A.c                      	 t        j                         }|j                  t        j                   fd       y# t
        $ r* t        j                  t        j                   fd       Y yw xY w)r5   c                  $    j                         S r7   r8   r:   s    r   r<   z/PipelineRunner._setup_sigterm.<locals>.<lambda>|   s    $BSBSBU r   c                 $    j                         S r7   r8   r>   s     r   r<   z/PipelineRunner._setup_sigterm.<locals>.<lambda>   s    t7H7H7J r   N)r   r   rA   rB   SIGTERMrD   rE   s   ` r   r   zPipelineRunner._setup_sigtermx   sP    	L++-D##FNN4UV" 	LMM&..*JK	LrF   c                 n    | j                   s)t        j                  | j                               | _         yy)z1Handle interrupt signals by cancelling all tasks.N)r   r   create_task_sig_cancelr0   s    r   r9   zPipelineRunner._sig_handler   s)    ~~$001A1A1CDDN r   c                 p   K   t        j                  d|         | j                          d{    y7 w)z4Cancel all running tasks due to signal interruption.z)Interruption detected. Cancelling runner N)r   warningr1   r0   s    r   rM   zPipelineRunner._sig_cancel   s)     B4&IJkkmr2   c                     t        j                         }t        j                  d| d       t        j                  dt         j                          y)z)Force garbage collection and log results.zGarbage collector: collected z	 objects.z)Garbage collector: uncollectable objects N)gccollectr   r!   garbage)r   	collecteds     r   r%   zPipelineRunner._gc_collect   s;    JJL	4YKyIJ@MNr   )__name__
__module____qualname____doc__r   strboolr   AbstractEventLoopr   r   r"   r*   r1   r/   r   r   r9   rM   r%   __classcell__)r   s   @r   r	   r	      s     #"$48" sm" 	"
 " " w001">?l ?@R

JKLE

Or   r	   )rX   r   rQ   rB   typingr   logurur   pipecat.pipeline.base_taskr   pipecat.pipeline.taskr   pipecat.utils.base_objectr   r	    r   r   <module>rc      s4     	    9 . 0uOZ uOr   