Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 3 additions & 25 deletions sdks/python/apache_beam/io/aws/s3io.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from apache_beam.io.filesystemio import Uploader
from apache_beam.io.filesystemio import UploaderStream
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated

try:
# pylint: disable=wrong-import-order, wrong-import-position
Expand Down Expand Up @@ -100,27 +99,6 @@ def open(
else:
raise ValueError('Invalid file open mode: %s.' % mode)

@deprecated(since='2.45.0', current='list_files')
def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.

``list_prefix`` has been deprecated. Use `list_files` instead, which returns
a generator of file information instead of a dict.

Args:
path: S3 file path pattern in the form s3://<bucket>/[name].
with_metadata: Experimental. Specify whether returns file metadata.

Returns:
If ``with_metadata`` is False: dict of file name -> size; if
``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
file_info = {}
for file_metadata in self.list_files(path, with_metadata):
file_info[file_metadata[0]] = file_metadata[1]

return file_info

def list_files(self, path, with_metadata=False):
"""Lists files matching the prefix.

Expand Down Expand Up @@ -186,7 +164,7 @@ def list_files(self, path, with_metadata=False):
break

logging.log(
# do not spam logs when list_prefix is likely used to check empty folder
# do not spam logs when list_files is likely used to check empty folder
logging.INFO if counter > 0 else logging.DEBUG,
"Finished listing %s files in %s seconds.",
counter,
Expand Down Expand Up @@ -288,7 +266,7 @@ def copy_tree(self, src, dest):
assert dest.endswith('/')

results = []
for entry in self.list_prefix(src):
for entry, _ in self.list_files(src):
rel_path = entry[len(src):]
try:
self.copy(entry, dest + rel_path)
Expand Down Expand Up @@ -436,7 +414,7 @@ def delete_tree(self, root):
"""
assert root.endswith('/')

paths = self.list_prefix(root)
paths = [p for p, _ in self.list_files(root)]
return self.delete_files(paths)

def size(self, path):
Expand Down
36 changes: 21 additions & 15 deletions sdks/python/apache_beam/io/aws/s3io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,17 @@ def test_copy(self):
file_size = 1024
self._insert_random_file(self.client, src_file_name, file_size)

self.assertTrue(src_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertTrue(
src_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))
self.assertFalse(
dest_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
dest_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))

self.aws.copy(src_file_name, dest_file_name)

self.assertTrue(src_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertTrue(dest_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertTrue(
src_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))
self.assertTrue(
dest_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))

# Clean up
self.aws.delete_files([src_file_name, dest_file_name])
Expand Down Expand Up @@ -290,9 +293,9 @@ def test_copy_tree(self):
dest_file_name = dest_dir_name + path
self._insert_random_file(self.client, src_file_name, file_size)
self.assertTrue(
src_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
src_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))
self.assertFalse(
dest_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
dest_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))

results = self.aws.copy_tree(src_dir_name, dest_dir_name)

Expand All @@ -303,9 +306,9 @@ def test_copy_tree(self):
self.assertIsNone(err)

self.assertTrue(
src_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
src_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))
self.assertTrue(
dest_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
dest_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))

# Clean up
for path in paths:
Expand All @@ -320,14 +323,17 @@ def test_rename(self):

self._insert_random_file(self.client, src_file_name, file_size)

self.assertTrue(src_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertTrue(
src_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))
self.assertFalse(
dest_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
dest_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))

self.aws.rename(src_file_name, dest_file_name)

self.assertFalse(src_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertTrue(dest_file_name in self.aws.list_prefix(self.TEST_DATA_PATH))
self.assertFalse(
src_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))
self.assertTrue(
dest_file_name in dict(self.aws.list_files(self.TEST_DATA_PATH)))

# Clean up
self.aws.delete_files([src_file_name, dest_file_name])
Expand Down Expand Up @@ -439,7 +445,7 @@ def test_delete(self):

# Create the file and check that it was created
self._insert_random_file(self.aws.client, file_name, file_size)
files = self.aws.list_prefix(self.TEST_DATA_PATH)
files = dict(self.aws.list_files(self.TEST_DATA_PATH))
self.assertTrue(file_name in files)

# Delete the file and check that it was deleted
Expand Down Expand Up @@ -748,7 +754,7 @@ def test_context_manager(self):
# Clean up
self.aws.delete(file_name)

def test_list_prefix(self):
def test_list_files(self):

