
    qi                         d Z ddlZddlZddlmZmZ ddlmZ ddlm	Z	m
Z
mZmZ ddlmZ e G d d             Z G d	 d
e      Ze G d d             Z G d de      Zy)zAsyncio task management.

This module provides task management functionality. Includes both abstract base
classes and concrete implementations for managing asyncio tasks with
comprehensive monitoring and cleanup capabilities.
    N)ABCabstractmethod)	dataclass)	CoroutineDictOptionalSequence)loggerc                   0    e Zd ZU dZej
                  ed<   y)TaskManagerParamszConfiguration parameters for task manager initialization.

    Parameters:
        loop: The asyncio event loop to use for task management.
    loopN)__name__
__module____qualname____doc__asyncioAbstractEventLoop__annotations__     T/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/utils/asyncio/task_manager.pyr   r      s     
#
##r   r   c                       e Zd ZdZedefd       Zedej                  fd       Z	ede
dedej                  fd       Zedd
ej                  dee   fd       Zedeej                     fd       Zy	)BaseTaskManagerzAbstract base class for asyncio task management.

    Provides the interface for creating, monitoring, and managing asyncio tasks.
    paramsc                      yzInitialize the task manager with configuration parameters.

        Args:
            params: Configuration parameters for task management.
        Nr   selfr   s     r   setupzBaseTaskManager.setup(        	r   returnc                      y)zuGet the event loop used by this task manager.

        Returns:
            The asyncio event loop instance.
        Nr   r   s    r   get_event_loopzBaseTaskManager.get_event_loop1   r    r   	coroutinenamec                      y)a]  Creates and schedules a new asyncio Task that runs the given coroutine.

        The task is added to a global set of created tasks.

        Args:
            coroutine: The coroutine to be executed within the task.
            name: The name to assign to the task for identification.

        Returns:
            The created task object.
        Nr   )r   r%   r&   s      r   create_taskzBaseTaskManager.create_task:   s     	r   Ntasktimeoutc                    K   yw)X  Cancels the given asyncio Task and awaits its completion with an optional timeout.

        This function removes the task from the set of registered tasks upon
        completion or failure.

        Args:
            task: The task to be cancelled.
            timeout: The optional timeout in seconds to wait for the task to cancel.
        Nr   )r   r)   r*   s      r   cancel_taskzBaseTaskManager.cancel_taskI   s      	s   c                      y)Returns the list of currently created/registered tasks.

        Returns:
            Sequence of currently managed asyncio tasks.
        Nr   r#   s    r   current_taskszBaseTaskManager.current_tasksV   r    r   N)r   r   r   r   r   r   r   r   r   r$   r   strTaskr(   r   floatr-   r	   r0   r   r   r   r   r   "   s    
 -    9 9   Y c gll   
