Revert "Move more non-AB code to separate files"

This reverts commit 513b86e5c2.

Change-Id: I6aae60642772a052404eb1773966b2e637864bbc
This commit is contained in:
Abhishek Nigam
2023-11-08 02:21:39 +00:00
parent b148ac22f8
commit 1dfca46094
8 changed files with 1237 additions and 1260 deletions

View File

@@ -483,13 +483,8 @@ python_binary_host {
defaults: ["releasetools_binary_defaults"],
srcs: [
"make_recovery_patch.py",
"non_ab_ota.py",
"edify_generator.py",
"check_target_files_vintf.py",
],
libs: [
"ota_utils_lib",
"ota_metadata_proto",
"releasetools_common",
],
}

View File

@@ -31,6 +31,7 @@ import sys
import zipfile
import common
from apex_manifest import ParseApexManifest
logger = logging.getLogger(__name__)

View File

@@ -15,6 +15,7 @@
from __future__ import print_function
import base64
import collections
import copy
import datetime
import errno
@@ -22,6 +23,7 @@ import fnmatch
import getopt
import getpass
import gzip
import imp
import json
import logging
import logging.config
@@ -34,13 +36,17 @@ import subprocess
import stat
import sys
import tempfile
import threading
import time
import zipfile
from dataclasses import dataclass
from genericpath import isdir
from hashlib import sha1, sha256
import images
import rangelib
import sparse_img
from blockimgdiff import BlockImageDiff
logger = logging.getLogger(__name__)
@@ -149,6 +155,35 @@ class AvbChainedPartitionArg:
self.partition, self.rollback_index_location, self.pubkey_path)
class ErrorCode(object):
"""Define error_codes for failures that happen during the actual
update package installation.
Error codes 0-999 are reserved for failures before the package
installation (i.e. low battery, package verification failure).
Detailed code in 'bootable/recovery/error_code.h' """
SYSTEM_VERIFICATION_FAILURE = 1000
SYSTEM_UPDATE_FAILURE = 1001
SYSTEM_UNEXPECTED_CONTENTS = 1002
SYSTEM_NONZERO_CONTENTS = 1003
SYSTEM_RECOVER_FAILURE = 1004
VENDOR_VERIFICATION_FAILURE = 2000
VENDOR_UPDATE_FAILURE = 2001
VENDOR_UNEXPECTED_CONTENTS = 2002
VENDOR_NONZERO_CONTENTS = 2003
VENDOR_RECOVER_FAILURE = 2004
OEM_PROP_MISMATCH = 3000
FINGERPRINT_MISMATCH = 3001
THUMBPRINT_MISMATCH = 3002
OLDER_BUILD = 3003
DEVICE_MISMATCH = 3004
BAD_PATCH_FILE = 3005
INSUFFICIENT_CACHE_SPACE = 3006
TUNE_PARTITION_FAILURE = 3007
APPLY_PATCH_FAILURE = 3008
class ExternalError(RuntimeError):
pass
@@ -3104,6 +3139,107 @@ def ZipClose(zip_file):
zipfile.ZIP64_LIMIT = saved_zip64_limit
class DeviceSpecificParams(object):
module = None
def __init__(self, **kwargs):
"""Keyword arguments to the constructor become attributes of this
object, which is passed to all functions in the device-specific
module."""
for k, v in kwargs.items():
setattr(self, k, v)
self.extras = OPTIONS.extras
if self.module is None:
path = OPTIONS.device_specific
if not path:
return
try:
if os.path.isdir(path):
info = imp.find_module("releasetools", [path])
else:
d, f = os.path.split(path)
b, x = os.path.splitext(f)
if x == ".py":
f = b
info = imp.find_module(f, [d])
logger.info("loaded device-specific extensions from %s", path)
self.module = imp.load_module("device_specific", *info)
except ImportError:
logger.info("unable to load device-specific module; assuming none")
def _DoCall(self, function_name, *args, **kwargs):
"""Call the named function in the device-specific module, passing
the given args and kwargs. The first argument to the call will be
the DeviceSpecific object itself. If there is no module, or the
module does not define the function, return the value of the
'default' kwarg (which itself defaults to None)."""
if self.module is None or not hasattr(self.module, function_name):
return kwargs.get("default")
return getattr(self.module, function_name)(*((self,) + args), **kwargs)
def FullOTA_Assertions(self):
"""Called after emitting the block of assertions at the top of a
full OTA package. Implementations can add whatever additional
assertions they like."""
return self._DoCall("FullOTA_Assertions")
def FullOTA_InstallBegin(self):
"""Called at the start of full OTA installation."""
return self._DoCall("FullOTA_InstallBegin")
def FullOTA_GetBlockDifferences(self):
"""Called during full OTA installation and verification.
Implementation should return a list of BlockDifference objects describing
the update on each additional partitions.
"""
return self._DoCall("FullOTA_GetBlockDifferences")
def FullOTA_InstallEnd(self):
"""Called at the end of full OTA installation; typically this is
used to install the image for the device's baseband processor."""
return self._DoCall("FullOTA_InstallEnd")
def IncrementalOTA_Assertions(self):
"""Called after emitting the block of assertions at the top of an
incremental OTA package. Implementations can add whatever
additional assertions they like."""
return self._DoCall("IncrementalOTA_Assertions")
def IncrementalOTA_VerifyBegin(self):
"""Called at the start of the verification phase of incremental
OTA installation; additional checks can be placed here to abort
the script before any changes are made."""
return self._DoCall("IncrementalOTA_VerifyBegin")
def IncrementalOTA_VerifyEnd(self):
"""Called at the end of the verification phase of incremental OTA
installation; additional checks can be placed here to abort the
script before any changes are made."""
return self._DoCall("IncrementalOTA_VerifyEnd")
def IncrementalOTA_InstallBegin(self):
"""Called at the start of incremental OTA installation (after
verification is complete)."""
return self._DoCall("IncrementalOTA_InstallBegin")
def IncrementalOTA_GetBlockDifferences(self):
"""Called during incremental OTA installation and verification.
Implementation should return a list of BlockDifference objects describing
the update on each additional partitions.
"""
return self._DoCall("IncrementalOTA_GetBlockDifferences")
def IncrementalOTA_InstallEnd(self):
"""Called at the end of incremental OTA installation; typically
this is used to install the image for the device's baseband
processor."""
return self._DoCall("IncrementalOTA_InstallEnd")
def VerifyOTA_Assertions(self):
return self._DoCall("VerifyOTA_Assertions")
class File(object):
def __init__(self, name, data, compress_size=None):
self.name = name
@@ -3133,11 +3269,454 @@ class File(object):
ZipWriteStr(z, self.name, self.data, compress_type=compression)
DIFF_PROGRAM_BY_EXT = {
".gz": "imgdiff",
".zip": ["imgdiff", "-z"],
".jar": ["imgdiff", "-z"],
".apk": ["imgdiff", "-z"],
".img": "imgdiff",
}
class Difference(object):
def __init__(self, tf, sf, diff_program=None):
self.tf = tf
self.sf = sf
self.patch = None
self.diff_program = diff_program
def ComputePatch(self):
"""Compute the patch (as a string of data) needed to turn sf into
tf. Returns the same tuple as GetPatch()."""
tf = self.tf
sf = self.sf
if self.diff_program:
diff_program = self.diff_program
else:
ext = os.path.splitext(tf.name)[1]
diff_program = DIFF_PROGRAM_BY_EXT.get(ext, "bsdiff")
ttemp = tf.WriteToTemp()
stemp = sf.WriteToTemp()
ext = os.path.splitext(tf.name)[1]
try:
ptemp = tempfile.NamedTemporaryFile()
if isinstance(diff_program, list):
cmd = copy.copy(diff_program)
else:
cmd = [diff_program]
cmd.append(stemp.name)
cmd.append(ttemp.name)
cmd.append(ptemp.name)
p = Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
err = []
def run():
_, e = p.communicate()
if e:
err.append(e)
th = threading.Thread(target=run)
th.start()
th.join(timeout=300) # 5 mins
if th.is_alive():
logger.warning("diff command timed out")
p.terminate()
th.join(5)
if th.is_alive():
p.kill()
th.join()
if p.returncode != 0:
logger.warning("Failure running %s:\n%s\n", cmd, "".join(err))
self.patch = None
return None, None, None
diff = ptemp.read()
finally:
ptemp.close()
stemp.close()
ttemp.close()
self.patch = diff
return self.tf, self.sf, self.patch
def GetPatch(self):
"""Returns a tuple of (target_file, source_file, patch_data).
patch_data may be None if ComputePatch hasn't been called, or if
computing the patch failed.
"""
return self.tf, self.sf, self.patch
def ComputeDifferences(diffs):
"""Call ComputePatch on all the Difference objects in 'diffs'."""
logger.info("%d diffs to compute", len(diffs))
# Do the largest files first, to try and reduce the long-pole effect.
by_size = [(i.tf.size, i) for i in diffs]
by_size.sort(reverse=True)
by_size = [i[1] for i in by_size]
lock = threading.Lock()
diff_iter = iter(by_size) # accessed under lock
def worker():
try:
lock.acquire()
for d in diff_iter:
lock.release()
start = time.time()
d.ComputePatch()
dur = time.time() - start
lock.acquire()
tf, sf, patch = d.GetPatch()
if sf.name == tf.name:
name = tf.name
else:
name = "%s (%s)" % (tf.name, sf.name)
if patch is None:
logger.error("patching failed! %40s", name)
else:
logger.info(
"%8.2f sec %8d / %8d bytes (%6.2f%%) %s", dur, len(patch),
tf.size, 100.0 * len(patch) / tf.size, name)
lock.release()
except Exception:
logger.exception("Failed to compute diff from worker")
raise
# start worker threads; wait for them all to finish.
threads = [threading.Thread(target=worker)
for i in range(OPTIONS.worker_threads)]
for th in threads:
th.start()
while threads:
threads.pop().join()
class BlockDifference(object):
def __init__(self, partition, tgt, src=None, check_first_block=False,
version=None, disable_imgdiff=False):
self.tgt = tgt
self.src = src
self.partition = partition
self.check_first_block = check_first_block
self.disable_imgdiff = disable_imgdiff
if version is None:
version = max(
int(i) for i in
OPTIONS.info_dict.get("blockimgdiff_versions", "1").split(","))
assert version >= 3
self.version = version
b = BlockImageDiff(tgt, src, threads=OPTIONS.worker_threads,
version=self.version,
disable_imgdiff=self.disable_imgdiff)
self.path = os.path.join(MakeTempDir(), partition)
b.Compute(self.path)
self._required_cache = b.max_stashed_size
self.touched_src_ranges = b.touched_src_ranges
self.touched_src_sha1 = b.touched_src_sha1
# On devices with dynamic partitions, for new partitions,
# src is None but OPTIONS.source_info_dict is not.
if OPTIONS.source_info_dict is None:
is_dynamic_build = OPTIONS.info_dict.get(
"use_dynamic_partitions") == "true"
is_dynamic_source = False
else:
is_dynamic_build = OPTIONS.source_info_dict.get(
"use_dynamic_partitions") == "true"
is_dynamic_source = partition in shlex.split(
OPTIONS.source_info_dict.get("dynamic_partition_list", "").strip())
is_dynamic_target = partition in shlex.split(
OPTIONS.info_dict.get("dynamic_partition_list", "").strip())
# For dynamic partitions builds, check partition list in both source
# and target build because new partitions may be added, and existing
# partitions may be removed.
is_dynamic = is_dynamic_build and (is_dynamic_source or is_dynamic_target)
if is_dynamic:
self.device = 'map_partition("%s")' % partition
else:
if OPTIONS.source_info_dict is None:
_, device_expr = GetTypeAndDeviceExpr("/" + partition,
OPTIONS.info_dict)
else:
_, device_expr = GetTypeAndDeviceExpr("/" + partition,
OPTIONS.source_info_dict)
self.device = device_expr
@property
def required_cache(self):
return self._required_cache
def WriteScript(self, script, output_zip, progress=None,
write_verify_script=False):
if not self.src:
# write the output unconditionally
script.Print("Patching %s image unconditionally..." % (self.partition,))
else:
script.Print("Patching %s image after verification." % (self.partition,))
if progress:
script.ShowProgress(progress, 0)
self._WriteUpdate(script, output_zip)
if write_verify_script:
self.WritePostInstallVerifyScript(script)
def WriteStrictVerifyScript(self, script):
"""Verify all the blocks in the care_map, including clobbered blocks.
This differs from the WriteVerifyScript() function: a) it prints different
error messages; b) it doesn't allow half-way updated images to pass the
verification."""
partition = self.partition
script.Print("Verifying %s..." % (partition,))
ranges = self.tgt.care_map
ranges_str = ranges.to_string_raw()
script.AppendExtra(
'range_sha1(%s, "%s") == "%s" && ui_print(" Verified.") || '
'ui_print("%s has unexpected contents.");' % (
self.device, ranges_str,
self.tgt.TotalSha1(include_clobbered_blocks=True),
self.partition))
script.AppendExtra("")
def WriteVerifyScript(self, script, touched_blocks_only=False):
partition = self.partition
# full OTA
if not self.src:
script.Print("Image %s will be patched unconditionally." % (partition,))
# incremental OTA
else:
if touched_blocks_only:
ranges = self.touched_src_ranges
expected_sha1 = self.touched_src_sha1
else:
ranges = self.src.care_map.subtract(self.src.clobbered_blocks)
expected_sha1 = self.src.TotalSha1()
# No blocks to be checked, skipping.
if not ranges:
return
ranges_str = ranges.to_string_raw()
script.AppendExtra(
'if (range_sha1(%s, "%s") == "%s" || block_image_verify(%s, '
'package_extract_file("%s.transfer.list"), "%s.new.dat", '
'"%s.patch.dat")) then' % (
self.device, ranges_str, expected_sha1,
self.device, partition, partition, partition))
script.Print('Verified %s image...' % (partition,))
script.AppendExtra('else')
if self.version >= 4:
# Bug: 21124327
# When generating incrementals for the system and vendor partitions in
# version 4 or newer, explicitly check the first block (which contains
# the superblock) of the partition to see if it's what we expect. If
# this check fails, give an explicit log message about the partition
# having been remounted R/W (the most likely explanation).
if self.check_first_block:
script.AppendExtra('check_first_block(%s);' % (self.device,))
# If version >= 4, try block recovery before abort update
if partition == "system":
code = ErrorCode.SYSTEM_RECOVER_FAILURE
else:
code = ErrorCode.VENDOR_RECOVER_FAILURE
script.AppendExtra((
'ifelse (block_image_recover({device}, "{ranges}") && '
'block_image_verify({device}, '
'package_extract_file("{partition}.transfer.list"), '
'"{partition}.new.dat", "{partition}.patch.dat"), '
'ui_print("{partition} recovered successfully."), '
'abort("E{code}: {partition} partition fails to recover"));\n'
'endif;').format(device=self.device, ranges=ranges_str,
partition=partition, code=code))
# Abort the OTA update. Note that the incremental OTA cannot be applied
# even if it may match the checksum of the target partition.
# a) If version < 3, operations like move and erase will make changes
# unconditionally and damage the partition.
# b) If version >= 3, it won't even reach here.
else:
if partition == "system":
code = ErrorCode.SYSTEM_VERIFICATION_FAILURE
else:
code = ErrorCode.VENDOR_VERIFICATION_FAILURE
script.AppendExtra((
'abort("E%d: %s partition has unexpected contents");\n'
'endif;') % (code, partition))
def WritePostInstallVerifyScript(self, script):
partition = self.partition
script.Print('Verifying the updated %s image...' % (partition,))
# Unlike pre-install verification, clobbered_blocks should not be ignored.
ranges = self.tgt.care_map
ranges_str = ranges.to_string_raw()
script.AppendExtra(
'if range_sha1(%s, "%s") == "%s" then' % (
self.device, ranges_str,
self.tgt.TotalSha1(include_clobbered_blocks=True)))
# Bug: 20881595
# Verify that extended blocks are really zeroed out.
if self.tgt.extended:
ranges_str = self.tgt.extended.to_string_raw()
script.AppendExtra(
'if range_sha1(%s, "%s") == "%s" then' % (
self.device, ranges_str,
self._HashZeroBlocks(self.tgt.extended.size())))
script.Print('Verified the updated %s image.' % (partition,))
if partition == "system":
code = ErrorCode.SYSTEM_NONZERO_CONTENTS
else:
code = ErrorCode.VENDOR_NONZERO_CONTENTS
script.AppendExtra(
'else\n'
' abort("E%d: %s partition has unexpected non-zero contents after '
'OTA update");\n'
'endif;' % (code, partition))
else:
script.Print('Verified the updated %s image.' % (partition,))
if partition == "system":
code = ErrorCode.SYSTEM_UNEXPECTED_CONTENTS
else:
code = ErrorCode.VENDOR_UNEXPECTED_CONTENTS
script.AppendExtra(
'else\n'
' abort("E%d: %s partition has unexpected contents after OTA '
'update");\n'
'endif;' % (code, partition))
def _WriteUpdate(self, script, output_zip):
ZipWrite(output_zip,
'{}.transfer.list'.format(self.path),
'{}.transfer.list'.format(self.partition))
# For full OTA, compress the new.dat with brotli with quality 6 to reduce
# its size. Quailty 9 almost triples the compression time but doesn't
# further reduce the size too much. For a typical 1.8G system.new.dat
# zip | brotli(quality 6) | brotli(quality 9)
# compressed_size: 942M | 869M (~8% reduced) | 854M
# compression_time: 75s | 265s | 719s
# decompression_time: 15s | 25s | 25s
if not self.src:
brotli_cmd = ['brotli', '--quality=6',
'--output={}.new.dat.br'.format(self.path),
'{}.new.dat'.format(self.path)]
print("Compressing {}.new.dat with brotli".format(self.partition))
RunAndCheckOutput(brotli_cmd)
new_data_name = '{}.new.dat.br'.format(self.partition)
ZipWrite(output_zip,
'{}.new.dat.br'.format(self.path),
new_data_name,
compress_type=zipfile.ZIP_STORED)
else:
new_data_name = '{}.new.dat'.format(self.partition)
ZipWrite(output_zip, '{}.new.dat'.format(self.path), new_data_name)
ZipWrite(output_zip,
'{}.patch.dat'.format(self.path),
'{}.patch.dat'.format(self.partition),
compress_type=zipfile.ZIP_STORED)
if self.partition == "system":
code = ErrorCode.SYSTEM_UPDATE_FAILURE
else:
code = ErrorCode.VENDOR_UPDATE_FAILURE
call = ('block_image_update({device}, '
'package_extract_file("{partition}.transfer.list"), '
'"{new_data_name}", "{partition}.patch.dat") ||\n'
' abort("E{code}: Failed to update {partition} image.");'.format(
device=self.device, partition=self.partition,
new_data_name=new_data_name, code=code))
script.AppendExtra(script.WordWrap(call))
def _HashBlocks(self, source, ranges): # pylint: disable=no-self-use
data = source.ReadRangeSet(ranges)
ctx = sha1()
for p in data:
ctx.update(p)
return ctx.hexdigest()
def _HashZeroBlocks(self, num_blocks): # pylint: disable=no-self-use
"""Return the hash value for all zero blocks."""
zero_block = '\x00' * 4096
ctx = sha1()
for _ in range(num_blocks):
ctx.update(zero_block)
return ctx.hexdigest()
# Expose these two classes to support vendor-specific scripts
DataImage = images.DataImage
EmptyImage = images.EmptyImage
# map recovery.fstab's fs_types to mount/format "partition types"
PARTITION_TYPES = {
"ext4": "EMMC",
"emmc": "EMMC",
"f2fs": "EMMC",
"squashfs": "EMMC",
"erofs": "EMMC"
}
def GetTypeAndDevice(mount_point, info, check_no_slot=True):
"""
Use GetTypeAndDeviceExpr whenever possible. This function is kept for
backwards compatibility. It aborts if the fstab entry has slotselect option
(unless check_no_slot is explicitly set to False).
"""
fstab = info["fstab"]
if fstab:
if check_no_slot:
assert not fstab[mount_point].slotselect, \
"Use GetTypeAndDeviceExpr instead"
return (PARTITION_TYPES[fstab[mount_point].fs_type],
fstab[mount_point].device)
raise KeyError
def GetTypeAndDeviceExpr(mount_point, info):
"""
Return the filesystem of the partition, and an edify expression that evaluates
to the device at runtime.
"""
fstab = info["fstab"]
if fstab:
p = fstab[mount_point]
device_expr = '"%s"' % fstab[mount_point].device
if p.slotselect:
device_expr = 'add_slot_suffix(%s)' % device_expr
return (PARTITION_TYPES[fstab[mount_point].fs_type], device_expr)
raise KeyError
def GetEntryForDevice(fstab, device):
"""
@@ -3213,6 +3792,349 @@ def ExtractAvbPublicKey(avbtool, key):
return output
def MakeRecoveryPatch(input_dir, output_sink, recovery_img, boot_img,
info_dict=None):
"""Generates the recovery-from-boot patch and writes the script to output.
Most of the space in the boot and recovery images is just the kernel, which is
identical for the two, so the resulting patch should be efficient. Add it to
the output zip, along with a shell script that is run from init.rc on first
boot to actually do the patching and install the new recovery image.
Args:
input_dir: The top-level input directory of the target-files.zip.
output_sink: The callback function that writes the result.
recovery_img: File object for the recovery image.
boot_img: File objects for the boot image.
info_dict: A dict returned by common.LoadInfoDict() on the input
target_files. Will use OPTIONS.info_dict if None has been given.
"""
if info_dict is None:
info_dict = OPTIONS.info_dict
full_recovery_image = info_dict.get("full_recovery_image") == "true"
board_uses_vendorimage = info_dict.get("board_uses_vendorimage") == "true"
if board_uses_vendorimage:
# In this case, the output sink is rooted at VENDOR
recovery_img_path = "etc/recovery.img"
recovery_resource_dat_path = "VENDOR/etc/recovery-resource.dat"
sh_dir = "bin"
else:
# In this case the output sink is rooted at SYSTEM
recovery_img_path = "vendor/etc/recovery.img"
recovery_resource_dat_path = "SYSTEM/vendor/etc/recovery-resource.dat"
sh_dir = "vendor/bin"
if full_recovery_image:
output_sink(recovery_img_path, recovery_img.data)
else:
system_root_image = info_dict.get("system_root_image") == "true"
include_recovery_dtbo = info_dict.get("include_recovery_dtbo") == "true"
include_recovery_acpio = info_dict.get("include_recovery_acpio") == "true"
path = os.path.join(input_dir, recovery_resource_dat_path)
# With system-root-image, boot and recovery images will have mismatching
# entries (only recovery has the ramdisk entry) (Bug: 72731506). Use bsdiff
# to handle such a case.
if system_root_image or include_recovery_dtbo or include_recovery_acpio:
diff_program = ["bsdiff"]
bonus_args = ""
assert not os.path.exists(path)
else:
diff_program = ["imgdiff"]
if os.path.exists(path):
diff_program.append("-b")
diff_program.append(path)
bonus_args = "--bonus /vendor/etc/recovery-resource.dat"
else:
bonus_args = ""
d = Difference(recovery_img, boot_img, diff_program=diff_program)
_, _, patch = d.ComputePatch()
output_sink("recovery-from-boot.p", patch)
try:
# The following GetTypeAndDevice()s need to use the path in the target
# info_dict instead of source_info_dict.
boot_type, boot_device = GetTypeAndDevice("/boot", info_dict,
check_no_slot=False)
recovery_type, recovery_device = GetTypeAndDevice("/recovery", info_dict,
check_no_slot=False)
except KeyError:
return
if full_recovery_image:
# Note that we use /vendor to refer to the recovery resources. This will
# work for a separate vendor partition mounted at /vendor or a
# /system/vendor subdirectory on the system partition, for which init will
# create a symlink from /vendor to /system/vendor.
sh = """#!/vendor/bin/sh
if ! applypatch --check %(type)s:%(device)s:%(size)d:%(sha1)s; then
applypatch \\
--flash /vendor/etc/recovery.img \\
--target %(type)s:%(device)s:%(size)d:%(sha1)s && \\
log -t recovery "Installing new recovery image: succeeded" || \\
log -t recovery "Installing new recovery image: failed"
else
log -t recovery "Recovery image already installed"
fi
""" % {'type': recovery_type,
'device': recovery_device,
'sha1': recovery_img.sha1,
'size': recovery_img.size}
else:
sh = """#!/vendor/bin/sh
if ! applypatch --check %(recovery_type)s:%(recovery_device)s:%(recovery_size)d:%(recovery_sha1)s; then
applypatch %(bonus_args)s \\
--patch /vendor/recovery-from-boot.p \\
--source %(boot_type)s:%(boot_device)s:%(boot_size)d:%(boot_sha1)s \\
--target %(recovery_type)s:%(recovery_device)s:%(recovery_size)d:%(recovery_sha1)s && \\
log -t recovery "Installing new recovery image: succeeded" || \\
log -t recovery "Installing new recovery image: failed"
else
log -t recovery "Recovery image already installed"
fi
""" % {'boot_size': boot_img.size,
'boot_sha1': boot_img.sha1,
'recovery_size': recovery_img.size,
'recovery_sha1': recovery_img.sha1,
'boot_type': boot_type,
'boot_device': boot_device + '$(getprop ro.boot.slot_suffix)',
'recovery_type': recovery_type,
'recovery_device': recovery_device + '$(getprop ro.boot.slot_suffix)',
'bonus_args': bonus_args}
# The install script location moved from /system/etc to /system/bin in the L
# release. In the R release it is in VENDOR/bin or SYSTEM/vendor/bin.
sh_location = os.path.join(sh_dir, "install-recovery.sh")
logger.info("putting script in %s", sh_location)
output_sink(sh_location, sh.encode())
class DynamicPartitionUpdate(object):
def __init__(self, src_group=None, tgt_group=None, progress=None,
block_difference=None):
self.src_group = src_group
self.tgt_group = tgt_group
self.progress = progress
self.block_difference = block_difference
@property
def src_size(self):
if not self.block_difference:
return 0
return DynamicPartitionUpdate._GetSparseImageSize(self.block_difference.src)
@property
def tgt_size(self):
if not self.block_difference:
return 0
return DynamicPartitionUpdate._GetSparseImageSize(self.block_difference.tgt)
@staticmethod
def _GetSparseImageSize(img):
if not img:
return 0
return img.blocksize * img.total_blocks
class DynamicGroupUpdate(object):
def __init__(self, src_size=None, tgt_size=None):
# None: group does not exist. 0: no size limits.
self.src_size = src_size
self.tgt_size = tgt_size
class DynamicPartitionsDifference(object):
def __init__(self, info_dict, block_diffs, progress_dict=None,
source_info_dict=None):
if progress_dict is None:
progress_dict = {}
self._remove_all_before_apply = False
if source_info_dict is None:
self._remove_all_before_apply = True
source_info_dict = {}
block_diff_dict = collections.OrderedDict(
[(e.partition, e) for e in block_diffs])
assert len(block_diff_dict) == len(block_diffs), \
"Duplicated BlockDifference object for {}".format(
[partition for partition, count in
collections.Counter(e.partition for e in block_diffs).items()
if count > 1])
self._partition_updates = collections.OrderedDict()
for p, block_diff in block_diff_dict.items():
self._partition_updates[p] = DynamicPartitionUpdate()
self._partition_updates[p].block_difference = block_diff
for p, progress in progress_dict.items():
if p in self._partition_updates:
self._partition_updates[p].progress = progress
tgt_groups = shlex.split(info_dict.get(
"super_partition_groups", "").strip())
src_groups = shlex.split(source_info_dict.get(
"super_partition_groups", "").strip())
for g in tgt_groups:
for p in shlex.split(info_dict.get(
"super_%s_partition_list" % g, "").strip()):
assert p in self._partition_updates, \
"{} is in target super_{}_partition_list but no BlockDifference " \
"object is provided.".format(p, g)
self._partition_updates[p].tgt_group = g
for g in src_groups:
for p in shlex.split(source_info_dict.get(
"super_%s_partition_list" % g, "").strip()):
assert p in self._partition_updates, \
"{} is in source super_{}_partition_list but no BlockDifference " \
"object is provided.".format(p, g)
self._partition_updates[p].src_group = g
target_dynamic_partitions = set(shlex.split(info_dict.get(
"dynamic_partition_list", "").strip()))
block_diffs_with_target = set(p for p, u in self._partition_updates.items()
if u.tgt_size)
assert block_diffs_with_target == target_dynamic_partitions, \
"Target Dynamic partitions: {}, BlockDifference with target: {}".format(
list(target_dynamic_partitions), list(block_diffs_with_target))
source_dynamic_partitions = set(shlex.split(source_info_dict.get(
"dynamic_partition_list", "").strip()))
block_diffs_with_source = set(p for p, u in self._partition_updates.items()
if u.src_size)
assert block_diffs_with_source == source_dynamic_partitions, \
"Source Dynamic partitions: {}, BlockDifference with source: {}".format(
list(source_dynamic_partitions), list(block_diffs_with_source))
if self._partition_updates:
logger.info("Updating dynamic partitions %s",
self._partition_updates.keys())
self._group_updates = collections.OrderedDict()
for g in tgt_groups:
self._group_updates[g] = DynamicGroupUpdate()
self._group_updates[g].tgt_size = int(info_dict.get(
"super_%s_group_size" % g, "0").strip())
for g in src_groups:
if g not in self._group_updates:
self._group_updates[g] = DynamicGroupUpdate()
self._group_updates[g].src_size = int(source_info_dict.get(
"super_%s_group_size" % g, "0").strip())
self._Compute()
def WriteScript(self, script, output_zip, write_verify_script=False):
script.Comment('--- Start patching dynamic partitions ---')
for p, u in self._partition_updates.items():
if u.src_size and u.tgt_size and u.src_size > u.tgt_size:
script.Comment('Patch partition %s' % p)
u.block_difference.WriteScript(script, output_zip, progress=u.progress,
write_verify_script=False)
op_list_path = MakeTempFile()
with open(op_list_path, 'w') as f:
for line in self._op_list:
f.write('{}\n'.format(line))
ZipWrite(output_zip, op_list_path, "dynamic_partitions_op_list")
script.Comment('Update dynamic partition metadata')
script.AppendExtra('assert(update_dynamic_partitions('
'package_extract_file("dynamic_partitions_op_list")));')
if write_verify_script:
for p, u in self._partition_updates.items():
if u.src_size and u.tgt_size and u.src_size > u.tgt_size:
u.block_difference.WritePostInstallVerifyScript(script)
script.AppendExtra('unmap_partition("%s");' % p) # ignore errors
for p, u in self._partition_updates.items():
if u.tgt_size and u.src_size <= u.tgt_size:
script.Comment('Patch partition %s' % p)
u.block_difference.WriteScript(script, output_zip, progress=u.progress,
write_verify_script=write_verify_script)
if write_verify_script:
script.AppendExtra('unmap_partition("%s");' % p) # ignore errors
script.Comment('--- End patching dynamic partitions ---')
def _Compute(self):
self._op_list = list()
def append(line):
self._op_list.append(line)
def comment(line):
self._op_list.append("# %s" % line)
if self._remove_all_before_apply:
comment('Remove all existing dynamic partitions and groups before '
'applying full OTA')
append('remove_all_groups')
for p, u in self._partition_updates.items():
if u.src_group and not u.tgt_group:
append('remove %s' % p)
for p, u in self._partition_updates.items():
if u.src_group and u.tgt_group and u.src_group != u.tgt_group:
comment('Move partition %s from %s to default' % (p, u.src_group))
append('move %s default' % p)
for p, u in self._partition_updates.items():
if u.src_size and u.tgt_size and u.src_size > u.tgt_size:
comment('Shrink partition %s from %d to %d' %
(p, u.src_size, u.tgt_size))
append('resize %s %s' % (p, u.tgt_size))
for g, u in self._group_updates.items():
if u.src_size is not None and u.tgt_size is None:
append('remove_group %s' % g)
if (u.src_size is not None and u.tgt_size is not None and
u.src_size > u.tgt_size):
comment('Shrink group %s from %d to %d' % (g, u.src_size, u.tgt_size))
append('resize_group %s %d' % (g, u.tgt_size))
for g, u in self._group_updates.items():
if u.src_size is None and u.tgt_size is not None:
comment('Add group %s with maximum size %d' % (g, u.tgt_size))
append('add_group %s %d' % (g, u.tgt_size))
if (u.src_size is not None and u.tgt_size is not None and
u.src_size < u.tgt_size):
comment('Grow group %s from %d to %d' % (g, u.src_size, u.tgt_size))
append('resize_group %s %d' % (g, u.tgt_size))
for p, u in self._partition_updates.items():
if u.tgt_group and not u.src_group:
comment('Add partition %s to group %s' % (p, u.tgt_group))
append('add %s %s' % (p, u.tgt_group))
for p, u in self._partition_updates.items():
if u.tgt_size and u.src_size < u.tgt_size:
comment('Grow partition %s from %d to %d' %
(p, u.src_size, u.tgt_size))
append('resize %s %d' % (p, u.tgt_size))
for p, u in self._partition_updates.items():
if u.src_group and u.tgt_group and u.src_group != u.tgt_group:
comment('Move partition %s from default to %s' %
(p, u.tgt_group))
append('move %s %s' % (p, u.tgt_group))
def GetBootImageBuildProp(boot_img, ramdisk_format=RamdiskFormat.LZ4):
"""
Get build.prop from ramdisk within the boot image

