Move non-AB OTA generation code to a separate file

Test: Generate a non-AB OTA, apply it
Change-Id: I2f1afbe70d17356fcbf4d59901d201a76a3d6c4f
This commit is contained in:
Kelvin Zhang
2020-07-29 16:37:51 -04:00
parent 3443517fcb
commit cff4d7606d
9 changed files with 1384 additions and 1280 deletions

View File

@@ -93,7 +93,9 @@ python_defaults {
srcs: [
"edify_generator.py",
"ota_from_target_files.py",
"non_ab_ota.py",
"target_files_diff.py",
"ota_utils.py",
],
libs: [
"releasetools_check_target_files_vintf",

View File

@@ -220,6 +220,52 @@ def CheckVintf(inp, info_dict=None):
raise ValueError('{} is not a valid directory or zip file'.format(inp))
def CheckVintfIfTrebleEnabled(target_files, target_info):
"""Checks compatibility info of the input target files.
Metadata used for compatibility verification is retrieved from target_zip.
Compatibility should only be checked for devices that have enabled
Treble support.
Args:
target_files: Path to zip file containing the source files to be included
for OTA. Can also be the path to extracted directory.
target_info: The BuildInfo instance that holds the target build info.
"""
# Will only proceed if the target has enabled the Treble support (as well as
# having a /vendor partition).
if not HasTrebleEnabled(target_files, target_info):
return
# Skip adding the compatibility package as a workaround for b/114240221. The
# compatibility will always fail on devices without qualified kernels.
if OPTIONS.skip_compatibility_check:
return
if not CheckVintf(target_files, target_info):
raise RuntimeError("VINTF compatibility check failed")
def HasTrebleEnabled(target_files, target_info):
def HasVendorPartition(target_files):
if os.path.isdir(target_files):
return os.path.isdir(os.path.join(target_files, "VENDOR"))
if zipfile.is_zipfile(target_files):
return HasPartition(zipfile.ZipFile(target_files), "vendor")
raise ValueError("Unknown target_files argument")
return (HasVendorPartition(target_files) and
target_info.GetBuildProp("ro.treble.enabled") == "true")
def HasPartition(target_files_zip, partition):
try:
target_files_zip.getinfo(partition.upper() + "/")
return True
except KeyError:
return False
def main(argv):
args = common.ParseOptions(argv, __doc__)

View File

@@ -1227,7 +1227,7 @@ def _MakeRamdisk(sourcedir, fs_config_file=None, lz4_ramdisks=False):
cmd = ["mkbootfs", os.path.join(sourcedir, "RAMDISK")]
p1 = Run(cmd, stdout=subprocess.PIPE)
if lz4_ramdisks:
p2 = Run(["lz4", "-l", "-12" , "--favor-decSpeed"], stdin=p1.stdout,
p2 = Run(["lz4", "-l", "-12", "--favor-decSpeed"], stdin=p1.stdout,
stdout=ramdisk_img.file.fileno())
else:
p2 = Run(["minigzip"], stdin=p1.stdout, stdout=ramdisk_img.file.fileno())

View File

@@ -0,0 +1,684 @@
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import logging
import os
import zipfile
import common
import edify_generator
import verity_utils
from check_target_files_vintf import CheckVintfIfTrebleEnabled, HasPartition
from common import OPTIONS
from ota_utils import UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata, PropertyFiles
logger = logging.getLogger(__name__)
def GetBlockDifferences(target_zip, source_zip, target_info, source_info,
device_specific):
"""Returns a ordered dict of block differences with partition name as key."""
def GetIncrementalBlockDifferenceForPartition(name):
if not HasPartition(source_zip, name):
raise RuntimeError(
"can't generate incremental that adds {}".format(name))
partition_src = common.GetUserImage(name, OPTIONS.source_tmp, source_zip,
info_dict=source_info,
allow_shared_blocks=allow_shared_blocks)
hashtree_info_generator = verity_utils.CreateHashtreeInfoGenerator(
name, 4096, target_info)
partition_tgt = common.GetUserImage(name, OPTIONS.target_tmp, target_zip,
info_dict=target_info,
allow_shared_blocks=allow_shared_blocks,
hashtree_info_generator=hashtree_info_generator)
# Check the first block of the source system partition for remount R/W only
# if the filesystem is ext4.
partition_source_info = source_info["fstab"]["/" + name]
check_first_block = partition_source_info.fs_type == "ext4"
# Disable using imgdiff for squashfs. 'imgdiff -z' expects input files to be
# in zip formats. However with squashfs, a) all files are compressed in LZ4;
# b) the blocks listed in block map may not contain all the bytes for a
# given file (because they're rounded to be 4K-aligned).
partition_target_info = target_info["fstab"]["/" + name]
disable_imgdiff = (partition_source_info.fs_type == "squashfs" or
partition_target_info.fs_type == "squashfs")
return common.BlockDifference(name, partition_tgt, partition_src,
check_first_block,
version=blockimgdiff_version,
disable_imgdiff=disable_imgdiff)
if source_zip:
# See notes in common.GetUserImage()
allow_shared_blocks = (source_info.get('ext4_share_dup_blocks') == "true" or
target_info.get('ext4_share_dup_blocks') == "true")
blockimgdiff_version = max(
int(i) for i in target_info.get(
"blockimgdiff_versions", "1").split(","))
assert blockimgdiff_version >= 3
block_diff_dict = collections.OrderedDict()
partition_names = ["system", "vendor", "product", "odm", "system_ext",
"vendor_dlkm", "odm_dlkm"]
for partition in partition_names:
if not HasPartition(target_zip, partition):
continue
# Full OTA update.
if not source_zip:
tgt = common.GetUserImage(partition, OPTIONS.input_tmp, target_zip,
info_dict=target_info,
reset_file_map=True)
block_diff_dict[partition] = common.BlockDifference(partition, tgt,
src=None)
# Incremental OTA update.
else:
block_diff_dict[partition] = GetIncrementalBlockDifferenceForPartition(
partition)
assert "system" in block_diff_dict
# Get the block diffs from the device specific script. If there is a
# duplicate block diff for a partition, ignore the diff in the generic script
# and use the one in the device specific script instead.
if source_zip:
device_specific_diffs = device_specific.IncrementalOTA_GetBlockDifferences()
function_name = "IncrementalOTA_GetBlockDifferences"
else:
device_specific_diffs = device_specific.FullOTA_GetBlockDifferences()
function_name = "FullOTA_GetBlockDifferences"
if device_specific_diffs:
assert all(isinstance(diff, common.BlockDifference)
for diff in device_specific_diffs), \
"{} is not returning a list of BlockDifference objects".format(
function_name)
for diff in device_specific_diffs:
if diff.partition in block_diff_dict:
logger.warning("Duplicate block difference found. Device specific block"
" diff for partition '%s' overrides the one in generic"
" script.", diff.partition)
block_diff_dict[diff.partition] = diff
return block_diff_dict
def WriteFullOTAPackage(input_zip, output_file):
target_info = common.BuildInfo(OPTIONS.info_dict, OPTIONS.oem_dicts)
# We don't know what version it will be installed on top of. We expect the API
# just won't change very often. Similarly for fstab, it might have changed in
# the target build.
target_api_version = target_info["recovery_api_version"]
script = edify_generator.EdifyGenerator(target_api_version, target_info)
if target_info.oem_props and not OPTIONS.oem_no_mount:
target_info.WriteMountOemScript(script)
metadata = GetPackageMetadata(target_info)
if not OPTIONS.no_signing:
staging_file = common.MakeTempFile(suffix='.zip')
else:
staging_file = output_file
output_zip = zipfile.ZipFile(
staging_file, "w", compression=zipfile.ZIP_DEFLATED)
device_specific = common.DeviceSpecificParams(
input_zip=input_zip,
input_version=target_api_version,
output_zip=output_zip,
script=script,
input_tmp=OPTIONS.input_tmp,
metadata=metadata,
info_dict=OPTIONS.info_dict)
assert HasRecoveryPatch(input_zip, info_dict=OPTIONS.info_dict)
# Assertions (e.g. downgrade check, device properties check).
ts = target_info.GetBuildProp("ro.build.date.utc")
ts_text = target_info.GetBuildProp("ro.build.date")
script.AssertOlderBuild(ts, ts_text)
target_info.WriteDeviceAssertions(script, OPTIONS.oem_no_mount)
device_specific.FullOTA_Assertions()
block_diff_dict = GetBlockDifferences(target_zip=input_zip, source_zip=None,
target_info=target_info,
source_info=None,
device_specific=device_specific)
# Two-step package strategy (in chronological order, which is *not*
# the order in which the generated script has things):
#
# if stage is not "2/3" or "3/3":
# write recovery image to boot partition
# set stage to "2/3"
# reboot to boot partition and restart recovery
# else if stage is "2/3":
# write recovery image to recovery partition
# set stage to "3/3"
# reboot to recovery partition and restart recovery
# else:
# (stage must be "3/3")
# set stage to ""
# do normal full package installation:
# wipe and install system, boot image, etc.
# set up system to update recovery partition on first boot
# complete script normally
# (allow recovery to mark itself finished and reboot)
recovery_img = common.GetBootableImage("recovery.img", "recovery.img",
OPTIONS.input_tmp, "RECOVERY")
if OPTIONS.two_step:
if not target_info.get("multistage_support"):
assert False, "two-step packages not supported by this build"
fs = target_info["fstab"]["/misc"]
assert fs.fs_type.upper() == "EMMC", \
"two-step packages only supported on devices with EMMC /misc partitions"
bcb_dev = {"bcb_dev": fs.device}
common.ZipWriteStr(output_zip, "recovery.img", recovery_img.data)
script.AppendExtra("""
if get_stage("%(bcb_dev)s") == "2/3" then
""" % bcb_dev)
# Stage 2/3: Write recovery image to /recovery (currently running /boot).
script.Comment("Stage 2/3")
script.WriteRawImage("/recovery", "recovery.img")
script.AppendExtra("""
set_stage("%(bcb_dev)s", "3/3");
reboot_now("%(bcb_dev)s", "recovery");
else if get_stage("%(bcb_dev)s") == "3/3" then
""" % bcb_dev)
# Stage 3/3: Make changes.
script.Comment("Stage 3/3")
# Dump fingerprints
script.Print("Target: {}".format(target_info.fingerprint))
device_specific.FullOTA_InstallBegin()
# All other partitions as well as the data wipe use 10% of the progress, and
# the update of the system partition takes the remaining progress.
system_progress = 0.9 - (len(block_diff_dict) - 1) * 0.1
if OPTIONS.wipe_user_data:
system_progress -= 0.1
progress_dict = {partition: 0.1 for partition in block_diff_dict}
progress_dict["system"] = system_progress
if target_info.get('use_dynamic_partitions') == "true":
# Use empty source_info_dict to indicate that all partitions / groups must
# be re-added.
dynamic_partitions_diff = common.DynamicPartitionsDifference(
info_dict=OPTIONS.info_dict,
block_diffs=block_diff_dict.values(),
progress_dict=progress_dict)
dynamic_partitions_diff.WriteScript(script, output_zip,
write_verify_script=OPTIONS.verify)
else:
for block_diff in block_diff_dict.values():
block_diff.WriteScript(script, output_zip,
progress=progress_dict.get(block_diff.partition),
write_verify_script=OPTIONS.verify)
CheckVintfIfTrebleEnabled(OPTIONS.input_tmp, target_info)
boot_img = common.GetBootableImage(
"boot.img", "boot.img", OPTIONS.input_tmp, "BOOT")
common.CheckSize(boot_img.data, "boot.img", target_info)
common.ZipWriteStr(output_zip, "boot.img", boot_img.data)
script.WriteRawImage("/boot", "boot.img")
script.ShowProgress(0.1, 10)
device_specific.FullOTA_InstallEnd()
if OPTIONS.extra_script is not None:
script.AppendExtra(OPTIONS.extra_script)
script.UnmountAll()
if OPTIONS.wipe_user_data:
script.ShowProgress(0.1, 10)
script.FormatPartition("/data")
if OPTIONS.two_step:
script.AppendExtra("""
set_stage("%(bcb_dev)s", "");
""" % bcb_dev)
script.AppendExtra("else\n")
# Stage 1/3: Nothing to verify for full OTA. Write recovery image to /boot.
script.Comment("Stage 1/3")
_WriteRecoveryImageToBoot(script, output_zip)
script.AppendExtra("""
set_stage("%(bcb_dev)s", "2/3");
reboot_now("%(bcb_dev)s", "");
endif;
endif;
""" % bcb_dev)
script.SetProgress(1)
script.AddToZip(input_zip, output_zip, input_path=OPTIONS.updater_binary)
metadata["ota-required-cache"] = str(script.required_cache)
# We haven't written the metadata entry, which will be done in
# FinalizeMetadata.
common.ZipClose(output_zip)
needed_property_files = (
NonAbOtaPropertyFiles(),
)
FinalizeMetadata(metadata, staging_file, output_file, needed_property_files)
def WriteBlockIncrementalOTAPackage(target_zip, source_zip, output_file):
target_info = common.BuildInfo(OPTIONS.target_info_dict, OPTIONS.oem_dicts)
source_info = common.BuildInfo(OPTIONS.source_info_dict, OPTIONS.oem_dicts)
target_api_version = target_info["recovery_api_version"]
source_api_version = source_info["recovery_api_version"]
if source_api_version == 0:
logger.warning(
"Generating edify script for a source that can't install it.")
script = edify_generator.EdifyGenerator(
source_api_version, target_info, fstab=source_info["fstab"])
if target_info.oem_props or source_info.oem_props:
if not OPTIONS.oem_no_mount:
source_info.WriteMountOemScript(script)
metadata = GetPackageMetadata(target_info, source_info)
if not OPTIONS.no_signing:
staging_file = common.MakeTempFile(suffix='.zip')
else:
staging_file = output_file
output_zip = zipfile.ZipFile(
staging_file, "w", compression=zipfile.ZIP_DEFLATED)
device_specific = common.DeviceSpecificParams(
source_zip=source_zip,
source_version=source_api_version,
source_tmp=OPTIONS.source_tmp,
target_zip=target_zip,
target_version=target_api_version,
target_tmp=OPTIONS.target_tmp,
output_zip=output_zip,
script=script,
metadata=metadata,
info_dict=source_info)
source_boot = common.GetBootableImage(
"/tmp/boot.img", "boot.img", OPTIONS.source_tmp, "BOOT", source_info)
target_boot = common.GetBootableImage(
"/tmp/boot.img", "boot.img", OPTIONS.target_tmp, "BOOT", target_info)
updating_boot = (not OPTIONS.two_step and
(source_boot.data != target_boot.data))
target_recovery = common.GetBootableImage(
"/tmp/recovery.img", "recovery.img", OPTIONS.target_tmp, "RECOVERY")
block_diff_dict = GetBlockDifferences(target_zip=target_zip,
source_zip=source_zip,
target_info=target_info,
source_info=source_info,
device_specific=device_specific)
CheckVintfIfTrebleEnabled(OPTIONS.target_tmp, target_info)
# Assertions (e.g. device properties check).
target_info.WriteDeviceAssertions(script, OPTIONS.oem_no_mount)
device_specific.IncrementalOTA_Assertions()
# Two-step incremental package strategy (in chronological order,
# which is *not* the order in which the generated script has
# things):
#
# if stage is not "2/3" or "3/3":
# do verification on current system
# write recovery image to boot partition
# set stage to "2/3"
# reboot to boot partition and restart recovery
# else if stage is "2/3":
# write recovery image to recovery partition
# set stage to "3/3"
# reboot to recovery partition and restart recovery
# else:
# (stage must be "3/3")
# perform update:
# patch system files, etc.
# force full install of new boot image
# set up system to update recovery partition on first boot
# complete script normally
# (allow recovery to mark itself finished and reboot)
if OPTIONS.two_step:
if not source_info.get("multistage_support"):
assert False, "two-step packages not supported by this build"
fs = source_info["fstab"]["/misc"]
assert fs.fs_type.upper() == "EMMC", \
"two-step packages only supported on devices with EMMC /misc partitions"
bcb_dev = {"bcb_dev": fs.device}
common.ZipWriteStr(output_zip, "recovery.img", target_recovery.data)
script.AppendExtra("""
if get_stage("%(bcb_dev)s") == "2/3" then
""" % bcb_dev)
# Stage 2/3: Write recovery image to /recovery (currently running /boot).
script.Comment("Stage 2/3")
script.AppendExtra("sleep(20);\n")
script.WriteRawImage("/recovery", "recovery.img")
script.AppendExtra("""
set_stage("%(bcb_dev)s", "3/3");
reboot_now("%(bcb_dev)s", "recovery");
else if get_stage("%(bcb_dev)s") != "3/3" then
""" % bcb_dev)
# Stage 1/3: (a) Verify the current system.
script.Comment("Stage 1/3")
# Dump fingerprints
script.Print("Source: {}".format(source_info.fingerprint))
script.Print("Target: {}".format(target_info.fingerprint))
script.Print("Verifying current system...")
device_specific.IncrementalOTA_VerifyBegin()
WriteFingerprintAssertion(script, target_info, source_info)
# Check the required cache size (i.e. stashed blocks).
required_cache_sizes = [diff.required_cache for diff in
block_diff_dict.values()]
if updating_boot:
boot_type, boot_device_expr = common.GetTypeAndDeviceExpr("/boot",
source_info)
d = common.Difference(target_boot, source_boot)
_, _, d = d.ComputePatch()
if d is None:
include_full_boot = True
common.ZipWriteStr(output_zip, "boot.img", target_boot.data)
else:
include_full_boot = False
logger.info(
"boot target: %d source: %d diff: %d", target_boot.size,
source_boot.size, len(d))
common.ZipWriteStr(output_zip, "boot.img.p", d)
target_expr = 'concat("{}:",{},":{}:{}")'.format(
boot_type, boot_device_expr, target_boot.size, target_boot.sha1)
source_expr = 'concat("{}:",{},":{}:{}")'.format(
boot_type, boot_device_expr, source_boot.size, source_boot.sha1)
script.PatchPartitionExprCheck(target_expr, source_expr)
required_cache_sizes.append(target_boot.size)
if required_cache_sizes:
script.CacheFreeSpaceCheck(max(required_cache_sizes))
# Verify the existing partitions.
for diff in block_diff_dict.values():
diff.WriteVerifyScript(script, touched_blocks_only=True)
device_specific.IncrementalOTA_VerifyEnd()
if OPTIONS.two_step:
# Stage 1/3: (b) Write recovery image to /boot.
_WriteRecoveryImageToBoot(script, output_zip)
script.AppendExtra("""
set_stage("%(bcb_dev)s", "2/3");
reboot_now("%(bcb_dev)s", "");
else
""" % bcb_dev)
# Stage 3/3: Make changes.
script.Comment("Stage 3/3")
script.Comment("---- start making changes here ----")
device_specific.IncrementalOTA_InstallBegin()
progress_dict = {partition: 0.1 for partition in block_diff_dict}
progress_dict["system"] = 1 - len(block_diff_dict) * 0.1
if OPTIONS.source_info_dict.get("use_dynamic_partitions") == "true":
if OPTIONS.target_info_dict.get("use_dynamic_partitions") != "true":
raise RuntimeError(
"can't generate incremental that disables dynamic partitions")
dynamic_partitions_diff = common.DynamicPartitionsDifference(
info_dict=OPTIONS.target_info_dict,
source_info_dict=OPTIONS.source_info_dict,
block_diffs=block_diff_dict.values(),
progress_dict=progress_dict)
dynamic_partitions_diff.WriteScript(
script, output_zip, write_verify_script=OPTIONS.verify)
else:
for block_diff in block_diff_dict.values():
block_diff.WriteScript(script, output_zip,
progress=progress_dict.get(block_diff.partition),
write_verify_script=OPTIONS.verify)
if OPTIONS.two_step:
common.ZipWriteStr(output_zip, "boot.img", target_boot.data)
script.WriteRawImage("/boot", "boot.img")
logger.info("writing full boot image (forced by two-step mode)")
if not OPTIONS.two_step:
if updating_boot:
if include_full_boot:
logger.info("boot image changed; including full.")
script.Print("Installing boot image...")
script.WriteRawImage("/boot", "boot.img")
else:
# Produce the boot image by applying a patch to the current
# contents of the boot partition, and write it back to the
# partition.
logger.info("boot image changed; including patch.")
script.Print("Patching boot image...")
script.ShowProgress(0.1, 10)
target_expr = 'concat("{}:",{},":{}:{}")'.format(
boot_type, boot_device_expr, target_boot.size, target_boot.sha1)
source_expr = 'concat("{}:",{},":{}:{}")'.format(
boot_type, boot_device_expr, source_boot.size, source_boot.sha1)
script.PatchPartitionExpr(target_expr, source_expr, '"boot.img.p"')
else:
logger.info("boot image unchanged; skipping.")
# Do device-specific installation (eg, write radio image).
device_specific.IncrementalOTA_InstallEnd()
if OPTIONS.extra_script is not None:
script.AppendExtra(OPTIONS.extra_script)
if OPTIONS.wipe_user_data:
script.Print("Erasing user data...")
script.FormatPartition("/data")
if OPTIONS.two_step:
script.AppendExtra("""
set_stage("%(bcb_dev)s", "");
endif;
endif;
""" % bcb_dev)
script.SetProgress(1)
# For downgrade OTAs, we prefer to use the update-binary in the source
# build that is actually newer than the one in the target build.
if OPTIONS.downgrade:
script.AddToZip(source_zip, output_zip, input_path=OPTIONS.updater_binary)
else:
script.AddToZip(target_zip, output_zip, input_path=OPTIONS.updater_binary)
metadata["ota-required-cache"] = str(script.required_cache)
# We haven't written the metadata entry yet, which will be handled in
# FinalizeMetadata().
common.ZipClose(output_zip)
# Sign the generated zip package unless no_signing is specified.
needed_property_files = (
NonAbOtaPropertyFiles(),
)
FinalizeMetadata(metadata, staging_file, output_file, needed_property_files)
def GenerateNonAbOtaPackage(target_file, output_file, source_file=None):
"""Generates a non-A/B OTA package."""
# Check the loaded info dicts first.
if OPTIONS.info_dict.get("no_recovery") == "true":
raise common.ExternalError(
"--- target build has specified no recovery ---")
# Non-A/B OTAs rely on /cache partition to store temporary files.
cache_size = OPTIONS.info_dict.get("cache_size")
if cache_size is None:
logger.warning("--- can't determine the cache partition size ---")
OPTIONS.cache_size = cache_size
if OPTIONS.extra_script is not None:
with open(OPTIONS.extra_script) as fp:
OPTIONS.extra_script = fp.read()
if OPTIONS.extracted_input is not None:
OPTIONS.input_tmp = OPTIONS.extracted_input
else:
logger.info("unzipping target target-files...")
OPTIONS.input_tmp = common.UnzipTemp(target_file, UNZIP_PATTERN)
OPTIONS.target_tmp = OPTIONS.input_tmp
# If the caller explicitly specified the device-specific extensions path via
# -s / --device_specific, use that. Otherwise, use META/releasetools.py if it
# is present in the target target_files. Otherwise, take the path of the file
# from 'tool_extensions' in the info dict and look for that in the local
# filesystem, relative to the current directory.
if OPTIONS.device_specific is None:
from_input = os.path.join(OPTIONS.input_tmp, "META", "releasetools.py")
if os.path.exists(from_input):
logger.info("(using device-specific extensions from target_files)")
OPTIONS.device_specific = from_input
else:
OPTIONS.device_specific = OPTIONS.info_dict.get("tool_extensions")
if OPTIONS.device_specific is not None:
OPTIONS.device_specific = os.path.abspath(OPTIONS.device_specific)
# Generate a full OTA.
if source_file is None:
with zipfile.ZipFile(target_file) as input_zip:
WriteFullOTAPackage(
input_zip,
output_file)
# Generate an incremental OTA.
else:
logger.info("unzipping source target-files...")
OPTIONS.source_tmp = common.UnzipTemp(
OPTIONS.incremental_source, UNZIP_PATTERN)
with zipfile.ZipFile(target_file) as input_zip, \
zipfile.ZipFile(source_file) as source_zip:
WriteBlockIncrementalOTAPackage(
input_zip,
source_zip,
output_file)
def WriteFingerprintAssertion(script, target_info, source_info):
source_oem_props = source_info.oem_props
target_oem_props = target_info.oem_props
if source_oem_props is None and target_oem_props is None:
script.AssertSomeFingerprint(
source_info.fingerprint, target_info.fingerprint)
elif source_oem_props is not None and target_oem_props is not None:
script.AssertSomeThumbprint(
target_info.GetBuildProp("ro.build.thumbprint"),
source_info.GetBuildProp("ro.build.thumbprint"))
elif source_oem_props is None and target_oem_props is not None:
script.AssertFingerprintOrThumbprint(
source_info.fingerprint,
target_info.GetBuildProp("ro.build.thumbprint"))
else:
script.AssertFingerprintOrThumbprint(
target_info.fingerprint,
source_info.GetBuildProp("ro.build.thumbprint"))
class NonAbOtaPropertyFiles(PropertyFiles):
"""The property-files for non-A/B OTA.
For non-A/B OTA, the property-files string contains the info for METADATA
entry, with which a system updater can be fetched the package metadata prior
to downloading the entire package.
"""
def __init__(self):
super(NonAbOtaPropertyFiles, self).__init__()
self.name = 'ota-property-files'
def _WriteRecoveryImageToBoot(script, output_zip):
"""Find and write recovery image to /boot in two-step OTA.
In two-step OTAs, we write recovery image to /boot as the first step so that
we can reboot to there and install a new recovery image to /recovery.
A special "recovery-two-step.img" will be preferred, which encodes the correct
path of "/boot". Otherwise the device may show "device is corrupt" message
when booting into /boot.
Fall back to using the regular recovery.img if the two-step recovery image
doesn't exist. Note that rebuilding the special image at this point may be
infeasible, because we don't have the desired boot signer and keys when
calling ota_from_target_files.py.
"""
recovery_two_step_img_name = "recovery-two-step.img"
recovery_two_step_img_path = os.path.join(
OPTIONS.input_tmp, "OTA", recovery_two_step_img_name)
if os.path.exists(recovery_two_step_img_path):
common.ZipWrite(
output_zip,
recovery_two_step_img_path,
arcname=recovery_two_step_img_name)
logger.info(
"two-step package: using %s in stage 1/3", recovery_two_step_img_name)
script.WriteRawImage("/boot", recovery_two_step_img_name)
else:
logger.info("two-step package: using recovery.img in stage 1/3")
# The "recovery.img" entry has been written into package earlier.
script.WriteRawImage("/boot", "recovery.img")
def HasRecoveryPatch(target_files_zip, info_dict):
board_uses_vendorimage = info_dict.get("board_uses_vendorimage") == "true"
if board_uses_vendorimage:
target_files_dir = "VENDOR"
else:
target_files_dir = "SYSTEM/vendor"
patch = "%s/recovery-from-boot.p" % target_files_dir
img = "%s/etc/recovery.img" % target_files_dir
namelist = target_files_zip.namelist()
return patch in namelist or img in namelist

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,433 @@
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import itertools
import os
import zipfile
from common import (ZipDelete, ZipClose, OPTIONS, MakeTempFile,
ZipWriteStr, BuildInfo, LoadDictionaryFromFile,
SignFile, PARTITIONS_WITH_CARE_MAP, PartitionBuildProps)
METADATA_NAME = 'META-INF/com/android/metadata'
UNZIP_PATTERN = ['IMAGES/*', 'META/*', 'OTA/*', 'RADIO/*']
def FinalizeMetadata(metadata, input_file, output_file, needed_property_files):
"""Finalizes the metadata and signs an A/B OTA package.
In order to stream an A/B OTA package, we need 'ota-streaming-property-files'
that contains the offsets and sizes for the ZIP entries. An example
property-files string is as follows.
"payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379"
OTA server can pass down this string, in addition to the package URL, to the
system update client. System update client can then fetch individual ZIP
entries (ZIP_STORED) directly at the given offset of the URL.
Args:
metadata: The metadata dict for the package.
input_file: The input ZIP filename that doesn't contain the package METADATA
entry yet.
output_file: The final output ZIP filename.
needed_property_files: The list of PropertyFiles' to be generated.
"""
def ComputeAllPropertyFiles(input_file, needed_property_files):
# Write the current metadata entry with placeholders.
with zipfile.ZipFile(input_file) as input_zip:
for property_files in needed_property_files:
metadata[property_files.name] = property_files.Compute(input_zip)
namelist = input_zip.namelist()
if METADATA_NAME in namelist:
ZipDelete(input_file, METADATA_NAME)
output_zip = zipfile.ZipFile(input_file, 'a')
WriteMetadata(metadata, output_zip)
ZipClose(output_zip)
if OPTIONS.no_signing:
return input_file
prelim_signing = MakeTempFile(suffix='.zip')
SignOutput(input_file, prelim_signing)
return prelim_signing
def FinalizeAllPropertyFiles(prelim_signing, needed_property_files):
with zipfile.ZipFile(prelim_signing) as prelim_signing_zip:
for property_files in needed_property_files:
metadata[property_files.name] = property_files.Finalize(
prelim_signing_zip, len(metadata[property_files.name]))
# SignOutput(), which in turn calls signapk.jar, will possibly reorder the ZIP
# entries, as well as padding the entry headers. We do a preliminary signing
# (with an incomplete metadata entry) to allow that to happen. Then compute
# the ZIP entry offsets, write back the final metadata and do the final
# signing.
prelim_signing = ComputeAllPropertyFiles(input_file, needed_property_files)
try:
FinalizeAllPropertyFiles(prelim_signing, needed_property_files)
except PropertyFiles.InsufficientSpaceException:
# Even with the preliminary signing, the entry orders may change
# dramatically, which leads to insufficiently reserved space during the
# first call to ComputeAllPropertyFiles(). In that case, we redo all the
# preliminary signing works, based on the already ordered ZIP entries, to
# address the issue.
prelim_signing = ComputeAllPropertyFiles(
prelim_signing, needed_property_files)
FinalizeAllPropertyFiles(prelim_signing, needed_property_files)
# Replace the METADATA entry.
ZipDelete(prelim_signing, METADATA_NAME)
output_zip = zipfile.ZipFile(prelim_signing, 'a')
WriteMetadata(metadata, output_zip)
ZipClose(output_zip)
# Re-sign the package after updating the metadata entry.
if OPTIONS.no_signing:
output_file = prelim_signing
else:
SignOutput(prelim_signing, output_file)
# Reopen the final signed zip to double check the streaming metadata.
with zipfile.ZipFile(output_file) as output_zip:
for property_files in needed_property_files:
property_files.Verify(output_zip, metadata[property_files.name].strip())
# If requested, dump the metadata to a separate file.
output_metadata_path = OPTIONS.output_metadata_path
if output_metadata_path:
WriteMetadata(metadata, output_metadata_path)
def WriteMetadata(metadata, output):
"""Writes the metadata to the zip archive or a file.
Args:
metadata: The metadata dict for the package.
output: A ZipFile object or a string of the output file path.
"""
value = "".join(["%s=%s\n" % kv for kv in sorted(metadata.items())])
if isinstance(output, zipfile.ZipFile):
ZipWriteStr(output, METADATA_NAME, value,
compress_type=zipfile.ZIP_STORED)
return
with open(output, 'w') as f:
f.write(value)
def GetPackageMetadata(target_info, source_info=None):
"""Generates and returns the metadata dict.
It generates a dict() that contains the info to be written into an OTA
package (META-INF/com/android/metadata). It also handles the detection of
downgrade / data wipe based on the global options.
Args:
target_info: The BuildInfo instance that holds the target build info.
source_info: The BuildInfo instance that holds the source build info, or
None if generating full OTA.
Returns:
A dict to be written into package metadata entry.
"""
assert isinstance(target_info, BuildInfo)
assert source_info is None or isinstance(source_info, BuildInfo)
separator = '|'
boot_variable_values = {}
if OPTIONS.boot_variable_file:
d = LoadDictionaryFromFile(OPTIONS.boot_variable_file)
for key, values in d.items():
boot_variable_values[key] = [val.strip() for val in values.split(',')]
post_build_devices, post_build_fingerprints = \
CalculateRuntimeDevicesAndFingerprints(target_info, boot_variable_values)
metadata = {
'post-build': separator.join(sorted(post_build_fingerprints)),
'post-build-incremental': target_info.GetBuildProp(
'ro.build.version.incremental'),
'post-sdk-level': target_info.GetBuildProp(
'ro.build.version.sdk'),
'post-security-patch-level': target_info.GetBuildProp(
'ro.build.version.security_patch'),
}
if target_info.is_ab and not OPTIONS.force_non_ab:
metadata['ota-type'] = 'AB'
metadata['ota-required-cache'] = '0'
else:
metadata['ota-type'] = 'BLOCK'
if OPTIONS.wipe_user_data:
metadata['ota-wipe'] = 'yes'
if OPTIONS.retrofit_dynamic_partitions:
metadata['ota-retrofit-dynamic-partitions'] = 'yes'
is_incremental = source_info is not None
if is_incremental:
pre_build_devices, pre_build_fingerprints = \
CalculateRuntimeDevicesAndFingerprints(source_info,
boot_variable_values)
metadata['pre-build'] = separator.join(sorted(pre_build_fingerprints))
metadata['pre-build-incremental'] = source_info.GetBuildProp(
'ro.build.version.incremental')
metadata['pre-device'] = separator.join(sorted(pre_build_devices))
else:
metadata['pre-device'] = separator.join(sorted(post_build_devices))
# Use the actual post-timestamp, even for a downgrade case.
metadata['post-timestamp'] = target_info.GetBuildProp('ro.build.date.utc')
# Detect downgrades and set up downgrade flags accordingly.
if is_incremental:
HandleDowngradeMetadata(metadata, target_info, source_info)
return metadata
def HandleDowngradeMetadata(metadata, target_info, source_info):
# Only incremental OTAs are allowed to reach here.
assert OPTIONS.incremental_source is not None
post_timestamp = target_info.GetBuildProp("ro.build.date.utc")
pre_timestamp = source_info.GetBuildProp("ro.build.date.utc")
is_downgrade = int(post_timestamp) < int(pre_timestamp)
if OPTIONS.downgrade:
if not is_downgrade:
raise RuntimeError(
"--downgrade or --override_timestamp specified but no downgrade "
"detected: pre: %s, post: %s" % (pre_timestamp, post_timestamp))
metadata["ota-downgrade"] = "yes"
else:
if is_downgrade:
raise RuntimeError(
"Downgrade detected based on timestamp check: pre: %s, post: %s. "
"Need to specify --override_timestamp OR --downgrade to allow "
"building the incremental." % (pre_timestamp, post_timestamp))
def CalculateRuntimeDevicesAndFingerprints(build_info, boot_variable_values):
"""Returns a tuple of sets for runtime devices and fingerprints"""
device_names = {build_info.device}
fingerprints = {build_info.fingerprint}
if not boot_variable_values:
return device_names, fingerprints
# Calculate all possible combinations of the values for the boot variables.
keys = boot_variable_values.keys()
value_list = boot_variable_values.values()
combinations = [dict(zip(keys, values))
for values in itertools.product(*value_list)]
for placeholder_values in combinations:
# Reload the info_dict as some build properties may change their values
# based on the value of ro.boot* properties.
info_dict = copy.deepcopy(build_info.info_dict)
for partition in PARTITIONS_WITH_CARE_MAP:
partition_prop_key = "{}.build.prop".format(partition)
input_file = info_dict[partition_prop_key].input_file
if isinstance(input_file, zipfile.ZipFile):
with zipfile.ZipFile(input_file.filename) as input_zip:
info_dict[partition_prop_key] = \
PartitionBuildProps.FromInputFile(input_zip, partition,
placeholder_values)
else:
info_dict[partition_prop_key] = \
PartitionBuildProps.FromInputFile(input_file, partition,
placeholder_values)
info_dict["build.prop"] = info_dict["system.build.prop"]
new_build_info = BuildInfo(info_dict, build_info.oem_dicts)
device_names.add(new_build_info.device)
fingerprints.add(new_build_info.fingerprint)
return device_names, fingerprints
class PropertyFiles(object):
"""A class that computes the property-files string for an OTA package.
A property-files string is a comma-separated string that contains the
offset/size info for an OTA package. The entries, which must be ZIP_STORED,
can be fetched directly with the package URL along with the offset/size info.
These strings can be used for streaming A/B OTAs, or allowing an updater to
download package metadata entry directly, without paying the cost of
downloading entire package.
Computing the final property-files string requires two passes. Because doing
the whole package signing (with signapk.jar) will possibly reorder the ZIP
entries, which may in turn invalidate earlier computed ZIP entry offset/size
values.
This class provides functions to be called for each pass. The general flow is
as follows.
property_files = PropertyFiles()
# The first pass, which writes placeholders before doing initial signing.
property_files.Compute()
SignOutput()
# The second pass, by replacing the placeholders with actual data.
property_files.Finalize()
SignOutput()
And the caller can additionally verify the final result.
property_files.Verify()
"""
def __init__(self):
self.name = None
self.required = ()
self.optional = ()
def Compute(self, input_zip):
"""Computes and returns a property-files string with placeholders.
We reserve extra space for the offset and size of the metadata entry itself,
although we don't know the final values until the package gets signed.
Args:
input_zip: The input ZIP file.
Returns:
A string with placeholders for the metadata offset/size info, e.g.
"payload.bin:679:343,payload_properties.txt:378:45,metadata: ".
"""
return self.GetPropertyFilesString(input_zip, reserve_space=True)
class InsufficientSpaceException(Exception):
pass
def Finalize(self, input_zip, reserved_length):
"""Finalizes a property-files string with actual METADATA offset/size info.
The input ZIP file has been signed, with the ZIP entries in the desired
place (signapk.jar will possibly reorder the ZIP entries). Now we compute
the ZIP entry offsets and construct the property-files string with actual
data. Note that during this process, we must pad the property-files string
to the reserved length, so that the METADATA entry size remains the same.
Otherwise the entries' offsets and sizes may change again.
Args:
input_zip: The input ZIP file.
reserved_length: The reserved length of the property-files string during
the call to Compute(). The final string must be no more than this
size.
Returns:
A property-files string including the metadata offset/size info, e.g.
"payload.bin:679:343,payload_properties.txt:378:45,metadata:69:379 ".
Raises:
InsufficientSpaceException: If the reserved length is insufficient to hold
the final string.
"""
result = self.GetPropertyFilesString(input_zip, reserve_space=False)
if len(result) > reserved_length:
raise self.InsufficientSpaceException(
'Insufficient reserved space: reserved={}, actual={}'.format(
reserved_length, len(result)))
result += ' ' * (reserved_length - len(result))
return result
def Verify(self, input_zip, expected):
"""Verifies the input ZIP file contains the expected property-files string.
Args:
input_zip: The input ZIP file.
expected: The property-files string that's computed from Finalize().
Raises:
AssertionError: On finding a mismatch.
"""
actual = self.GetPropertyFilesString(input_zip)
assert actual == expected, \
"Mismatching streaming metadata: {} vs {}.".format(actual, expected)
def GetPropertyFilesString(self, zip_file, reserve_space=False):
"""
Constructs the property-files string per request.
Args:
zip_file: The input ZIP file.
reserved_length: The reserved length of the property-files string.
Returns:
A property-files string including the metadata offset/size info, e.g.
"payload.bin:679:343,payload_properties.txt:378:45,metadata: ".
"""
def ComputeEntryOffsetSize(name):
"""Computes the zip entry offset and size."""
info = zip_file.getinfo(name)
offset = info.header_offset
offset += zipfile.sizeFileHeader
offset += len(info.extra) + len(info.filename)
size = info.file_size
return '%s:%d:%d' % (os.path.basename(name), offset, size)
tokens = []
tokens.extend(self._GetPrecomputed(zip_file))
for entry in self.required:
tokens.append(ComputeEntryOffsetSize(entry))
for entry in self.optional:
if entry in zip_file.namelist():
tokens.append(ComputeEntryOffsetSize(entry))
# 'META-INF/com/android/metadata' is required. We don't know its actual
# offset and length (as well as the values for other entries). So we reserve
# 15-byte as a placeholder ('offset:length'), which is sufficient to cover
# the space for metadata entry. Because 'offset' allows a max of 10-digit
# (i.e. ~9 GiB), with a max of 4-digit for the length. Note that all the
# reserved space serves the metadata entry only.
if reserve_space:
tokens.append('metadata:' + ' ' * 15)
else:
tokens.append(ComputeEntryOffsetSize(METADATA_NAME))
return ','.join(tokens)
def _GetPrecomputed(self, input_zip):
"""Computes the additional tokens to be included into the property-files.
This applies to tokens without actual ZIP entries, such as
payload_metadata.bin. We want to expose the offset/size to updaters, so
that they can download the payload metadata directly with the info.
Args:
input_zip: The input zip file.
Returns:
A list of strings (tokens) to be added to the property-files string.
"""
# pylint: disable=no-self-use
# pylint: disable=unused-argument
return []
def SignOutput(temp_zip_name, output_zip_name):
pw = OPTIONS.key_passwords[OPTIONS.package_key]
SignFile(temp_zip_name, output_zip_name, OPTIONS.package_key, pw,
whole_file=True)

View File

@@ -0,0 +1,169 @@
#
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import zipfile
import common
import test_utils
from non_ab_ota import NonAbOtaPropertyFiles, WriteFingerprintAssertion
from test_utils import PropertyFilesTestCase
class NonAbOtaPropertyFilesTest(PropertyFilesTestCase):
"""Additional validity checks specialized for NonAbOtaPropertyFiles."""
def setUp(self):
common.OPTIONS.no_signing = False
def test_init(self):
property_files = NonAbOtaPropertyFiles()
self.assertEqual('ota-property-files', property_files.name)
self.assertEqual((), property_files.required)
self.assertEqual((), property_files.optional)
def test_Compute(self):
entries = ()
zip_file = self.construct_zip_package(entries)
property_files = NonAbOtaPropertyFiles()
with zipfile.ZipFile(zip_file) as zip_fp:
property_files_string = property_files.Compute(zip_fp)
tokens = self._parse_property_files_string(property_files_string)
self.assertEqual(1, len(tokens))
self._verify_entries(zip_file, tokens, entries)
def test_Finalize(self):
entries = [
'META-INF/com/android/metadata',
]
zip_file = self.construct_zip_package(entries)
property_files = NonAbOtaPropertyFiles()
with zipfile.ZipFile(zip_file) as zip_fp:
raw_metadata = property_files.GetPropertyFilesString(
zip_fp, reserve_space=False)
property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
tokens = self._parse_property_files_string(property_files_string)
self.assertEqual(1, len(tokens))
# 'META-INF/com/android/metadata' will be key'd as 'metadata'.
entries[0] = 'metadata'
self._verify_entries(zip_file, tokens, entries)
def test_Verify(self):
entries = (
'META-INF/com/android/metadata',
)
zip_file = self.construct_zip_package(entries)
property_files = NonAbOtaPropertyFiles()
with zipfile.ZipFile(zip_file) as zip_fp:
raw_metadata = property_files.GetPropertyFilesString(
zip_fp, reserve_space=False)
property_files.Verify(zip_fp, raw_metadata)
class NonAbOTATest(test_utils.ReleaseToolsTestCase):
TEST_TARGET_INFO_DICT = {
'build.prop': common.PartitionBuildProps.FromDictionary(
'system', {
'ro.product.device': 'product-device',
'ro.build.fingerprint': 'build-fingerprint-target',
'ro.build.version.incremental': 'build-version-incremental-target',
'ro.build.version.sdk': '27',
'ro.build.version.security_patch': '2017-12-01',
'ro.build.date.utc': '1500000000'}
)
}
TEST_INFO_DICT_USES_OEM_PROPS = {
'build.prop': common.PartitionBuildProps.FromDictionary(
'system', {
'ro.product.name': 'product-name',
'ro.build.thumbprint': 'build-thumbprint',
'ro.build.bar': 'build-bar'}
),
'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
'vendor', {
'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
),
'property1': 'value1',
'property2': 4096,
'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
}
TEST_OEM_DICTS = [
{
'ro.product.brand': 'brand1',
'ro.product.device': 'device1',
},
{
'ro.product.brand': 'brand2',
'ro.product.device': 'device2',
},
{
'ro.product.brand': 'brand3',
'ro.product.device': 'device3',
},
]
def test_WriteFingerprintAssertion_without_oem_props(self):
target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
source_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
source_info_dict['build.prop'].build_props['ro.build.fingerprint'] = (
'source-build-fingerprint')
source_info = common.BuildInfo(source_info_dict, None)
script_writer = test_utils.MockScriptWriter()
WriteFingerprintAssertion(script_writer, target_info, source_info)
self.assertEqual(
[('AssertSomeFingerprint', 'source-build-fingerprint',
'build-fingerprint-target')],
script_writer.lines)
def test_WriteFingerprintAssertion_with_source_oem_props(self):
target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
source_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
self.TEST_OEM_DICTS)
script_writer = test_utils.MockScriptWriter()
WriteFingerprintAssertion(script_writer, target_info, source_info)
self.assertEqual(
[('AssertFingerprintOrThumbprint', 'build-fingerprint-target',
'build-thumbprint')],
script_writer.lines)
def test_WriteFingerprintAssertion_with_target_oem_props(self):
target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
self.TEST_OEM_DICTS)
source_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
script_writer = test_utils.MockScriptWriter()
WriteFingerprintAssertion(script_writer, target_info, source_info)
self.assertEqual(
[('AssertFingerprintOrThumbprint', 'build-fingerprint-target',
'build-thumbprint')],
script_writer.lines)
def test_WriteFingerprintAssertion_with_both_oem_props(self):
target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
self.TEST_OEM_DICTS)
source_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS)
source_info_dict['build.prop'].build_props['ro.build.thumbprint'] = (
'source-build-thumbprint')
source_info = common.BuildInfo(source_info_dict, self.TEST_OEM_DICTS)
script_writer = test_utils.MockScriptWriter()
WriteFingerprintAssertion(script_writer, target_info, source_info)
self.assertEqual(
[('AssertSomeThumbprint', 'build-thumbprint',
'source-build-thumbprint')],
script_writer.lines)