gll 
Xe_ 
 
 x5  r   r   c                   0    e Zd ZU dZej
                  ed<   y)TaskDatazwInternal data structure for tracking task metadata.

    Parameters:
        task: The asyncio Task being managed.
    r)   N)r   r   r   r   r   r3   r   r   r   r   r6   r6   `   s     ,,r   r6   c                       e Zd ZdZddZdefdZdej                  fdZ	de
d	edej                  fd
Zddej                  dee   fdZdeej                     fdZdefdZdej                  fdZy)TaskManagerzConcrete implementation of BaseTaskManager.

    Manages asyncio tasks. Provides comprehensive task lifecycle management
    including creation, monitoring, cancellation, and cleanup.

    r!   Nc                      i | _         d| _        y)z5Initialize the task manager with empty task registry.N)_tasks_paramsr#   s    r   __init__zTaskManager.__init__s   s    +-48r   r   c                 ,    | j                   s|| _         yyr   )r;   r   s     r   r   zTaskManager.setupx   s     ||!DL r   c                 \    | j                   st        d      | j                   j                  S )zGet the event loop used by this task manager.

        Returns:
            The asyncio event loop instance.

        Raises:
            Exception: If the task manager is not properly set up.
        2TaskManager is not setup: unable to get event loop)r;   	Exceptionr   r#   s    r   r$   zTaskManager.get_event_loop   s'     ||PQQ||   r   r%   r&   c                 V   fd}| j                   st        d      | j                   j                  j                   |             }|j	                         |j                  | j                         | j                  t        |             t        j                   d       |S )a  Creates and schedules a new asyncio Task that runs the given coroutine.

        The task is added to a global set of created tasks.

        Args:
            coroutine: The coroutine to be executed within the task.
            name: The name to assign to the task for identification.

        Returns:
            The created task object.

        Raises:
            Exception: If the task manager is not properly set up.
        c            
      X  K   	  d {   S 7 # t         j                  $ r t        j                   d        t        $ rb} t        j                  | j                        }|d   }t        j                   d|j                   d|j                   d|         Y d } ~ y d } ~ ww xY ww)Nz: task cancelledz unexpected exception (:): )r   CancelledErrorr
   tracer@   	traceback
extract_tb__traceback__errorfilenamelineno)etblastr%   r&   s      r   run_coroutinez.TaskManager.create_task.<locals>.run_coroutine   s     	b&&)) v%567 b))!//:"vv%<T]]O1T[[MY\]^\_`aabs8   B*  B* 4B'AB"B*"B''B*r?   )r)   z: task created)r;   r@   r   r(   set_nameadd_done_callback_task_done_handler	_add_taskr6   r
   rG   )r   r%   r&   rQ   r)   s    ``  r   r(   zTaskManager.create_task   s     
	b ||PQQ||  ,,]_=dt667xT*+v^,-r   r)   r*   c           
        K   |j                         }|j                          	 |r t        j                  ||       d{    y| d{    y7 7 # t        j                  $ r t        j                  | d       Y yt        j                  $ r Y yt        $ rb}t        j                  |j                        }|d   }t        j                  | d|j                   d|j                   d|        Y d}~yd}~wt        $ r^}t        j                  |j                        }|d   }t        j                   | d|j                   d|j                   d|         d}~ww xY ww)	r,   )r*   Nz&: timed out waiting for task to cancelrC   z- unexpected exception while cancelling task (rD   rE   z- fatal base exception while cancelling task ()get_namecancelr   wait_forTimeoutErrorr
   warningrF   r@   rH   rI   rJ   rK   rL   rM   BaseExceptioncritical)r   r)   r*   r&   rN   rO   rP   s          r   r-   zTaskManager.cancel_task   sJ     }}	&&tW===

 >## 	LNNdV#IJK%% 	 	%%aoo6Bb6DLL&Edmm_TUVZVaVaUbbefgehi   	%%aoo6Bb6DOO&Edmm_TUVZVaVaUbbefgehi 	s   !E(A  AA E(A AA E(A A +E% E(E%E(E%AC;6E(;E%AE  E%%E(c                 p    | j                   j                         D cg c]  }|j                   c}S c c}w )r/   )r:   valuesr)   )r   datas     r   r0   zTaskManager.current_tasks   s)     '+kk&8&8&:;d		;;;s   3	task_datac                 V    |j                   j                         }|| j                  |<   y)zfAdd a task to the internal registry.

        Args:
            task_data: The task metadata.
        N)r)   rW   r:   )r   ra   r&   s      r   rU   zTaskManager._add_task   s$     ~~&&(%Dr   c                     |j                         }	 | j                  |= y# t        $ r$}t        j                  | d|        Y d}~yd}~ww xY w)zHandle task completion by removing the task from the registry.

        Args:
            task: The completed asyncio task.
        z1: unable to remove task data (already removed?): N)rW   r:   KeyErrorr
   rG   )r   r)   r&   rN   s       r   rT   zTaskManager._task_done_handler   sP     }}	XD! 	XLLD6!RSTRUVWW	Xs     	AAA)r!   Nr1   )r   r   r   r   r<   r   r   r   r   r$   r   r2   r3   r(   r   r4   r-   r	   r0   r6   rU   rT   r   r   r   r8   r8   k   s    9
"- "! 9 9 !$Y $c $gll $L"gll "Xe_ "H<x5 <&8 &
Xw|| 
Xr   r8   )r   r   rH   abcr   r   dataclassesr   typingr   r   r   r	   logurur
   r   r   r6   r8   r   r   r   <module>ri      sp      # ! 6 6  $ $ $;c ;|   HX/ HXr   