
    qi(                        d Z ddlZddlmZ ddlmZ 	 ddlmZ ddlm	Z	m
Z
 ddlmZmZmZmZ ddlmZ dd	lmZ dd
lmZ  G d d      Zy# e$ r7Z ej0                  de         ej0                  d        ede       dZ[ww xY w)a  AWS SageMaker bidirectional streaming client.

This module provides a client for streaming bidirectional communication with
SageMaker endpoints using the HTTP/2 protocol. Supports sending audio, text,
and JSON data to SageMaker model endpoints and receiving streaming responses.
    N)Optional)logger)SageMakerRuntimeHTTP2Client)ConfigHTTPAuthSchemeResolver)*InvokeEndpointWithBidirectionalStreamInputRequestPayloadPartRequestStreamEventPayloadPartResponseStreamEvent)SigV4AuthScheme)EnvironmentCredentialsResolver)DuplexEventStreamzException: zWIn order to use SageMaker BiDi client, you need to `pip install pipecat-ai[sagemaker]`.zMissing module: c            	           e Zd ZdZ	 	 ddedededefdZd Zd Zdd
ede	e   fdZ
defdZdefdZdefdZde	e   fdZd Zedefd       Zy	)SageMakerBidiClientap  Client for bidirectional streaming with AWS SageMaker endpoints.

    Handles low-level HTTP/2 bidirectional streaming protocol for communicating
    with SageMaker model endpoints. Provides methods for sending various data
    types (audio, text, JSON) and receiving streaming responses.

    This client uses AWS SigV4 authentication and supports credential resolution
    from environment variables, AWS CLI configuration, and instance metadata.

    Example::

        client = SageMakerBidiClient(
            endpoint_name="my-deepgram-endpoint",
            region="us-east-2",
            model_invocation_path="v1/listen",
            model_query_string="model=nova-3&language=en"
        )
        await client.start_session()
        await client.send_audio_chunk(audio_bytes)
        response = await client.receive_response()
        await client.close_session()
    endpoint_nameregionmodel_invocation_pathmodel_query_stringc                     || _         || _        || _        || _        d| d| _        d| _        d| _        d| _        d| _        y)a}  Initialize the SageMaker BiDi client.

        Args:
            endpoint_name: Name of the SageMaker endpoint to connect to.
            region: AWS region where the endpoint is deployed.
            model_invocation_path: API path for the model invocation (e.g., "v1/listen").
            model_query_string: Query string parameters for the model (e.g., "model=nova-3").
        zhttps://runtime.sagemaker.z.amazonaws.com:8443NF)	r   r   r   r   bidi_endpoint_client_stream_output_stream
_is_active)selfr   r   r   r   s        \/opt/pipecat/venv/lib/python3.12/site-packages/pipecat/services/aws/sagemaker/bidi_client.py__init__zSageMakerBidiClient.__init__?   sY     +%:""49&ATU>B  	 #    c           
         t        j                  d| j                          t        j                  d| j                          t	        t        j                  d      xr t        j                  d            }|st        j                  d       t        | j                  | j                  t               t               dt        d      i	      }t        |
      | _        y)a'  Initialize the SageMaker Runtime HTTP2 client with AWS credentials.

        Creates and configures the SageMaker Runtime HTTP2 client with SigV4
        authentication. Attempts to resolve AWS credentials from environment
        variables, AWS CLI configuration, or instance metadata.
        z/Initializing SageMaker BiDi client for region: zUsing endpoint URI: AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYzAWS credentials not found in environment variables. Attempting to use EnvironmentCredentialsResolver which will check AWS CLI configuration and instance metadata.zaws.auth#sigv4	sagemaker)service)endpoint_urir   !aws_credentials_identity_resolverauth_scheme_resolverauth_schemes)configN)r   debugr   r   boolosgetenvwarningr   r   r   r   r   r   )r   has_env_credsr(   s      r   _initialize_clientz&SageMakerBidiClient._initialize_clientZ   s     	Ft{{mTU+D,>,>+?@A RYY':;b		Ja@bcNN? ++;;.L.N!7!9*OK,PQ
 3&Ar   c                   K   | j                   s| j                          t        j                  d| j                          t        j                  d| j
                          t        j                  d| j                          t        | j                  | j
                  | j                        }	 | j                   j                  |       d{   | _	        d| _
        | j                  j                          d{   }|d   | _        t        j                  d       | j                  S 7 ]7 1# t        $ r2}t        j                  d	|        d
| _
        t        d|       d}~ww xY ww)a  Start a bidirectional streaming session with the SageMaker endpoint.

        Initializes the client if needed, creates the bidirectional stream, and
        establishes the connection to the SageMaker endpoint. Must be called
        before sending or receiving data.

        Returns:
            The output stream for receiving responses.

        Raises:
            RuntimeError: If client initialization or connection fails.
        z%Starting BiDi session with endpoint: zModel invocation path: zModel query string: )r   r   r   NT   z!BiDi session started successfullyzFailed to start BiDi session: Fz(Failed to start SageMaker BiDi session: )r   r/   r   r)   r   r   r   r   )invoke_endpoint_with_bidirectional_streamr   r   await_outputr   	ExceptionerrorRuntimeError)r   stream_inputoutputes       r   start_sessionz!SageMakerBidiClient.start_sessionw   sI     ||##%<T=O=O<PQR.t/I/I.JKL+D,C,C+DEF B,,"&"<"<#66
	O!%!W!W" DL #DO  <<4466F"()DLL<=&&& 7  	OLL9!=>#DO!I!MNN	OsN   B/E02D2 D.-D2 >D0?.D2 -E0.D2 0D2 2	E-;-E((E--E0N