View File

@@ -16,36 +16,6 @@ import re
import common
class ErrorCode(object):
"""Define error_codes for failures that happen during the actual
update package installation.
Error codes 0-999 are reserved for failures before the package
installation (i.e. low battery, package verification failure).
Detailed code in 'bootable/recovery/error_code.h' """
SYSTEM_VERIFICATION_FAILURE = 1000
SYSTEM_UPDATE_FAILURE = 1001
SYSTEM_UNEXPECTED_CONTENTS = 1002
SYSTEM_NONZERO_CONTENTS = 1003
SYSTEM_RECOVER_FAILURE = 1004
VENDOR_VERIFICATION_FAILURE = 2000
VENDOR_UPDATE_FAILURE = 2001
VENDOR_UNEXPECTED_CONTENTS = 2002
VENDOR_NONZERO_CONTENTS = 2003
VENDOR_RECOVER_FAILURE = 2004
OEM_PROP_MISMATCH = 3000
FINGERPRINT_MISMATCH = 3001
THUMBPRINT_MISMATCH = 3002
OLDER_BUILD = 3003
DEVICE_MISMATCH = 3004
BAD_PATCH_FILE = 3005
INSUFFICIENT_CACHE_SPACE = 3006
TUNE_PARTITION_FAILURE = 3007
APPLY_PATCH_FAILURE = 3008
class EdifyGenerator(object):
"""Class to generate scripts in the 'edify' recovery script language
used from donut onwards."""
@@ -118,7 +88,7 @@ class EdifyGenerator(object):
'abort("E{code}: This package expects the value \\"{values}\\" for '
'\\"{name}\\"; this has value \\"" + '
'{get_prop_command} + "\\".");').format(
code=ErrorCode.OEM_PROP_MISMATCH,
code=common.ErrorCode.OEM_PROP_MISMATCH,
get_prop_command=get_prop_command, name=name,
values='\\" or \\"'.join(values))
self.script.append(cmd)
@@ -131,7 +101,7 @@ class EdifyGenerator(object):
for i in fp]) +
' ||\n abort("E%d: Package expects build fingerprint of %s; '
'this device has " + getprop("ro.build.fingerprint") + ".");') % (
ErrorCode.FINGERPRINT_MISMATCH, " or ".join(fp))
common.ErrorCode.FINGERPRINT_MISMATCH, " or ".join(fp))
self.script.append(cmd)
def AssertSomeThumbprint(self, *fp):
@@ -142,7 +112,7 @@ class EdifyGenerator(object):
for i in fp]) +
' ||\n abort("E%d: Package expects build thumbprint of %s; this '
'device has " + getprop("ro.build.thumbprint") + ".");') % (
ErrorCode.THUMBPRINT_MISMATCH, " or ".join(fp))
common.ErrorCode.THUMBPRINT_MISMATCH, " or ".join(fp))
self.script.append(cmd)
def AssertFingerprintOrThumbprint(self, fp, tp):
@@ -163,14 +133,14 @@ class EdifyGenerator(object):
('(!less_than_int(%s, getprop("ro.build.date.utc"))) || '
'abort("E%d: Can\'t install this package (%s) over newer '
'build (" + getprop("ro.build.date") + ").");') % (
timestamp, ErrorCode.OLDER_BUILD, timestamp_text))
timestamp, common.ErrorCode.OLDER_BUILD, timestamp_text))
def AssertDevice(self, device):
"""Assert that the device identifier is the given string."""
cmd = ('getprop("ro.product.device") == "%s" || '
'abort("E%d: This package is for \\"%s\\" devices; '
'this is a \\"" + getprop("ro.product.device") + "\\".");') % (
device, ErrorCode.DEVICE_MISMATCH, device)
device, common.ErrorCode.DEVICE_MISMATCH, device)
self.script.append(cmd)
def AssertSomeBootloader(self, *bootloaders):
@@ -237,7 +207,7 @@ class EdifyGenerator(object):
'unexpected contents."));').format(
target=target_expr,
source=source_expr,
code=ErrorCode.BAD_PATCH_FILE)))
code=common.ErrorCode.BAD_PATCH_FILE)))
def CacheFreeSpaceCheck(self, amount):
"""Check that there's at least 'amount' space that can be made
@@ -246,7 +216,7 @@ class EdifyGenerator(object):
self.script.append(('apply_patch_space(%d) || abort("E%d: Not enough free '
'space on /cache to apply patches.");') % (
amount,
ErrorCode.INSUFFICIENT_CACHE_SPACE))
common.ErrorCode.INSUFFICIENT_CACHE_SPACE))
def Mount(self, mount_point, mount_options_by_format=""):
"""Mount the partition with the given mount_point.
@@ -294,7 +264,7 @@ class EdifyGenerator(object):
'tune2fs(' + "".join(['"%s", ' % (i,) for i in options]) +
'%s) || abort("E%d: Failed to tune partition %s");' % (
self._GetSlotSuffixDeviceForEntry(p),
ErrorCode.TUNE_PARTITION_FAILURE, partition))
common.ErrorCode.TUNE_PARTITION_FAILURE, partition))
def FormatPartition(self, partition):
"""Format the given partition, specified by its mount point (eg,
@@ -384,7 +354,7 @@ class EdifyGenerator(object):
target=target_expr,
source=source_expr,
patch=patch_expr,
code=ErrorCode.APPLY_PATCH_FAILURE)))
code=common.ErrorCode.APPLY_PATCH_FAILURE)))
def _GetSlotSuffixDeviceForEntry(self, entry=None):
"""

