releasetools: Make common Python 3 compatible.

Bug: 131631303
Test: TreeHugger
Test: `python -m unittest test_common`
Test: `python3 -m unittest test_common`
Change-Id: I409fe30a5d71975c1d7b66e5e749843de530f1f9
This commit is contained in:
Tao Bao
2017-12-01 16:19:46 -08:00
parent 9392fb3905
commit da30cfae96
3 changed files with 38 additions and 23 deletions

View File

@@ -14,6 +14,7 @@
from __future__ import print_function from __future__ import print_function
import base64
import collections import collections
import copy import copy
import errno import errno
@@ -30,7 +31,6 @@ import platform
import re import re
import shlex import shlex
import shutil import shutil
import string
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
@@ -190,6 +190,8 @@ def Run(args, verbose=None, **kwargs):
kwargs: Any additional args to be passed to subprocess.Popen(), such as env, kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
stdin, etc. stdout and stderr will default to subprocess.PIPE and stdin, etc. stdout and stderr will default to subprocess.PIPE and
subprocess.STDOUT respectively unless caller specifies any of them. subprocess.STDOUT respectively unless caller specifies any of them.
universal_newlines will default to True, as most of the users in
releasetools expect string output.
Returns: Returns:
A subprocess.Popen object. A subprocess.Popen object.
@@ -197,6 +199,8 @@ def Run(args, verbose=None, **kwargs):
if 'stdout' not in kwargs and 'stderr' not in kwargs: if 'stdout' not in kwargs and 'stderr' not in kwargs:
kwargs['stdout'] = subprocess.PIPE kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.STDOUT kwargs['stderr'] = subprocess.STDOUT
if 'universal_newlines' not in kwargs:
kwargs['universal_newlines'] = True
# Don't log any if caller explicitly says so. # Don't log any if caller explicitly says so.
if verbose != False: if verbose != False:
logger.info(" Running: \"%s\"", " ".join(args)) logger.info(" Running: \"%s\"", " ".join(args))
@@ -314,7 +318,7 @@ def LoadInfoDict(input_file, repacking=False):
def read_helper(fn): def read_helper(fn):
if isinstance(input_file, zipfile.ZipFile): if isinstance(input_file, zipfile.ZipFile):
return input_file.read(fn) return input_file.read(fn).decode()
else: else:
path = os.path.join(input_file, *fn.split("/")) path = os.path.join(input_file, *fn.split("/"))
try: try:
@@ -526,7 +530,7 @@ def LoadRecoveryFSTab(read_helper, fstab_version, recovery_fstab_path,
# system. Other areas assume system is always at "/system" so point /system # system. Other areas assume system is always at "/system" so point /system
# at /. # at /.
if system_root_image: if system_root_image:
assert not d.has_key("/system") and d.has_key("/") assert '/system' not in d and '/' in d
d["/system"] = d["/"] d["/system"] = d["/"]
return d return d
@@ -953,7 +957,7 @@ def GetSparseImage(which, tmpdir, input_zip, allow_shared_blocks,
# filename listed in system.map may contain an additional leading slash # filename listed in system.map may contain an additional leading slash
# (i.e. "//system/framework/am.jar"). Using lstrip to get consistent # (i.e. "//system/framework/am.jar"). Using lstrip to get consistent
# results. # results.
arcname = string.replace(entry, which, which.upper(), 1).lstrip('/') arcname = entry.replace(which, which.upper(), 1).lstrip('/')
# Special handling another case, where files not under /system # Special handling another case, where files not under /system
# (e.g. "/sbin/charger") are packed under ROOT/ in a target_files.zip. # (e.g. "/sbin/charger") are packed under ROOT/ in a target_files.zip.
@@ -1223,7 +1227,7 @@ def ReadApkCerts(tf_zip):
if basename: if basename:
installed_files.add(basename) installed_files.add(basename)
for line in tf_zip.read("META/apkcerts.txt").split("\n"): for line in tf_zip.read('META/apkcerts.txt').decode().split('\n'):
line = line.strip() line = line.strip()
if not line: if not line:
continue continue
@@ -1433,6 +1437,8 @@ class PasswordManager(object):
if not first: if not first:
print("key file %s still missing some passwords." % (self.pwfile,)) print("key file %s still missing some passwords." % (self.pwfile,))
if sys.version_info[0] >= 3:
raw_input = input # pylint: disable=redefined-builtin
answer = raw_input("try to edit again? [y]> ").strip() answer = raw_input("try to edit again? [y]> ").strip()
if answer and answer[0] not in 'yY': if answer and answer[0] not in 'yY':
raise RuntimeError("key passwords unavailable") raise RuntimeError("key passwords unavailable")
@@ -2185,7 +2191,7 @@ def ParseCertificate(data):
This gives the same result as `openssl x509 -in <filename> -outform DER`. This gives the same result as `openssl x509 -in <filename> -outform DER`.
Returns: Returns:
The decoded certificate string. The decoded certificate bytes.
""" """
cert_buffer = [] cert_buffer = []
save = False save = False
@@ -2196,7 +2202,7 @@ def ParseCertificate(data):
cert_buffer.append(line) cert_buffer.append(line)
if "--BEGIN CERTIFICATE--" in line: if "--BEGIN CERTIFICATE--" in line:
save = True save = True
cert = "".join(cert_buffer).decode('base64') cert = base64.b64decode("".join(cert_buffer))
return cert return cert
@@ -2338,7 +2344,7 @@ fi
logger.info("putting script in %s", sh_location) logger.info("putting script in %s", sh_location)
output_sink(sh_location, sh) output_sink(sh_location, sh.encode())
class DynamicPartitionUpdate(object): class DynamicPartitionUpdate(object):

View File

@@ -579,7 +579,7 @@ class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
def test_ExtractPublicKey(self): def test_ExtractPublicKey(self):
cert = os.path.join(self.testdata_dir, 'testkey.x509.pem') cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
with open(pubkey, 'rb') as pubkey_fp: with open(pubkey) as pubkey_fp:
self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert)) self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert))
def test_ExtractPublicKey_invalidInput(self): def test_ExtractPublicKey_invalidInput(self):
@@ -590,15 +590,16 @@ class CommonApkUtilsTest(test_utils.ReleaseToolsTestCase):
def test_ExtractAvbPublicKey(self): def test_ExtractAvbPublicKey(self):
privkey = os.path.join(self.testdata_dir, 'testkey.key') privkey = os.path.join(self.testdata_dir, 'testkey.key')
pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem')
with open(common.ExtractAvbPublicKey(privkey)) as privkey_fp, \ with open(common.ExtractAvbPublicKey(privkey), 'rb') as privkey_fp, \
open(common.ExtractAvbPublicKey(pubkey)) as pubkey_fp: open(common.ExtractAvbPublicKey(pubkey), 'rb') as pubkey_fp:
self.assertEqual(privkey_fp.read(), pubkey_fp.read()) self.assertEqual(privkey_fp.read(), pubkey_fp.read())
def test_ParseCertificate(self): def test_ParseCertificate(self):
cert = os.path.join(self.testdata_dir, 'testkey.x509.pem') cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER'] cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER']
proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=False)
expected, _ = proc.communicate() expected, _ = proc.communicate()
self.assertEqual(0, proc.returncode) self.assertEqual(0, proc.returncode)
@@ -914,7 +915,7 @@ class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
with zipfile.ZipFile(target_files, 'w') as target_files_zip: with zipfile.ZipFile(target_files, 'w') as target_files_zip:
info_values = ''.join( info_values = ''.join(
['{}={}\n'.format(k, v) for k, v in sorted(info_dict.iteritems())]) ['{}={}\n'.format(k, v) for k, v in sorted(info_dict.items())])
common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values) common.ZipWriteStr(target_files_zip, 'META/misc_info.txt', info_values)
FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults" FSTAB_TEMPLATE = "/dev/block/system {} ext4 ro,barrier=1 defaults"
@@ -1085,7 +1086,7 @@ class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
loc = os.path.join(self._tempdir, prefix, name) loc = os.path.join(self._tempdir, prefix, name)
if not os.path.exists(os.path.dirname(loc)): if not os.path.exists(os.path.dirname(loc)):
os.makedirs(os.path.dirname(loc)) os.makedirs(os.path.dirname(loc))
with open(loc, "w+") as f: with open(loc, "wb") as f:
f.write(data) f.write(data)
def test_full_recovery(self): def test_full_recovery(self):
@@ -1110,7 +1111,7 @@ class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
validate_target_files.ValidateInstallRecoveryScript(self._tempdir, validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
self._info) self._info)
# Validate 'recovery-from-boot' with bonus argument. # Validate 'recovery-from-boot' with bonus argument.
self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM") self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM")
common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
recovery_image, boot_image, self._info) recovery_image, boot_image, self._info)
validate_target_files.ValidateInstallRecoveryScript(self._tempdir, validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
@@ -1118,25 +1119,30 @@ class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
class MockScriptWriter(object): class MockScriptWriter(object):
"""A class that mocks edify_generator.EdifyGenerator. """A class that mocks edify_generator.EdifyGenerator."""
"""
def __init__(self, enable_comments=False): def __init__(self, enable_comments=False):
self.lines = [] self.lines = []
self.enable_comments = enable_comments self.enable_comments = enable_comments
def Comment(self, comment): def Comment(self, comment):
if self.enable_comments: if self.enable_comments:
self.lines.append("# {}".format(comment)) self.lines.append('# {}'.format(comment))
def AppendExtra(self, extra): def AppendExtra(self, extra):
self.lines.append(extra) self.lines.append(extra)
def __str__(self): def __str__(self):
return "\n".join(self.lines) return '\n'.join(self.lines)
class MockBlockDifference(object): class MockBlockDifference(object):
def __init__(self, partition, tgt, src=None): def __init__(self, partition, tgt, src=None):
self.partition = partition self.partition = partition
self.tgt = tgt self.tgt = tgt
self.src = src self.src = src
def WriteScript(self, script, _, progress=None, def WriteScript(self, script, _, progress=None,
write_verify_script=False): write_verify_script=False):
if progress: if progress:
@@ -1144,11 +1150,13 @@ class MockBlockDifference(object):
script.AppendExtra("patch({});".format(self.partition)) script.AppendExtra("patch({});".format(self.partition))
if write_verify_script: if write_verify_script:
self.WritePostInstallVerifyScript(script) self.WritePostInstallVerifyScript(script)
def WritePostInstallVerifyScript(self, script): def WritePostInstallVerifyScript(self, script):
script.AppendExtra("verify({});".format(self.partition)) script.AppendExtra("verify({});".format(self.partition))
class FakeSparseImage(object): class FakeSparseImage(object):
def __init__(self, size): def __init__(self, size):
self.blocksize = 4096 self.blocksize = 4096
self.total_blocks = size // 4096 self.total_blocks = size // 4096
@@ -1156,12 +1164,13 @@ class FakeSparseImage(object):
class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase): class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
@staticmethod @staticmethod
def get_op_list(output_path): def get_op_list(output_path):
with zipfile.ZipFile(output_path) as output_zip: with zipfile.ZipFile(output_path) as output_zip:
with output_zip.open("dynamic_partitions_op_list") as op_list: with output_zip.open('dynamic_partitions_op_list') as op_list:
return [line.strip() for line in op_list.readlines() return [line.decode().strip() for line in op_list.readlines()
if not line.startswith("#")] if not line.startswith(b'#')]
def setUp(self): def setUp(self):
self.script = MockScriptWriter() self.script = MockScriptWriter()

View File

@@ -44,7 +44,7 @@ def _ReadFile(file_name, unpacked_name, round_up=False):
"""Constructs and returns a File object. Rounds up its size if needed.""" """Constructs and returns a File object. Rounds up its size if needed."""
assert os.path.exists(unpacked_name) assert os.path.exists(unpacked_name)
with open(unpacked_name, 'r') as f: with open(unpacked_name, 'rb') as f:
file_data = f.read() file_data = f.read()
file_size = len(file_data) file_size = len(file_data)
if round_up: if round_up: