am 44158945
: Merge "Wrap zipfile.write(), writestr() and close()"
* commit '4415894579186c0c4f8d006ed625c0379b1b9d40': Wrap zipfile.write(), writestr() and close()
This commit is contained in:
@@ -33,10 +33,6 @@ import os
|
||||
import tempfile
|
||||
import zipfile
|
||||
|
||||
# missing in Python 2.4 and before
|
||||
if not hasattr(os, "SEEK_SET"):
|
||||
os.SEEK_SET = 0
|
||||
|
||||
import build_image
|
||||
import common
|
||||
|
||||
@@ -189,7 +185,7 @@ def AddUserdata(output_zip, prefix="IMAGES/"):
|
||||
assert succ, "build userdata.img image failed"
|
||||
|
||||
common.CheckSize(img.name, "userdata.img", OPTIONS.info_dict)
|
||||
output_zip.write(img.name, prefix + "userdata.img")
|
||||
common.ZipWrite(output_zip, img.name, prefix + "userdata.img")
|
||||
img.close()
|
||||
os.rmdir(user_dir)
|
||||
os.rmdir(temp_dir)
|
||||
@@ -226,7 +222,7 @@ def AddCache(output_zip, prefix="IMAGES/"):
|
||||
assert succ, "build cache.img image failed"
|
||||
|
||||
common.CheckSize(img.name, "cache.img", OPTIONS.info_dict)
|
||||
output_zip.write(img.name, prefix + "cache.img")
|
||||
common.ZipWrite(output_zip, img.name, prefix + "cache.img")
|
||||
img.close()
|
||||
os.rmdir(user_dir)
|
||||
os.rmdir(temp_dir)
|
||||
@@ -252,7 +248,7 @@ def AddImagesToTargetFiles(filename):
|
||||
OPTIONS.info_dict["selinux_fc"] = os.path.join(
|
||||
OPTIONS.input_tmp, "BOOT", "RAMDISK", "file_contexts")
|
||||
|
||||
input_zip.close()
|
||||
common.ZipClose(input_zip)
|
||||
output_zip = zipfile.ZipFile(filename, "a",
|
||||
compression=zipfile.ZIP_DEFLATED)
|
||||
|
||||
@@ -297,7 +293,7 @@ def AddImagesToTargetFiles(filename):
|
||||
banner("cache")
|
||||
AddCache(output_zip)
|
||||
|
||||
output_zip.close()
|
||||
common.ZipClose(output_zip)
|
||||
|
||||
def main(argv):
|
||||
def option_handler(o, _):
|
||||
|
@@ -205,8 +205,8 @@ def BuildImage(in_dir, prop_dict, out_file):
|
||||
Returns:
|
||||
True iff the image is built successfully.
|
||||
"""
|
||||
# system_root_image=true: build a system.img that combines the contents of /system
|
||||
# and the ramdisk, and can be mounted at the root of the file system.
|
||||
# system_root_image=true: build a system.img that combines the contents of
|
||||
# /system and the ramdisk, and can be mounted at the root of the file system.
|
||||
origin_in = in_dir
|
||||
fs_config = prop_dict.get("fs_config")
|
||||
if (prop_dict.get("system_root_image") == "true"
|
||||
@@ -375,8 +375,8 @@ def ImagePropFromGlobalDict(glob_dict, mount_point):
|
||||
copy_prop("system_size", "partition_size")
|
||||
copy_prop("system_journal_size", "journal_size")
|
||||
copy_prop("system_verity_block_device", "verity_block_device")
|
||||
copy_prop("system_root_image","system_root_image")
|
||||
copy_prop("ramdisk_dir","ramdisk_dir")
|
||||
copy_prop("system_root_image", "system_root_image")
|
||||
copy_prop("ramdisk_dir", "ramdisk_dir")
|
||||
elif mount_point == "data":
|
||||
# Copy the generic fs type first, override with specific one if available.
|
||||
copy_prop("fs_type", "fs_type")
|
||||
|
@@ -32,10 +32,7 @@ import zipfile
|
||||
import blockimgdiff
|
||||
import rangelib
|
||||
|
||||
try:
|
||||
from hashlib import sha1 as sha1
|
||||
except ImportError:
|
||||
from sha import sha as sha1
|
||||
from hashlib import sha1 as sha1
|
||||
|
||||
|
||||
class Options(object):
|
||||
@@ -380,6 +377,10 @@ def BuildBootableImage(sourcedir, fs_config_file, info_dict=None):
|
||||
p.communicate()
|
||||
assert p.returncode == 0, "vboot_signer of %s image failed" % path
|
||||
|
||||
# Clean up the temp files.
|
||||
img_unsigned.close()
|
||||
img_keyblock.close()
|
||||
|
||||
img.seek(os.SEEK_SET, 0)
|
||||
data = img.read()
|
||||
|
||||
@@ -854,16 +855,50 @@ def ZipWrite(zip_file, filename, arcname=None, perms=0o644,
|
||||
zipfile.ZIP64_LIMIT = saved_zip64_limit
|
||||
|
||||
|
||||
def ZipWriteStr(zip_file, filename, data, perms=0o644, compression=None):
|
||||
# use a fixed timestamp so the output is repeatable.
|
||||
zinfo = zipfile.ZipInfo(filename=filename,
|
||||
date_time=(2009, 1, 1, 0, 0, 0))
|
||||
if compression is None:
|
||||
def ZipWriteStr(zip_file, zinfo_or_arcname, data, perms=0o644,
|
||||
compress_type=None):
|
||||
"""Wrap zipfile.writestr() function to work around the zip64 limit.
|
||||
|
||||
Even with the ZIP64_LIMIT workaround, it won't allow writing a string
|
||||
longer than 2GiB. It gives 'OverflowError: size does not fit in an int'
|
||||
when calling crc32(bytes).
|
||||
|
||||
But it still works fine to write a shorter string into a large zip file.
|
||||
We should use ZipWrite() whenever possible, and only use ZipWriteStr()
|
||||
when we know the string won't be too long.
|
||||
"""
|
||||
|
||||
saved_zip64_limit = zipfile.ZIP64_LIMIT
|
||||
zipfile.ZIP64_LIMIT = (1 << 32) - 1
|
||||
|
||||
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
|
||||
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
|
||||
zinfo.compress_type = zip_file.compression
|
||||
else:
|
||||
zinfo.compress_type = compression
|
||||
zinfo = zinfo_or_arcname
|
||||
|
||||
# If compress_type is given, it overrides the value in zinfo.
|
||||
if compress_type is not None:
|
||||
zinfo.compress_type = compress_type
|
||||
|
||||
# Use a fixed timestamp so the output is repeatable.
|
||||
zinfo.external_attr = perms << 16
|
||||
zinfo.date_time = (2009, 1, 1, 0, 0, 0)
|
||||
|
||||
zip_file.writestr(zinfo, data)
|
||||
zipfile.ZIP64_LIMIT = saved_zip64_limit
|
||||
|
||||
|
||||
def ZipClose(zip_file):
|
||||
# http://b/18015246
|
||||
# zipfile also refers to ZIP64_LIMIT during close() when it writes out the
|
||||
# central directory.
|
||||
saved_zip64_limit = zipfile.ZIP64_LIMIT
|
||||
zipfile.ZIP64_LIMIT = (1 << 32) - 1
|
||||
|
||||
zip_file.close()
|
||||
|
||||
zipfile.ZIP64_LIMIT = saved_zip64_limit
|
||||
|
||||
|
||||
class DeviceSpecificParams(object):
|
||||
@@ -969,7 +1004,7 @@ class File(object):
|
||||
return t
|
||||
|
||||
def AddToZip(self, z, compression=None):
|
||||
ZipWriteStr(z, self.name, self.data, compression=compression)
|
||||
ZipWriteStr(z, self.name, self.data, compress_type=compression)
|
||||
|
||||
DIFF_PROGRAM_BY_EXT = {
|
||||
".gz" : "imgdiff",
|
||||
|
@@ -43,8 +43,9 @@ OPTIONS = common.OPTIONS
|
||||
|
||||
def CopyInfo(output_zip):
|
||||
"""Copy the android-info.txt file from the input to the output."""
|
||||
output_zip.write(os.path.join(OPTIONS.input_tmp, "OTA", "android-info.txt"),
|
||||
"android-info.txt")
|
||||
common.ZipWrite(
|
||||
output_zip, os.path.join(OPTIONS.input_tmp, "OTA", "android-info.txt"),
|
||||
"android-info.txt")
|
||||
|
||||
|
||||
def main(argv):
|
||||
@@ -133,13 +134,7 @@ def main(argv):
|
||||
|
||||
finally:
|
||||
print "cleaning up..."
|
||||
# http://b/18015246
|
||||
# See common.py for context. zipfile also refers to ZIP64_LIMIT during
|
||||
# close() when it writes out the central directory.
|
||||
saved_zip64_limit = zipfile.ZIP64_LIMIT
|
||||
zipfile.ZIP64_LIMIT = (1 << 32) - 1
|
||||
output_zip.close()
|
||||
zipfile.ZIP64_LIMIT = saved_zip64_limit
|
||||
common.ZipClose(output_zip)
|
||||
shutil.rmtree(OPTIONS.input_tmp)
|
||||
|
||||
print "done."
|
||||
|
@@ -92,7 +92,6 @@ if sys.hexversion < 0x02070000:
|
||||
print >> sys.stderr, "Python 2.7 or newer is required."
|
||||
sys.exit(1)
|
||||
|
||||
import copy
|
||||
import multiprocessing
|
||||
import os
|
||||
import tempfile
|
||||
@@ -371,6 +370,7 @@ def CopyPartitionFiles(itemset, input_zip, output_zip=None, substitute=None):
|
||||
symlinks.append((input_zip.read(info.filename),
|
||||
"/" + partition + "/" + basefilename))
|
||||
else:
|
||||
import copy
|
||||
info2 = copy.copy(info)
|
||||
fn = info2.filename = partition + "/" + basefilename
|
||||
if substitute and fn in substitute and substitute[fn] is None:
|
||||
@@ -380,7 +380,7 @@ def CopyPartitionFiles(itemset, input_zip, output_zip=None, substitute=None):
|
||||
data = substitute[fn]
|
||||
else:
|
||||
data = input_zip.read(info.filename)
|
||||
output_zip.writestr(info2, data)
|
||||
common.ZipWriteStr(output_zip, info2, data)
|
||||
if fn.endswith("/"):
|
||||
itemset.Get(fn[:-1], is_dir=True)
|
||||
else:
|
||||
@@ -1581,6 +1581,7 @@ def main(argv):
|
||||
OPTIONS.package_key = OPTIONS.info_dict.get(
|
||||
"default_system_dev_certificate",
|
||||
"build/target/product/security/testkey")
|
||||
common.ZipClose(output_zip)
|
||||
break
|
||||
|
||||
else:
|
||||
@@ -1601,15 +1602,14 @@ def main(argv):
|
||||
common.DumpInfoDict(OPTIONS.source_info_dict)
|
||||
try:
|
||||
WriteIncrementalOTAPackage(input_zip, source_zip, output_zip)
|
||||
common.ZipClose(output_zip)
|
||||
break
|
||||
except ValueError:
|
||||
if not OPTIONS.fallback_to_full:
|
||||
raise
|
||||
print "--- failed to build incremental; falling back to full ---"
|
||||
OPTIONS.incremental_source = None
|
||||
output_zip.close()
|
||||
|
||||
output_zip.close()
|
||||
common.ZipClose(output_zip)
|
||||
|
||||
if not OPTIONS.no_signing:
|
||||
SignOutput(temp_zip_file.name, args[1])
|
||||
|
@@ -196,23 +196,23 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
|
||||
if key not in common.SPECIAL_CERT_STRINGS:
|
||||
print " signing: %-*s (%s)" % (maxsize, name, key)
|
||||
signed_data = SignApk(data, key, key_passwords[key])
|
||||
output_tf_zip.writestr(out_info, signed_data)
|
||||
common.ZipWriteStr(output_tf_zip, out_info, signed_data)
|
||||
else:
|
||||
# an APK we're not supposed to sign.
|
||||
print "NOT signing: %s" % (name,)
|
||||
output_tf_zip.writestr(out_info, data)
|
||||
common.ZipWriteStr(output_tf_zip, out_info, data)
|
||||
elif info.filename in ("SYSTEM/build.prop",
|
||||
"VENDOR/build.prop",
|
||||
"RECOVERY/RAMDISK/default.prop"):
|
||||
print "rewriting %s:" % (info.filename,)
|
||||
new_data = RewriteProps(data, misc_info)
|
||||
output_tf_zip.writestr(out_info, new_data)
|
||||
common.ZipWriteStr(output_tf_zip, out_info, new_data)
|
||||
if info.filename == "RECOVERY/RAMDISK/default.prop":
|
||||
write_to_temp(info.filename, info.external_attr, new_data)
|
||||
elif info.filename.endswith("mac_permissions.xml"):
|
||||
print "rewriting %s with new keys." % (info.filename,)
|
||||
new_data = ReplaceCerts(data)
|
||||
output_tf_zip.writestr(out_info, new_data)
|
||||
common.ZipWriteStr(output_tf_zip, out_info, new_data)
|
||||
elif info.filename in ("SYSTEM/recovery-from-boot.p",
|
||||
"SYSTEM/bin/install-recovery.sh"):
|
||||
rebuild_recovery = True
|
||||
@@ -229,7 +229,7 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
|
||||
pass
|
||||
else:
|
||||
# a non-APK file; copy it verbatim
|
||||
output_tf_zip.writestr(out_info, data)
|
||||
common.ZipWriteStr(output_tf_zip, out_info, data)
|
||||
|
||||
if OPTIONS.replace_ota_keys:
|
||||
new_recovery_keys = ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info)
|
||||
@@ -243,7 +243,7 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
|
||||
"boot.img", "boot.img", tmpdir, "BOOT", info_dict=misc_info)
|
||||
|
||||
def output_sink(fn, data):
|
||||
output_tf_zip.writestr("SYSTEM/"+fn, data)
|
||||
common.ZipWriteStr(output_tf_zip, "SYSTEM/" + fn, data)
|
||||
|
||||
common.MakeRecoveryPatch(tmpdir, output_sink, recovery_img, boot_img,
|
||||
info_dict=misc_info)
|
||||
@@ -488,8 +488,8 @@ def main(argv):
|
||||
ProcessTargetFiles(input_zip, output_zip, misc_info,
|
||||
apk_key_map, key_passwords)
|
||||
|
||||
input_zip.close()
|
||||
output_zip.close()
|
||||
common.ZipClose(input_zip)
|
||||
common.ZipClose(output_zip)
|
||||
|
||||
add_img_to_target_files.AddImagesToTargetFiles(args[1])
|
||||
|
||||
|
@@ -29,15 +29,54 @@ def random_string_with_holes(size, block_size, step_size):
|
||||
data[begin:end] = os.urandom(block_size)
|
||||
return "".join(data)
|
||||
|
||||
def get_2gb_string():
|
||||
kilobytes = 1024
|
||||
megabytes = 1024 * kilobytes
|
||||
gigabytes = 1024 * megabytes
|
||||
|
||||
size = int(2 * gigabytes + 1)
|
||||
block_size = 4 * kilobytes
|
||||
step_size = 4 * megabytes
|
||||
two_gb_string = random_string_with_holes(
|
||||
size, block_size, step_size)
|
||||
return two_gb_string
|
||||
|
||||
|
||||
class CommonZipTest(unittest.TestCase):
|
||||
def _verify(self, zip_file, zip_file_name, arcname, contents,
|
||||
test_file_name=None, expected_stat=None, expected_mode=0o644,
|
||||
expected_compress_type=zipfile.ZIP_STORED):
|
||||
# Verify the stat if present.
|
||||
if test_file_name is not None:
|
||||
new_stat = os.stat(test_file_name)
|
||||
self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
|
||||
self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
|
||||
|
||||
# Reopen the zip file to verify.
|
||||
zip_file = zipfile.ZipFile(zip_file_name, "r")
|
||||
|
||||
# Verify the timestamp.
|
||||
info = zip_file.getinfo(arcname)
|
||||
self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
|
||||
|
||||
# Verify the file mode.
|
||||
mode = (info.external_attr >> 16) & 0o777
|
||||
self.assertEqual(mode, expected_mode)
|
||||
|
||||
# Verify the compress type.
|
||||
self.assertEqual(info.compress_type, expected_compress_type)
|
||||
|
||||
# Verify the zip contents.
|
||||
self.assertEqual(zip_file.read(arcname), contents)
|
||||
self.assertIsNone(zip_file.testzip())
|
||||
|
||||
def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
|
||||
extra_zipwrite_args = dict(extra_zipwrite_args or {})
|
||||
|
||||
test_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
zip_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
|
||||
test_file_name = test_file.name
|
||||
|
||||
zip_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
zip_file_name = zip_file.name
|
||||
|
||||
# File names within an archive strip the leading slash.
|
||||
@@ -52,31 +91,100 @@ class CommonZipTest(unittest.TestCase):
|
||||
test_file.write(contents)
|
||||
test_file.close()
|
||||
|
||||
old_stat = os.stat(test_file_name)
|
||||
expected_stat = os.stat(test_file_name)
|
||||
expected_mode = extra_zipwrite_args.get("perms", 0o644)
|
||||
|
||||
expected_compress_type = extra_zipwrite_args.get("compress_type",
|
||||
zipfile.ZIP_STORED)
|
||||
time.sleep(5) # Make sure the atime/mtime will change measurably.
|
||||
|
||||
common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
|
||||
common.ZipClose(zip_file)
|
||||
|
||||
new_stat = os.stat(test_file_name)
|
||||
self.assertEqual(int(old_stat.st_mode), int(new_stat.st_mode))
|
||||
self.assertEqual(int(old_stat.st_mtime), int(new_stat.st_mtime))
|
||||
self.assertIsNone(zip_file.testzip())
|
||||
|
||||
zip_file.close()
|
||||
zip_file = zipfile.ZipFile(zip_file_name, "r")
|
||||
info = zip_file.getinfo(arcname)
|
||||
|
||||
self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
|
||||
mode = (info.external_attr >> 16) & 0o777
|
||||
self.assertEqual(mode, expected_mode)
|
||||
self.assertEqual(zip_file.read(arcname), contents)
|
||||
self.assertIsNone(zip_file.testzip())
|
||||
self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
|
||||
expected_stat, expected_mode, expected_compress_type)
|
||||
finally:
|
||||
os.remove(test_file_name)
|
||||
os.remove(zip_file_name)
|
||||
|
||||
def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
|
||||
extra_args = dict(extra_args or {})
|
||||
|
||||
zip_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
zip_file_name = zip_file.name
|
||||
zip_file.close()
|
||||
|
||||
zip_file = zipfile.ZipFile(zip_file_name, "w")
|
||||
|
||||
try:
|
||||
expected_compress_type = extra_args.get("compress_type",
|
||||
zipfile.ZIP_STORED)
|
||||
time.sleep(5) # Make sure the atime/mtime will change measurably.
|
||||
|
||||
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
|
||||
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
|
||||
else:
|
||||
zinfo = zinfo_or_arcname
|
||||
arcname = zinfo.filename
|
||||
|
||||
common.ZipWriteStr(zip_file, zinfo, contents, **extra_args)
|
||||
common.ZipClose(zip_file)
|
||||
|
||||
self._verify(zip_file, zip_file_name, arcname, contents,
|
||||
expected_compress_type=expected_compress_type)
|
||||
finally:
|
||||
os.remove(zip_file_name)
|
||||
|
||||
def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
|
||||
extra_args = dict(extra_args or {})
|
||||
|
||||
zip_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
zip_file_name = zip_file.name
|
||||
|
||||
test_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
test_file_name = test_file.name
|
||||
|
||||
arcname_large = test_file_name
|
||||
arcname_small = "bar"
|
||||
|
||||
# File names within an archive strip the leading slash.
|
||||
if arcname_large[0] == "/":
|
||||
arcname_large = arcname_large[1:]
|
||||
|
||||
zip_file.close()
|
||||
zip_file = zipfile.ZipFile(zip_file_name, "w")
|
||||
|
||||
try:
|
||||
test_file.write(large)
|
||||
test_file.close()
|
||||
|
||||
expected_stat = os.stat(test_file_name)
|
||||
expected_mode = 0o644
|
||||
expected_compress_type = extra_args.get("compress_type",
|
||||
zipfile.ZIP_STORED)
|
||||
time.sleep(5) # Make sure the atime/mtime will change measurably.
|
||||
|
||||
common.ZipWrite(zip_file, test_file_name, **extra_args)
|
||||
common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
|
||||
common.ZipClose(zip_file)
|
||||
|
||||
# Verify the contents written by ZipWrite().
|
||||
self._verify(zip_file, zip_file_name, arcname_large, large,
|
||||
test_file_name, expected_stat, expected_mode,
|
||||
expected_compress_type)
|
||||
|
||||
# Verify the contents written by ZipWriteStr().
|
||||
self._verify(zip_file, zip_file_name, arcname_small, small,
|
||||
expected_compress_type=expected_compress_type)
|
||||
finally:
|
||||
os.remove(zip_file_name)
|
||||
os.remove(test_file_name)
|
||||
|
||||
def _test_reset_ZIP64_LIMIT(self, func, *args):
|
||||
default_limit = (1 << 31) - 1
|
||||
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
|
||||
func(*args)
|
||||
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
|
||||
|
||||
def test_ZipWrite(self):
|
||||
file_contents = os.urandom(1024)
|
||||
self._test_ZipWrite(file_contents)
|
||||
@@ -88,23 +196,64 @@ class CommonZipTest(unittest.TestCase):
|
||||
"perms": 0o777,
|
||||
"compress_type": zipfile.ZIP_DEFLATED,
|
||||
})
|
||||
self._test_ZipWrite(file_contents, {
|
||||
"arcname": "foobar",
|
||||
"perms": 0o700,
|
||||
"compress_type": zipfile.ZIP_STORED,
|
||||
})
|
||||
|
||||
def test_ZipWrite_large_file(self):
|
||||
kilobytes = 1024
|
||||
megabytes = 1024 * kilobytes
|
||||
gigabytes = 1024 * megabytes
|
||||
|
||||
size = int(2 * gigabytes + 1)
|
||||
block_size = 4 * kilobytes
|
||||
step_size = 4 * megabytes
|
||||
file_contents = random_string_with_holes(
|
||||
size, block_size, step_size)
|
||||
file_contents = get_2gb_string()
|
||||
self._test_ZipWrite(file_contents, {
|
||||
"compress_type": zipfile.ZIP_DEFLATED,
|
||||
})
|
||||
|
||||
def test_ZipWrite_resets_ZIP64_LIMIT(self):
|
||||
default_limit = (1 << 31) - 1
|
||||
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
|
||||
self._test_ZipWrite('')
|
||||
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
|
||||
self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
|
||||
|
||||
def test_ZipWriteStr(self):
|
||||
random_string = os.urandom(1024)
|
||||
# Passing arcname
|
||||
self._test_ZipWriteStr("foo", random_string)
|
||||
|
||||
# Passing zinfo
|
||||
zinfo = zipfile.ZipInfo(filename="foo")
|
||||
self._test_ZipWriteStr(zinfo, random_string)
|
||||
|
||||
# Timestamp in the zinfo should be overwritten.
|
||||
zinfo.date_time = (2015, 3, 1, 15, 30, 0)
|
||||
self._test_ZipWriteStr(zinfo, random_string)
|
||||
|
||||
def test_ZipWriteStr_with_opts(self):
|
||||
random_string = os.urandom(1024)
|
||||
# Passing arcname
|
||||
self._test_ZipWriteStr("foo", random_string, {
|
||||
"compress_type": zipfile.ZIP_DEFLATED,
|
||||
})
|
||||
self._test_ZipWriteStr("foo", random_string, {
|
||||
"compress_type": zipfile.ZIP_STORED,
|
||||
})
|
||||
|
||||
# Passing zinfo
|
||||
zinfo = zipfile.ZipInfo(filename="foo")
|
||||
self._test_ZipWriteStr(zinfo, random_string, {
|
||||
"compress_type": zipfile.ZIP_DEFLATED,
|
||||
})
|
||||
self._test_ZipWriteStr(zinfo, random_string, {
|
||||
"compress_type": zipfile.ZIP_STORED,
|
||||
})
|
||||
|
||||
def test_ZipWriteStr_large_file(self):
|
||||
# zipfile.writestr() doesn't work when the str size is over 2GiB even with
|
||||
# the workaround. We will only test the case of writing a string into a
|
||||
# large archive.
|
||||
long_string = get_2gb_string()
|
||||
short_string = os.urandom(1024)
|
||||
self._test_ZipWriteStr_large_file(long_string, short_string, {
|
||||
"compress_type": zipfile.ZIP_DEFLATED,
|
||||
})
|
||||
|
||||
def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
|
||||
self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
|
||||
zinfo = zipfile.ZipInfo(filename="foo")
|
||||
self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
|
||||
|
Reference in New Issue
Block a user