Skip to content
17 changes: 17 additions & 0 deletions Doc/library/zipfile.rst
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,23 @@ ZipFile Objects
.. versionadded:: 3.11


.. method:: ZipFile.remove(zinfo_or_arcname)

Removes a member from the archive. *zinfo_or_arcname* is either the full
path of the member, or a :class:`ZipInfo` instance.

The archive must be opened with mode ``'a'``.

Calling :meth:`remove` on a closed ZipFile will raise a :exc:`ValueError`.

.. note::

Removing a member in an archive may involve a move of many internal data
records, which can be I/O intensive for a large ZIP file.

.. versionadded:: next


The following data attributes are also available:

.. attribute:: ZipFile.filename
Expand Down
223 changes: 223 additions & 0 deletions Lib/test/test_zipfile/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,229 @@ class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
class ZstdWriterTests(AbstractWriterTests, unittest.TestCase):
compression = zipfile.ZIP_ZSTANDARD

class AbstractRemoveTests:

def _test_removing_members(self, test_files, indexes, force_zip64=False):
"""Test underlying _remove_members() for removing members at given
indexes."""
# calculate the expected results
expected_files = []
with zipfile.ZipFile(TESTFN, 'w', self.compression) as zh:
for i, (file, data) in enumerate(test_files):
if i not in indexes:
with zh.open(file, 'w', force_zip64=force_zip64) as fh:
fh.write(data)
expected_files.append(file)
expected_size = os.path.getsize(TESTFN)

# prepare the test zip
with zipfile.ZipFile(TESTFN, 'w', self.compression) as zh:
for file, data in test_files:
with zh.open(file, 'w', force_zip64=force_zip64) as fh:
fh.write(data)

# do the removal and check the result
with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh:
members = {zh.infolist()[i] for i in indexes}
zh._remove_members(members)

# make sure internal caches have reflected the change
# and are consistent
self.assertEqual(zh.namelist(), expected_files)
for file, _ in test_files:
if file in zh.namelist():
self.assertEqual(zh.getinfo(file).filename, file)
else:
with self.assertRaises(KeyError):
zh.getinfo(file)

self.assertIsNone(zh.testzip())
self.assertEqual(os.path.getsize(TESTFN), expected_size)

def _test_removing_combinations(self, test_files, n=None):
"""Test underlying _remove_members() for removing random combinations
of members."""
ln = len(test_files)
for n in (range(1, ln + 1) if n is None else (n,)):
for indexes in itertools.combinations(range(ln), n):
with self.subTest(remove=indexes):
self._test_removing_members(test_files, indexes)

def test_basic(self):
"""Test underlying _remove_members() for removing random combinations
of members."""
test_files = [
('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'),
('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'),
('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'),
]

self._test_removing_combinations(test_files)

def test_duplicated_arcname(self):
"""Test underlying _remove_members() for removing any one of random
duplicated members."""
dupl_file = 'file.txt'
test_files = [
('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'),
('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'),
('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'),
]

ln = len(test_files)
for n in range(2, ln + 1):
for dups in itertools.combinations(range(ln), n):
files = []
for i, (file, data) in enumerate(test_files):
file_ = dupl_file if i in dups else file
files.append((file_, data))

for index in dups:
indexes = [index]
with self.subTest(dups=dups, remove=indexes):
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self._test_removing_members(files, indexes)

def test_non_physical(self):
"""Test underlying _remove_members() for non-physical removing."""
test_files = [
('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'),
('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'),
('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'),
]

ln = len(test_files)
for n in range(1, ln + 1):
for indexes in itertools.combinations(range(ln), n):
with self.subTest(remove=indexes):
# prepare the test zip
expected = {}
with zipfile.ZipFile(TESTFN, 'w', self.compression) as zh:
for i, (file, data) in enumerate(test_files):
zh.writestr(file, data)
if i not in indexes:
expected[file] = zh.getinfo(file).header_offset

# do the removal and check the result
with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh:
members = {zh.infolist()[i] for i in indexes}
zh._remove_members(members, remove_physical=False)
self.assertEqual(zh.namelist(), list(expected))
for file, offset in expected.items():
self.assertEqual(zh.getinfo(file).header_offset, offset)
self.assertIsNone(zh.testzip())

def test_verify(self):
"""Test if params are passed to underlying _remove_members() correctly,
or never passed if conditions not met."""
file0 = 'file0.txt'
file = 'datafile.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'

# closed: error and do nothing
with zipfile.ZipFile(TESTFN, 'w', self.compression) as zh:
zh.writestr(file, data)
with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh:
zh.close()
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
with self.assertRaises(ValueError):
zh.remove(file)
mock_fn.assert_not_called()

# writing: error and do nothing
with zipfile.ZipFile(TESTFN, 'w', self.compression) as zh:
zh.writestr(file, data)
with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
with zh.open(file0, 'w') as fh:
with self.assertRaises(ValueError):
zh.remove(file)
mock_fn.assert_not_called()

