
    qiD                         d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlZddlmZmZ ddlmZmZ  e j$                  d      Z G d	 d
e      Z G d de      Zy)    N)suppress)datetime)quote   )AbstractBufferedFileAbstractFileSystem)infer_storage_optionstokenizewebhdfsc                   N    e Zd ZdZ e ej                               ZdZ	 	 	 	 	 	 	 	 	 	 	 d fd	Z	e
d        Zd Zd dZ	 	 	 	 	 d!dZed        Zed	        Zed
        Zd Zd Zd Zd"dZd Zd Zd Zd#dZd Zd Zd Zd$dZd Z d Z!d"dZ"d Z#d"dZ$d Z%d Z&d Z' xZ(S )%WebHDFSa  
    Interface to HDFS over HTTP using the WebHDFS API. Supports also HttpFS gateways.

    Four auth mechanisms are supported:

    insecure: no auth is done, and the user is assumed to be whoever they
        say they are (parameter ``user``), or a predefined value such as
        "dr.who" if not given
    spnego: when kerberos authentication is enabled, auth is negotiated by
        requests_kerberos https://github.com/requests/requests-kerberos .
        This establishes a session based on existing kinit login and/or
        specified principal/password; parameters are passed with ``kerb_kwargs``
    token: uses an existing Hadoop delegation token from another secured
        service. Indeed, this client can also generate such tokens when
        not insecure. Note that tokens expire, but can be renewed (by a
        previously specified user) and may allow for proxying.
    basic-auth: used when both parameter ``user`` and parameter ``password``
        are provided.

    )r   webHDFSc                    | j                   ryt        |   di | |
rdnd d| d| d| _        || _        |xs i | _        i | _        |	xs i | _        |||t        d      || j                  d<   || _	        || _
        ||t        d	      ||| j                  d
<   ||| j                  d<   |r|t        d      || _        || _        | j                          dt        ||       | _        y)a  
        Parameters
        ----------
        host: str
            Name-node address
        port: int
            Port for webHDFS
        kerberos: bool
            Whether to authenticate with kerberos for this connection
        token: str or None
            If given, use this token on every call to authenticate. A user
            and user-proxy may be encoded in the token and should not be also
            given
        user: str or None
            If given, assert the user name to connect with
        password: str or None
            If given, assert the password to use for basic auth. If password
            is provided, user must be provided also
        proxy_to: str or None
            If given, the user has the authority to proxy, and this value is
            the user in who's name actions are taken
        kerb_kwargs: dict
            Any extra arguments for HTTPKerberosAuth, see
            `<https://github.com/requests/requests-kerberos/blob/master/requests_kerberos/kerberos_.py>`_
        data_proxy: dict, callable or None
            If given, map data-node addresses. This can be necessary if the
            HDFS cluster is behind a proxy, running on Docker or otherwise has
            a mismatch between the host-names given by the name-node and the
            address by which to refer to them from the client. If a dict,
            maps host names ``host->data_proxy[host]``; if a callable, full
            URLs are passed, and function must conform to
            ``url->data_proxy(url)``.
        use_https: bool
            Whether to connect to the Name-node using HTTPS instead of HTTP
        session_cert: str or Tuple[str, str] or None
            Path to a certificate file, or tuple of (cert, key) files to use
            for the requests.Session
        session_verify: str, bool or None
            Path to a certificate file to use for verifying the requests.Session.
        kwargs
        Nhttpshttpz://:z/webhdfs/v1z_If passing a delegation token, must not set user or proxy_to, as these are encoded in the token
delegationzQIf passing a password, the user must also beset in order to set up the basic-authz	user.namedoaszJIf using Kerberos auth, do not specify the user, this is handled by kinit.webhdfs_ )_cachedsuper__init__urlkerbkerb_kwargsparsproxy
ValueErroruserpasswordsession_certsession_verify_connectr
   _fsid)selfhostportkerberostokenr    r!   proxy_tor   
data_proxy	use_httpsr"   r#   kwargs	__class__s                 P/opt/pipecat/venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.pyr   zWebHDFS.__init__.   s9   r <<"6"!*g7s4&${S	&,"	%2
8#7  
 ',DIIl#	 | < 
 )-		+& (DIIf(2 
 ),t 456
    c                     | j                   S N)r%   r&   s    r0   fsidzWebHDFS.fsid   s    zzr1   c                    t        j                         | _        | j                  r| j                  | j                  _        | j
                  | j                  _        | j                  r'ddlm	}  |di | j                  | j                  _        | j                  @| j                  3ddlm}  || j                  | j                        | j                  _        y y y )Nr   )HTTPKerberosAuth)HTTPBasicAuthr   )requestsSessionsessionr"   certr#   verifyr   requests_kerberosr7   r   authr    r!   requests.authr8   )r&   r7   r8   s      r0   r$   zWebHDFS._connect   s    '') $ 1 1DLL"1199: 0 D43C3C DDLL99 T]]%>3 -dii GDLL &? r1   c                    || j                  |      nd}| j                  | j                  t        |d      z         }|j	                         }|j                  | j                         |j                         |d<   t        j                  d||       | j                  j                  |j                         ||||      }	|	j                  dv rY	 |	j                         }
