Create a function that can generate ApexInfo using target-files

If an OTA contains compressed APEX inside it, then the device will need
to allocate space on /data partition for their decompression. In order
to calculate how much space the OTA process needs to allocate, the
process needs more information about the APEX contained inside the OTA.

In this CL, we are adding functionality to the OTA generation script
that allows us to gather information about the APEX stored inside the
target-file zip. However, we did not integrate the new functionality
with the ota_from_target_files.py scrip yet. That will be done on follow
up CL.

Bug: 172911822
Test: atest releasetools_py3_test
Change-Id: I2ac42018f628c2c21527b3e086be1f4e7e7247ad
This commit is contained in:
Mohammad Samiul Islam
2021-01-06 13:33:25 +00:00
committed by Kelvin Zhang
parent 3b455ea92e
commit 9fd5886e23
5 changed files with 122 additions and 7 deletions

View File

@@ -122,13 +122,15 @@ python_defaults {
"releasetools_check_target_files_vintf",
"releasetools_common",
"releasetools_verity_utils",
"apex_manifest",
],
required: [
"brillo_update_payload",
"checkvintf",
"lz4",
"toybox",
"unpack_bootimg"
"unpack_bootimg",
"deapexer",
],
target: {
darwin: {
@@ -169,6 +171,8 @@ python_library_host {
"apex_utils.py",
],
libs: [
"apex_manifest",
"ota_metadata_proto",
"releasetools_common",
],
}
@@ -544,6 +548,8 @@ python_defaults {
],
data: [
"testdata/**/*",
":com.android.apex.compressed.v1",
":com.android.apex.compressed.v1_original",
],
target: {
darwin: {
@@ -551,6 +557,9 @@ python_defaults {
enabled: false,
},
},
required: [
"deapexer",
],
}
python_test_host {

View File

@@ -21,7 +21,12 @@ import shlex
import shutil
import zipfile
import apex_manifest
import common
from common import UnzipTemp, RunAndCheckOutput, MakeTempFile, OPTIONS
import ota_metadata_pb2
logger = logging.getLogger(__name__)
@@ -69,7 +74,7 @@ class ApexApkSigner(object):
if not os.path.exists(self.debugfs_path):
raise ApexSigningError(
"Couldn't find location of debugfs_static: " +
"Path {} does not exist. ".format(debugfs_path) +
"Path {} does not exist. ".format(self.debugfs_path) +
"Make sure bin/debugfs_static can be found in -p <path>")
list_cmd = ['deapexer', '--debugfs_path',
self.debugfs_path, 'list', self.apex_path]
@@ -105,7 +110,7 @@ class ApexApkSigner(object):
if not os.path.exists(self.debugfs_path):
raise ApexSigningError(
"Couldn't find location of debugfs_static: " +
"Path {} does not exist. ".format(debugfs_path) +
"Path {} does not exist. ".format(self.debugfs_path) +
"Make sure bin/debugfs_static can be found in -p <path>")
payload_dir = common.MakeTempDir()
extract_cmd = ['deapexer', '--debugfs_path',
@@ -127,7 +132,8 @@ class ApexApkSigner(object):
# signed apk file.
unsigned_apk = common.MakeTempFile()
os.rename(apk_path, unsigned_apk)
common.SignFile(unsigned_apk, apk_path, key_name, self.key_passwords.get(key_name),
common.SignFile(
unsigned_apk, apk_path, key_name, self.key_passwords.get(key_name),
codename_to_api_level_map=self.codename_to_api_level_map)
has_signed_apk = True
return payload_dir, has_signed_apk
@@ -427,4 +433,71 @@ def SignApex(avbtool, apex_data, payload_key, container_key, container_pw,
except common.ExternalError as e:
raise ApexInfoError(
'Failed to get type for {}:\n{}'.format(apex_file))
'Failed to get type for {}:\n{}'.format(apex_file, e))
def GetApexInfoFromTargetFiles(input_file):
"""
Get information about system APEX stored in the input_file zip
Args:
input_file: The filename of the target build target-files zip or directory.
Return:
A list of ota_metadata_pb2.ApexInfo() populated using the APEX stored in
/system partition of the input_file
"""
# Extract the apex files so that we can run checks on them
if not isinstance(input_file, str):
raise RuntimeError("must pass filepath to target-files zip or directory")
if os.path.isdir(input_file):
tmp_dir = input_file
else:
tmp_dir = UnzipTemp(input_file, ["SYSTEM/apex/*"])
target_dir = os.path.join(tmp_dir, "SYSTEM/apex/")
apex_infos = []
for apex_filename in os.listdir(target_dir):
apex_filepath = os.path.join(target_dir, apex_filename)
if not os.path.isfile(apex_filepath) or \
not zipfile.is_zipfile(apex_filepath):
logger.info("Skipping %s because it's not a zipfile", apex_filepath)
continue
apex_info = ota_metadata_pb2.ApexInfo()
# Open the apex file to retrieve information
manifest = apex_manifest.fromApex(apex_filepath)
apex_info.package_name = manifest.name
apex_info.version = manifest.version
# Check if the file is compressed or not
debugfs_path = "debugfs"
if OPTIONS.search_path:
debugfs_path = os.path.join(OPTIONS.search_path, "bin", "debugfs_static")
deapexer = 'deapexer'
if OPTIONS.search_path:
deapexer_path = os.path.join(OPTIONS.search_path, "deapexer")
if os.path.isfile(deapexer_path):
deapexer = deapexer_path
apex_type = RunAndCheckOutput([
deapexer, "--debugfs_path", debugfs_path,
'info', '--print-type', apex_filepath]).rstrip()
if apex_type == 'COMPRESSED':
apex_info.is_compressed = True
elif apex_type == 'UNCOMPRESSED':
apex_info.is_compressed = False
else:
raise RuntimeError('Not an APEX file: ' + apex_type)
# Decompress compressed APEX to determine its size
if apex_info.is_compressed:
decompressed_file_path = MakeTempFile(prefix="decompressed-",
suffix=".apex")
# Decompression target path should not exist
os.remove(decompressed_file_path)
RunAndCheckOutput([deapexer, 'decompress', '--input', apex_filepath,
'--output', decompressed_file_path])
apex_info.decompressed_size = os.path.getsize(decompressed_file_path)
apex_infos.append(apex_info)
return apex_infos

View File

@@ -65,6 +65,13 @@ message DeviceState {
repeated PartitionState partition_state = 7;
}
message ApexInfo {
string package_name = 1;
int64 version = 2;
bool is_compressed = 3;
int64 decompressed_size = 4;
}
// The metadata of an OTA package. It contains the information of the package
// and prerequisite to install the update correctly.
message OtaMetadata {

View File

@@ -33,10 +33,11 @@ from ota_from_target_files import (
GetTargetFilesZipWithoutPostinstallConfig,
Payload, PayloadSigner, POSTINSTALL_CONFIG,
StreamingPropertyFiles, AB_PARTITIONS)
from apex_utils import GetApexInfoFromTargetFiles
from test_utils import PropertyFilesTestCase
def construct_target_files(secondary=False):
def construct_target_files(secondary=False, compressedApex=False):
"""Returns a target-files.zip file for generating OTA packages."""
target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
with zipfile.ZipFile(target_files, 'w', allowZip64=True) as target_files_zip:
@@ -78,6 +79,11 @@ def construct_target_files(secondary=False):
target_files_zip.writestr('IMAGES/system_other.img',
os.urandom(len("system_other")))
if compressedApex:
apex_file_name = 'com.android.apex.compressed.v1.capex'
apex_file = os.path.join(test_utils.get_current_dir(), apex_file_name)
target_files_zip.write(apex_file, 'SYSTEM/apex/' + apex_file_name)
return target_files
@@ -274,6 +280,21 @@ class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase):
},
metadata)
@test_utils.SkipIfExternalToolsUnavailable()
def test_GetApexInfoFromTargetFiles(self):
target_files = construct_target_files(compressedApex=True)
apex_infos = GetApexInfoFromTargetFiles(target_files)
self.assertEqual(len(apex_infos), 1)
self.assertEqual(apex_infos[0].package_name, "com.android.apex.compressed")
self.assertEqual(apex_infos[0].version, 1)
self.assertEqual(apex_infos[0].is_compressed, True)
# Compare the decompressed APEX size with the original uncompressed APEX
original_apex_name = 'com.android.apex.compressed.v1_original.apex'
original_apex_filepath = os.path.join(test_utils.get_current_dir(), original_apex_name)
uncompressed_apex_size = os.path.getsize(original_apex_filepath)
self.assertEqual(apex_infos[0].decompressed_size, uncompressed_apex_size)
def test_GetPackageMetadata_retrofitDynamicPartitions(self):
target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
common.OPTIONS.retrofit_dynamic_partitions = True

View File

@@ -55,6 +55,11 @@ def get_testdata_dir():
current_dir = os.path.dirname(os.path.realpath(__file__))
return os.path.join(current_dir, 'testdata')
def get_current_dir():
"""Returns the current dir, relative to the script dir."""
# The script dir is the one we want, which could be different from pwd.
current_dir = os.path.dirname(os.path.realpath(__file__))
return current_dir
def get_search_path():
"""Returns the search path that has 'framework/signapk.jar' under."""