Skip to content

Commit

Permalink
{0.6.1} Fix paging issue in filedatalake (#44)
Browse files Browse the repository at this point in the history
* update filedatalake

* version

* changelog
  • Loading branch information
evelyn-ys authored Mar 23, 2021
1 parent 529e0e9 commit d813fa2
Show file tree
Hide file tree
Showing 19 changed files with 364 additions and 149 deletions.
5 changes: 5 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Handles multi-API versions of Azure Storage Data Plane originally from https://g

Change Log
----------
0.6.1
+++++
* Support undelete filesystem
* Fix minor issues in filedatalake

0.6.0
+++++
* blob(12.7.1): Support v2020-04-08
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from typing import Any

try:
from urllib.parse import quote, unquote
except ImportError:
Expand All @@ -11,7 +13,7 @@
from ._deserialize import deserialize_dir_properties
from ._shared.base_client import TransportWrapper, parse_connection_str
from ._data_lake_file_client import DataLakeFileClient
from ._models import DirectoryProperties
from ._models import DirectoryProperties, FileProperties
from ._path_client import PathClient


Expand Down Expand Up @@ -241,9 +243,19 @@ def get_directory_properties(self, **kwargs):
"""
return self._get_path_properties(cls=deserialize_dir_properties, **kwargs) # pylint: disable=protected-access

def rename_directory(self, new_name, # type: str
**kwargs):
# type: (**Any) -> DataLakeDirectoryClient
def exists(self, **kwargs):
# type: (**Any) -> bool
"""
Returns True if a directory exists and returns False otherwise.
:kwarg int timeout:
The timeout parameter is expressed in seconds.
:returns: boolean
"""
return self._exists(**kwargs)

def rename_directory(self, new_name, **kwargs):
# type: (str, **Any) -> DataLakeDirectoryClient
"""
Rename the source directory.
Expand Down Expand Up @@ -502,12 +514,12 @@ def get_file_client(self, file # type: Union[FileProperties, str]
or an instance of FileProperties. eg. directory/subdirectory/file
:type file: str or ~azure.storage.filedatalake.FileProperties
:returns: A DataLakeFileClient.
:rtype: ~azure.storage.filedatalake..DataLakeFileClient
:rtype: ~azure.storage.filedatalake.DataLakeFileClient
"""
try:
file_path = file.name
file_path = file.get('name')
except AttributeError:
file_path = self.path_name + '/' + file
file_path = self.path_name + '/' + str(file)

_pipeline = Pipeline(
transport=TransportWrapper(self._pipeline._transport), # pylint: disable = protected-access
Expand Down Expand Up @@ -535,9 +547,9 @@ def get_sub_directory_client(self, sub_directory # type: Union[DirectoryPropert
:rtype: ~azure.storage.filedatalake.DataLakeDirectoryClient
"""
try:
subdir_path = sub_directory.name
subdir_path = sub_directory.get('name')
except AttributeError:
subdir_path = self.path_name + '/' + sub_directory
subdir_path = self.path_name + '/' + str(sub_directory)

_pipeline = Pipeline(
transport=TransportWrapper(self._pipeline._transport), # pylint: disable = protected-access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# license information.
# --------------------------------------------------------------------------
from io import BytesIO
from typing import Any

try:
from urllib.parse import quote, unquote
Expand Down Expand Up @@ -596,9 +597,19 @@ def download_file(self, offset=None, length=None, **kwargs):
downloader = self._blob_client.download_blob(offset=offset, length=length, **kwargs)
return StorageStreamDownloader(downloader)

def rename_file(self, new_name, # type: str
**kwargs):
# type: (**Any) -> DataLakeFileClient
def exists(self, **kwargs):
# type: (**Any) -> bool
"""
Returns True if a file exists and returns False otherwise.
:kwarg int timeout:
The timeout parameter is expressed in seconds.
:returns: boolean
"""
return self._exists(**kwargs)

def rename_file(self, new_name, **kwargs):
# type: (str, **Any) -> DataLakeFileClient
"""
Rename the source file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from typing import Any

try:
from urllib.parse import urlparse
Expand Down Expand Up @@ -92,17 +93,19 @@ def __init__(
# ADLS doesn't support secondary endpoint, make sure it's empty
self._hosts[LocationMode.SECONDARY] = ""

def __enter__(self):
self._blob_service_client.__enter__()
return self

def __exit__(self, *args):
self._blob_service_client.close()
super(DataLakeServiceClient, self).__exit__(*args)

def close(self):
# type: () -> None
""" This method is to close the sockets opened by the client.
It need not be used when using with a context manager.
"""
self._blob_service_client.close()
self.__exit__()

def _format_url(self, hostname):
"""Format the endpoint URL according to hostname
Expand Down Expand Up @@ -195,6 +198,10 @@ def list_file_systems(self, name_starts_with=None, # type: Optional[str]
call. If the request does not specify the server will return up to 5,000 items per page.
:keyword int timeout:
The timeout parameter is expressed in seconds.
:keyword bool include_deleted:
Specifies that deleted file systems to be returned in the response. This is for file system restore enabled
account. The default value is `False`.
.. versionadded:: 12.3.0
:returns: An iterable (auto-paging) of FileSystemProperties.
:rtype: ~azure.core.paging.ItemPaged[~azure.storage.filedatalake.FileSystemProperties]
Expand Down Expand Up @@ -250,6 +257,55 @@ def create_file_system(self, file_system, # type: Union[FileSystemProperties, s
file_system_client.create_file_system(metadata=metadata, public_access=public_access, **kwargs)
return file_system_client

def _rename_file_system(self, name, new_name, **kwargs):
# type: (str, str, **Any) -> FileSystemClient
"""Renames a filesystem.
Operation is successful only if the source filesystem exists.
:param str name:
The name of the filesystem to rename.
:param str new_name:
The new filesystem name the user wants to rename to.
:keyword lease:
Specify this to perform only if the lease ID given
matches the active lease ID of the source filesystem.
:paramtype lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
:keyword int timeout:
The timeout parameter is expressed in seconds.
:rtype: ~azure.storage.filedatalake.FileSystemClient
"""
self._blob_service_client._rename_container(name, new_name, **kwargs) # pylint: disable=protected-access
renamed_file_system = self.get_file_system_client(new_name)
return renamed_file_system

def undelete_file_system(self, name, deleted_version, **kwargs):
# type: (str, str, **Any) -> FileSystemClient
"""Restores soft-deleted filesystem.
Operation will only be successful if used within the specified number of days
set in the delete retention policy.
.. versionadded:: 12.3.0
This operation was introduced in API version '2019-12-12'.
:param str name:
Specifies the name of the deleted filesystem to restore.
:param str deleted_version:
Specifies the version of the deleted filesystem to restore.
:keyword str new_name:
The new name for the deleted filesystem to be restored to.
If not specified "name" will be used as the restored filesystem name.
:keyword int timeout:
The timeout parameter is expressed in seconds.
:rtype: ~azure.storage.filedatalake.FileSystemClient
"""
new_name = kwargs.pop('new_name', None)
file_system = self.get_file_system_client(new_name or name)
self._blob_service_client.undelete_container(
name, deleted_version, new_name=new_name, **kwargs) # pylint: disable=protected-access
return file_system

def delete_file_system(self, file_system, # type: Union[FileSystemProperties, str]
**kwargs):
# type: (...) -> FileSystemClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from azure.core.pipeline.policies import ContentDecodePolicy
from azure.core.exceptions import HttpResponseError, DecodeError, ResourceModifiedError, ClientAuthenticationError, \
ResourceNotFoundError, ResourceExistsError
from ._models import FileProperties, DirectoryProperties, LeaseProperties
from ._models import FileProperties, DirectoryProperties, LeaseProperties, PathProperties
from ._shared.models import StorageErrorCode

if TYPE_CHECKING:
Expand Down Expand Up @@ -44,6 +44,10 @@ def deserialize_file_properties(response, obj, headers):
return file_properties


def deserialize_path_properties(path_list):
return [PathProperties._from_generated(path) for path in path_list] # pylint: disable=protected-access


def from_blob_properties(blob_properties):
file_props = FileProperties()
file_props.name = blob_properties.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import functools
from typing import Optional, Any

try:
from urllib.parse import urlparse, quote
Expand All @@ -17,12 +17,12 @@
from azure.multiapi.storagev2.blob.v2020_02_10 import ContainerClient
from ._shared.base_client import TransportWrapper, StorageAccountHostsMixin, parse_query, parse_connection_str
from ._serialize import convert_dfs_url_to_blob_url
from ._models import LocationMode, FileSystemProperties, PublicAccess
from ._list_paths_helper import PathPropertiesPaged
from ._models import LocationMode, FileSystemProperties, PublicAccess, FileProperties, DirectoryProperties
from ._data_lake_file_client import DataLakeFileClient
from ._data_lake_directory_client import DataLakeDirectoryClient
from ._data_lake_lease import DataLakeLeaseClient
from ._generated import AzureDataLakeStorageRESTAPI
from ._deserialize import deserialize_path_properties


class FileSystemClient(StorageAccountHostsMixin):
Expand Down Expand Up @@ -247,6 +247,42 @@ def create_file_system(self, metadata=None, # type: Optional[Dict[str, str]]
public_access=public_access,
**kwargs)

def exists(self, **kwargs):
# type: (**Any) -> bool
"""
Returns True if a file system exists and returns False otherwise.
:kwarg int timeout:
The timeout parameter is expressed in seconds.
:returns: boolean
"""
return self._container_client.exists(**kwargs)

def _rename_file_system(self, new_name, **kwargs):
# type: (str, **Any) -> FileSystemClient
"""Renames a filesystem.
Operation is successful only if the source filesystem exists.
:param str new_name:
The new filesystem name the user wants to rename to.
:keyword lease:
Specify this to perform only if the lease ID given
matches the active lease ID of the source filesystem.
:paramtype lease: ~azure.storage.filedatalake.DataLakeLeaseClient or str
:keyword int timeout:
The timeout parameter is expressed in seconds.
:rtype: ~azure.storage.filedatalake.FileSystemClient
"""
self._container_client._rename_container(new_name, **kwargs) # pylint: disable=protected-access
renamed_file_system = FileSystemClient(
"{}://{}".format(self.scheme, self.primary_hostname), file_system_name=new_name,
credential=self._raw_credential, api_version=self.api_version, _configuration=self._config,
_pipeline=self._pipeline, _location_mode=self._location_mode, _hosts=self._hosts,
require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key,
key_resolver_function=self.key_resolver_function)
return renamed_file_system

def delete_file_system(self, **kwargs):
# type: (Any) -> None
"""Marks the specified file system for deletion.
Expand Down Expand Up @@ -463,14 +499,13 @@ def get_paths(self, path=None, # type: Optional[str]
:caption: List the paths in the file system.
"""
timeout = kwargs.pop('timeout', None)
command = functools.partial(
self._client.file_system.list_paths,
return self._client.file_system.list_paths(
recursive=recursive,
max_results=max_results,
path=path,
timeout=timeout,
cls=deserialize_path_properties,
**kwargs)
return ItemPaged(
command, recursive, path=path, max_results=max_results,
page_iterator_class=PathPropertiesPaged, **kwargs)

def create_directory(self, directory, # type: Union[DirectoryProperties, str]
metadata=None, # type: Optional[Dict[str, str]]
Expand Down Expand Up @@ -738,9 +773,9 @@ def get_directory_client(self, directory # type: Union[DirectoryProperties, str
:caption: Getting the directory client to interact with a specific directory.
"""
try:
directory_name = directory.name
directory_name = directory.get('name')
except AttributeError:
directory_name = directory
directory_name = str(directory)
_pipeline = Pipeline(
transport=TransportWrapper(self._pipeline._transport), # pylint: disable = protected-access
policies=self._pipeline._impl_policies # pylint: disable = protected-access
Expand Down Expand Up @@ -778,9 +813,9 @@ def get_file_client(self, file_path # type: Union[FileProperties, str]
:caption: Getting the file client to interact with a specific file.
"""
try:
file_path = file_path.name
file_path = file_path.get('name')
except AttributeError:
pass
file_path = str(file_path)
_pipeline = Pipeline(
transport=TransportWrapper(self._pipeline._transport), # pylint: disable = protected-access
policies=self._pipeline._impl_policies # pylint: disable = protected-access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ def list_paths(
error_map.update(kwargs.pop('error_map', {}))
accept = "application/json"

def prepare_request(next_link=None):
# TODO: change this once continuation/next_link autorest PR is merged
def prepare_request(next_link=None, cont_token=None):
# Construct headers
header_parameters = {} # type: Dict[str, Any]
if request_id_parameter is not None:
Expand All @@ -458,8 +459,9 @@ def prepare_request(next_link=None):
query_parameters['resource'] = self._serialize.query("self._config.resource", self._config.resource, 'str')
if timeout is not None:
query_parameters['timeout'] = self._serialize.query("timeout", timeout, 'int', minimum=0)
if continuation is not None:
query_parameters['continuation'] = self._serialize.query("continuation", continuation, 'str')
# TODO: change this once continuation/next_link autorest PR is merged
if cont_token is not None:
query_parameters['continuation'] = self._serialize.query("continuation", cont_token, 'str')
if path is not None:
query_parameters['directory'] = self._serialize.query("path", path, 'str')
query_parameters['recursive'] = self._serialize.query("recursive", recursive, 'bool')
Expand All @@ -480,14 +482,21 @@ def prepare_request(next_link=None):
return request

async def extract_data(pipeline_response):
# TODO: change this once continuation/next_link autorest PR is merged
try:
cont_token = pipeline_response.http_response.headers['x-ms-continuation']
except KeyError:
cont_token = None
deserialized = self._deserialize('PathList', pipeline_response)
list_of_elem = deserialized.paths
if cls:
list_of_elem = cls(list_of_elem)
return None, AsyncList(list_of_elem)
return cont_token, AsyncList(list_of_elem)

async def get_next(next_link=None):
request = prepare_request(next_link)
# TODO: change this once continuation/next_link autorest PR is merged
async def get_next(cont_token=None):
cont_token = cont_token if not continuation else continuation
request = prepare_request(cont_token=cont_token)

pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
Expand Down
Loading

0 comments on commit d813fa2

Please sign in to comment.