Merge "releasetools: Clean up additional modules for Python 3 compatibility."

This commit is contained in:
Tao Bao
2019-06-26 18:01:57 +00:00
committed by Gerrit Code Review
2 changed files with 37 additions and 48 deletions

View File

@@ -38,8 +38,7 @@ def CertUsesSha256(cert):
"""Check if the cert uses SHA-256 hashing algorithm.""" """Check if the cert uses SHA-256 hashing algorithm."""
cmd = ['openssl', 'x509', '-text', '-noout', '-in', cert] cmd = ['openssl', 'x509', '-text', '-noout', '-in', cert]
p1 = common.Run(cmd, stdout=subprocess.PIPE) cert_dump = common.RunAndCheckOutput(cmd, stdout=subprocess.PIPE)
cert_dump, _ = p1.communicate()
algorithm = re.search(r'Signature Algorithm: ([a-zA-Z0-9]+)', cert_dump) algorithm = re.search(r'Signature Algorithm: ([a-zA-Z0-9]+)', cert_dump)
assert algorithm, "Failed to identify the signature algorithm." assert algorithm, "Failed to identify the signature algorithm."
@@ -69,13 +68,13 @@ def VerifyPackage(cert, package):
print('Certificate: %s' % (cert,)) print('Certificate: %s' % (cert,))
# Read in the package. # Read in the package.
with open(package) as package_file: with open(package, 'rb') as package_file:
package_bytes = package_file.read() package_bytes = package_file.read()
length = len(package_bytes) length = len(package_bytes)
assert length >= 6, "Not big enough to contain footer." assert length >= 6, "Not big enough to contain footer."
footer = [ord(x) for x in package_bytes[-6:]] footer = bytearray(package_bytes[-6:])
assert footer[2] == 0xff and footer[3] == 0xff, "Footer is wrong." assert footer[2] == 0xff and footer[3] == 0xff, "Footer is wrong."
signature_start_from_end = (footer[1] << 8) + footer[0] signature_start_from_end = (footer[1] << 8) + footer[0]
@@ -111,31 +110,25 @@ def VerifyPackage(cert, package):
# Parse the signature and get the hash. # Parse the signature and get the hash.
cmd = ['openssl', 'asn1parse', '-inform', 'DER', '-in', sig_file] cmd = ['openssl', 'asn1parse', '-inform', 'DER', '-in', sig_file]
p1 = common.Run(cmd, stdout=subprocess.PIPE) sig = common.RunAndCheckOutput(cmd, stdout=subprocess.PIPE)
sig, _ = p1.communicate()
assert p1.returncode == 0, "Failed to parse the signature."
digest_line = sig.strip().split('\n')[-1] digest_line = sig.rstrip().split('\n')[-1]
digest_string = digest_line.split(':')[3] digest_string = digest_line.split(':')[3]
digest_file = common.MakeTempFile(prefix='digest-') digest_file = common.MakeTempFile(prefix='digest-')
with open(digest_file, 'wb') as f: with open(digest_file, 'wb') as f:
f.write(digest_string.decode('hex')) f.write(bytearray.fromhex(digest_string))
# Verify the digest by outputing the decrypted result in ASN.1 structure. # Verify the digest by outputing the decrypted result in ASN.1 structure.
decrypted_file = common.MakeTempFile(prefix='decrypted-') decrypted_file = common.MakeTempFile(prefix='decrypted-')
cmd = ['openssl', 'rsautl', '-verify', '-certin', '-inkey', cert, cmd = ['openssl', 'rsautl', '-verify', '-certin', '-inkey', cert,
'-in', digest_file, '-out', decrypted_file] '-in', digest_file, '-out', decrypted_file]
p1 = common.Run(cmd, stdout=subprocess.PIPE) common.RunAndCheckOutput(cmd, stdout=subprocess.PIPE)
p1.communicate()
assert p1.returncode == 0, "Failed to run openssl rsautl -verify."
# Parse the output ASN.1 structure. # Parse the output ASN.1 structure.
cmd = ['openssl', 'asn1parse', '-inform', 'DER', '-in', decrypted_file] cmd = ['openssl', 'asn1parse', '-inform', 'DER', '-in', decrypted_file]
p1 = common.Run(cmd, stdout=subprocess.PIPE) decrypted_output = common.RunAndCheckOutput(cmd, stdout=subprocess.PIPE)
decrypted_output, _ = p1.communicate()
assert p1.returncode == 0, "Failed to parse the output."
digest_line = decrypted_output.strip().split('\n')[-1] digest_line = decrypted_output.rstrip().split('\n')[-1]
digest_string = digest_line.split(':')[3].lower() digest_string = digest_line.split(':')[3].lower()
# Verify that the two digest strings match. # Verify that the two digest strings match.
@@ -156,7 +149,7 @@ def VerifyAbOtaPayload(cert, package):
# Dump pubkey from the certificate. # Dump pubkey from the certificate.
pubkey = common.MakeTempFile(prefix="key-", suffix=".pem") pubkey = common.MakeTempFile(prefix="key-", suffix=".pem")
with open(pubkey, 'wb') as pubkey_fp: with open(pubkey, 'w') as pubkey_fp:
pubkey_fp.write(common.ExtractPublicKey(cert)) pubkey_fp.write(common.ExtractPublicKey(cert))
package_dir = common.MakeTempDir(prefix='package-') package_dir = common.MakeTempDir(prefix='package-')
@@ -166,11 +159,7 @@ def VerifyAbOtaPayload(cert, package):
cmd = ['delta_generator', cmd = ['delta_generator',
'--in_file=' + payload_file, '--in_file=' + payload_file,
'--public_key=' + pubkey] '--public_key=' + pubkey]
proc = common.Run(cmd) common.RunAndCheckOutput(cmd)
stdoutdata, _ = proc.communicate()
assert proc.returncode == 0, \
'Failed to verify payload with delta_generator: {}\n{}'.format(
package, stdoutdata)
common.ZipClose(package_zip) common.ZipClose(package_zip)
# Verified successfully upon reaching here. # Verified successfully upon reaching here.

