
    qi}/                       d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
mZmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ d dl	mZ erddlmZ dZe G d d             Ze G d de             Z G d d      Z e G d de             Z! G d d      Z" G d d      Z# G d de#      Z$ G d de#      Z%ee e&gdf   Z'ee"e&gdf   Z(y)    )annotationsN)Callable)	dataclass)AsyncIteratorOptionalDictList   )
DataStream)ffi_pb2)room_pb2)	FfiClient)
split_utf8)TYPE_CHECKING)LocalParticipanti:  c                  J    e Zd ZU ded<   ded<   ded<   ded<   ded<   d	ed
<   y)BaseStreamInfostr	stream_id	mime_typetopicint	timestampzOptional[int]sizeOptional[Dict[str, str]]
attributesN__name__
__module____qualname____annotations__     I/opt/pipecat/venv/lib/python3.12/site-packages/livekit/rtc/data_stream.pyr   r   %   s"    NNJN
((r#   r   c                      e Zd ZU ded<   y)TextStreamInfoz	List[str]attachmentsNr   r"   r#   r$   r&   r&   /   s    r#   r&   c                  V    e Zd Z	 	 	 	 d	dZd
dZddZddZddZedd       Z	ddZ
y)TextStreamReaderc                6   || _         t        |j                  |j                  |j                  |j
                  |j                  t        |j                        t        |j                  j                              | _        t        j                         | _        y )Nr   r   r   r   r   r   r'   )_headerr&   r   r   r   r   total_lengthdictr   listtext_headerattached_stream_ids_infoasyncioQueue_queue)selfheaders     r$   __init__zTextStreamReader.__init__5   su     #&&&&,,&&$$F--.V//CCD

 ELMMOr#   c                V   K   | j                   j                  |       d {    y 7 wNr5   putr6   chunks     r$   _on_chunk_updatez!TextStreamReader._on_chunk_updateE        kkooe$$$   )')c                  K   | j                   j                  xs i | j                   _        | j                   j                  j                  |j                         | j                  j	                  d        d {    y 7 wr:   infor   updater5   r<   r6   trailers     r$   _on_stream_closez!TextStreamReader._on_stream_closeH   U     #yy339r				##G$6$67kkood###   A7B9A?:Bc                    | S r:   r"   r6   s    r$   	__aiter__zTextStreamReader.__aiter__M       r#   c                   K   | j                   j                          d {   }|t        |j                  j	                         }|S 7 (wr:   )r5   getStopAsyncIterationcontentdecode)r6   item
decodedStrs      r$   	__anext__zTextStreamReader.__anext__P   sA     [[__&&<$$\\((*
	 's   AA	)Ac                    | j                   S r:   r2   rL   s    r$   rD   zTextStreamReader.infoW       zzr#   c                :   K   d}| 2 3 d {   }||z  }7 
6 |S w)N r"   )r6   final_stringr>   s      r$   read_allzTextStreamReader.read_all[   s2      	" 	"%E!L	"4s   N)r7   proto_DataStream.HeaderreturnNoner>   zproto_DataStream.ChunkrG   zproto_DataStream.Trailer)r_   zAsyncIterator[str])r_   r   r_   r&   )r   r   r    r8   r?   rH   rM   rV   propertyrD   r]   r"   r#   r$   r)   r)   4   sI    T'T 
T %$
  r#   r)   c                      e Zd ZU ded<   y)ByteStreamInfor   nameNr   r"   r#   r$   rf   rf   b   s    
Ir#   rf   c                  H    e Zd Zdd	dZd
dZddZddZddZedd       Z	y)ByteStreamReaderc           
     &   || _         t        |j                  |j                  |j                  |j
                  |j                  t        |j                        |j                  j                        | _        t        j                  |      | _        y )Nr   r   r   r   r   r   rg   )r,   rf   r   r   r   r   r-   r.   r   byte_headerrg   r2   r3   r4   r5   )r6   r7   capacitys      r$   r8   zByteStreamReader.__init__h   sr    #&&&&,,&&$$F--.##((

 ELMMRZD[r#   c                V   K   | j                   j                  |       d {    y 7 wr:   r;   r=   s     r$   r?   z!ByteStreamReader._on_chunk_updateu   r@   rA   c                  K   | j                   j                  xs i | j                   _        | j                   j                  j                  |j                         | j                  j	                  d        d {    y 7 wr:   rC   rF   s     r$   rH   z!ByteStreamReader._on_stream_closex   rI   rJ   c                    | S r:   r"   rL   s    r$   rM   zByteStreamReader.__aiter__}   rN   r#   c                z   K   | j                   j                          d {   }|t        |j                  S 7 wr:   )r5   rP   rQ   rR   )r6   rT   s     r$   rV   zByteStreamReader.__anext__   s3     [[__&&<$$||	 's   ;9;c                    | j                   S r:   rX   rL   s    r$   rD   zByteStreamReader.info   rY   r#   N)r   )r7   r^   rm   r   r_   r`   ra   rb   )r_   zAsyncIterator[bytes])r_   bytesr_   rf   )
r   r   r    r8   r?   rH   rM   rV   rd   rD   r"   r#   r$   ri   ri   g   s0    \%$
  r#   ri   c                  h    e Zd Zdi dddddf	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 d	dZd Zd
dZddZdddddZy)BaseStreamWriterr[   Nc	                h   || _         |t        t        j                               }t	        t
        j
                  j                         j                         dz        }	t        j                  ||	||||      | _
        d| _        || _        |xs | j                   j                  | _        d| _        y )Ni  )r   r   r   r   r   r-   r   F)_local_participantr   uuiduuid4r   datetimenowr   proto_DataStreamHeaderr,   _next_chunk_index_destination_identitiesidentity_sender_identity_closed)