data_bytes	data_typec                 8  K   | j                   r| j                  st        d      	 t        ||      }t	        |      }| j                  j
                  j                  |       d{    y7 # t        $ r}t        j                  d|         d}~ww xY ww)a  Send a chunk of data to the stream.

        Generic method for sending any type of data to the SageMaker endpoint.
        Use the convenience methods (send_audio_chunk, send_text, send_json)
        for common data types.

        Args:
            data_bytes: Raw bytes to send.
            data_type: Optional data type header. Common values are "BINARY" for
                audio/binary data and "UTF8" for text/JSON data.

        Raises:
            RuntimeError: If session is not active or send fails.
        BiDi session not active)bytes_r<   )valueNzFailed to send data: )
r   r   r6   r	   r
   input_streamsendr4   r   r5   )r   r;   r<   payloadeventr9   s         r   	send_datazSageMakerBidiClient.send_data   s      dll899	(
iPG1@E,,++00777 	LL045	s;   $BAA0 (A.)A0 -B.A0 0	B9BBBaudio_bytesc                 F   K   | j                  |d       d{    y7 w)aG  Send a chunk of audio data to the stream.

        Convenience method for sending audio data. Automatically sets the data
        type to "BINARY".

        Args:
            audio_bytes: Raw audio bytes to send (e.g., PCM audio data).

        Raises:
            RuntimeError: If session is not active or send fails.
        BINARYr<   N)rE   )r   rF   s     r   send_audio_chunkz$SageMakerBidiClient.send_audio_chunk   s      nn[Hn===s   !!textc                 d   K   | j                  |j                  d      d       d{    y7 w)a4  Send text data to the stream.

        Convenience method for sending text data. Automatically encodes the text
        as UTF-8 and sets the data type to "UTF8".

        Args:
            text: Text string to send.

        Raises:
            RuntimeError: If session is not active or send fails.
        utf-8UTF8rI   N)rE   encode)r   rK   s     r   	send_textzSageMakerBidiClient.send_text   s'      nnT[[1VnDDDs   &0.0datac                    K   ddl }| j                  |j                  |      j                  d      d       d{    y7 w)a  Send JSON data to the stream.

        Convenience method for sending JSON-encoded messages. Useful for control
        messages like KeepAlive or CloseStream. Automatically serializes the
        dictionary to JSON, encodes as UTF-8, and sets the data type to "UTF8".

        Args:
            data: Dictionary to send as JSON (e.g., {"type": "KeepAlive"}).

        Raises:
            RuntimeError: If session is not active or send fails.
        r   NrM   rN   rI   )jsonrE   dumpsrO   )r   rQ   rS   s      r   	send_jsonzSageMakerBidiClient.send_json   s5      	nnTZZ-44W=nPPPs   9AAAreturnc                    K   | j                   r| j                  st        d      	 | j                  j                          d{   }|S 7 # t        $ r}t        j                  d|         d}~ww xY ww)aS  Receive a response from the stream.

        Blocks until a response is available from the SageMaker endpoint. Returns
        None when the stream is closed.

        Returns:
            The response event containing payload data, or None if stream is closed.

        Raises:
            RuntimeError: If session is not active.
        r>   NzFailed to receive response: )r   r   r6   receiver4   r   r5   )r   resultr9   s      r   receive_responsez$SageMakerBidiClient.receive_response   so      d&9&9899	..6688FM 9 	LL7s;<	s:   $A7A AA 
A7A 	A4A//A44A7c                 Z  K   | j                   syt        j                  d       d| _         	 | j                  r,| j                  j                  j                          d{    t        j                  d       y7 # t        $ r"}t        j                  d|        Y d}~yd}~ww xY ww)zClose the bidirectional streaming session.

        Gracefully closes the input stream and marks the session as inactive.
        Safe to call multiple times.
        NzClosing BiDi session...Fz BiDi session closed successfullyzError closing BiDi session: )r   r   r)   r   rA   closer4   r-   )r   r9   s     r   close_sessionz!SageMakerBidiClient.close_session  s      ./	?||ll//55777LL;< 8 	?NN9!=>>	?s@   *B+3A=  A;!A= :B+;A= =	B(B#B+#B((B+c                     | j                   S )z|Check if the session is currently active.

        Returns:
            True if session is active, False otherwise.
        )r   )r   s    r   	is_activezSageMakerBidiClient.is_active  s     r   ) r`   )N)__name__
__module____qualname____doc__strr   r/   r:   bytesr   rE   rJ   rP   dictrU   r   rZ   r]   propertyr*   r_    r   r   r   r   '   s    6 &("$      #	 
   6B:+OZ% HSM 4>% >EC EQD Q"1D(E ,?& 4  r   r   )rd   r+   typingr   logurur   &aws_sdk_sagemaker_runtime_http2.clientr   &aws_sdk_sagemaker_runtime_http2.configr   r   &aws_sdk_sagemaker_runtime_http2.modelsr   r	   r
   r   smithy_aws_core.auth.sigv4r   smithy_aws_core.identityr   smithy_core.aio.eventstreamr   ModuleNotFoundErrorr9   r5   r4   r   ri   r   r   <module>rs      s    
  ,RU  ;G=t t  ,FLL;qc"#FLLa &qc*
++,s   ,A B2BB