View File

@@ -38,8 +38,8 @@ Common options that apply to both of non-A/B and A/B OTAs
-k (--package_key) <key> -k (--package_key) <key>
Key to use to sign the package (default is the value of Key to use to sign the package (default is the value of
default_system_dev_certificate from the input target-files's default_system_dev_certificate from the input target-files's
META/misc_info.txt, or "build/make/target/product/security/testkey" if that META/misc_info.txt, or "build/make/target/product/security/testkey" if
value is not specified). that value is not specified).
For incremental OTAs, the default value is based on the source For incremental OTAs, the default value is based on the source
target-file, not the target build. target-file, not the target build.
@@ -384,10 +384,10 @@ class BuildInfo(object):
"Invalid ro.product.property_source_order '{}'".format(source_order)) "Invalid ro.product.property_source_order '{}'".format(source_order))
for source in source_order: for source in source_order:
source_prop = prop.replace("ro.product", "ro.product.{}".format(source), source_prop = prop.replace(
1) "ro.product", "ro.product.{}".format(source), 1)
prop_val = self.info_dict.get("{}.build.prop".format(source), {}).get( prop_val = self.info_dict.get(
source_prop) "{}.build.prop".format(source), {}).get(source_prop)
if prop_val: if prop_val:
return prop_val return prop_val
@@ -508,7 +508,7 @@ class PayloadSigner(object):
MODULUS_PREFIX = "Modulus=" MODULUS_PREFIX = "Modulus="
assert modulus_string.startswith(MODULUS_PREFIX) assert modulus_string.startswith(MODULUS_PREFIX)
modulus_string = modulus_string[len(MODULUS_PREFIX):] modulus_string = modulus_string[len(MODULUS_PREFIX):]
key_size = len(modulus_string) / 2 key_size = len(modulus_string) // 2
assert key_size == 256 or key_size == 512, \ assert key_size == 256 or key_size == 512, \
"Unsupported key size {}".format(key_size) "Unsupported key size {}".format(key_size)
return key_size return key_size
@@ -1051,7 +1051,7 @@ def WriteMetadata(metadata, output):
output: A ZipFile object or a string of the output file path. output: A ZipFile object or a string of the output file path.
""" """
value = "".join(["%s=%s\n" % kv for kv in sorted(metadata.iteritems())]) value = "".join(["%s=%s\n" % kv for kv in sorted(metadata.items())])
if isinstance(output, zipfile.ZipFile): if isinstance(output, zipfile.ZipFile):
common.ZipWriteStr(output, METADATA_NAME, value, common.ZipWriteStr(output, METADATA_NAME, value,
compress_type=zipfile.ZIP_STORED) compress_type=zipfile.ZIP_STORED)
@@ -1067,7 +1067,7 @@ def HandleDowngradeMetadata(metadata, target_info, source_info):
post_timestamp = target_info.GetBuildProp("ro.build.date.utc") post_timestamp = target_info.GetBuildProp("ro.build.date.utc")
pre_timestamp = source_info.GetBuildProp("ro.build.date.utc") pre_timestamp = source_info.GetBuildProp("ro.build.date.utc")
is_downgrade = long(post_timestamp) < long(pre_timestamp) is_downgrade = int(post_timestamp) < int(pre_timestamp)
if OPTIONS.downgrade: if OPTIONS.downgrade:
if not is_downgrade: if not is_downgrade:
@@ -1392,7 +1392,7 @@ class AbOtaPropertyFiles(StreamingPropertyFiles):
payload_offset += len(payload_info.extra) + len(payload_info.filename) payload_offset += len(payload_info.extra) + len(payload_info.filename)
payload_size = payload_info.file_size payload_size = payload_info.file_size
with input_zip.open('payload.bin', 'r') as payload_fp: with input_zip.open('payload.bin') as payload_fp:
header_bin = payload_fp.read(24) header_bin = payload_fp.read(24)
# network byte order (big-endian) # network byte order (big-endian)
@@ -1864,7 +1864,6 @@ def GetTargetFilesZipForSecondaryImages(input_file, skip_postinstall=False):
with zipfile.ZipFile(input_file, 'r') as input_zip: with zipfile.ZipFile(input_file, 'r') as input_zip:
infolist = input_zip.infolist() infolist = input_zip.infolist()
namelist = input_zip.namelist()
input_tmp = common.UnzipTemp(input_file, UNZIP_PATTERN) input_tmp = common.UnzipTemp(input_file, UNZIP_PATTERN)
for info in infolist: for info in infolist:
@@ -1976,7 +1975,7 @@ def GetTargetFilesZipForRetrofitDynamicPartitions(input_file,
to_delete += [DYNAMIC_PARTITION_INFO] to_delete += [DYNAMIC_PARTITION_INFO]
# Remove the existing partition images as well as the map files. # Remove the existing partition images as well as the map files.
to_delete += replace.values() to_delete += list(replace.values())
to_delete += ['IMAGES/{}.map'.format(dev) for dev in super_block_devices] to_delete += ['IMAGES/{}.map'.format(dev) for dev in super_block_devices]
common.ZipDelete(target_file, to_delete) common.ZipDelete(target_file, to_delete)
@@ -2291,7 +2290,8 @@ def main(argv):
OPTIONS.cache_size = cache_size OPTIONS.cache_size = cache_size
if OPTIONS.extra_script is not None: if OPTIONS.extra_script is not None:
OPTIONS.extra_script = open(OPTIONS.extra_script).read() with open(OPTIONS.extra_script) as fp:
OPTIONS.extra_script = fp.read()
if OPTIONS.extracted_input is not None: if OPTIONS.extracted_input is not None:
OPTIONS.input_tmp = OPTIONS.extracted_input OPTIONS.input_tmp = OPTIONS.extracted_input