View File

@@ -21,7 +21,6 @@ import os
import sys
import common
from non_ab_ota import MakeRecoveryPatch
if sys.hexversion < 0x02070000:
print("Python 2.7 or newer is required.", file=sys.stderr)
@@ -61,7 +60,7 @@ def main(argv):
*fn.split("/")), "wb") as f:
f.write(data)
MakeRecoveryPatch(input_dir, output_sink, recovery_img, boot_img)
common.MakeRecoveryPatch(input_dir, output_sink, recovery_img, boot_img)
if __name__ == '__main__':

View File

@@ -13,25 +13,17 @@
# limitations under the License.
import collections
import copy
import imp
import logging
import os
import time
import threading
import tempfile
import zipfile
import subprocess
import shlex
import common
import edify_generator
from edify_generator import ErrorCode
import verity_utils
from check_target_files_vintf import CheckVintfIfTrebleEnabled, HasPartition
from common import OPTIONS, Run, MakeTempDir, RunAndCheckOutput, ZipWrite, MakeTempFile
from common import OPTIONS
from ota_utils import UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata, PropertyFiles
from blockimgdiff import BlockImageDiff
from hashlib import sha1
import subprocess
logger = logging.getLogger(__name__)
@@ -59,7 +51,7 @@ def GetBlockDifferences(target_zip, source_zip, target_info, source_info,
check_first_block = partition_source_info.fs_type == "ext4"
# Disable imgdiff because it relies on zlib to produce stable output
# across different versions, which is often not the case.
return BlockDifference(name, partition_tgt, partition_src,
return common.BlockDifference(name, partition_tgt, partition_src,
check_first_block,
version=blockimgdiff_version,
disable_imgdiff=True)
@@ -84,7 +76,7 @@ def GetBlockDifferences(target_zip, source_zip, target_info, source_info,
tgt = common.GetUserImage(partition, OPTIONS.input_tmp, target_zip,
info_dict=target_info,
reset_file_map=True)
block_diff_dict[partition] = BlockDifference(partition, tgt,
block_diff_dict[partition] = common.BlockDifference(partition, tgt,
src=None)
# Incremental OTA update.
else:
@@ -103,7 +95,7 @@ def GetBlockDifferences(target_zip, source_zip, target_info, source_info,
function_name = "FullOTA_GetBlockDifferences"
if device_specific_diffs:
assert all(isinstance(diff, BlockDifference)
assert all(isinstance(diff, common.BlockDifference)
for diff in device_specific_diffs), \
"{} is not returning a list of BlockDifference objects".format(
function_name)
@@ -139,7 +131,7 @@ def WriteFullOTAPackage(input_zip, output_file):
output_zip = zipfile.ZipFile(
staging_file, "w", compression=zipfile.ZIP_DEFLATED)
device_specific = DeviceSpecificParams(
device_specific = common.DeviceSpecificParams(
input_zip=input_zip,
input_version=target_api_version,
output_zip=output_zip,
@@ -225,7 +217,7 @@ else if get_stage("%(bcb_dev)s") == "3/3" then
if target_info.get('use_dynamic_partitions') == "true":
# Use empty source_info_dict to indicate that all partitions / groups must
# be re-added.
dynamic_partitions_diff = DynamicPartitionsDifference(
dynamic_partitions_diff = common.DynamicPartitionsDifference(
info_dict=OPTIONS.info_dict,
block_diffs=block_diff_dict.values(),
progress_dict=progress_dict)
@@ -317,7 +309,7 @@ def WriteBlockIncrementalOTAPackage(target_zip, source_zip, output_file):
output_zip = zipfile.ZipFile(
staging_file, "w", compression=zipfile.ZIP_DEFLATED)
device_specific = DeviceSpecificParams(
device_specific = common.DeviceSpecificParams(
source_zip=source_zip,
source_version=source_api_version,
source_tmp=OPTIONS.source_tmp,
@@ -412,9 +404,9 @@ else if get_stage("%(bcb_dev)s") != "3/3" then
required_cache_sizes = [diff.required_cache for diff in
block_diff_dict.values()]
if updating_boot:
boot_type, boot_device_expr = GetTypeAndDeviceExpr("/boot",
boot_type, boot_device_expr = common.GetTypeAndDeviceExpr("/boot",
source_info)
d = Difference(target_boot, source_boot, "bsdiff")
d = common.Difference(target_boot, source_boot, "bsdiff")
_, _, d = d.ComputePatch()
if d is None:
include_full_boot = True
@@ -469,7 +461,7 @@ else
if OPTIONS.target_info_dict.get("use_dynamic_partitions") != "true":
raise RuntimeError(
"can't generate incremental that disables dynamic partitions")
dynamic_partitions_diff = DynamicPartitionsDifference(
dynamic_partitions_diff = common.DynamicPartitionsDifference(
info_dict=OPTIONS.target_info_dict,
source_info_dict=OPTIONS.source_info_dict,
block_diffs=block_diff_dict.values(),
@@ -695,891 +687,3 @@ def HasRecoveryPatch(target_files_zip, info_dict):
namelist = target_files_zip.namelist()
return patch in namelist or img in namelist
class DeviceSpecificParams(object):
module = None
def __init__(self, **kwargs):
"""Keyword arguments to the constructor become attributes of this
object, which is passed to all functions in the device-specific
module."""
for k, v in kwargs.items():
setattr(self, k, v)
self.extras = OPTIONS.extras
if self.module is None:
path = OPTIONS.device_specific
if not path:
return
try:
if os.path.isdir(path):
info = imp.find_module("releasetools", [path])
else:
d, f = os.path.split(path)
b, x = os.path.splitext(f)
if x == ".py":
f = b
info = imp.find_module(f, [d])
logger.info("loaded device-specific extensions from %s", path)
self.module = imp.load_module("device_specific", *info)
except ImportError:
logger.info("unable to load device-specific module; assuming none")
def _DoCall(self, function_name, *args, **kwargs):
"""Call the named function in the device-specific module, passing
the given args and kwargs. The first argument to the call will be
the DeviceSpecific object itself. If there is no module, or the
module does not define the function, return the value of the
'default' kwarg (which itself defaults to None)."""
if self.module is None or not hasattr(self.module, function_name):
return kwargs.get("default")
return getattr(self.module, function_name)(*((self,) + args), **kwargs)
def FullOTA_Assertions(self):
"""Called after emitting the block of assertions at the top of a
full OTA package. Implementations can add whatever additional
assertions they like."""
return self._DoCall("FullOTA_Assertions")
def FullOTA_InstallBegin(self):
"""Called at the start of full OTA installation."""
return self._DoCall("FullOTA_InstallBegin")
def FullOTA_GetBlockDifferences(self):
"""Called during full OTA installation and verification.
Implementation should return a list of BlockDifference objects describing
the update on each additional partitions.
"""
return self._DoCall("FullOTA_GetBlockDifferences")
def FullOTA_InstallEnd(self):
"""Called at the end of full OTA installation; typically this is
used to install the image for the device's baseband processor."""
return self._DoCall("FullOTA_InstallEnd")
def IncrementalOTA_Assertions(self):
"""Called after emitting the block of assertions at the top of an
incremental OTA package. Implementations can add whatever
additional assertions they like."""
return self._DoCall("IncrementalOTA_Assertions")
def IncrementalOTA_VerifyBegin(self):
"""Called at the start of the verification phase of incremental
OTA installation; additional checks can be placed here to abort
the script before any changes are made."""
return self._DoCall("IncrementalOTA_VerifyBegin")
def IncrementalOTA_VerifyEnd(self):
"""Called at the end of the verification phase of incremental OTA
installation; additional checks can be placed here to abort the
script before any changes are made."""
return self._DoCall("IncrementalOTA_VerifyEnd")
def IncrementalOTA_InstallBegin(self):
"""Called at the start of incremental OTA installation (after
verification is complete)."""
return self._DoCall("IncrementalOTA_InstallBegin")
def IncrementalOTA_GetBlockDifferences(self):
"""Called during incremental OTA installation and verification.
Implementation should return a list of BlockDifference objects describing
the update on each additional partitions.
"""
return self._DoCall("IncrementalOTA_GetBlockDifferences")
def IncrementalOTA_InstallEnd(self):
"""Called at the end of incremental OTA installation; typically
this is used to install the image for the device's baseband
processor."""
return self._DoCall("IncrementalOTA_InstallEnd")
def VerifyOTA_Assertions(self):
return self._DoCall("VerifyOTA_Assertions")
DIFF_PROGRAM_BY_EXT = {
".gz": "imgdiff",
".zip": ["imgdiff", "-z"],
".jar": ["imgdiff", "-z"],
".apk": ["imgdiff", "-z"],
".img": "imgdiff",
}
class Difference(object):
def __init__(self, tf, sf, diff_program=None):
self.tf = tf
self.sf = sf
self.patch = None
self.diff_program = diff_program
def ComputePatch(self):
"""Compute the patch (as a string of data) needed to turn sf into
tf. Returns the same tuple as GetPatch()."""
tf = self.tf
sf = self.sf
if self.diff_program:
diff_program = self.diff_program
else:
ext = os.path.splitext(tf.name)[1]
diff_program = DIFF_PROGRAM_BY_EXT.get(ext, "bsdiff")
ttemp = tf.WriteToTemp()
stemp = sf.WriteToTemp()
ext = os.path.splitext(tf.name)[1]
try:
ptemp = tempfile.NamedTemporaryFile()
if isinstance(diff_program, list):
cmd = copy.copy(diff_program)
else:
cmd = [diff_program]
cmd.append(stemp.name)
cmd.append(ttemp.name)
cmd.append(ptemp.name)
p = Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
err = []
def run():
_, e = p.communicate()
if e:
err.append(e)
th = threading.Thread(target=run)
th.start()
th.join(timeout=300) # 5 mins
if th.is_alive():
logger.warning("diff command timed out")
p.terminate()
th.join(5)
if th.is_alive():
p.kill()
th.join()
if p.returncode != 0:
logger.warning("Failure running %s:\n%s\n", cmd, "".join(err))
self.patch = None
return None, None, None
diff = ptemp.read()
finally:
ptemp.close()
stemp.close()
ttemp.close()
self.patch = diff
return self.tf, self.sf, self.patch
def GetPatch(self):
"""Returns a tuple of (target_file, source_file, patch_data).
patch_data may be None if ComputePatch hasn't been called, or if
computing the patch failed.
"""
return self.tf, self.sf, self.patch
def ComputeDifferences(diffs):
"""Call ComputePatch on all the Difference objects in 'diffs'."""
logger.info("%d diffs to compute", len(diffs))
# Do the largest files first, to try and reduce the long-pole effect.
by_size = [(i.tf.size, i) for i in diffs]
by_size.sort(reverse=True)
by_size = [i[1] for i in by_size]
lock = threading.Lock()
diff_iter = iter(by_size) # accessed under lock
def worker():
try:
lock.acquire()
for d in diff_iter:
lock.release()
start = time.time()
d.ComputePatch()
dur = time.time() - start
lock.acquire()
tf, sf, patch = d.GetPatch()
if sf.name == tf.name:
name = tf.name
else:
name = "%s (%s)" % (tf.name, sf.name)
if patch is None:
logger.error("patching failed! %40s", name)
else:
logger.info(
"%8.2f sec %8d / %8d bytes (%6.2f%%) %s", dur, len(patch),
tf.size, 100.0 * len(patch) / tf.size, name)
lock.release()
except Exception:
logger.exception("Failed to compute diff from worker")
raise
# start worker threads; wait for them all to finish.
threads = [threading.Thread(target=worker)
for i in range(OPTIONS.worker_threads)]
for th in threads:
th.start()
while threads:
threads.pop().join()
class BlockDifference(object):
def __init__(self, partition, tgt, src=None, check_first_block=False,
version=None, disable_imgdiff=False):
self.tgt = tgt
self.src = src
self.partition = partition
self.check_first_block = check_first_block
self.disable_imgdiff = disable_imgdiff
if version is None:
version = max(
int(i) for i in
OPTIONS.info_dict.get("blockimgdiff_versions", "1").split(","))
assert version >= 3
self.version = version
b = BlockImageDiff(tgt, src, threads=OPTIONS.worker_threads,
version=self.version,
disable_imgdiff=self.disable_imgdiff)
self.path = os.path.join(MakeTempDir(), partition)
b.Compute(self.path)
self._required_cache = b.max_stashed_size
self.touched_src_ranges = b.touched_src_ranges
self.touched_src_sha1 = b.touched_src_sha1
# On devices with dynamic partitions, for new partitions,
# src is None but OPTIONS.source_info_dict is not.
if OPTIONS.source_info_dict is None:
is_dynamic_build = OPTIONS.info_dict.get(
"use_dynamic_partitions") == "true"
is_dynamic_source = False
else:
is_dynamic_build = OPTIONS.source_info_dict.get(
"use_dynamic_partitions") == "true"
is_dynamic_source = partition in shlex.split(
OPTIONS.source_info_dict.get("dynamic_partition_list", "").strip())
is_dynamic_target = partition in shlex.split(
OPTIONS.info_dict.get("dynamic_partition_list", "").strip())
# For dynamic partitions builds, check partition list in both source
# and target build because new partitions may be added, and existing
# partitions may be removed.
is_dynamic = is_dynamic_build and (is_dynamic_source or is_dynamic_target)
if is_dynamic:
self.device = 'map_partition("%s")' % partition
else:
if OPTIONS.source_info_dict is None:
_, device_expr = GetTypeAndDeviceExpr("/" + partition,
OPTIONS.info_dict)
else:
_, device_expr = GetTypeAndDeviceExpr("/" + partition,
OPTIONS.source_info_dict)
self.device = device_expr
@property
def required_cache(self):
return self._required_cache
def WriteScript(self, script, output_zip, progress=None,
write_verify_script=False):
if not self.src:
# write the output unconditionally
script.Print("Patching %s image unconditionally..." % (self.partition,))
else:
script.Print("Patching %s image after verification." % (self.partition,))
if progress:
script.ShowProgress(progress, 0)
self._WriteUpdate(script, output_zip)
if write_verify_script:
self.WritePostInstallVerifyScript(script)
def WriteStrictVerifyScript(self, script):
"""Verify all the blocks in the care_map, including clobbered blocks.
This differs from the WriteVerifyScript() function: a) it prints different
error messages; b) it doesn't allow half-way updated images to pass the
verification."""
partition = self.partition
script.Print("Verifying %s..." % (partition,))
ranges = self.tgt.care_map
ranges_str = ranges.to_string_raw()
script.AppendExtra(
'range_sha1(%s, "%s") == "%s" && ui_print(" Verified.") || '
'ui_print("%s has unexpected contents.");' % (
self.device, ranges_str,
self.tgt.TotalSha1(include_clobbered_blocks=True),
self.partition))
script.AppendExtra("")
def WriteVerifyScript(self, script, touched_blocks_only=False):
partition = self.partition
# full OTA
if not self.src:
script.Print("Image %s will be patched unconditionally." % (partition,))
# incremental OTA
else:
if touched_blocks_only:
ranges = self.touched_src_ranges
expected_sha1 = self.touched_src_sha1
else:
ranges = self.src.care_map.subtract(self.src.clobbered_blocks)
expected_sha1 = self.src.TotalSha1()
# No blocks to be checked, skipping.
if not ranges:
return
ranges_str = ranges.to_string_raw()
script.AppendExtra(
'if (range_sha1(%s, "%s") == "%s" || block_image_verify(%s, '
'package_extract_file("%s.transfer.list"), "%s.new.dat", '
'"%s.patch.dat")) then' % (
self.device, ranges_str, expected_sha1,
self.device, partition, partition, partition))
script.Print('Verified %s image...' % (partition,))
script.AppendExtra('else')
if self.version >= 4:
# Bug: 21124327
# When generating incrementals for the system and vendor partitions in
# version 4 or newer, explicitly check the first block (which contains
# the superblock) of the partition to see if it's what we expect. If
# this check fails, give an explicit log message about the partition
# having been remounted R/W (the most likely explanation).
if self.check_first_block:
script.AppendExtra('check_first_block(%s);' % (self.device,))
# If version >= 4, try block recovery before abort update
if partition == "system":
code = ErrorCode.SYSTEM_RECOVER_FAILURE
else:
code = ErrorCode.VENDOR_RECOVER_FAILURE
script.AppendExtra((
'ifelse (block_image_recover({device}, "{ranges}") && '
'block_image_verify({device}, '
'package_extract_file("{partition}.transfer.list"), '
'"{partition}.new.dat", "{partition}.patch.dat"), '
'ui_print("{partition} recovered successfully."), '
'abort("E{code}: {partition} partition fails to recover"));\n'
'endif;').format(device=self.device, ranges=ranges_str,
partition=partition, code=code))
# Abort the OTA update. Note that the incremental OTA cannot be applied
# even if it may match the checksum of the target partition.
# a) If version < 3, operations like move and erase will make changes
# unconditionally and damage the partition.
# b) If version >= 3, it won't even reach here.
else:
if partition == "system":
code = ErrorCode.SYSTEM_VERIFICATION_FAILURE
else:
code = ErrorCode.VENDOR_VERIFICATION_FAILURE
script.AppendExtra((
'abort("E%d: %s partition has unexpected contents");\n'
'endif;') % (code, partition))
def WritePostInstallVerifyScript(self, script):
partition = self.partition
script.Print('Verifying the updated %s image...' % (partition,))
# Unlike pre-install verification, clobbered_blocks should not be ignored.
ranges = self.tgt.care_map
ranges_str = ranges.to_string_raw()
script.AppendExtra(
'if range_sha1(%s, "%s") == "%s" then' % (
self.device, ranges_str,
self.tgt.TotalSha1(include_clobbered_blocks=True)))
# Bug: 20881595
# Verify that extended blocks are really zeroed out.
if self.tgt.extended:
ranges_str = self.tgt.extended.to_string_raw()
script.AppendExtra(
'if range_sha1(%s, "%s") == "%s" then' % (
self.device, ranges_str,
self._HashZeroBlocks(self.tgt.extended.size())))
script.Print('Verified the updated %s image.' % (partition,))
if partition == "system":
code = ErrorCode.SYSTEM_NONZERO_CONTENTS
else:
code = ErrorCode.VENDOR_NONZERO_CONTENTS
script.AppendExtra(
'else\n'
' abort("E%d: %s partition has unexpected non-zero contents after '
'OTA update");\n'
'endif;' % (code, partition))
else:
script.Print('Verified the updated %s image.' % (partition,))
if partition == "system":
code = ErrorCode.SYSTEM_UNEXPECTED_CONTENTS
else:
code = ErrorCode.VENDOR_UNEXPECTED_CONTENTS
script.AppendExtra(
'else\n'
' abort("E%d: %s partition has unexpected contents after OTA '
'update");\n'
'endif;' % (code, partition))
def _WriteUpdate(self, script, output_zip):
ZipWrite(output_zip,
'{}.transfer.list'.format(self.path),
'{}.transfer.list'.format(self.partition))
# For full OTA, compress the new.dat with brotli with quality 6 to reduce
# its size. Quailty 9 almost triples the compression time but doesn't
# further reduce the size too much. For a typical 1.8G system.new.dat
# zip | brotli(quality 6) | brotli(quality 9)
# compressed_size: 942M | 869M (~8% reduced) | 854M
# compression_time: 75s | 265s | 719s
# decompression_time: 15s | 25s | 25s
if not self.src:
brotli_cmd = ['brotli', '--quality=6',
'--output={}.new.dat.br'.format(self.path),
'{}.new.dat'.format(self.path)]
print("Compressing {}.new.dat with brotli".format(self.partition))
RunAndCheckOutput(brotli_cmd)
new_data_name = '{}.new.dat.br'.format(self.partition)
ZipWrite(output_zip,
'{}.new.dat.br'.format(self.path),
new_data_name,
compress_type=zipfile.ZIP_STORED)
else:
new_data_name = '{}.new.dat'.format(self.partition)
ZipWrite(output_zip, '{}.new.dat'.format(self.path), new_data_name)
ZipWrite(output_zip,
'{}.patch.dat'.format(self.path),
'{}.patch.dat'.format(self.partition),
compress_type=zipfile.ZIP_STORED)
if self.partition == "system":
code = ErrorCode.SYSTEM_UPDATE_FAILURE
else:
code = ErrorCode.VENDOR_UPDATE_FAILURE
call = ('block_image_update({device}, '
'package_extract_file("{partition}.transfer.list"), '
'"{new_data_name}", "{partition}.patch.dat") ||\n'
' abort("E{code}: Failed to update {partition} image.");'.format(
device=self.device, partition=self.partition,
new_data_name=new_data_name, code=code))
script.AppendExtra(script.WordWrap(call))
def _HashBlocks(self, source, ranges): # pylint: disable=no-self-use
data = source.ReadRangeSet(ranges)
ctx = sha1()
for p in data:
ctx.update(p)
return ctx.hexdigest()
def _HashZeroBlocks(self, num_blocks): # pylint: disable=no-self-use
"""Return the hash value for all zero blocks."""
zero_block = '\x00' * 4096
ctx = sha1()
for _ in range(num_blocks):
ctx.update(zero_block)
return ctx.hexdigest()
def MakeRecoveryPatch(input_dir, output_sink, recovery_img, boot_img,
info_dict=None):
"""Generates the recovery-from-boot patch and writes the script to output.
Most of the space in the boot and recovery images is just the kernel, which is
identical for the two, so the resulting patch should be efficient. Add it to
the output zip, along with a shell script that is run from init.rc on first
boot to actually do the patching and install the new recovery image.
Args:
input_dir: The top-level input directory of the target-files.zip.
output_sink: The callback function that writes the result.
recovery_img: File object for the recovery image.
boot_img: File objects for the boot image.
info_dict: A dict returned by common.LoadInfoDict() on the input
target_files. Will use OPTIONS.info_dict if None has been given.
"""
if info_dict is None:
info_dict = OPTIONS.info_dict
full_recovery_image = info_dict.get("full_recovery_image") == "true"
board_uses_vendorimage = info_dict.get("board_uses_vendorimage") == "true"
if board_uses_vendorimage:
# In this case, the output sink is rooted at VENDOR
recovery_img_path = "etc/recovery.img"
recovery_resource_dat_path = "VENDOR/etc/recovery-resource.dat"
sh_dir = "bin"
else:
# In this case the output sink is rooted at SYSTEM
recovery_img_path = "vendor/etc/recovery.img"
recovery_resource_dat_path = "SYSTEM/vendor/etc/recovery-resource.dat"
sh_dir = "vendor/bin"
if full_recovery_image:
output_sink(recovery_img_path, recovery_img.data)
else:
system_root_image = info_dict.get("system_root_image") == "true"
include_recovery_dtbo = info_dict.get("include_recovery_dtbo") == "true"
include_recovery_acpio = info_dict.get("include_recovery_acpio") == "true"
path = os.path.join(input_dir, recovery_resource_dat_path)
# With system-root-image, boot and recovery images will have mismatching
# entries (only recovery has the ramdisk entry) (Bug: 72731506). Use bsdiff
# to handle such a case.
if system_root_image or include_recovery_dtbo or include_recovery_acpio:
diff_program = ["bsdiff"]
bonus_args = ""
assert not os.path.exists(path)
else:
diff_program = ["imgdiff"]
if os.path.exists(path):
diff_program.append("-b")
diff_program.append(path)
bonus_args = "--bonus /vendor/etc/recovery-resource.dat"
else:
bonus_args = ""
d = Difference(recovery_img, boot_img, diff_program=diff_program)
_, _, patch = d.ComputePatch()
output_sink("recovery-from-boot.p", patch)
try:
# The following GetTypeAndDevice()s need to use the path in the target
# info_dict instead of source_info_dict.
boot_type, boot_device = GetTypeAndDevice("/boot", info_dict,
check_no_slot=False)
recovery_type, recovery_device = GetTypeAndDevice("/recovery", info_dict,
check_no_slot=False)
except KeyError:
return
if full_recovery_image:
# Note that we use /vendor to refer to the recovery resources. This will
# work for a separate vendor partition mounted at /vendor or a
# /system/vendor subdirectory on the system partition, for which init will
# create a symlink from /vendor to /system/vendor.
sh = """#!/vendor/bin/sh
if ! applypatch --check %(type)s:%(device)s:%(size)d:%(sha1)s; then
applypatch \\
--flash /vendor/etc/recovery.img \\
--target %(type)s:%(device)s:%(size)d:%(sha1)s && \\
log -t recovery "Installing new recovery image: succeeded" || \\
log -t recovery "Installing new recovery image: failed"
else
log -t recovery "Recovery image already installed"
fi
""" % {'type': recovery_type,
'device': recovery_device,
'sha1': recovery_img.sha1,
'size': recovery_img.size}
else:
sh = """#!/vendor/bin/sh
if ! applypatch --check %(recovery_type)s:%(recovery_device)s:%(recovery_size)d:%(recovery_sha1)s; then
applypatch %(bonus_args)s \\
--patch /vendor/recovery-from-boot.p \\
--source %(boot_type)s:%(boot_device)s:%(boot_size)d:%(boot_sha1)s \\
--target %(recovery_type)s:%(recovery_device)s:%(recovery_size)d:%(recovery_sha1)s && \\
log -t recovery "Installing new recovery image: succeeded" || \\
log -t recovery "Installing new recovery image: failed"
else
log -t recovery "Recovery image already installed"
fi
""" % {'boot_size': boot_img.size,
'boot_sha1': boot_img.sha1,
'recovery_size': recovery_img.size,
'recovery_sha1': recovery_img.sha1,
'boot_type': boot_type,
'boot_device': boot_device + '$(getprop ro.boot.slot_suffix)',
'recovery_type': recovery_type,
'recovery_device': recovery_device + '$(getprop ro.boot.slot_suffix)',
'bonus_args': bonus_args}
# The install script location moved from /system/etc to /system/bin in the L
# release. In the R release it is in VENDOR/bin or SYSTEM/vendor/bin.
sh_location = os.path.join(sh_dir, "install-recovery.sh")
logger.info("putting script in %s", sh_location)
output_sink(sh_location, sh.encode())
class DynamicPartitionUpdate(object):
def __init__(self, src_group=None, tgt_group=None, progress=None,
block_difference=None):
self.src_group = src_group
self.tgt_group = tgt_group
self.progress = progress
self.block_difference = block_difference
@property
def src_size(self):
if not self.block_difference:
return 0
return DynamicPartitionUpdate._GetSparseImageSize(self.block_difference.src)
@property
def tgt_size(self):
if not self.block_difference:
return 0
return DynamicPartitionUpdate._GetSparseImageSize(self.block_difference.tgt)
@staticmethod
def _GetSparseImageSize(img):
if not img:
return 0
return img.blocksize * img.total_blocks
class DynamicGroupUpdate(object):
def __init__(self, src_size=None, tgt_size=None):
# None: group does not exist. 0: no size limits.
self.src_size = src_size
self.tgt_size = tgt_size
class DynamicPartitionsDifference(object):
def __init__(self, info_dict, block_diffs, progress_dict=None,
source_info_dict=None):
if progress_dict is None:
progress_dict = {}
self._remove_all_before_apply = False
if source_info_dict is None:
self._remove_all_before_apply = True
source_info_dict = {}
block_diff_dict = collections.OrderedDict(
[(e.partition, e) for e in block_diffs])
assert len(block_diff_dict) == len(block_diffs), \
"Duplicated BlockDifference object for {}".format(
[partition for partition, count in
collections.Counter(e.partition for e in block_diffs).items()
if count > 1])
self._partition_updates = collections.OrderedDict()
for p, block_diff in block_diff_dict.items():
self._partition_updates[p] = DynamicPartitionUpdate()
self._partition_updates[p].block_difference = block_diff
for p, progress in progress_dict.items():
if p in self._partition_updates:
self._partition_updates[p].progress = progress
tgt_groups = shlex.split(info_dict.get(
"super_partition_groups", "").strip())
src_groups = shlex.split(source_info_dict.get(
"super_partition_groups", "").strip())
for g in tgt_groups:
for p in shlex.split(info_dict.get(
"super_%s_partition_list" % g, "").strip()):
assert p in self._partition_updates, \
"{} is in target super_{}_partition_list but no BlockDifference " \
"object is provided.".format(p, g)
self._partition_updates[p].tgt_group = g
for g in src_groups:
for p in shlex.split(source_info_dict.get(
"super_%s_partition_list" % g, "").strip()):
assert p in self._partition_updates, \
"{} is in source super_{}_partition_list but no BlockDifference " \
"object is provided.".format(p, g)
self._partition_updates[p].src_group = g
target_dynamic_partitions = set(shlex.split(info_dict.get(
"dynamic_partition_list", "").strip()))
block_diffs_with_target = set(p for p, u in self._partition_updates.items()
if u.tgt_size)
assert block_diffs_with_target == target_dynamic_partitions, \
"Target Dynamic partitions: {}, BlockDifference with target: {}".format(
list(target_dynamic_partitions), list(block_diffs_with_target))
source_dynamic_partitions = set(shlex.split(source_info_dict.get(
"dynamic_partition_list", "").strip()))
block_diffs_with_source = set(p for p, u in self._partition_updates.items()
if u.src_size)
assert block_diffs_with_source == source_dynamic_partitions, \
"Source Dynamic partitions: {}, BlockDifference with source: {}".format(
list(source_dynamic_partitions), list(block_diffs_with_source))
if self._partition_updates:
logger.info("Updating dynamic partitions %s",
self._partition_updates.keys())
self._group_updates = collections.OrderedDict()
for g in tgt_groups:
self._group_updates[g] = DynamicGroupUpdate()
self._group_updates[g].tgt_size = int(info_dict.get(
"super_%s_group_size" % g, "0").strip())
for g in src_groups:
if g not in self._group_updates:
self._group_updates[g] = DynamicGroupUpdate()
self._group_updates[g].src_size = int(source_info_dict.get(
"super_%s_group_size" % g, "0").strip())
self._Compute()
def WriteScript(self, script, output_zip, write_verify_script=False):
script.Comment('--- Start patching dynamic partitions ---')
for p, u in self._partition_updates.items():
if u.src_size and u.tgt_size and u.src_size > u.tgt_size:
script.Comment('Patch partition %s' % p)
u.block_difference.WriteScript(script, output_zip, progress=u.progress,
write_verify_script=False)
op_list_path = MakeTempFile()
with open(op_list_path, 'w') as f:
for line in self._op_list:
f.write('{}\n'.format(line))
ZipWrite(output_zip, op_list_path, "dynamic_partitions_op_list")
script.Comment('Update dynamic partition metadata')
script.AppendExtra('assert(update_dynamic_partitions('
'package_extract_file("dynamic_partitions_op_list")));')
if write_verify_script:
for p, u in self._partition_updates.items():
if u.src_size and u.tgt_size and u.src_size > u.tgt_size:
u.block_difference.WritePostInstallVerifyScript(script)
script.AppendExtra('unmap_partition("%s");' % p) # ignore errors
for p, u in self._partition_updates.items():
if u.tgt_size and u.src_size <= u.tgt_size:
script.Comment('Patch partition %s' % p)
u.block_difference.WriteScript(script, output_zip, progress=u.progress,
write_verify_script=write_verify_script)
if write_verify_script:
script.AppendExtra('unmap_partition("%s");' % p) # ignore errors
script.Comment('--- End patching dynamic partitions ---')
def _Compute(self):
self._op_list = list()
def append(line):
self._op_list.append(line)
def comment(line):
self._op_list.append("# %s" % line)
if self._remove_all_before_apply:
comment('Remove all existing dynamic partitions and groups before '
'applying full OTA')
append('remove_all_groups')
for p, u in self._partition_updates.items():
if u.src_group and not u.tgt_group:
append('remove %s' % p)
for p, u in self._partition_updates.items():
if u.src_group and u.tgt_group and u.src_group != u.tgt_group:
comment('Move partition %s from %s to default' % (p, u.src_group))
append('move %s default' % p)
for p, u in self._partition_updates.items():
if u.src_size and u.tgt_size and u.src_size > u.tgt_size:
comment('Shrink partition %s from %d to %d' %
(p, u.src_size, u.tgt_size))
append('resize %s %s' % (p, u.tgt_size))
for g, u in self._group_updates.items():
if u.src_size is not None and u.tgt_size is None:
append('remove_group %s' % g)
if (u.src_size is not None and u.tgt_size is not None and
u.src_size > u.tgt_size):
comment('Shrink group %s from %d to %d' % (g, u.src_size, u.tgt_size))
append('resize_group %s %d' % (g, u.tgt_size))
for g, u in self._group_updates.items():
if u.src_size is None and u.tgt_size is not None:
comment('Add group %s with maximum size %d' % (g, u.tgt_size))
append('add_group %s %d' % (g, u.tgt_size))
if (u.src_size is not None and u.tgt_size is not None and
u.src_size < u.tgt_size):
comment('Grow group %s from %d to %d' % (g, u.src_size, u.tgt_size))
append('resize_group %s %d' % (g, u.tgt_size))
for p, u in self._partition_updates.items():
if u.tgt_group and not u.src_group:
comment('Add partition %s to group %s' % (p, u.tgt_group))
append('add %s %s' % (p, u.tgt_group))
for p, u in self._partition_updates.items():
if u.tgt_size and u.src_size < u.tgt_size:
comment('Grow partition %s from %d to %d' %
(p, u.src_size, u.tgt_size))
append('resize %s %d' % (p, u.tgt_size))
for p, u in self._partition_updates.items():
if u.src_group and u.tgt_group and u.src_group != u.tgt_group:
comment('Move partition %s from default to %s' %
(p, u.tgt_group))
append('move %s %s' % (p, u.tgt_group))
# map recovery.fstab's fs_types to mount/format "partition types"
PARTITION_TYPES = {
"ext4": "EMMC",
"emmc": "EMMC",
"f2fs": "EMMC",
"squashfs": "EMMC",
"erofs": "EMMC"
}
def GetTypeAndDevice(mount_point, info, check_no_slot=True):
"""
Use GetTypeAndDeviceExpr whenever possible. This function is kept for
backwards compatibility. It aborts if the fstab entry has slotselect option
(unless check_no_slot is explicitly set to False).
"""
fstab = info["fstab"]
if fstab:
if check_no_slot:
assert not fstab[mount_point].slotselect, \
"Use GetTypeAndDeviceExpr instead"
return (PARTITION_TYPES[fstab[mount_point].fs_type],
fstab[mount_point].device)
raise KeyError
def GetTypeAndDeviceExpr(mount_point, info):
"""
Return the filesystem of the partition, and an edify expression that evaluates
to the device at runtime.
"""
fstab = info["fstab"]
if fstab:
p = fstab[mount_point]
device_expr = '"%s"' % fstab[mount_point].device
if p.slotselect:
device_expr = 'add_slot_suffix(%s)' % device_expr
return (PARTITION_TYPES[fstab[mount_point].fs_type], device_expr)
raise KeyError

View File

@@ -26,6 +26,7 @@ from typing import BinaryIO
import common
import test_utils
import validate_target_files
from images import EmptyImage, DataImage
from rangelib import RangeSet
@@ -1670,6 +1671,292 @@ class CommonUtilsTest(test_utils.ReleaseToolsTestCase):
test_file.name, 'generic_kernel')
class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
"""Checks the format of install-recovery.sh.
Its format should match between common.py and validate_target_files.py.
"""
def setUp(self):
self._tempdir = common.MakeTempDir()
# Create a fake dict that contains the fstab info for boot&recovery.
self._info = {"fstab": {}}
fake_fstab = [
"/dev/soc.0/by-name/boot /boot emmc defaults defaults",
"/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, fake_fstab)
# Construct the gzipped recovery.img and boot.img
self.recovery_data = bytearray([
0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
0x08, 0x00, 0x00, 0x00
])
# echo -n "boot" | gzip -f | hd
self.boot_data = bytearray([
0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
])
def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
loc = os.path.join(self._tempdir, prefix, name)
if not os.path.exists(os.path.dirname(loc)):
os.makedirs(os.path.dirname(loc))
with open(loc, "wb") as f:
f.write(data)
def test_full_recovery(self):
recovery_image = common.File("recovery.img", self.recovery_data)
boot_image = common.File("boot.img", self.boot_data)
self._info["full_recovery_image"] = "true"
common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
recovery_image, boot_image, self._info)
validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
self._info)
@test_utils.SkipIfExternalToolsUnavailable()
def test_recovery_from_boot(self):
recovery_image = common.File("recovery.img", self.recovery_data)
self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
boot_image = common.File("boot.img", self.boot_data)
self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
recovery_image, boot_image, self._info)
validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
self._info)
# Validate 'recovery-from-boot' with bonus argument.
self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM")
common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
recovery_image, boot_image, self._info)
validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
self._info)
class MockBlockDifference(object):
def __init__(self, partition, tgt, src=None):
self.partition = partition
self.tgt = tgt
self.src = src
def WriteScript(self, script, _, progress=None,
write_verify_script=False):
if progress:
script.AppendExtra("progress({})".format(progress))
script.AppendExtra("patch({});".format(self.partition))
if write_verify_script:
self.WritePostInstallVerifyScript(script)
def WritePostInstallVerifyScript(self, script):
script.AppendExtra("verify({});".format(self.partition))
class FakeSparseImage(object):
def __init__(self, size):
self.blocksize = 4096
self.total_blocks = size // 4096
assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
@staticmethod
def get_op_list(output_path):
with zipfile.ZipFile(output_path, allowZip64=True) as output_zip:
with output_zip.open('dynamic_partitions_op_list') as op_list:
return [line.decode().strip() for line in op_list.readlines()
if not line.startswith(b'#')]
def setUp(self):
self.script = test_utils.MockScriptWriter()
self.output_path = common.MakeTempFile(suffix='.zip')
def test_full(self):
target_info = common.LoadDictionaryFromLines("""
dynamic_partition_list=system vendor
super_partition_groups=group_foo
super_group_foo_group_size={group_size}
super_group_foo_partition_list=system vendor
""".format(group_size=4 * GiB).split("\n"))
block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs)
with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
self.assertEqual(str(self.script).strip(), """
assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
patch(system);
verify(system);
unmap_partition("system");
patch(vendor);
verify(vendor);
unmap_partition("vendor");
""".strip())
lines = self.get_op_list(self.output_path)
remove_all_groups = lines.index("remove_all_groups")
add_group = lines.index("add_group group_foo 4294967296")
add_vendor = lines.index("add vendor group_foo")
add_system = lines.index("add system group_foo")
resize_vendor = lines.index("resize vendor 1073741824")
resize_system = lines.index("resize system 3221225472")
self.assertLess(remove_all_groups, add_group,
"Should add groups after removing all groups")
self.assertLess(add_group, min(add_vendor, add_system),
"Should add partitions after adding group")
self.assertLess(add_system, resize_system,
"Should resize system after adding it")
self.assertLess(add_vendor, resize_vendor,
"Should resize vendor after adding it")
def test_inc_groups(self):
source_info = common.LoadDictionaryFromLines("""
super_partition_groups=group_foo group_bar group_baz
super_group_foo_group_size={group_foo_size}
super_group_bar_group_size={group_bar_size}
""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
target_info = common.LoadDictionaryFromLines("""
super_partition_groups=group_foo group_baz group_qux
super_group_foo_group_size={group_foo_size}
super_group_baz_group_size={group_baz_size}
super_group_qux_group_size={group_qux_size}
""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
group_qux_size=1 * GiB).split("\n"))
dp_diff = common.DynamicPartitionsDifference(target_info,
block_diffs=[],
source_info_dict=source_info)
with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
lines = self.get_op_list(self.output_path)
removed = lines.index("remove_group group_bar")
shrunk = lines.index("resize_group group_foo 3221225472")
grown = lines.index("resize_group group_baz 4294967296")
added = lines.index("add_group group_qux 1073741824")
self.assertLess(max(removed, shrunk),
min(grown, added),
"ops that remove / shrink partitions must precede ops that "
"grow / add partitions")
def test_incremental(self):
source_info = common.LoadDictionaryFromLines("""
dynamic_partition_list=system vendor product system_ext
super_partition_groups=group_foo
super_group_foo_group_size={group_foo_size}
super_group_foo_partition_list=system vendor product system_ext
""".format(group_foo_size=4 * GiB).split("\n"))
target_info = common.LoadDictionaryFromLines("""
dynamic_partition_list=system vendor product odm
super_partition_groups=group_foo group_bar
super_group_foo_group_size={group_foo_size}
super_group_foo_partition_list=system vendor odm
super_group_bar_group_size={group_bar_size}
super_group_bar_partition_list=product
""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
src=FakeSparseImage(1024 * MiB)),
MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
src=FakeSparseImage(1024 * MiB)),
MockBlockDifference("product", FakeSparseImage(1024 * MiB),
src=FakeSparseImage(1024 * MiB)),
MockBlockDifference("system_ext", None,
src=FakeSparseImage(1024 * MiB)),
MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
src=None)]
dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
source_info_dict=source_info)
with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
metadata_idx = self.script.lines.index(
'assert(update_dynamic_partitions(package_extract_file('
'"dynamic_partitions_op_list")));')
self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
for p in ("product", "system", "odm"):
patch_idx = self.script.lines.index("patch({});".format(p))
verify_idx = self.script.lines.index("verify({});".format(p))
self.assertLess(metadata_idx, patch_idx,
"Should patch {} after updating metadata".format(p))
self.assertLess(patch_idx, verify_idx,
"Should verify {} after patching".format(p))
self.assertNotIn("patch(system_ext);", self.script.lines)
lines = self.get_op_list(self.output_path)
remove = lines.index("remove system_ext")
move_product_out = lines.index("move product default")
shrink = lines.index("resize vendor 536870912")
shrink_group = lines.index("resize_group group_foo 3221225472")
add_group_bar = lines.index("add_group group_bar 1073741824")
add_odm = lines.index("add odm group_foo")
grow_existing = lines.index("resize system 1610612736")
grow_added = lines.index("resize odm 1073741824")
move_product_in = lines.index("move product group_bar")
max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
self.assertLess(max_idx_move_partition_out_foo, shrink_group,
"Must shrink group after partitions inside group are shrunk"
" / removed")
self.assertLess(add_group_bar, move_product_in,
"Must add partitions to group after group is added")
self.assertLess(max_idx_move_partition_out_foo,
min_idx_move_partition_in_foo,
"Must shrink partitions / remove partitions from group"
"before adding / moving partitions into group")
def test_remove_partition(self):
source_info = common.LoadDictionaryFromLines("""
blockimgdiff_versions=3,4
use_dynamic_partitions=true
dynamic_partition_list=foo
super_partition_groups=group_foo
super_group_foo_group_size={group_foo_size}
super_group_foo_partition_list=foo
""".format(group_foo_size=4 * GiB).split("\n"))
target_info = common.LoadDictionaryFromLines("""
blockimgdiff_versions=3,4
use_dynamic_partitions=true
super_partition_groups=group_foo
super_group_foo_group_size={group_foo_size}
""".format(group_foo_size=4 * GiB).split("\n"))
common.OPTIONS.info_dict = target_info
common.OPTIONS.target_info_dict = target_info
common.OPTIONS.source_info_dict = source_info
common.OPTIONS.cache_size = 4 * 4096
block_diffs = [common.BlockDifference("foo", EmptyImage(),
src=DataImage("source", pad=True))]
dp_diff = common.DynamicPartitionsDifference(target_info, block_diffs,
source_info_dict=source_info)
with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
self.assertNotIn("block_image_update", str(self.script),
"Removed partition should not be patched.")
lines = self.get_op_list(self.output_path)
self.assertEqual(lines, ["remove foo"])
class PartitionBuildPropsTest(test_utils.ReleaseToolsTestCase):
def setUp(self):
self.odm_build_prop = [

View File

@@ -15,24 +15,19 @@
#
import copy
import os
import zipfile
import common
import test_utils
import validate_target_files
from images import EmptyImage, DataImage
from non_ab_ota import NonAbOtaPropertyFiles, WriteFingerprintAssertion, BlockDifference, DynamicPartitionsDifference, MakeRecoveryPatch
from non_ab_ota import NonAbOtaPropertyFiles, WriteFingerprintAssertion
from test_utils import PropertyFilesTestCase
class NonAbOtaPropertyFilesTest(PropertyFilesTestCase):
"""Additional validity checks specialized for NonAbOtaPropertyFiles."""
def setUp(self):
common.OPTIONS.no_signing = False
common.OPTIONS.no_signing = False
def test_init(self):
property_files = NonAbOtaPropertyFiles()
self.assertEqual('ota-property-files', property_files.name)
@@ -60,8 +55,7 @@ class NonAbOtaPropertyFilesTest(PropertyFilesTestCase):
with zipfile.ZipFile(zip_file) as zip_fp:
raw_metadata = property_files.GetPropertyFilesString(
zip_fp, reserve_space=False)
property_files_string = property_files.Finalize(
zip_fp, len(raw_metadata))
property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
tokens = self._parse_property_files_string(property_files_string)
self.assertEqual(2, len(tokens))
@@ -83,7 +77,6 @@ class NonAbOtaPropertyFilesTest(PropertyFilesTestCase):
property_files.Verify(zip_fp, raw_metadata)
class NonAbOTATest(test_utils.ReleaseToolsTestCase):
TEST_TARGET_INFO_DICT = {
'build.prop': common.PartitionBuildProps.FromDictionary(
@@ -105,7 +98,7 @@ class NonAbOTATest(test_utils.ReleaseToolsTestCase):
),
'vendor.build.prop': common.PartitionBuildProps.FromDictionary(
'vendor', {
'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
'ro.vendor.build.fingerprint': 'vendor-build-fingerprint'}
),
'property1': 'value1',
'property2': 4096,
@@ -125,7 +118,6 @@ class NonAbOTATest(test_utils.ReleaseToolsTestCase):
'ro.product.device': 'device3',
},
]
def test_WriteFingerprintAssertion_without_oem_props(self):
target_info = common.BuildInfo(self.TEST_TARGET_INFO_DICT, None)
source_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
@@ -178,296 +170,3 @@ class NonAbOTATest(test_utils.ReleaseToolsTestCase):
[('AssertSomeThumbprint', 'build-thumbprint',
'source-build-thumbprint')],
script_writer.lines)
KiB = 1024
MiB = 1024 * KiB
GiB = 1024 * MiB
class MockBlockDifference(object):
def __init__(self, partition, tgt, src=None):
self.partition = partition
self.tgt = tgt
self.src = src
def WriteScript(self, script, _, progress=None,
write_verify_script=False):
if progress:
script.AppendExtra("progress({})".format(progress))
script.AppendExtra("patch({});".format(self.partition))
if write_verify_script:
self.WritePostInstallVerifyScript(script)
def WritePostInstallVerifyScript(self, script):
script.AppendExtra("verify({});".format(self.partition))
class FakeSparseImage(object):
def __init__(self, size):
self.blocksize = 4096
self.total_blocks = size // 4096
assert size % 4096 == 0, "{} is not a multiple of 4096".format(size)
class DynamicPartitionsDifferenceTest(test_utils.ReleaseToolsTestCase):
@staticmethod
def get_op_list(output_path):
with zipfile.ZipFile(output_path, allowZip64=True) as output_zip:
with output_zip.open('dynamic_partitions_op_list') as op_list:
return [line.decode().strip() for line in op_list.readlines()
if not line.startswith(b'#')]
def setUp(self):
self.script = test_utils.MockScriptWriter()
self.output_path = common.MakeTempFile(suffix='.zip')
def test_full(self):
target_info = common.LoadDictionaryFromLines("""
dynamic_partition_list=system vendor
super_partition_groups=group_foo
super_group_foo_group_size={group_size}
super_group_foo_partition_list=system vendor
""".format(group_size=4 * GiB).split("\n"))
block_diffs = [MockBlockDifference("system", FakeSparseImage(3 * GiB)),
MockBlockDifference("vendor", FakeSparseImage(1 * GiB))]
dp_diff = DynamicPartitionsDifference(target_info, block_diffs)
with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
self.assertEqual(str(self.script).strip(), """
assert(update_dynamic_partitions(package_extract_file("dynamic_partitions_op_list")));
patch(system);
verify(system);
unmap_partition("system");
patch(vendor);
verify(vendor);
unmap_partition("vendor");
""".strip())
lines = self.get_op_list(self.output_path)
remove_all_groups = lines.index("remove_all_groups")
add_group = lines.index("add_group group_foo 4294967296")
add_vendor = lines.index("add vendor group_foo")
add_system = lines.index("add system group_foo")
resize_vendor = lines.index("resize vendor 1073741824")
resize_system = lines.index("resize system 3221225472")
self.assertLess(remove_all_groups, add_group,
"Should add groups after removing all groups")
self.assertLess(add_group, min(add_vendor, add_system),
"Should add partitions after adding group")
self.assertLess(add_system, resize_system,
"Should resize system after adding it")
self.assertLess(add_vendor, resize_vendor,
"Should resize vendor after adding it")
def test_inc_groups(self):
source_info = common.LoadDictionaryFromLines("""
super_partition_groups=group_foo group_bar group_baz
super_group_foo_group_size={group_foo_size}
super_group_bar_group_size={group_bar_size}
""".format(group_foo_size=4 * GiB, group_bar_size=3 * GiB).split("\n"))
target_info = common.LoadDictionaryFromLines("""
super_partition_groups=group_foo group_baz group_qux
super_group_foo_group_size={group_foo_size}
super_group_baz_group_size={group_baz_size}
super_group_qux_group_size={group_qux_size}
""".format(group_foo_size=3 * GiB, group_baz_size=4 * GiB,
group_qux_size=1 * GiB).split("\n"))
dp_diff = DynamicPartitionsDifference(target_info,
block_diffs=[],
source_info_dict=source_info)
with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
lines = self.get_op_list(self.output_path)
removed = lines.index("remove_group group_bar")
shrunk = lines.index("resize_group group_foo 3221225472")
grown = lines.index("resize_group group_baz 4294967296")
added = lines.index("add_group group_qux 1073741824")
self.assertLess(max(removed, shrunk),
min(grown, added),
"ops that remove / shrink partitions must precede ops that "
"grow / add partitions")
def test_incremental(self):
source_info = common.LoadDictionaryFromLines("""
dynamic_partition_list=system vendor product system_ext
super_partition_groups=group_foo
super_group_foo_group_size={group_foo_size}
super_group_foo_partition_list=system vendor product system_ext
""".format(group_foo_size=4 * GiB).split("\n"))
target_info = common.LoadDictionaryFromLines("""
dynamic_partition_list=system vendor product odm
super_partition_groups=group_foo group_bar
super_group_foo_group_size={group_foo_size}
super_group_foo_partition_list=system vendor odm
super_group_bar_group_size={group_bar_size}
super_group_bar_partition_list=product
""".format(group_foo_size=3 * GiB, group_bar_size=1 * GiB).split("\n"))
block_diffs = [MockBlockDifference("system", FakeSparseImage(1536 * MiB),
src=FakeSparseImage(1024 * MiB)),
MockBlockDifference("vendor", FakeSparseImage(512 * MiB),
src=FakeSparseImage(1024 * MiB)),
MockBlockDifference("product", FakeSparseImage(1024 * MiB),
src=FakeSparseImage(1024 * MiB)),
MockBlockDifference("system_ext", None,
src=FakeSparseImage(1024 * MiB)),
MockBlockDifference("odm", FakeSparseImage(1024 * MiB),
src=None)]
dp_diff = DynamicPartitionsDifference(target_info, block_diffs,
source_info_dict=source_info)
with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
metadata_idx = self.script.lines.index(
'assert(update_dynamic_partitions(package_extract_file('
'"dynamic_partitions_op_list")));')
self.assertLess(self.script.lines.index('patch(vendor);'), metadata_idx)
self.assertLess(metadata_idx, self.script.lines.index('verify(vendor);'))
for p in ("product", "system", "odm"):
patch_idx = self.script.lines.index("patch({});".format(p))
verify_idx = self.script.lines.index("verify({});".format(p))
self.assertLess(metadata_idx, patch_idx,
"Should patch {} after updating metadata".format(p))
self.assertLess(patch_idx, verify_idx,
"Should verify {} after patching".format(p))
self.assertNotIn("patch(system_ext);", self.script.lines)
lines = self.get_op_list(self.output_path)
remove = lines.index("remove system_ext")
move_product_out = lines.index("move product default")
shrink = lines.index("resize vendor 536870912")
shrink_group = lines.index("resize_group group_foo 3221225472")
add_group_bar = lines.index("add_group group_bar 1073741824")
add_odm = lines.index("add odm group_foo")
grow_existing = lines.index("resize system 1610612736")
grow_added = lines.index("resize odm 1073741824")
move_product_in = lines.index("move product group_bar")
max_idx_move_partition_out_foo = max(remove, move_product_out, shrink)
min_idx_move_partition_in_foo = min(add_odm, grow_existing, grow_added)
self.assertLess(max_idx_move_partition_out_foo, shrink_group,
"Must shrink group after partitions inside group are shrunk"
" / removed")
self.assertLess(add_group_bar, move_product_in,
"Must add partitions to group after group is added")
self.assertLess(max_idx_move_partition_out_foo,
min_idx_move_partition_in_foo,
"Must shrink partitions / remove partitions from group"
"before adding / moving partitions into group")
def test_remove_partition(self):
source_info = common.LoadDictionaryFromLines("""
blockimgdiff_versions=3,4
use_dynamic_partitions=true
dynamic_partition_list=foo
super_partition_groups=group_foo
super_group_foo_group_size={group_foo_size}
super_group_foo_partition_list=foo
""".format(group_foo_size=4 * GiB).split("\n"))
target_info = common.LoadDictionaryFromLines("""
blockimgdiff_versions=3,4
use_dynamic_partitions=true
super_partition_groups=group_foo
super_group_foo_group_size={group_foo_size}
""".format(group_foo_size=4 * GiB).split("\n"))
common.OPTIONS.info_dict = target_info
common.OPTIONS.target_info_dict = target_info
common.OPTIONS.source_info_dict = source_info
common.OPTIONS.cache_size = 4 * 4096
block_diffs = [BlockDifference("foo", EmptyImage(),
src=DataImage("source", pad=True))]
dp_diff = DynamicPartitionsDifference(target_info, block_diffs,
source_info_dict=source_info)
with zipfile.ZipFile(self.output_path, 'w', allowZip64=True) as output_zip:
dp_diff.WriteScript(self.script, output_zip, write_verify_script=True)
self.assertNotIn("block_image_update", str(self.script),
"Removed partition should not be patched.")
lines = self.get_op_list(self.output_path)
self.assertEqual(lines, ["remove foo"])
class InstallRecoveryScriptFormatTest(test_utils.ReleaseToolsTestCase):
"""Checks the format of install-recovery.sh.
Its format should match between common.py and validate_target_files.py.
"""
def setUp(self):
self._tempdir = common.MakeTempDir()
# Create a fake dict that contains the fstab info for boot&recovery.
self._info = {"fstab": {}}
fake_fstab = [
"/dev/soc.0/by-name/boot /boot emmc defaults defaults",
"/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, fake_fstab)
# Construct the gzipped recovery.img and boot.img
self.recovery_data = bytearray([
0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3,
0x08, 0x00, 0x00, 0x00
])
# echo -n "boot" | gzip -f | hd
self.boot_data = bytearray([
0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca,
0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00
])
def _out_tmp_sink(self, name, data, prefix="SYSTEM"):
loc = os.path.join(self._tempdir, prefix, name)
if not os.path.exists(os.path.dirname(loc)):
os.makedirs(os.path.dirname(loc))
with open(loc, "wb") as f:
f.write(data)
def test_full_recovery(self):
recovery_image = common.File("recovery.img", self.recovery_data)
boot_image = common.File("boot.img", self.boot_data)
self._info["full_recovery_image"] = "true"
MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
recovery_image, boot_image, self._info)
validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
self._info)
@test_utils.SkipIfExternalToolsUnavailable()
def test_recovery_from_boot(self):
recovery_image = common.File("recovery.img", self.recovery_data)
self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
boot_image = common.File("boot.img", self.boot_data)
self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
recovery_image, boot_image, self._info)
validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
self._info)
# Validate 'recovery-from-boot' with bonus argument.
self._out_tmp_sink("etc/recovery-resource.dat", b"bonus", "SYSTEM")
MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
recovery_image, boot_image, self._info)
validate_target_files.ValidateInstallRecoveryScript(self._tempdir,
self._info)