r6   local_participantr   r   r   
total_sizer   destination_identitiessender_identityr   s
             r$   r8   zBaseStreamWriter.__init__   s     #4DJJL)I))--/99;dBC	'..!#
 '('=$ / S43J3J3S3Sr#   c                  K   t        j                  t        j                  | j                  | j
                  j                  j                  | j                  | j                              }t        j                  j                  j                         }	 t        j                  j                  |      |j                  fd       d {   }t        j                  j                  j!                  |       |j"                  j$                  rt'        |j"                  j$                        y 7 c# t        j                  j                  j!                  |       w xY ww)N)r7   local_participant_handler   r   )send_stream_headerc                ^    | j                   j                  j                   j                  k(  S r:   )r   async_ideresps    r$   <lambda>z/BaseStreamWriter._send_header.<locals>.<lambda>   s$    !..774;R;R;[;[[ r#   )	proto_ffi
FfiRequest
proto_roomSendStreamHeaderRequestr,   rx   _ffi_handlehandler   r   r   instancequeue	subscriberequestwait_forunsubscriber   errorConnectionError)r6   reqr   cbr   s       @r$   _send_headerzBaseStreamWriter._send_header   s	    "")AA||)-)@)@)L)L)S)S'+'C'C $ 5 5	 
 ""((224	8%%--c2D+0>>[, &B $$007  &&!""7"7"="=>> '& $$007s1   BE 6D2 D0D2 AE 0D2 2+EE c                  K   | j                   rt        d|       t        j                  t	        j
                  || j                  j                  j                  | j                  j                  | j                              }t        j                  j                  j                         }	 t        j                  j                  |      |j!                  fd       d {   }t        j                  j                  j#                  |       |j$                  j&                  rt)        |j$                  j&                        y 7 c# t        j                  j                  j#                  |       w xY ww)Nz*Cannot send chunk after stream is closed: )r>   r   r   r   )send_stream_chunkc                ^    | j                   j                  j                   j                  k(  S r:   )r   r   r   s    r$   r   z.BaseStreamWriter._send_chunk.<locals>.<lambda>   s$    !--66$:P:P:Y:YY r#   )r   RuntimeErrorr   r   r   SendStreamChunkRequestrx   r   r   r   r   r   r   r   r   r   r   r   r   r   r   )r6   r>   r   r   r   r   s        @r$   _send_chunkzBaseStreamWriter._send_chunk   s%    <<!KE7STT""(??)-)@)@)L)L)S)S $ 7 7 @ @'+'C'C	
 ""((224	8%%--c2D+0>>Y, &B $$007%%!""6"6"<"<== && $$007s1   B,E:06E &E
'E +AE:
E +E77E:c                  K   t        j                  t        j                  || j                  j
                  j                  | j                  j                              }t        j                  j                  j                         }	 t        j                  j                  |      |j                  fd       d {   }t        j                  j                  j                  |       |j                  j                   rt#        |j$                  j                         y 7 c# t        j                  j                  j                  |       w xY ww)N)rG   r   r   )send_stream_trailerc                ^    | j                   j                  j                   j                  k(  S r:   )r   r   r   s    r$   r   z0BaseStreamWriter._send_trailer.<locals>.<lambda>   s$    !//88D<T<T<]<]] r#   )r   r   r   SendStreamTrailerRequestrx   r   r   r   r   r   r   r   r   r   r   r   r   r   r   )r6   rG   r   r   r   r   s        @r$   _send_trailerzBaseStreamWriter._send_trailer   s    "" * C C)-)@)@)L)L)S)S $ 7 7 @ @!
 ""((224	8%%--c2D+0>>], &B $$007%%!""8"8">">?? && $$007s1   BE6D' D%D' AE%D' '+EE)reasonr   c                  K   | j                   rt        d      d| _         | j                  t        j                  | j
                  j                  ||             d {    y 7 w)NzStream already closedT)r   r   r   )rG   )r   r   r   r}   Trailerr,   r   )r6   r   r   s      r$   aclosezBaseStreamWriter.aclose   s\     <<677  $,,,,00J ! 
 	
 	