|
d   d	   }|
d   d
   }|dv rt        |      |dv rt        |      |dv rt!        |      t#        |      |	j'                          |	S # t        t$        f$ r Y #w xY w)N z/=)safeopzsending %s with %s)methodr   paramsdataallow_redirects)i  i  i  i  i  RemoteExceptionmessage	exception)IllegalArgumentExceptionUnsupportedOperationException)SecurityExceptionAccessControlException)FileNotFoundException)_strip_protocol_apply_proxyr   r   copyupdater   upperloggerdebugr;   requeststatus_codejsonr   PermissionErrorFileNotFoundErrorRuntimeErrorKeyErrorraise_for_status)r&   rD   rE   pathrG   redirectr.   r   argsouterrmsgexps                r0   _callzWebHDFS._call   sS   -1-=t##D)25D+A AB{{}DIIXXZT
)37ll""<<>$ # 
 ??77,hhj+,Y7+,[9 WW$S/)KK)#..55+C00&s++
 ) s    D; ;EEc           
      \    |xs | j                   }t        | |||| j                  |||      S )a^  

        Parameters
        ----------
        path: str
            File location
        mode: str
            'rb', 'wb', etc.
        block_size: int
            Client buffer size for read-ahead or write buffer
        autocommit: bool
            If False, writes to temporary file that only gets put in final
            location upon commit
        replication: int
            Number of copies of file on the cluster, write mode only
        permissions: str or int
            posix permissions, write mode only
        kwargs

        Returns
        -------
        WebHDFile instance
        )mode
block_sizetempdir
autocommitreplicationpermissions)	blocksize	WebHDFilerk   )r&   r`   ri   rj   rl   rm   rn   r.   s           r0   _openzWebHDFS._open   s<    B  14>>
!LL!##	
 		
r1   c                 B    | d   j                         | d<   | d   | d<   | S )Ntypelengthsize)lower)infos    r0   _process_infozWebHDFS._process_info   s*    F|))+VH~Vr1   c                     t        |      d   S )Nr`   )r	   )clsr`   s     r0   rQ   zWebHDFS._strip_protocol   s    $T*622r1   c                     t        |       }|j                  dd        |j                  dd        d|v r|j                  d      |d<   |S )Nr`   protocolusernamer    )r	   pop)urlpathrc   s     r0   _get_kwargs_from_urlszWebHDFS._get_kwargs_from_urls  sG    #G,
D!''*-CK
r1   c                 z    | j                  d|      }|j                         d   }||d<   | j                  |      S )NGETFILESTATUSr`   
FileStatusname)rg   rZ   rx   )r&   r`   rc   rw   s       r0   rw   zWebHDFS.info
  s>    jjtj4xxz,'V!!$''r1   c                     | j                  |      }|j                  dd      }|t        j                  |dz        S t	        d      )z=Return the created timestamp of a file as a datetime.datetimemodificationTimeN  z5Could not retrieve creation time (modification time).rw   getr   fromtimestampr]   r&   r`   rw   mtimes       r0   createdzWebHDFS.created  sI     yy+T2))%$,77RSSr1   c                     | j                  |      }|j                  dd      }|t        j                  |dz        S t	        d      )z>Return the modified timestamp of a file as a datetime.datetimer   Nr   z%Could not retrieve modification time.r   r   s       r0   modifiedzWebHDFS.modified  sG    yy+T2))%$,77BCCr1   c                    | j                  d|      }|j                         d   d   }|D ]0  }| j                  |       |j                  d      dz   |d   z   |d<   2 |rt	        |d 	      S t	        d
 |D              S )N
