# -*- coding: utf-8 -*-
"Main module defining filesystem and file classes"
from __future__ import absolute_import

import ctypes
import logging
import os
import re
import warnings
import posixpath
from collections import deque

from .compatibility import FileNotFoundError, ConnectionError
from .conf import conf
from .utils import (read_block, seek_delimiter, ensure_bytes, ensure_string,
                    ensure_trailing_slash, MyNone)

logger = logging.getLogger(__name__)
_lib = None

[docs]class HDFileSystem(object): """ Connection to an HDFS namenode >>> hdfs = HDFileSystem(host='', port=8020) # doctest: +SKIP """ _first_pid = None def __init__(self, host=MyNone, port=MyNone, connect=True, autoconf=True, pars=None, **kwargs): """ Parameters ---------- host: str; port: int Overrides which take precedence over information in conf files and other passed parameters connect: bool (True) Whether to automatically attempt to establish a connection to the name-node. autoconf: bool (True) Whether to use the configuration found in the conf module as the set of defaults pars : {str: str} any parameters for hadoop, that you can find in hdfs-site.xml, This dict looks exactly like the one produced by conf - you can, for example, remove any problematic entries. kwargs: key/value Further override parameters. These are applied after the default conf and pars; the most typical things to set are: host : str (localhost) namenode hostname or IP address, in case of HA mode it is name of the cluster that can be found in "fs.defaultFS" option. port : int (8020) namenode RPC port usually 8020, in HA mode port mast be None user, ticket_cache, token : str kerberos things """ self.conf = conf.copy() if autoconf else {} if pars: self.conf.update(pars) self.conf.update(kwargs) if host is not MyNone: self.conf['host'] = host if port is not MyNone: self.conf['port'] = port self._handle = None if self.conf.get('ticket_cache') and self.conf.get('token'): m = "It is not possible to use ticket_cache and token at same time" raise RuntimeError(m) if connect: self.connect() def __getstate__(self): d = self.__dict__.copy() del d['_handle'] logger.debug("Serialize with state: %s", d) return d def __setstate__(self, state): self.__dict__.update(state) self._handle = None self.connect()
[docs] def connect(self): """ Connect to the name node This happens automatically at startup """ get_lib() conf = self.conf.copy() if self._handle: return if HDFileSystem._first_pid is None: HDFileSystem._first_pid = os.getpid() elif HDFileSystem._first_pid != os.getpid(): warnings.warn("Attempting to re-use hdfs3 in child process %d, " "but it was initialized in parent process %d. " "Beware that hdfs3 is not fork-safe and this may " "lead to bugs or crashes." % (os.getpid(), HDFileSystem._first_pid), RuntimeWarning, stacklevel=2) o = _lib.hdfsNewBuilder() _lib.hdfsBuilderSetNameNode(o, ensure_bytes(conf.pop('host'))) port = conf.pop('port', None) if port is not None: _lib.hdfsBuilderSetNameNodePort(o, port) user = conf.pop('user', None) if user is not None: _lib.hdfsBuilderSetUserName(o, ensure_bytes(user)) ticket_cache = conf.pop('ticket_cache', None) if ticket_cache is not None: _lib.hdfsBuilderSetKerbTicketCachePath(o, ensure_bytes(ticket_cache)) token = conf.pop('token', None) if token is not None: _lib.hdfsBuilderSetToken(o, ensure_bytes(token)) for par, val in conf.items(): if not _lib.hdfsBuilderConfSetStr(o, ensure_bytes(par), ensure_bytes(val)) == 0: warnings.warn('Setting conf parameter %s failed' % par) fs = _lib.hdfsBuilderConnect(o) _lib.hdfsFreeBuilder(o) if fs: logger.debug("Connect to handle %d", fs.contents.filesystem) self._handle = fs else: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise ConnectionError('Connection Failed: {}'.format(msg))
[docs] def delegate_token(self, user=None): """Generate delegate auth token. Parameters ---------- user: bytes/str User to pass to delegation (defaults to user supplied to instance); this user is the only one that can renew the token. """ if user is None and self.user is None: raise ValueError('Delegation requires a user') user = user or self.user out = _lib.hdfsGetDelegationToken(self._handle, ensure_bytes(user)) if out: self.token = out return out else: raise RuntimeError('Token delegation failed')
[docs] def renew_token(self, token=None): """ Renew delegation token Parameters ---------- token: str or None If None, uses the instance's token. It is an error to do that if there is no token. Returns ------- New expiration time for the token """ token = token or self.token if token is None: raise ValueError('There is no token to renew') return _lib.hdfsRenewDelegationToken(self._handle, ensure_bytes(token))
[docs] def cancel_token(self, token=None): """ Revoke delegation token Parameters ---------- token: str or None If None, uses the instance's token. It is an error to do that if there is no token. """ token = token or self.token if token is None: raise ValueError('There is no token to cancel') out = _lib.hdfsCancelDelegationToken(self._handle, ensure_bytes(token)) if out: raise RuntimeError('Token cancel failed') if token == self.token: # now our token is invalid - this FS may not work self.token = None
[docs] def disconnect(self): """ Disconnect from name node """ if self._handle: logger.debug("Disconnect from handle %d", self._handle.contents.filesystem) _lib.hdfsDisconnect(self._handle) self._handle = None
[docs] def open(self, path, mode='rb', replication=0, buff=0, block_size=0): """ Open a file for reading or writing Parameters ---------- path: string Path of file on HDFS mode: string One of 'rb', 'wb', or 'ab' replication: int Replication factor; if zero, use system default (only on write) buf: int (=0) Client buffer size (bytes); if 0, use default. block_size: int Size of data-node blocks if writing """ if not self._handle: raise IOError("Filesystem not connected") if block_size and mode != 'wb': raise ValueError('Block size only valid when writing new file') if ('a' in mode and self.exists(path) and replication != 0 and replication > 1): raise IOError("Appending to an existing file with replication > 1" " is unsupported") if 'b' not in mode: raise NotImplementedError("Text mode not supported, use mode='%s'" " and manage bytes" % (mode + 'b')) return HDFile(self, path, mode, replication=replication, buff=buff, block_size=block_size)
[docs] def du(self, path, total=False, deep=False): """Returns file sizes on a path. Parameters ---------- path : string where to look total : bool (False) to add up the sizes to a grand total deep : bool (False) whether to recurse into subdirectories """ fi =, True) if deep: for apath in fi: if apath['kind'] == 'directory': fi.extend(['name'], True)) if total: return {path: sum(f['size'] for f in fi)} return {p['name']: p['size'] for p in fi}
[docs] def df(self): """ Used/free disc space on the HDFS system """ cap = _lib.hdfsGetCapacity(self._handle) used = _lib.hdfsGetUsed(self._handle) return {'capacity': cap, 'used': used, 'percent-free': 100 * (cap - used) / cap}
[docs] def get_block_locations(self, path, start=0, length=0): """ Fetch physical locations of blocks """ if not self._handle: raise IOError("Filesystem not connected") start = int(start) or 0 length = int(length) or['size'] nblocks = ctypes.c_int(0) out = _lib.hdfsGetFileBlockLocations(self._handle, ensure_bytes(path), ctypes.c_int64(start), ctypes.c_int64(length), ctypes.byref(nblocks)) locs = [] for i in range(nblocks.value): block = out[i] hosts = [block.hosts[i] for i in range(block.numOfNodes)] locs.append({'hosts': hosts, 'length': block.length, 'offset': block.offset}) _lib.hdfsFreeFileBlockLocations(out, nblocks) return locs
[docs] def info(self, path): """ File information (as a dict) """ if not self.exists(path): raise FileNotFoundError(path) fi = _lib.hdfsGetPathInfo(self._handle, ensure_bytes(path)).contents out = info_to_dict(fi) _lib.hdfsFreeFileInfo(ctypes.byref(fi), 1) return ensure_string(out)
[docs] def isdir(self, path): """Return True if path refers to an existing directory.""" try: info = return info['kind'] == 'directory' except EnvironmentError: return False
[docs] def isfile(self, path): """Return True if path refers to an existing file.""" try: info = return info['kind'] == 'file' except EnvironmentError: return False
[docs] def walk(self, path): """Directory tree generator, see ``os.walk``""" full_dirs = [] dirs = [] files = [] for info in, True): name = info['name'] tail = posixpath.split(name)[1] if info['kind'] == 'directory': full_dirs.append(name) dirs.append(tail) else: files.append(tail) yield path, dirs, files for d in full_dirs: for res in self.walk(d): yield res
[docs] def glob(self, path): """ Get list of paths mathing glob-like pattern (i.e., with "*"s). If passed a directory, gets all contained files; if passed path to a file, without any "*", returns one-element list containing that filename. Does not support python3.5's "**" notation. """ path = ensure_string(path) try: f = if f['kind'] == 'directory' and '*' not in path: path = ensure_trailing_slash(path) + '*' else: return [f['name']] except IOError: pass if '/' in path[:path.index('*')]: ind = path[:path.index('*')].rindex('/') root = path[:ind + 1] else: root = '/' allpaths = [] for dirname, dirs, fils in self.walk(root): allpaths.extend(posixpath.join(dirname, d) for d in dirs) allpaths.extend(posixpath.join(dirname, f) for f in fils) pattern = re.compile("^" + path.replace('//', '/') .rstrip('/') .replace('*', '[^/]*') .replace('?', '.') + "$") return [p for p in allpaths if pattern.match(p.replace('//', '/').rstrip('/'))]
[docs] def ls(self, path, detail=False): """ List files at path Parameters ---------- path : string/bytes location at which to list files detail : bool (=True) if True, each list item is a dict of file properties; otherwise, returns list of filenames """ if not self.exists(path): raise FileNotFoundError(path) num = ctypes.c_int(0) fi = _lib.hdfsListDirectory(self._handle, ensure_bytes(path), ctypes.byref(num)) out = [ensure_string(info_to_dict(fi[i])) for i in range(num.value)] _lib.hdfsFreeFileInfo(fi, num.value) if detail: return out else: return [o['name'] for o in out]
@property def host(self): return self.conf.get('host', '') @property def port(self): return self.conf.get('port', '') def __repr__(self): if self._handle is None: state = 'Disconnected' else: state = 'Connected' return 'hdfs://%s:%s, %s' % (, self.port, state) def __del__(self): if self._handle: self.disconnect()
[docs] def mkdir(self, path): """ Make directory at path """ out = _lib.hdfsCreateDirectory(self._handle, ensure_bytes(path)) if out != 0: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError('Create directory failed: {}'.format(msg))
[docs] def makedirs(self, path, mode=0o711): """ Create directory together with any necessary intermediates """ out = _lib.hdfsCreateDirectoryEx(self._handle, ensure_bytes(path), ctypes.c_short(mode), 1) if out != 0: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError('Create directory failed: {}'.format(msg))
[docs] def set_replication(self, path, replication): """ Instruct HDFS to set the replication for the given file. If successful, the head-node's table is updated immediately, but actual copying will be queued for later. It is acceptable to set a replication that cannot be supported (e.g., higher than the number of data-nodes). """ if replication < 0: raise ValueError('Replication must be positive,' ' or 0 for system default') out = _lib.hdfsSetReplication(self._handle, ensure_bytes(path), ctypes.c_int16(int(replication))) if out != 0: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError('Set replication failed: {}'.format(msg))
[docs] def mv(self, path1, path2): """ Move file at path1 to path2 """ if not self.exists(path1): raise FileNotFoundError(path1) out = _lib.hdfsRename(self._handle, ensure_bytes(path1), ensure_bytes(path2)) return out == 0
[docs] def concat(self, destination, paths): """Concatenate inputs to destination Source files *should* all have the same block size and replication. The destination file must be in the same directory as the source files. If the target exists, it will be appended to. Some HDFSs impose that the target file must exist and be an exact number of blocks long, and that each concated file except the last is also a whole number of blocks. The source files are deleted on successful completion. """ if not self.exists(destination): self.touch(destination) arr = (ctypes.c_char_p * (len(paths) + 1))() arr[:-1] = [ensure_bytes(s) for s in paths] arr[-1] = ctypes.c_char_p() # NULL pointer out = _lib.hdfsConcat(self._handle, ensure_bytes(destination), arr) if out != 0: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError('Concat failed on %s %s' % (destination, msg))
[docs] def rm(self, path, recursive=True): "Use recursive for `rm -r`, i.e., delete directory and contents" if not self.exists(path): raise FileNotFoundError(path) out = _lib.hdfsDelete(self._handle, ensure_bytes(path), bool(recursive)) if out != 0: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError('Remove failed on %s %s' % (path, msg))
[docs] def exists(self, path): """ Is there an entry at path? """ out = _lib.hdfsExists(self._handle, ensure_bytes(path)) return out == 0
[docs] def chmod(self, path, mode): """Change access control of given path Exactly what permissions the file will get depends on HDFS configurations. Parameters ---------- path : string file/directory to change mode : integer As with the POSIX standard, each octal digit refers to user-group-all, in that order, with read-write-execute as the bits of each group. Examples -------- Make read/writeable to all >>> hdfs.chmod('/path/to/file', 0o777) # doctest: +SKIP Make read/writeable only to user >>> hdfs.chmod('/path/to/file', 0o700) # doctest: +SKIP Make read-only to user >>> hdfs.chmod('/path/to/file', 0o100) # doctest: +SKIP """ if not self.exists(path): raise FileNotFoundError(path) out = _lib.hdfsChmod(self._handle, ensure_bytes(path), ctypes.c_short(mode)) if out != 0: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError("chmod failed on %s %s" % (path, msg))
[docs] def chown(self, path, owner, group): """ Change owner/group """ if not self.exists(path): raise FileNotFoundError(path) out = _lib.hdfsChown(self._handle, ensure_bytes(path), ensure_bytes(owner), ensure_bytes(group)) if out != 0: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError("chown failed on %s %s" % (path, msg))
[docs] def cat(self, path): """ Return contents of file """ if not self.exists(path): raise FileNotFoundError(path) with, 'rb') as f: result = return result
[docs] def get(self, hdfs_path, local_path, blocksize=2**16): """ Copy HDFS file to local """ # TODO: _lib.hdfsCopy() may do this more efficiently if not self.exists(hdfs_path): raise FileNotFoundError(hdfs_path) with, 'rb') as f: with open(local_path, 'wb') as f2: out = 1 while out: out = f2.write(out)
[docs] def getmerge(self, path, filename, blocksize=2**16): """ Concat all files in path (a directory) to local output file """ files = with open(filename, 'wb') as f2: for apath in files: with, 'rb') as f: out = 1 while out: out = f2.write(out)
[docs] def put(self, filename, path, chunk=2**16, replication=0, block_size=0): """ Copy local file to path in HDFS """ with, 'wb', replication=replication, block_size=block_size) as target: with open(filename, 'rb') as source: while True: out = if len(out) == 0: break target.write(out)
[docs] def tail(self, path, size=1024): """ Return last bytes of file """ length = self.du(path)[ensure_trailing_slash(path)] if size > length: return with, 'rb') as f: - size) return
[docs] def head(self, path, size=1024): """ Return first bytes of file """ with, 'rb') as f: return
[docs] def touch(self, path): """ Create zero-length file """, 'wb').close()
[docs] def read_block(self, fn, offset, length, delimiter=None): """ Read a block of bytes from an HDFS file Starting at ``offset`` of the file, read ``length`` bytes. If ``delimiter`` is set then we ensure that the read starts and stops at delimiter boundaries that follow the locations ``offset`` and ``offset + length``. If ``offset`` is zero then we start at zero. The bytestring returned will not include the surrounding delimiter strings. If offset+length is beyond the eof, reads to eof. Parameters ---------- fn: string Path to filename on HDFS offset: int Byte offset to start read length: int Number of bytes to read delimiter: bytes (optional) Ensure reading starts and stops at delimiter bytestring Examples -------- >>> hdfs.read_block('/data/file.csv', 0, 13) # doctest: +SKIP b'Alice, 100\\nBo' >>> hdfs.read_block('/data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP b'Alice, 100\\nBob, 200' See Also -------- hdfs3.utils.read_block """ with, 'rb') as f: size =['size'] if offset + length > size: length = size - offset bytes = read_block(f, offset, length, delimiter) return bytes
[docs] def list_encryption_zones(self): """Get list of all the encryption zones""" x = ctypes.c_int(8) out = _lib.hdfsListEncryptionZones(self._handle, x) if not out: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError("EZ listing failed: %s" % msg) res = [struct_to_dict(out[i]) for i in range(x.value)] if res: _lib.hdfsFreeEncryptionZoneInfo(out, x) return res
def create_encryption_zone(self, path, key_name): out = _lib.hdfsCreateEncryptionZone(self._handle, ensure_bytes(path), ensure_bytes(key_name)) if out != 0: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError("EZ create failed: %s %s" % (path, msg))
def get_lib(): """ Import C-lib only on demand """ global _lib if _lib is None: from .lib import _lib as l _lib = l def struct_to_dict(s): """ Return dictionary views of a simple ctypes record-like structure """ return dict((ensure_string(name), getattr(s, name)) for (name, p) in s._fields_) def info_to_dict(s): """ Process data returned by hdfsInfo """ d = struct_to_dict(s) d['kind'] = {68: 'directory', 70: 'file'}[d['kind']] if d['encryption_info']: d['encryption_info'] = struct_to_dict(d['encryption_info'].contents) else: d['encryption_info'] = None return d mode_numbers = {'w': 1, 'r': 0, 'a': 1025, 'wb': 1, 'rb': 0, 'ab': 1025}
[docs]class HDFile(object): """ File on HDFS Matches the standard Python file interface. Examples -------- >>> with'/path/to/hdfs/file.txt') as f: # doctest: +SKIP ... bytes = # doctest: +SKIP >>> with'/path/to/hdfs/file.csv') as f: # doctest: +SKIP ... df = pd.read_csv(f, nrows=1000) # doctest: +SKIP """ def __init__(self, fs, path, mode, replication=0, buff=0, block_size=0): """ Called by open on a HDFileSystem """ if 't' in mode: raise NotImplementedError("Opening a file in text mode is not" " supported, use ``io.TextIOWrapper``.") self.fs = fs self.path = path self.replication = replication self.buff = buff self._fs = fs._handle self.buffers = [] self._handle = None self.mode = mode self.block_size = block_size self.lines = deque([]) self._set_handle() def _set_handle(self): out = _lib.hdfsOpenFile(self._fs, ensure_bytes(self.path), mode_numbers[self.mode], self.buff, ctypes.c_short(self.replication), ctypes.c_int64(self.block_size)) if not out: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError("Could not open file: %s, mode: %s %s" % (self.path, self.mode, msg)) self._handle = out
[docs] def read(self, length=None): """ Read bytes from open file """ if not _lib.hdfsFileIsOpenForRead(self._handle): raise IOError('File not read mode') buffers = [] if length is None: out = 1 while out: out =**16) buffers.append(out) else: while length: bufsize = min(2**16, length) p = ctypes.create_string_buffer(bufsize) ret = _lib.hdfsRead( self._fs, self._handle, p, ctypes.c_int32(bufsize)) if ret == 0: break if ret > 0: if ret < bufsize: buffers.append(p.raw[:ret]) elif ret == bufsize: buffers.append(p.raw) length -= ret else: raise IOError('Read file %s Failed:' % self.path, -ret) return b''.join(buffers)
[docs] def readline(self, chunksize=2**8, lineterminator='\n'): """ Return a line using buffered reading. A line is a sequence of bytes between ``'\n'`` markers (or given line-terminator). Line iteration uses this method internally. Note: this function requires many calls to HDFS and is slow; it is in general better to wrap an HDFile with an ``io.TextIOWrapper`` for buffering, text decoding and newline support. """ lineterminator = ensure_bytes(lineterminator) start = self.tell() seek_delimiter(self, lineterminator, chunksize, allow_zero=False) end = self.tell() return - start)
def _genline(self): while True: out = self.readline() if out: yield out else: raise StopIteration def __iter__(self): """ Enables `for line in file:` usage """ return self._genline() def __next__(self): """ Enables reading a file as a buffer in pandas """ out = self.readline() if out: return out else: raise StopIteration # PY2 compatibility next = __next__
[docs] def readlines(self): """ Return all lines in a file as a list """ return list(self)
[docs] def tell(self): """ Get current byte location in a file """ out = _lib.hdfsTell(self._fs, self._handle) if out == -1: msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError('Tell Failed on file %s %s' % (self.path, msg)) return out
[docs] def seek(self, offset, from_what=0): """ Set file read position. Read mode only. Attempt to move out of file bounds raises an exception. Note that, by the convention in python file seek, offset should be <=0 if from_what is 2. Parameters ---------- offset : int byte location in the file. from_what : int 0, 1, 2 if 0 (befault), relative to file start; if 1, relative to current location; if 2, relative to file end. Returns ------- new position """ if from_what not in {0, 1, 2}: raise ValueError('seek mode must be 0, 1 or 2') info = if from_what == 1: offset = offset + self.tell() elif from_what == 2: offset = info['size'] + offset if offset < 0 or offset > info['size']: raise ValueError('Attempt to seek outside file') out = _lib.hdfsSeek(self._fs, self._handle, ctypes.c_int64(offset)) if out == -1: # pragma: no cover msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError('Seek Failed on file %s' % (self.path, msg)) return self.tell()
[docs] def info(self): """ Filesystem metadata about this file """ return
[docs] def write(self, data): """ Write bytes to open file (which must be in w or a mode) """ data = ensure_bytes(data) if not data: return if not _lib.hdfsFileIsOpenForWrite(self._handle): msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError('File not write mode: {}'.format(msg)) write_block = 64 * 2**20 for offset in range(0, len(data), write_block): d = ensure_bytes(data[offset:offset + write_block]) if not _lib.hdfsWrite(self._fs, self._handle, d, len(d)) == len(d): msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0] raise IOError('Write failed on file %s, %s' % (self.path, msg)) return len(data)
[docs] def flush(self): """ Send buffer to the data-node; actual write may happen later """ _lib.hdfsFlush(self._fs, self._handle)
[docs] def close(self): """ Flush and close file, ensuring the data is readable """ self.flush() _lib.hdfsCloseFile(self._fs, self._handle) self._handle = None # _libhdfs releases memory self.mode = 'closed'
@property def read1(self): return @property def closed(self): return self.mode == 'closed' def writable(self): return self.mode.startswith('w') or self.mode.startswith('a') def seekable(self): return self.readable() def readable(self): return self.mode.startswith('r') def __del__(self): self.close() def __repr__(self): return 'hdfs://%s:%s%s, %s' % (, self.fs.port, self.path, self.mode) def __enter__(self): return self def __exit__(self, *args): self.close()