Source code for azblobexplorer.download

import os
from pathlib import Path

from .base import BlobBase
from .exceptions import NoBlobsFound

__all__ = ['AzureBlobDownload']


[docs]class AzureBlobDownload(BlobBase): """ Download a file or folder. """
[docs] def download_file(self, blob_name: str, download_to: str = None, timeout: int = 10): """ Download a file to a location. :param str blob_name: Give a blob path with file name. :param str download_to: Give a local absolute path to download. :param int timeout: Request timeout in seconds .. versionadded:: 2.0 :raises OSError: If the directory for ``download_to`` does not exists >>> from azblobexplorer import AzureBlobDownload >>> az = AzureBlobDownload('account name', 'account key', 'container name') >>> az.download_file('some/name/file.txt') """ file_dict = self.read_file(blob_name, timeout=timeout) file_name = Path(file_dict['file_name']).name if download_to is None: write_to = file_name else: write_to = Path(os.path.join(download_to, file_name)) write_to.parent.mkdir(parents=True, exist_ok=True) with open(write_to, 'wb') as file: file.write(file_dict['content'])
[docs] def download_folder(self, blob_folder_name: str, download_to: str = None, timeout: int = 10): """ Download a blob folder. :param str blob_folder_name: Give a folder name. :param str download_to: Give a local path to download. :param int timeout: Request timeout in seconds .. versionadded:: 2.0 :raises NoBlobsFound: If the blob folder is empty or is not found. :raises OSError: If the directory for ``download_to`` does not exists >>> from azblobexplorer import AzureBlobDownload >>> az = AzureBlobDownload('account name', 'account key', 'container name') >>> az.download_folder('some/folder/name') """ blobs = list(self.container_client.list_blobs(blob_folder_name)) if len(blobs) == 0: raise NoBlobsFound( "There where 0 blobs found with blob path '{}'".format(blob_folder_name)) if download_to is None: for blob in blobs: name = blob['name'] path = Path(name) path.parent.mkdir(parents=True, exist_ok=True) _blob = self.read_file(name, timeout=timeout) file = open(path, 'wb') file.write(_blob['content']) file.close() else: for blob in blobs: name = blob['name'] path = Path(os.path.join(download_to, name)) path.parent.mkdir(parents=True, exist_ok=True) _blob = self.read_file(name, timeout=timeout) file = open(path, 'wb') file.write(_blob['content']) file.close()
[docs] def read_file(self, blob_name: str, timeout: int = 10) -> dict: """ Read a file. :param int timeout: Request timeout in seconds .. versionadded:: 2.0 :param str blob_name: Give a file name. :rtype: dict :return: Returns a dictionary of name, content, >>> from azblobexplorer import AzureBlobDownload >>> az = AzureBlobDownload('account name', 'account key', 'container name') >>> az.read_file('some/name/file.txt') { 'file_name': 'file.txt', 'content': byte content, 'file_size_bytes': size in bytes } """ blob_obj = self.container_client.get_blob_client(blob_name) blob_properties = blob_obj.get_blob_properties() download_blob = blob_obj.download_blob(timeout=timeout) return { 'file_name': blob_properties['name'], 'content': download_blob.readall(), 'file_size_bytes': blob_properties['size'] }