LISTSTATUSr   FileStatusesr   /
pathSuffixr   c                     | d   S )Nr   r   )is    r0   <lambda>zWebHDFS.ls.<locals>.<lambda>(  s
    qy r1   )keyc              3   &   K   | ]	  }|d      yw)r   Nr   ).0rw   s     r0   	<genexpr>zWebHDFS.ls.<locals>.<genexpr>*  s     94$v,9s   )rg   rZ   rx   rstripsorted)r&   r`   detailr.   rc   infosrw   s          r0   lsz
WebHDFS.ls!  s    jjDj1
>*<8 	GDt$;;s+c1D4FFDL	G %%89995999r1   c                 N    | j                  d|      }|j                         d   S )z8Total numbers of files, directories and bytes under pathGETCONTENTSUMMARYr   ContentSummaryrg   rZ   )r&   r`   rc   s      r0   content_summaryzWebHDFS.content_summary,  s'    jj,4j8xxz*++r1   c                 D   | j                  d|d      }d|j                  v r\| j                  |j                  d         }| j                  j	                  |      }|j                          |j                         d   S |j                          |j                         d   S )z/Checksum info of file, giving method and resultGETFILECHECKSUMF)r`   ra   LocationFileChecksum)rg   headersrR   r;   r   r_   rZ   )r&   r`   rc   locationout2s        r0   ukeyzWebHDFS.ukey1  s    jj*jF$((Z)@AH<<##H-D!!#99;~..  "88:n--r1   c                 J    | j                  d      }|j                         d   S )zGet user's home directoryGETHOMEDIRECTORYPathr   )r&   rc   s     r0   home_directoryzWebHDFS.home_directory=  s"    jj+,xxz&!!r1   c                     |r| j                  d|      }n| j                  d      }|j                         d   }|t        d      |d   S )zRetrieve token which can give the same authority to other uses

        Parameters
        ----------
        renewer: str or None
            User who may use this token; if None, will be current user
        GETDELEGATIONTOKEN)renewerTokenz1No token available for this user/security context	urlString)rg   rZ   r   )r&   r   rc   ts       r0   get_delegation_tokenzWebHDFS.get_delegation_tokenB  sT     **17*CC**12CHHJw9PQQ~r1   c                 P    | j                  dd|      }|j                         d   S )z/Make token live longer. Returns new expiry timeRENEWDELEGATIONTOKENputrE   r*   longr   )r&   r*   rc   s      r0   renew_delegation_tokenzWebHDFS.renew_delegation_tokenS  s(    jj/UjKxxz&!!r1   c                 ,    | j                  dd|       y)z Stop the token from being usefulCANCELDELEGATIONTOKENr   r   Nrg   )r&   r*   s     r0   cancel_delegation_tokenzWebHDFS.cancel_delegation_tokenX  s    

*5
Fr1   c                 .    | j                  dd||       y)a  Set the permission at path

        Parameters
        ----------
        path: str
            location to set (file or directory)
        mod: str or int
            posix epresentation or permission, give as oct string, e.g, '777'
            or 0o777
        SETPERMISSIONr   )rE   r`   
permissionNr   )r&   r`   mods      r0   chmodzWebHDFS.chmod\  s     	

?5t
Lr1   c                 N    i }|||d<   |||d<    | j                   dd|d| y)zChange owning user and/or groupNownergroupr   rE   r`   )SETOWNERr   )r&   r`   r   r   r.   s        r0   chownzWebHDFS.chowni  s=    #F7O#F7O

Ae$A&Ar1   c                 .    | j                  d|d|       y)a9  
        Set file replication factor

        Parameters
        ----------
        path: str
            File location (not for directories)
        replication: int
            Number of copies of file on the cluster. Should be smaller than
            number of data nodes; normally 3 on most systems.
        SETREPLICATIONr   )r`   rE   rm   Nr   )r&   r`   rm   s      r0   set_replicationzWebHDFS.set_replicationr  s     	

#$u+
Vr1   c                 ,    | j                  dd|       y )NMKDIRSr   r   r   r&   r`   r.   s      r0   mkdirzWebHDFS.mkdir  s    

8E
5r1   c                 f    |du r| j                  |      rt        |      | j                  |       y )NF)existsFileExistsErrorr   )r&   r`   exist_oks      r0   makedirszWebHDFS.makedirs  s,    uT!2!$''

4r1   c                 .    | j                  dd||       y )NRENAMEr   )rE   r`   destinationr   )r&   path1path2r.   s       r0   mvz
WebHDFS.mv  s    

8E5
Ir1   c                 B    | j                  dd||rd       y d       y )NDELETEdeletetruefalse)rE   r`   	recursiver   )r&   r`   r   r.   s       r0   rmz
WebHDFS.rm  s0    

 )f	 	 	
 07	 	 	
r1   c                 &    | j                  |       y r3   )r   r   s      r0   rm_filezWebHDFS.rm_file  s    r1   c                    | j                  |      5 }dj                  | j                  |      dt        j                  d       g      }	 | j                  |d      5 }t        j                  ||       d d d        | j                  ||       	 d d d        y # 1 sw Y   %xY w# t        $ r7 t        t              5  | j                  |       d d d         # 1 sw Y    xY ww xY w# 1 sw Y   y xY w)Nr   z.tmp.   wb)openjoin_parentsecrets	token_hexshutilcopyfileobjr   BaseExceptionr   r\   r   )r&   lpathrpathr.   lstream	tmp_fnamerstreams          r0   cp_filezWebHDFS.cp_file  s    YYu 	$,,u"5w?P?PQS?T>U7V!WXIYYy$/ 97&&w89	5)	 	
9 9 ! /0 'GGI&''	 	sM   9C(B%B5B%B"	B%%C%=C	C%C!C%%C((C1c                     | j                   r(t        | j                         r| j                  |      }|S | j                   r5| j                   j                         D ]  \  }}|j                  ||d      } |S )N   )r   callableitemsreplace)r&   r   kvs       r0   rR   zWebHDFS._apply_proxy  sk    ::(4::.zz(+H
 	 ZZ

((* 51#++Aq!45r1   )i  FNNNNNNFNT)r   NNT)rbNTNNFr3   )NN))__name__
__module____qualname____doc__strtempfile
gettempdirrk   r|   r   propertyr5   r$   rg   rq   staticmethodrx   classmethodrQ   r   rw   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rR   __classcell__r/   s   @r0   r   r      s-   * %(%%'(G#H
 c7J  H$H +
Z  
 3 3  (TD	:,

."
""
GMBW6
J
r1   r   c                   B     e Zd ZdZ fdZddZd Zd Zd Zd Z	 xZ
S )	rp   z"A file living in HDFS over webHDFSc                    t        |   ||fi | |j                         }|j                  dd       |j	                  dd        |j                  dd       |j	                  dd        |j	                  dd      | _        |j	                  d      }|j	                  dd      du rR| j                  | _        t        j                  j                  |t        t        j                                     | _        y y )Nrn   rm   i  rk   rl   F)r   r   rS   r   r~   rn   r`   targetosr   r  uuiduuid4)r&   fsr`   r.   rk   r/   s        r0   r   zWebHDFile.__init__  s    T,V,::mT*2JJ}d+::mT*2JJ}d+!::mS9**Y'::lE*e3))DKWc$**,.?@DI 4r1   c                     | j                   j                  j                  | j                  | j                  j                         ddi      }|j                          y)zWrite one part of a multi-block file upload

        Parameters
        ==========
        final: bool
            This is the last block, so should complete file, if
            self.autocommit is True.
        content-typeapplication/octet-stream)rG   r   T)r  r;   postr   buffergetvaluer_   )r&   finalrc   s      r0   _upload_chunkzWebHDFile._upload_chunk  sR     ggoo""MM%%'#%?@ # 

 	r1   c                 Z   | j                   j                         }d| j                  v rd\  }}n
d\  }}d|d<    | j                  j                  ||| j
                  fddi|}| j                  j                  |j                  d         }d	| j                  v r| j                  j                  j                  |d
di      }|j                           | j                  j                  dd| j
                  fddi|}| j                  j                  |j                  d         | _        yy)zCreate remote file/uploada)APPENDPOST)CREATEPUTr   	overwritera   Fr   wr  r  )r   r!  r"  N)r.   rS   ri   r  rg   r`   rR   r   r;   r   r_   r   )r&   r.   rD   rE   rc   r   r   s          r0   _initiate_uploadzWebHDFile._initiate_upload  s   !!#$)))JB(JB"(F;dggmmB		LELVL77''J(?@$))77??&&>3M"N ' D !!# 477==6499WuWPVWD GG00j1IJDM r1   c                    t        |d      }t        | j                  |      }||k\  s|| j                  k\  ry| j                  j	                  d| j
                  |||z
  d      }|j                          d|j                  v rY|j                  d   }| j                  j                  j                  | j                  j                  |            }|j                  S |j                  S )Nr   r1   OPENF)r`   offsetrt   ra   r   )maxminru   r  rg   r`   r_   r   r;   r   rR   content)r&   startendrc   r   r   s         r0   _fetch_rangezWebHDFile._fetch_range  s    E1$))S!C<5DII-ggmm5uu  
 	${{:.H77??&&tww';';H'EFD<<;;r1   c                 d    | j                   j                  | j                  | j                         y r3   )r  r   r`   r  r4   s    r0   commitzWebHDFile.commit  s    

499dkk*r1   c                 N    | j                   j                  | j                         y r3   )r  r   r`   r4   s    r0   discardzWebHDFile.discard  s    

499r1   r  )r  r  r  r  r   r  r'  r0  r2  r4  r  r  s   @r0   rp   rp     s&    ,A"K( +r1   rp   )loggingr  r   r   r	  r  
contextlibr   r   urllib.parser   r9   specr   r   utilsr	   r
   	getLoggerrV   r   rp   r   r1   r0   <module>r;     s\     	         ; 3			9	%V  VrI$ Ir1   