# mode 'r': error and do nothing
with zipfile.ZipFile(TESTFN, 'r', self.compression) as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
with self.assertRaises(ValueError):
zh.remove(file)
mock_fn.assert_not_called()

# mode 'w': error and do nothing
with zipfile.ZipFile(TESTFN, 'w', self.compression) as zh:
zh.writestr(file, data)
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
with self.assertRaises(ValueError):
zh.remove(file)
mock_fn.assert_not_called()

# mode 'x': error and do nothing
os.remove(TESTFN)
with zipfile.ZipFile(TESTFN, 'x', self.compression) as zh:
zh.writestr(file, data)
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
with self.assertRaises(ValueError):
zh.remove(file)
mock_fn.assert_not_called()

# mode 'a': the most general use case
with zipfile.ZipFile(TESTFN, 'w', self.compression) as zh:
zh.writestr(file, data)
# -- remove with arcname
with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
zh.remove(file)
mock_fn.assert_called_once_with({zh.getinfo(file)})
# -- remove with zinfo
with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
zinfo = zh.getinfo(file)
zh.remove(zinfo)
mock_fn.assert_called_once_with({zinfo})
# -- remove with nonexist arcname
with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
with self.assertRaises(KeyError):
zh.remove('nonexist.file')
mock_fn.assert_not_called()
# -- remove with nonexist zinfo (even if same name)
with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh:
with mock.patch('zipfile.ZipFile._remove_members') as mock_fn:
zinfo = zipfile.ZipInfo(file)
with self.assertRaises(KeyError):
zh.remove(zinfo)
mock_fn.assert_not_called()

def test_zip64(self):
"""Test if members use zip64."""
test_files = [
('pre.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'),
('datafile', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'),
('post.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'),
]

self._test_removing_members(test_files, [1], force_zip64=True)

class StoredRemoveTests(AbstractRemoveTests, unittest.TestCase):
compression = zipfile.ZIP_STORED

@requires_zlib()
class DeflateRemoveTests(AbstractRemoveTests, unittest.TestCase):
compression = zipfile.ZIP_DEFLATED

@requires_bz2()
class Bzip2RemoveTests(AbstractRemoveTests, unittest.TestCase):
compression = zipfile.ZIP_BZIP2

@requires_lzma()
class LzmaRemoveTests(AbstractRemoveTests, unittest.TestCase):
compression = zipfile.ZIP_LZMA

@requires_zstd()
class ZstdRemoveTests(AbstractRemoveTests, unittest.TestCase):
compression = zipfile.ZIP_ZSTANDARD


class PyZipFileTests(unittest.TestCase):
def assertCompiledIn(self, name, namelist):
if name + 'o' not in namelist:
Expand Down
63 changes: 63 additions & 0 deletions Lib/test/test_zipfile64.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,69 @@ def tearDown(self):
os_helper.unlink(TESTFN2)


class TestRemove(unittest.TestCase):
def setUp(self):
# Create test data.
line_gen = ("Test of zipfile line %d." % i for i in range(1000000))
self.data = '\n'.join(line_gen).encode('ascii')

def _write_large_file(self, fh):
# It will contain enough copies of self.data to reach about 8 GiB.
filecount = 8*1024**3 // len(self.data)

next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
for num in range(filecount):
fh.write(self.data)
# Print still working message since this test can be really slow
if next_time <= time.monotonic():
next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
print((
' writing %d of %d, be patient...' %
(num, filecount)), file=sys.__stdout__)
sys.__stdout__.flush()

def test_remove_large_file(self):
# Try the temp file. If we do TESTFN2, then it hogs
# gigabytes of disk space for the duration of the test.
with TemporaryFile() as f:
self._test_remove_large_file(f)
self.assertFalse(f.closed)

def _test_remove_large_file(self, f):
file = 'datafile.txt'
file1 = 'dummy.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
with zipfile.ZipFile(f, 'w') as zh:
with zh.open(file1, 'w', force_zip64=True) as fh:
self._write_large_file(fh)
zh.writestr(file, data)

with zipfile.ZipFile(f, 'a') as zh:
zh.remove(file1)
self.assertIsNone(zh.testzip())

def test_remove_before_large_file(self):
# Try the temp file. If we do TESTFN2, then it hogs
# gigabytes of disk space for the duration of the test.
with TemporaryFile() as f:
self._test_remove_before_large_file(f)
self.assertFalse(f.closed)

def _test_remove_before_large_file(self, f):
file = 'datafile.txt'
file1 = 'dummy.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
with zipfile.ZipFile(f, 'w') as zh:
zh.writestr(file, data)
with zh.open(file1, 'w', force_zip64=True) as fh:
self._write_large_file(fh)
expected_size = zh.getinfo(file1).file_size

with zipfile.ZipFile(f, 'a') as zh:
zh.remove(file)
self.assertIsNone(zh.testzip())


class OtherTests(unittest.TestCase):
def testMoreThan64kFiles(self):
# This test checks that more than 64k files can be added to an archive,
Expand Down
Loading
Loading