objects = [
('jerry/pigpen/phil', 5),
Expand Down Expand Up @@ -785,7 +791,7 @@ def test_list_prefix(self):
expected_file_names = [(self.TEST_DATA_PATH + object_name, size)
for (object_name, size) in expected_object_names]
self.assertEqual(
set(self.aws.list_prefix(file_pattern).items()),
set(dict(self.aws.list_files(file_pattern)).items()),
set(expected_file_names))

# Clean up
Expand Down
26 changes: 3 additions & 23 deletions sdks/python/apache_beam/io/azure/blobstorageio.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from apache_beam.io.filesystemio import UploaderStream
from apache_beam.options.pipeline_options import AzureOptions
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -207,7 +206,7 @@ def copy_tree(self, src, dest):
assert dest.endswith('/')

results = []
for entry in self.list_prefix(src):
for entry, _ in self.list_files(src):
rel_path = entry[len(src):]
try:
self.copy(entry, dest + rel_path)
Expand Down Expand Up @@ -504,7 +503,7 @@ def delete_tree(self, root):
assert root.endswith('/')

# Get the blob under the root directory.
paths_to_delete = self.list_prefix(root)
paths_to_delete = [p for p, _ in self.list_files(root)]

return self.delete_files(paths_to_delete)

Expand Down Expand Up @@ -577,25 +576,6 @@ def _delete_batch(self, container, blobs):

return results

@deprecated(since='2.45.0', current='list_files')
def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.

Args:
path: Azure Blob Storage file path pattern in the form
azfs://<storage-account>/<container>/[name].
with_metadata: Experimental. Specify whether returns file metadata.

Returns:
If ``with_metadata`` is False: dict of file name -> size; if
``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
file_info = {}
for file_metadata in self.list_files(path, with_metadata):
file_info[file_metadata[0]] = file_metadata[1]

return file_info

def list_files(self, path, with_metadata=False):
"""Lists files matching the prefix.

Expand Down Expand Up @@ -644,7 +624,7 @@ def list_files(self, path, with_metadata=False):
yield file_name, item.size

logging.log(
# do not spam logs when list_prefix is likely used to check empty folder
# do not spam logs when list_files is likely used to check empty folder
logging.INFO if counter > 0 else logging.DEBUG,
"Finished listing %s files in %s seconds.",
counter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def assert_iceberg_tables_created(

# List objects in the bucket with the constructed prefix
try:
objects = gcs_io.list_prefix(f"gs://{bucket_name}/{search_prefix}")
objects = gcs_io.list_files(f"gs://{bucket_name}/{search_prefix}")
object_count = len(list(objects))

if object_count < expected_count:
Expand Down
26 changes: 2 additions & 24 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
from apache_beam.metrics.metric import Metrics
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils.annotations import deprecated

__all__ = ['GcsIO', 'create_storage_client']

Expand Down Expand Up @@ -459,7 +458,7 @@ def copytree(self, src, dest):
"""
assert src.endswith('/')
assert dest.endswith('/')
for entry in self.list_prefix(src):
for entry, _ in self.list_files(src):
rel_path = entry[len(src):]
self.copy(entry, dest + rel_path)

Expand Down Expand Up @@ -564,27 +563,6 @@ def _gcs_object(self, path):
else:
raise NotFound('Object %s not found', path)

@deprecated(since='2.45.0', current='list_files')
def list_prefix(self, path, with_metadata=False):
"""Lists files matching the prefix.

``list_prefix`` has been deprecated. Use `list_files` instead, which returns
a generator of file information instead of a dict.

Args:
path: GCS file path pattern in the form gs://<bucket>/[name].
with_metadata: Experimental. Specify whether returns file metadata.

Returns:
If ``with_metadata`` is False: dict of file name -> size; if
``with_metadata`` is True: dict of file name -> tuple(size, timestamp).
"""
file_info = {}
for file_metadata in self.list_files(path, with_metadata):
file_info[file_metadata[0]] = file_metadata[1]

return file_info

def list_files(self, path, with_metadata=False):
"""Lists files matching the prefix.

Expand Down Expand Up @@ -627,7 +605,7 @@ def list_files(self, path, with_metadata=False):
yield file_name, item.size

_LOGGER.log(
# do not spam logs when list_prefix is likely used to check empty folder
# do not spam logs when list_files is likely used to check empty folder
logging.INFO if counter > 0 else logging.DEBUG,
"Finished listing %s files in %s seconds.",
counter,
Expand Down
5 changes: 2 additions & 3 deletions sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ def test_file_write_call(self):
self.gcs.open(file_name, 'w')
writer.assert_called()

def test_list_prefix(self):
def test_list_files(self):
bucket_name = 'gcsio-test'
objects = [
('cow/cat/fish', 2),
Expand Down Expand Up @@ -716,8 +716,7 @@ def test_list_prefix(self):
expected_file_names = [('gs://%s/%s' % (bucket_name, object_name), size)
for (object_name, size) in expected_object_names]
self.assertEqual(
set(self.gcs.list_prefix(file_pattern).items()),
set(expected_file_names))
set(self.gcs.list_files(file_pattern)), set(expected_file_names))

def test_downloader_fail_non_existent_object(self):
file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def size(self, *labels):
if 'gs://' in matched_path[0]:
from apache_beam.io.gcp import gcsio
return sum(
sum(gcsio.GcsIO().list_prefix(path).values())
sum(s for _, s in gcsio.GcsIO().list_files(path))
for path in matched_path)
return sum(os.path.getsize(path) for path in matched_path)
return 0
Expand Down
Loading