Merge "Adding flags and logic to sign updateable SEPolicy in APEX" am: 77c1dfa6d9 am: 5a0d81a0be am: 54e08307dc

Original change: https://android-review.googlesource.com/c/platform/build/+/1982226

Change-Id: Ia474f3f805e762a5021868848646c5ea553d9c19
This commit is contained in:
Melisa Carranza Zúñiga
2022-02-23 17:36:01 +00:00
committed by Automerger Merge Worker
4 changed files with 132 additions and 18 deletions

View File

@@ -54,7 +54,7 @@ class ApexSigningError(Exception):
class ApexApkSigner(object): class ApexApkSigner(object):
"""Class to sign the apk files and other files in an apex payload image and repack the apex""" """Class to sign the apk files and other files in an apex payload image and repack the apex"""
def __init__(self, apex_path, key_passwords, codename_to_api_level_map, avbtool=None, sign_tool=None): def __init__(self, apex_path, key_passwords, codename_to_api_level_map, avbtool=None, sign_tool=None, fsverity_tool=None):
self.apex_path = apex_path self.apex_path = apex_path
if not key_passwords: if not key_passwords:
self.key_passwords = dict() self.key_passwords = dict()
@@ -65,8 +65,9 @@ class ApexApkSigner(object):
OPTIONS.search_path, "bin", "debugfs_static") OPTIONS.search_path, "bin", "debugfs_static")
self.avbtool = avbtool if avbtool else "avbtool" self.avbtool = avbtool if avbtool else "avbtool"
self.sign_tool = sign_tool self.sign_tool = sign_tool
self.fsverity_tool = fsverity_tool if fsverity_tool else "fsverity"
def ProcessApexFile(self, apk_keys, payload_key, signing_args=None): def ProcessApexFile(self, apk_keys, payload_key, signing_args=None, is_sepolicy=False, sepolicy_key=None, sepolicy_cert=None):
"""Scans and signs the payload files and repack the apex """Scans and signs the payload files and repack the apex
Args: Args:
@@ -84,10 +85,14 @@ class ApexApkSigner(object):
self.debugfs_path, 'list', self.apex_path] self.debugfs_path, 'list', self.apex_path]
entries_names = common.RunAndCheckOutput(list_cmd).split() entries_names = common.RunAndCheckOutput(list_cmd).split()
apk_entries = [name for name in entries_names if name.endswith('.apk')] apk_entries = [name for name in entries_names if name.endswith('.apk')]
sepolicy_entries = []
if is_sepolicy:
sepolicy_entries = [name for name in entries_names if
name.startswith('./etc/SEPolicy') and name.endswith('.zip')]
# No need to sign and repack, return the original apex path. # No need to sign and repack, return the original apex path.
if not apk_entries and self.sign_tool is None: if not apk_entries and not sepolicy_entries and self.sign_tool is None:
logger.info('No apk file to sign in %s', self.apex_path) logger.info('No payload (apk or zip) file to sign in %s', self.apex_path)
return self.apex_path return self.apex_path
for entry in apk_entries: for entry in apk_entries:
@@ -101,15 +106,16 @@ class ApexApkSigner(object):
logger.warning('Apk path does not contain the intended directory name:' logger.warning('Apk path does not contain the intended directory name:'
' %s', entry) ' %s', entry)
payload_dir, has_signed_content = self.ExtractApexPayloadAndSignContents( payload_dir, has_signed_content = self.ExtractApexPayloadAndSignContents(apk_entries,
apk_entries, apk_keys, payload_key, signing_args) apk_keys, payload_key, sepolicy_entries, sepolicy_key, sepolicy_cert, signing_args)
if not has_signed_content: if not has_signed_content:
logger.info('No contents has been signed in %s', self.apex_path) logger.info('No contents has been signed in %s', self.apex_path)
return self.apex_path return self.apex_path
return self.RepackApexPayload(payload_dir, payload_key, signing_args) return self.RepackApexPayload(payload_dir, payload_key, signing_args)
def ExtractApexPayloadAndSignContents(self, apk_entries, apk_keys, payload_key, signing_args): def ExtractApexPayloadAndSignContents(self, apk_entries, apk_keys, payload_key,
sepolicy_entries, sepolicy_key, sepolicy_cert, signing_args):
"""Extracts the payload image and signs the containing apk files.""" """Extracts the payload image and signs the containing apk files."""
if not os.path.exists(self.debugfs_path): if not os.path.exists(self.debugfs_path):
raise ApexSigningError( raise ApexSigningError(
@@ -141,6 +147,11 @@ class ApexApkSigner(object):
codename_to_api_level_map=self.codename_to_api_level_map) codename_to_api_level_map=self.codename_to_api_level_map)
has_signed_content = True has_signed_content = True
for entry in sepolicy_entries:
sepolicy_key = sepolicy_key if sepolicy_key else payload_key
self.SignSePolicy(payload_dir, entry, sepolicy_key, sepolicy_cert)
has_signed_content = True
if self.sign_tool: if self.sign_tool:
logger.info('Signing payload contents in apex %s with %s', self.apex_path, self.sign_tool) logger.info('Signing payload contents in apex %s with %s', self.apex_path, self.sign_tool)
# Pass avbtool to the custom signing tool # Pass avbtool to the custom signing tool
@@ -154,6 +165,36 @@ class ApexApkSigner(object):
return payload_dir, has_signed_content return payload_dir, has_signed_content
def SignSePolicy(self, payload_dir, sepolicy_zip, sepolicy_key, sepolicy_cert):
sepolicy_sig = sepolicy_zip + '.sig'
sepolicy_fsv_sig = sepolicy_zip + '.fsv_sig'
policy_zip_path = os.path.join(payload_dir, sepolicy_zip)
sig_out_path = os.path.join(payload_dir, sepolicy_sig)
sig_old = sig_out_path + '.old'
if os.path.exists(sig_out_path):
os.rename(sig_out_path, sig_old)
sign_cmd = ['openssl', 'dgst', '-sign', sepolicy_key, '-keyform', 'PEM', '-sha256',
'-out', sig_out_path, '-binary', policy_zip_path]
common.RunAndCheckOutput(sign_cmd)
if os.path.exists(sig_old):
os.remove(sig_old)
if not sepolicy_cert:
logger.info('No cert provided for SEPolicy, skipping fsverity sign')
return
fsv_sig_out_path = os.path.join(payload_dir, sepolicy_fsv_sig)
fsv_sig_old = fsv_sig_out_path + '.old'
if os.path.exists(fsv_sig_out_path):
os.rename(fsv_sig_out_path, fsv_sig_old)
fsverity_cmd = [self.fsverity_tool, 'sign', policy_zip_path, fsv_sig_out_path,
'--key=' + sepolicy_key, '--cert=' + sepolicy_cert]
common.RunAndCheckOutput(fsverity_cmd)
if os.path.exists(fsv_sig_old):
os.remove(fsv_sig_old)
def RepackApexPayload(self, payload_dir, payload_key, signing_args=None): def RepackApexPayload(self, payload_dir, payload_key, signing_args=None):
"""Rebuilds the apex file with the updated payload directory.""" """Rebuilds the apex file with the updated payload directory."""
apex_dir = common.MakeTempDir() apex_dir = common.MakeTempDir()
@@ -324,7 +365,9 @@ def ParseApexPayloadInfo(avbtool, payload_path):
def SignUncompressedApex(avbtool, apex_file, payload_key, container_key, def SignUncompressedApex(avbtool, apex_file, payload_key, container_key,
container_pw, apk_keys, codename_to_api_level_map, container_pw, apk_keys, codename_to_api_level_map,
no_hashtree, signing_args=None, sign_tool=None): no_hashtree, signing_args=None, sign_tool=None,
is_sepolicy=False, sepolicy_key=None, sepolicy_cert=None,
fsverity_tool=None):
"""Signs the current uncompressed APEX with the given payload/container keys. """Signs the current uncompressed APEX with the given payload/container keys.
Args: Args:
@@ -337,6 +380,10 @@ def SignUncompressedApex(avbtool, apex_file, payload_key, container_key,
no_hashtree: Don't include hashtree in the signed APEX. no_hashtree: Don't include hashtree in the signed APEX.
signing_args: Additional args to be passed to the payload signer. signing_args: Additional args to be passed to the payload signer.
sign_tool: A tool to sign the contents of the APEX. sign_tool: A tool to sign the contents of the APEX.
is_sepolicy: Indicates if the apex is a sepolicy.apex
sepolicy_key: Key to sign a sepolicy zip.
sepolicy_cert: Cert to sign a sepolicy zip.
fsverity_tool: fsverity path to sign sepolicy zip.
Returns: Returns:
The path to the signed APEX file. The path to the signed APEX file.
@@ -345,8 +392,9 @@ def SignUncompressedApex(avbtool, apex_file, payload_key, container_key,
# the apex file after signing. # the apex file after signing.
apk_signer = ApexApkSigner(apex_file, container_pw, apk_signer = ApexApkSigner(apex_file, container_pw,
codename_to_api_level_map, codename_to_api_level_map,
avbtool, sign_tool) avbtool, sign_tool, fsverity_tool)
apex_file = apk_signer.ProcessApexFile(apk_keys, payload_key, signing_args) apex_file = apk_signer.ProcessApexFile(
apk_keys, payload_key, signing_args, is_sepolicy, sepolicy_key, sepolicy_cert)
# 2a. Extract and sign the APEX_PAYLOAD_IMAGE entry with the given # 2a. Extract and sign the APEX_PAYLOAD_IMAGE entry with the given
# payload_key. # payload_key.
@@ -400,7 +448,9 @@ def SignUncompressedApex(avbtool, apex_file, payload_key, container_key,
def SignCompressedApex(avbtool, apex_file, payload_key, container_key, def SignCompressedApex(avbtool, apex_file, payload_key, container_key,
container_pw, apk_keys, codename_to_api_level_map, container_pw, apk_keys, codename_to_api_level_map,
no_hashtree, signing_args=None, sign_tool=None): no_hashtree, signing_args=None, sign_tool=None,
is_sepolicy=False, sepolicy_key=None, sepolicy_cert=None,
fsverity_tool=None):
"""Signs the current compressed APEX with the given payload/container keys. """Signs the current compressed APEX with the given payload/container keys.
Args: Args:
@@ -412,6 +462,10 @@ def SignCompressedApex(avbtool, apex_file, payload_key, container_key,
codename_to_api_level_map: A dict that maps from codename to API level. codename_to_api_level_map: A dict that maps from codename to API level.
no_hashtree: Don't include hashtree in the signed APEX. no_hashtree: Don't include hashtree in the signed APEX.
signing_args: Additional args to be passed to the payload signer. signing_args: Additional args to be passed to the payload signer.
is_sepolicy: Indicates if the apex is a sepolicy.apex
sepolicy_key: Key to sign a sepolicy zip.
sepolicy_cert: Cert to sign a sepolicy zip.
fsverity_tool: fsverity path to sign sepolicy zip.
Returns: Returns:
The path to the signed APEX file. The path to the signed APEX file.
@@ -438,7 +492,11 @@ def SignCompressedApex(avbtool, apex_file, payload_key, container_key,
codename_to_api_level_map, codename_to_api_level_map,
no_hashtree, no_hashtree,
signing_args, signing_args,
sign_tool) sign_tool,
is_sepolicy,
sepolicy_key,
sepolicy_cert,
fsverity_tool)
# 3. Compress signed original apex. # 3. Compress signed original apex.
compressed_apex_file = common.MakeTempFile(prefix='apex-container-', compressed_apex_file = common.MakeTempFile(prefix='apex-container-',
@@ -466,7 +524,8 @@ def SignCompressedApex(avbtool, apex_file, payload_key, container_key,
def SignApex(avbtool, apex_data, payload_key, container_key, container_pw, def SignApex(avbtool, apex_data, payload_key, container_key, container_pw,
apk_keys, codename_to_api_level_map, apk_keys, codename_to_api_level_map,
no_hashtree, signing_args=None, sign_tool=None): no_hashtree, signing_args=None, sign_tool=None,
is_sepolicy=False, sepolicy_key=None, sepolicy_cert=None, fsverity_tool=None):
"""Signs the current APEX with the given payload/container keys. """Signs the current APEX with the given payload/container keys.
Args: Args:
@@ -478,6 +537,9 @@ def SignApex(avbtool, apex_data, payload_key, container_key, container_pw,
codename_to_api_level_map: A dict that maps from codename to API level. codename_to_api_level_map: A dict that maps from codename to API level.
no_hashtree: Don't include hashtree in the signed APEX. no_hashtree: Don't include hashtree in the signed APEX.
signing_args: Additional args to be passed to the payload signer. signing_args: Additional args to be passed to the payload signer.
sepolicy_key: Key to sign a sepolicy zip.
sepolicy_cert: Cert to sign a sepolicy zip.
fsverity_tool: fsverity path to sign sepolicy zip.
Returns: Returns:
The path to the signed APEX file. The path to the signed APEX file.
@@ -503,7 +565,11 @@ def SignApex(avbtool, apex_data, payload_key, container_key, container_pw,
no_hashtree=no_hashtree, no_hashtree=no_hashtree,
apk_keys=apk_keys, apk_keys=apk_keys,
signing_args=signing_args, signing_args=signing_args,
sign_tool=sign_tool) sign_tool=sign_tool,
is_sepolicy=is_sepolicy,
sepolicy_key=sepolicy_key,
sepolicy_cert=sepolicy_cert,
fsverity_tool=fsverity_tool)
elif apex_type == 'COMPRESSED': elif apex_type == 'COMPRESSED':
return SignCompressedApex( return SignCompressedApex(
avbtool, avbtool,
@@ -515,7 +581,11 @@ def SignApex(avbtool, apex_data, payload_key, container_key, container_pw,
no_hashtree=no_hashtree, no_hashtree=no_hashtree,
apk_keys=apk_keys, apk_keys=apk_keys,
signing_args=signing_args, signing_args=signing_args,
sign_tool=sign_tool) sign_tool=sign_tool,
is_sepolicy=is_sepolicy,
sepolicy_key=sepolicy_key,
sepolicy_cert=sepolicy_cert,
fsverity_tool=fsverity_tool)
else: else:
# TODO(b/172912232): support signing compressed apex # TODO(b/172912232): support signing compressed apex
raise ApexInfoError('Unsupported apex type {}'.format(apex_type)) raise ApexInfoError('Unsupported apex type {}'.format(apex_type))

View File

@@ -42,6 +42,15 @@ Usage: sign_apex [flags] input_apex_file output_apex_file
--sign_tool <sign_tool> --sign_tool <sign_tool>
Optional flag that specifies a custom signing tool for the contents of the apex. Optional flag that specifies a custom signing tool for the contents of the apex.
--sepolicy_key <key>
Optional flag that specifies the sepolicy signing key, defaults to payload_key.
--sepolicy_cert <cert>
Optional flag that specifies the sepolicy signing cert.
--fsverity_tool <path>
Optional flag that specifies the path to fsverity tool to sign SEPolicy, defaults to fsverity.
""" """
import logging import logging
@@ -55,7 +64,8 @@ logger = logging.getLogger(__name__)
def SignApexFile(avbtool, apex_file, payload_key, container_key, no_hashtree, def SignApexFile(avbtool, apex_file, payload_key, container_key, no_hashtree,
apk_keys=None, signing_args=None, codename_to_api_level_map=None, sign_tool=None): apk_keys=None, signing_args=None, codename_to_api_level_map=None, sign_tool=None,
sepolicy_key=None, sepolicy_cert=None, fsverity_tool=None):
"""Signs the given apex file.""" """Signs the given apex file."""
with open(apex_file, 'rb') as input_fp: with open(apex_file, 'rb') as input_fp:
apex_data = input_fp.read() apex_data = input_fp.read()
@@ -70,7 +80,11 @@ def SignApexFile(avbtool, apex_file, payload_key, container_key, no_hashtree,
no_hashtree=no_hashtree, no_hashtree=no_hashtree,
apk_keys=apk_keys, apk_keys=apk_keys,
signing_args=signing_args, signing_args=signing_args,
sign_tool=sign_tool) sign_tool=sign_tool,
is_sepolicy=apex_file.endswith("sepolicy.apex"),
sepolicy_key=sepolicy_key,
sepolicy_cert=sepolicy_cert,
fsverity_tool=fsverity_tool)
def main(argv): def main(argv):
@@ -106,6 +120,12 @@ def main(argv):
options['extra_apks'].update({n: key}) options['extra_apks'].update({n: key})
elif o == '--sign_tool': elif o == '--sign_tool':
options['sign_tool'] = a options['sign_tool'] = a
elif o == '--sepolicy_key':
options['sepolicy_key'] = a
elif o == '--sepolicy_cert':
options['sepolicy_cert'] = a
elif o == '--fsverity_tool':
options['fsverity_tool'] = a
else: else:
return False return False
return True return True
@@ -121,6 +141,9 @@ def main(argv):
'payload_key=', 'payload_key=',
'extra_apks=', 'extra_apks=',
'sign_tool=', 'sign_tool=',
'sepolicy_key=',
'sepolicy_cert=',
'fsverity_tool='
], ],
extra_option_handler=option_handler) extra_option_handler=option_handler)
@@ -141,7 +164,10 @@ def main(argv):
signing_args=options.get('payload_extra_args'), signing_args=options.get('payload_extra_args'),
codename_to_api_level_map=options.get( codename_to_api_level_map=options.get(
'codename_to_api_level_map', {}), 'codename_to_api_level_map', {}),
sign_tool=options.get('sign_tool', None)) sign_tool=options.get('sign_tool', None),
sepolicy_key=options.get('sepolicy_key', None),
sepolicy_cert=options.get('sepolicy_cert', None),
fsverity_tool=options.get('fsverity_tool', None))
shutil.copyfile(signed_apex, args[1]) shutil.copyfile(signed_apex, args[1])
logger.info("done.") logger.info("done.")

View File

@@ -71,3 +71,21 @@ class SignApexTest(test_utils.ReleaseToolsTestCase):
False, False,
codename_to_api_level_map={'S': 31, 'Tiramisu' : 32}) codename_to_api_level_map={'S': 31, 'Tiramisu' : 32})
self.assertTrue(os.path.exists(signed_apex)) self.assertTrue(os.path.exists(signed_apex))
@test_utils.SkipIfExternalToolsUnavailable()
def test_SignApexWithSepolicy(self):
test_apex = os.path.join(self.testdata_dir, 'sepolicy.apex')
payload_key = os.path.join(self.testdata_dir, 'testkey_RSA4096.key')
container_key = os.path.join(self.testdata_dir, 'testkey')
sepolicy_key = os.path.join(self.testdata_dir, 'testkey_RSA4096.key')
sepolicy_cert = os.path.join(self.testdata_dir, 'testkey.x509.pem')
signed_test_apex = sign_apex.SignApexFile(
'avbtool',
test_apex,
payload_key,
container_key,
False,
None,
sepolicy_key=sepolicy_key,
sepolicy_cert=sepolicy_cert)
self.assertTrue(os.path.exists(signed_test_apex))

Binary file not shown.