View File

@@ -21,14 +21,15 @@ import zipfile
import common
import test_utils
from ota_utils import CalculateRuntimeDevicesAndFingerprints
from ota_from_target_files import (
_LoadOemDicts, AbOtaPropertyFiles, FinalizeMetadata,
GetPackageMetadata, GetTargetFilesZipForSecondaryImages,
GetTargetFilesZipWithoutPostinstallConfig, NonAbOtaPropertyFiles,
GetTargetFilesZipWithoutPostinstallConfig,
Payload, PayloadSigner, POSTINSTALL_CONFIG, PropertyFiles,
StreamingPropertyFiles, WriteFingerprintAssertion,
CalculateRuntimeDevicesAndFingerprints)
StreamingPropertyFiles)
from non_ab_ota import NonAbOtaPropertyFiles
from test_utils import PropertyFilesTestCase
def construct_target_files(secondary=False):
"""Returns a target-files.zip file for generating OTA packages."""
@@ -149,20 +150,6 @@ class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase):
'oem_fingerprint_properties': 'ro.product.device ro.product.brand',
}
TEST_OEM_DICTS = [
{
'ro.product.brand': 'brand1',
'ro.product.device': 'device1',
},
{
'ro.product.brand': 'brand2',
'ro.product.device': 'device2',
},
{
'ro.product.brand': 'brand3',
'ro.product.device': 'device3',
},
]
def setUp(self):
self.testdata_dir = test_utils.get_testdata_dir()
@@ -529,59 +516,6 @@ class OtaFromTargetFilesTest(test_utils.ReleaseToolsTestCase):
FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
self.assertIn('ota-test-property-files', metadata)
def test_WriteFingerprintAssertion_without_oem_props(self):
target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
source_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
source_info_dict['build.prop'].build_props['ro.build.fingerprint'] = (
'source-build-fingerprint')
source_info = common.BuildInfo(source_info_dict, None)
script_writer = test_utils.MockScriptWriter()
WriteFingerprintAssertion(script_writer, target_info, source_info)
self.assertEqual(
[('AssertSomeFingerprint', 'source-build-fingerprint',
'build-fingerprint-target')],
script_writer.lines)
def test_WriteFingerprintAssertion_with_source_oem_props(self):
target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
source_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
self.TEST_OEM_DICTS)
script_writer = test_utils.MockScriptWriter()
WriteFingerprintAssertion(script_writer, target_info, source_info)
self.assertEqual(
[('AssertFingerprintOrThumbprint', 'build-fingerprint-target',
'build-thumbprint')],
script_writer.lines)
def test_WriteFingerprintAssertion_with_target_oem_props(self):
target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
self.TEST_OEM_DICTS)
source_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
script_writer = test_utils.MockScriptWriter()
WriteFingerprintAssertion(script_writer, target_info, source_info)
self.assertEqual(
[('AssertFingerprintOrThumbprint', 'build-fingerprint-target',
'build-thumbprint')],
script_writer.lines)
def test_WriteFingerprintAssertion_with_both_oem_props(self):
target_info = common.BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
self.TEST_OEM_DICTS)
source_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS)
source_info_dict['build.prop'].build_props['ro.build.thumbprint'] = (
'source-build-thumbprint')
source_info = common.BuildInfo(source_info_dict, self.TEST_OEM_DICTS)
script_writer = test_utils.MockScriptWriter()
WriteFingerprintAssertion(script_writer, target_info, source_info)
self.assertEqual(
[('AssertSomeThumbprint', 'build-thumbprint',
'source-build-thumbprint')],
script_writer.lines)
class TestPropertyFiles(PropertyFiles):
"""A class that extends PropertyFiles for testing purpose."""
@@ -598,41 +532,8 @@ class TestPropertyFiles(PropertyFiles):
'optional-entry2',
)
class PropertyFilesTest(PropertyFilesTestCase):
class PropertyFilesTest(test_utils.ReleaseToolsTestCase):
def setUp(self):
common.OPTIONS.no_signing = False
@staticmethod
def construct_zip_package(entries):
zip_file = common.MakeTempFile(suffix='.zip')
with zipfile.ZipFile(zip_file, 'w') as zip_fp:
for entry in entries:
zip_fp.writestr(
entry,
entry.replace('.', '-').upper(),
zipfile.ZIP_STORED)
return zip_file
@staticmethod
def _parse_property_files_string(data):
result = {}
for token in data.split(','):
name, info = token.split(':', 1)
result[name] = info
return result
def _verify_entries(self, input_file, tokens, entries):
for entry in entries:
offset, size = map(int, tokens[entry].split(':'))
with open(input_file, 'rb') as input_fp:
input_fp.seek(offset)
if entry == 'metadata':
expected = b'META-INF/COM/ANDROID/METADATA'
else:
expected = entry.replace('.', '-').upper().encode()
self.assertEqual(expected, input_fp.read(size))
@test_utils.SkipIfExternalToolsUnavailable()
def test_Compute(self):
@@ -753,7 +654,7 @@ class PropertyFilesTest(test_utils.ReleaseToolsTestCase):
AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
class StreamingPropertyFilesTest(PropertyFilesTest):
class StreamingPropertyFilesTest(PropertyFilesTestCase):
"""Additional validity checks specialized for StreamingPropertyFiles."""
def test_init(self):
@@ -834,7 +735,7 @@ class StreamingPropertyFilesTest(PropertyFilesTest):
AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
class AbOtaPropertyFilesTest(PropertyFilesTest):
class AbOtaPropertyFilesTest(PropertyFilesTestCase):
"""Additional validity checks specialized for AbOtaPropertyFiles."""
# The size for payload and metadata signature size.
@@ -1002,56 +903,6 @@ class AbOtaPropertyFilesTest(PropertyFilesTest):
property_files.Verify(zip_fp, raw_metadata)
class NonAbOtaPropertyFilesTest(PropertyFilesTest):
"""Additional validity checks specialized for NonAbOtaPropertyFiles."""
def test_init(self):
property_files = NonAbOtaPropertyFiles()
self.assertEqual('ota-property-files', property_files.name)
self.assertEqual((), property_files.required)
self.assertEqual((), property_files.optional)
def test_Compute(self):
entries = ()
zip_file = self.construct_zip_package(entries)
property_files = NonAbOtaPropertyFiles()
with zipfile.ZipFile(zip_file) as zip_fp:
property_files_string = property_files.Compute(zip_fp)
tokens = self._parse_property_files_string(property_files_string)
self.assertEqual(1, len(tokens))
self._verify_entries(zip_file, tokens, entries)
def test_Finalize(self):
entries = [
'META-INF/com/android/metadata',
]
zip_file = self.construct_zip_package(entries)
property_files = NonAbOtaPropertyFiles()
with zipfile.ZipFile(zip_file) as zip_fp:
raw_metadata = property_files.GetPropertyFilesString(
zip_fp, reserve_space=False)
property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
tokens = self._parse_property_files_string(property_files_string)
self.assertEqual(1, len(tokens))
# 'META-INF/com/android/metadata' will be key'd as 'metadata'.
entries[0] = 'metadata'
self._verify_entries(zip_file, tokens, entries)
def test_Verify(self):
entries = (
'META-INF/com/android/metadata',
)
zip_file = self.construct_zip_package(entries)
property_files = NonAbOtaPropertyFiles()
with zipfile.ZipFile(zip_file) as zip_fp:
raw_metadata = property_files.GetPropertyFilesString(
zip_fp, reserve_space=False)
property_files.Verify(zip_fp, raw_metadata)
class PayloadSignerTest(test_utils.ReleaseToolsTestCase):
SIGFILE = 'sigfile.bin'