s   AA( A&!A()r   r   r   r   r   r   r   
str | Noner   
int | Noner   r   r   Optional[List[str]]r   r   ra   rb   )r   r   r   r   )r   r   r    r8   r   r   r   r   r"   r#   r$   rv   rv      s     /1 $!%6:&*+  -	
    !4 $8?,>0@* -/W[ 
r#   rv   c                  n     e Zd Zdi dddddd	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 d fdZddZed	d       Z xZS )
TextStreamWriterr[   N)r   r   r   r   reply_to_idr   r   c                  t         	|   |||||d||       t        j                  j                  | j
                  j                  _        |r|| j
                  j                  _        t        | j
                  j                  | j
                  j                  | j
                  j                  | j
                  j                  | j
                  j                  t        | j
                  j                         t#        | j
                  j                  j$                              | _        t)        j*                         | _        y )Nz
text/plain)r   r   r   r+   )superr8   r}   OperationTypeCREATEr,   r0   operation_typereply_to_stream_idr&   r   r   r   r   r-   r.   r   r/   r1   r2   r3   Lock_write_lock)
r6   r   r   r   r   r   r   r   r   	__class__s
            r$   r8   zTextStreamWriter.__init__   s     	"#9+ 	 		
 3C2P2P2W2W  /:EDLL$$7#ll,,ll,,,,$$ll,,**DLL334T\\55IIJ

 #<<>r#   c                  K   | j                   4 d {    t        |t              D ]j  }|}| j                  }| xj                  dz  c_        t	        j
                  | j                  j                  ||      }| j                  |       d {    l d d d       d {    y 7 7 7 	# 1 d {  7  sw Y   y xY ww)Nr
   r   chunk_indexrR   )	r   r   STREAM_CHUNK_SIZEr   r}   Chunkr,   r   r   )r6   textr>   rR   r   	chunk_msgs         r$   writezTextStreamWriter.write  s     ## 
	2 
	2#D*;< 	2"44&&!+&,22"ll44 +#	
 &&y111	2
	2 
	2 
	2 2
	2 
	2 
	2 
	2sW   CB&CA7B,B(B,C B*!C(B,*C,B>2B53B>:Cc                    | j                   S r:   rX   rL   s    r$   rD   zTextStreamWriter.info)  rY   r#   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r_   r`   )r   r   rc   r   r   r    r8   r   rd   rD   __classcell__r   s   @r$   r   r      s    
 /1 $!%"&6:&*"*+"* 	"*
 -"* "* "*  "* !4"* $"* 
"*H2  r#   r   c                  l     e Zd Zddddddd	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 d fdZd	dZed
d       Z xZS )ByteStreamWriterr[   Nzapplication/octet-stream)r   r   r   r   r   r   c          
        t         	|   |||||||       || j                  j                  _        t        | j                  j                  | j                  j                  | j                  j                  | j                  j                  | j                  j                  t        | j                  j                        | j                  j                  j                        | _        t        j                         | _        y )N)r   r   rk   )r   r8   r,   rl   rg   rf   r   r   r   r   r-   r.   r   r2   r3   r   r   )
r6   r   rg   r   r   r   r   r   r   r   s
            r$   r8   zByteStreamWriter.__init__/  s     	#9 	 	
 )-  %#ll,,ll,,,,$$ll,,**DLL334))..

 #<<>r#   c                  K   | j                   4 d {    t        dt        |      t              D cg c]  }|||t        z     }}|D ]f  }t	        j
                  | j                  j                  | j                  |      }| j                  |       d {    | xj                  dz  c_        h d d d       d {    y 7 c c}w 7 37 # 1 d {  7  sw Y   y xY ww)Nr   r   r
   )
r   rangelenr   r}   r   r,   r   r   r   )r6   dataichunked_datar>   r   s         r$   r   zByteStreamWriter.writeP  s     ## 	, 	,9>q#d)M^9_45Q../L  & ,,22"ll44 $ 6 6!	
 &&y111&&!+&,	, 	, 	, 2	, 	, 	, 	,sb   C%CC%CCACCC4C%?C C%CC%C"CC"C%c                    | j                   S r:   rX   rL   s    r$   rD   zByteStreamWriter.info_  rY   r#   )r   r   rg   r   r   r   r   r   r   r   r   r   r   r   r   r   r_   r`   )r   rs   rt   r   r   s   @r$   r   r   .  s     /3 $!%36:*+* 	*
 * -* * * * !4* 
*B,  r#   r   ))
__future__r   r3   ry   r{   collections.abcr   dataclassesr   typingr   r   r   r	   _proto.room_pb2r   r}   _protor   r   r   r   _ffi_clientr   _utilsr   r   participantr   r   r   r&   r)   rf   ri   rv   r   r   r   TextStreamHandlerByteStreamHandlerr"   r#   r$   <module>r      s    #    $ ! 6 6 ; ( * "    -  ) ) ) ^  + +\ ^  " "Jh
 h
V4' 4n3' 3l .4d:; .4d:; r#   