View File

@@ -25,6 +25,7 @@ import os.path
import struct
import sys
import unittest
import zipfile
import common
@@ -192,6 +193,41 @@ class ReleaseToolsTestCase(unittest.TestCase):
def tearDown(self):
common.Cleanup()
class PropertyFilesTestCase(ReleaseToolsTestCase):
@staticmethod
def construct_zip_package(entries):
zip_file = common.MakeTempFile(suffix='.zip')
with zipfile.ZipFile(zip_file, 'w') as zip_fp:
for entry in entries:
zip_fp.writestr(
entry,
entry.replace('.', '-').upper(),
zipfile.ZIP_STORED)
return zip_file
@staticmethod
def _parse_property_files_string(data):
result = {}
for token in data.split(','):
name, info = token.split(':', 1)
result[name] = info
return result
def setUp(self):
common.OPTIONS.no_signing = False
def _verify_entries(self, input_file, tokens, entries):
for entry in entries:
offset, size = map(int, tokens[entry].split(':'))
with open(input_file, 'rb') as input_fp:
input_fp.seek(offset)
if entry == 'metadata':
expected = b'META-INF/COM/ANDROID/METADATA'
else:
expected = entry.replace('.', '-').upper().encode()
self.assertEqual(expected, input_fp.read(size))
if __name__ == '__main__':
testsuite = unittest.TestLoader().discover(