Merge changes Icc298256,I9268cb11,I446a0b16,I347447eb,Iaae39e4c, ...

* changes:
  Remove HashTreeInfo from verity_utils.py
  Remove verity keys from info_dict
  Remove replace verity key args
  Remove unused args from common.py
  Clean up unused code for VB in verity_utils.py
  Remove verity related props from build_image.py
This commit is contained in:
Hung-Wei Chen
2022-08-30 07:10:23 +00:00
committed by Gerrit Code Review
12 changed files with 29 additions and 920 deletions

View File

@@ -76,8 +76,6 @@ OPTIONS = common.OPTIONS
OPTIONS.add_missing = False OPTIONS.add_missing = False
OPTIONS.rebuild_recovery = False OPTIONS.rebuild_recovery = False
OPTIONS.replace_updated_files_list = [] OPTIONS.replace_updated_files_list = []
OPTIONS.replace_verity_public_key = False
OPTIONS.replace_verity_private_key = False
OPTIONS.is_signing = False OPTIONS.is_signing = False
# Use a fixed timestamp (01/01/2009 00:00:00 UTC) for files when packaging # Use a fixed timestamp (01/01/2009 00:00:00 UTC) for files when packaging
@@ -457,8 +455,7 @@ def CreateImage(input_dir, info_dict, what, output_file, block_list=None):
# Set the '_image_size' for given image size. # Set the '_image_size' for given image size.
is_verity_partition = "verity_block_device" in image_props is_verity_partition = "verity_block_device" in image_props
verity_supported = (image_props.get("verity") == "true" or verity_supported = (image_props.get("avb_enable") == "true")
image_props.get("avb_enable") == "true")
is_avb_enable = image_props.get("avb_hashtree_enable") == "true" is_avb_enable = image_props.get("avb_hashtree_enable") == "true"
if verity_supported and (is_verity_partition or is_avb_enable): if verity_supported and (is_verity_partition or is_avb_enable):
image_size = image_props.get("image_size") image_size = image_props.get("image_size")
@@ -1064,9 +1061,11 @@ def main(argv):
elif o in ("-r", "--rebuild_recovery",): elif o in ("-r", "--rebuild_recovery",):
OPTIONS.rebuild_recovery = True OPTIONS.rebuild_recovery = True
elif o == "--replace_verity_private_key": elif o == "--replace_verity_private_key":
OPTIONS.replace_verity_private_key = (True, a) raise ValueError("--replace_verity_private_key is no longer supported,"
" please switch to AVB")
elif o == "--replace_verity_public_key": elif o == "--replace_verity_public_key":
OPTIONS.replace_verity_public_key = (True, a) raise ValueError("--replace_verity_public_key is no longer supported,"
" please switch to AVB")
elif o == "--is_signing": elif o == "--is_signing":
OPTIONS.is_signing = True OPTIONS.is_signing = True
else: else:

View File

@@ -537,14 +537,6 @@ class BlockImageDiff(object):
self.touched_src_sha1 = self.src.RangeSha1(self.touched_src_ranges) self.touched_src_sha1 = self.src.RangeSha1(self.touched_src_ranges)
if self.tgt.hashtree_info:
out.append("compute_hash_tree {} {} {} {} {}\n".format(
self.tgt.hashtree_info.hashtree_range.to_string_raw(),
self.tgt.hashtree_info.filesystem_range.to_string_raw(),
self.tgt.hashtree_info.hash_algorithm,
self.tgt.hashtree_info.salt,
self.tgt.hashtree_info.root_hash))
# Zero out extended blocks as a workaround for bug 20881595. # Zero out extended blocks as a workaround for bug 20881595.
if self.tgt.extended: if self.tgt.extended:
assert (WriteSplitTransfers(out, "zero", self.tgt.extended) == assert (WriteSplitTransfers(out, "zero", self.tgt.extended) ==
@@ -830,12 +822,6 @@ class BlockImageDiff(object):
assert touched[i] == 0 assert touched[i] == 0
touched[i] = 1 touched[i] = 1
if self.tgt.hashtree_info:
for s, e in self.tgt.hashtree_info.hashtree_range:
for i in range(s, e):
assert touched[i] == 0
touched[i] = 1
# Check that we've written every target block. # Check that we've written every target block.
for s, e in self.tgt.care_map: for s, e in self.tgt.care_map:
for i in range(s, e): for i in range(s, e):

View File

@@ -671,11 +671,6 @@ def ImagePropFromGlobalDict(glob_dict, mount_point):
"f2fs_sparse_flag", "f2fs_sparse_flag",
"skip_fsck", "skip_fsck",
"ext_mkuserimg", "ext_mkuserimg",
"verity",
"verity_key",
"verity_signer_cmd",
"verity_fec",
"verity_disable",
"avb_enable", "avb_enable",
"avb_avbtool", "avb_avbtool",
"use_dynamic_partition_size", "use_dynamic_partition_size",

View File

@@ -82,10 +82,6 @@ class Options(object):
self.public_key_suffix = ".x509.pem" self.public_key_suffix = ".x509.pem"
self.private_key_suffix = ".pk8" self.private_key_suffix = ".pk8"
# use otatools built boot_signer by default # use otatools built boot_signer by default
self.boot_signer_path = "boot_signer"
self.boot_signer_args = []
self.verity_signer_path = None
self.verity_signer_args = []
self.verbose = False self.verbose = False
self.tempfiles = [] self.tempfiles = []
self.device_specific = None self.device_specific = None
@@ -1686,23 +1682,9 @@ def _BuildBootableImage(image_name, sourcedir, fs_config_file, info_dict=None,
with open(img.name, 'ab') as f: with open(img.name, 'ab') as f:
f.write(boot_signature_bytes) f.write(boot_signature_bytes)
if (info_dict.get("boot_signer") == "true" and
info_dict.get("verity_key")):
# Hard-code the path as "/boot" for two-step special recovery image (which
# will be loaded into /boot during the two-step OTA).
if two_step_image:
path = "/boot"
else:
path = "/" + partition_name
cmd = [OPTIONS.boot_signer_path]
cmd.extend(OPTIONS.boot_signer_args)
cmd.extend([path, img.name,
info_dict["verity_key"] + ".pk8",
info_dict["verity_key"] + ".x509.pem", img.name])
RunAndCheckOutput(cmd)
# Sign the image if vboot is non-empty. # Sign the image if vboot is non-empty.
elif info_dict.get("vboot"): if info_dict.get("vboot"):
path = "/" + partition_name path = "/" + partition_name
img_keyblock = tempfile.NamedTemporaryFile() img_keyblock = tempfile.NamedTemporaryFile()
# We have switched from the prebuilt futility binary to using the tool # We have switched from the prebuilt futility binary to using the tool
@@ -2077,7 +2059,6 @@ def UnzipTemp(filename, patterns=None):
def GetUserImage(which, tmpdir, input_zip, def GetUserImage(which, tmpdir, input_zip,
info_dict=None, info_dict=None,
allow_shared_blocks=None, allow_shared_blocks=None,
hashtree_info_generator=None,
reset_file_map=False): reset_file_map=False):
"""Returns an Image object suitable for passing to BlockImageDiff. """Returns an Image object suitable for passing to BlockImageDiff.
@@ -2094,8 +2075,6 @@ def GetUserImage(which, tmpdir, input_zip,
info_dict: The dict to be looked up for relevant info. info_dict: The dict to be looked up for relevant info.
allow_shared_blocks: If image is sparse, whether having shared blocks is allow_shared_blocks: If image is sparse, whether having shared blocks is
allowed. If none, it is looked up from info_dict. allowed. If none, it is looked up from info_dict.
hashtree_info_generator: If present and image is sparse, generates the
hashtree_info for this sparse image.
reset_file_map: If true and image is sparse, reset file map before returning reset_file_map: If true and image is sparse, reset file map before returning
the image. the image.
Returns: Returns:
@@ -2117,15 +2096,14 @@ def GetUserImage(which, tmpdir, input_zip,
allow_shared_blocks = info_dict.get("ext4_share_dup_blocks") == "true" allow_shared_blocks = info_dict.get("ext4_share_dup_blocks") == "true"
if is_sparse: if is_sparse:
img = GetSparseImage(which, tmpdir, input_zip, allow_shared_blocks, img = GetSparseImage(which, tmpdir, input_zip, allow_shared_blocks)
hashtree_info_generator)
if reset_file_map: if reset_file_map:
img.ResetFileMap() img.ResetFileMap()
return img return img
return GetNonSparseImage(which, tmpdir, hashtree_info_generator) return GetNonSparseImage(which, tmpdir)
def GetNonSparseImage(which, tmpdir, hashtree_info_generator=None): def GetNonSparseImage(which, tmpdir):
"""Returns a Image object suitable for passing to BlockImageDiff. """Returns a Image object suitable for passing to BlockImageDiff.
This function loads the specified non-sparse image from the given path. This function loads the specified non-sparse image from the given path.
@@ -2143,11 +2121,10 @@ def GetNonSparseImage(which, tmpdir, hashtree_info_generator=None):
# ota_from_target_files.py (since LMP). # ota_from_target_files.py (since LMP).
assert os.path.exists(path) and os.path.exists(mappath) assert os.path.exists(path) and os.path.exists(mappath)
return images.FileImage(path, hashtree_info_generator=hashtree_info_generator) return images.FileImage(path)
def GetSparseImage(which, tmpdir, input_zip, allow_shared_blocks, def GetSparseImage(which, tmpdir, input_zip, allow_shared_blocks):
hashtree_info_generator=None):
"""Returns a SparseImage object suitable for passing to BlockImageDiff. """Returns a SparseImage object suitable for passing to BlockImageDiff.
This function loads the specified sparse image from the given path, and This function loads the specified sparse image from the given path, and
@@ -2160,8 +2137,6 @@ def GetSparseImage(which, tmpdir, input_zip, allow_shared_blocks,
tmpdir: The directory that contains the prebuilt image and block map file. tmpdir: The directory that contains the prebuilt image and block map file.
input_zip: The target-files ZIP archive. input_zip: The target-files ZIP archive.
allow_shared_blocks: Whether having shared blocks is allowed. allow_shared_blocks: Whether having shared blocks is allowed.
hashtree_info_generator: If present, generates the hashtree_info for this
sparse image.
Returns: Returns:
A SparseImage object, with file_map info loaded. A SparseImage object, with file_map info loaded.
""" """
@@ -2178,8 +2153,7 @@ def GetSparseImage(which, tmpdir, input_zip, allow_shared_blocks,
clobbered_blocks = "0" clobbered_blocks = "0"
image = sparse_img.SparseImage( image = sparse_img.SparseImage(
path, mappath, clobbered_blocks, allow_shared_blocks=allow_shared_blocks, path, mappath, clobbered_blocks, allow_shared_blocks=allow_shared_blocks)
hashtree_info_generator=hashtree_info_generator)
# block.map may contain less blocks, because mke2fs may skip allocating blocks # block.map may contain less blocks, because mke2fs may skip allocating blocks
# if they contain all zeros. We can't reconstruct such a file from its block # if they contain all zeros. We can't reconstruct such a file from its block
@@ -2634,13 +2608,13 @@ def ParseOptions(argv,
elif o in ("--private_key_suffix",): elif o in ("--private_key_suffix",):
OPTIONS.private_key_suffix = a OPTIONS.private_key_suffix = a
elif o in ("--boot_signer_path",): elif o in ("--boot_signer_path",):
OPTIONS.boot_signer_path = a raise ValueError("--boot_signer_path is no longer supported, please switch to AVB")
elif o in ("--boot_signer_args",): elif o in ("--boot_signer_args",):
OPTIONS.boot_signer_args = shlex.split(a) raise ValueError("--boot_signer_args is no longer supported, please switch to AVB")
elif o in ("--verity_signer_path",): elif o in ("--verity_signer_path",):
OPTIONS.verity_signer_path = a raise ValueError("--verity_signer_path is no longer supported, please switch to AVB")
elif o in ("--verity_signer_args",): elif o in ("--verity_signer_args",):
OPTIONS.verity_signer_args = shlex.split(a) raise ValueError("--verity_signer_args is no longer supported, please switch to AVB")
elif o in ("-s", "--device_specific"): elif o in ("-s", "--device_specific"):
OPTIONS.device_specific = a OPTIONS.device_specific = a
elif o in ("-x", "--extra"): elif o in ("-x", "--extra"):

View File

@@ -149,7 +149,7 @@ class DataImage(Image):
class FileImage(Image): class FileImage(Image):
"""An image wrapped around a raw image file.""" """An image wrapped around a raw image file."""
def __init__(self, path, hashtree_info_generator=None): def __init__(self, path):
self.path = path self.path = path
self.blocksize = 4096 self.blocksize = 4096
self._file_size = os.path.getsize(self.path) self._file_size = os.path.getsize(self.path)
@@ -166,10 +166,6 @@ class FileImage(Image):
self.generator_lock = threading.Lock() self.generator_lock = threading.Lock()
self.hashtree_info = None
if hashtree_info_generator:
self.hashtree_info = hashtree_info_generator.Generate(self)
zero_blocks = [] zero_blocks = []
nonzero_blocks = [] nonzero_blocks = []
reference = '\0' * self.blocksize reference = '\0' * self.blocksize
@@ -190,8 +186,6 @@ class FileImage(Image):
self.file_map["__ZERO"] = RangeSet(data=zero_blocks) self.file_map["__ZERO"] = RangeSet(data=zero_blocks)
if nonzero_blocks: if nonzero_blocks:
self.file_map["__NONZERO"] = RangeSet(data=nonzero_blocks) self.file_map["__NONZERO"] = RangeSet(data=nonzero_blocks)
if self.hashtree_info:
self.file_map["__HASHTREE"] = self.hashtree_info.hashtree_range
def __del__(self): def __del__(self):
self._file.close() self._file.close()

View File

@@ -40,12 +40,9 @@ def GetBlockDifferences(target_zip, source_zip, target_info, source_info,
info_dict=source_info, info_dict=source_info,
allow_shared_blocks=allow_shared_blocks) allow_shared_blocks=allow_shared_blocks)
hashtree_info_generator = verity_utils.CreateHashtreeInfoGenerator(
name, 4096, target_info)
partition_tgt = common.GetUserImage(name, OPTIONS.target_tmp, target_zip, partition_tgt = common.GetUserImage(name, OPTIONS.target_tmp, target_zip,
info_dict=target_info, info_dict=target_info,
allow_shared_blocks=allow_shared_blocks, allow_shared_blocks=allow_shared_blocks)
hashtree_info_generator=hashtree_info_generator)
# Check the first block of the source system partition for remount R/W only # Check the first block of the source system partition for remount R/W only
# if the filesystem is ext4. # if the filesystem is ext4.

View File

@@ -1227,8 +1227,7 @@ def GenerateAbOtaPackage(target_file, output_file, source_file=None):
# If dm-verity is supported for the device, copy contents of care_map # If dm-verity is supported for the device, copy contents of care_map
# into A/B OTA package. # into A/B OTA package.
target_zip = zipfile.ZipFile(target_file, "r", allowZip64=True) target_zip = zipfile.ZipFile(target_file, "r", allowZip64=True)
if (target_info.get("verity") == "true" or if target_info.get("avb_enable") == "true":
target_info.get("avb_enable") == "true"):
care_map_list = [x for x in ["care_map.pb", "care_map.txt"] if care_map_list = [x for x in ["care_map.pb", "care_map.txt"] if
"META/" + x in target_zip.namelist()] "META/" + x in target_zip.namelist()]

View File

@@ -188,9 +188,6 @@ OPTIONS.skip_apks_with_path_prefix = set()
OPTIONS.key_map = {} OPTIONS.key_map = {}
OPTIONS.rebuild_recovery = False OPTIONS.rebuild_recovery = False
OPTIONS.replace_ota_keys = False OPTIONS.replace_ota_keys = False
OPTIONS.replace_verity_public_key = False
OPTIONS.replace_verity_private_key = False
OPTIONS.replace_verity_keyid = False
OPTIONS.remove_avb_public_keys = None OPTIONS.remove_avb_public_keys = None
OPTIONS.tag_changes = ("-test-keys", "-dev-keys", "+release-keys") OPTIONS.tag_changes = ("-test-keys", "-dev-keys", "+release-keys")
OPTIONS.avb_keys = {} OPTIONS.avb_keys = {}
@@ -663,11 +660,6 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
elif filename == "META/misc_info.txt": elif filename == "META/misc_info.txt":
pass pass
# Skip verity public key if we will replace it.
elif (OPTIONS.replace_verity_public_key and
filename in ("BOOT/RAMDISK/verity_key",
"ROOT/verity_key")):
pass
elif (OPTIONS.remove_avb_public_keys and elif (OPTIONS.remove_avb_public_keys and
(filename.startswith("BOOT/RAMDISK/avb/") or (filename.startswith("BOOT/RAMDISK/avb/") or
filename.startswith("BOOT/RAMDISK/first_stage_ramdisk/avb/"))): filename.startswith("BOOT/RAMDISK/first_stage_ramdisk/avb/"))):
@@ -681,10 +673,6 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
# Copy it verbatim if we don't want to remove it. # Copy it verbatim if we don't want to remove it.
common.ZipWriteStr(output_tf_zip, out_info, data) common.ZipWriteStr(output_tf_zip, out_info, data)
# Skip verity keyid (for system_root_image use) if we will replace it.
elif OPTIONS.replace_verity_keyid and filename == "BOOT/cmdline":
pass
# Skip the vbmeta digest as we will recalculate it. # Skip the vbmeta digest as we will recalculate it.
elif filename == "META/vbmeta_digest.txt": elif filename == "META/vbmeta_digest.txt":
pass pass
@@ -766,27 +754,6 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
if OPTIONS.replace_ota_keys: if OPTIONS.replace_ota_keys:
ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info) ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info)
# Replace the keyid string in misc_info dict.
if OPTIONS.replace_verity_private_key:
ReplaceVerityPrivateKey(misc_info, OPTIONS.replace_verity_private_key[1])
if OPTIONS.replace_verity_public_key:
# Replace the one in root dir in system.img.
ReplaceVerityPublicKey(
output_tf_zip, 'ROOT/verity_key', OPTIONS.replace_verity_public_key[1])
if not system_root_image:
# Additionally replace the copy in ramdisk if not using system-as-root.
ReplaceVerityPublicKey(
output_tf_zip,
'BOOT/RAMDISK/verity_key',
OPTIONS.replace_verity_public_key[1])
# Replace the keyid string in BOOT/cmdline.
if OPTIONS.replace_verity_keyid:
ReplaceVerityKeyId(input_tf_zip, output_tf_zip,
OPTIONS.replace_verity_keyid[1])
# Replace the AVB signing keys, if any. # Replace the AVB signing keys, if any.
ReplaceAvbSigningKeys(misc_info) ReplaceAvbSigningKeys(misc_info)
@@ -1003,64 +970,6 @@ def ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info):
WriteOtacerts(output_tf_zip, info.filename, mapped_keys + extra_keys) WriteOtacerts(output_tf_zip, info.filename, mapped_keys + extra_keys)
def ReplaceVerityPublicKey(output_zip, filename, key_path):
"""Replaces the verity public key at the given path in the given zip.
Args:
output_zip: The output target_files zip.
filename: The archive name in the output zip.
key_path: The path to the public key.
"""
print("Replacing verity public key with %s" % (key_path,))
common.ZipWrite(output_zip, key_path, arcname=filename)
def ReplaceVerityPrivateKey(misc_info, key_path):
"""Replaces the verity private key in misc_info dict.
Args:
misc_info: The info dict.
key_path: The path to the private key in PKCS#8 format.
"""
print("Replacing verity private key with %s" % (key_path,))
misc_info["verity_key"] = key_path
def ReplaceVerityKeyId(input_zip, output_zip, key_path):
"""Replaces the veritykeyid parameter in BOOT/cmdline.
Args:
input_zip: The input target_files zip, which should be already open.
output_zip: The output target_files zip, which should be already open and
writable.
key_path: The path to the PEM encoded X.509 certificate.
"""
in_cmdline = input_zip.read("BOOT/cmdline").decode()
# Copy in_cmdline to output_zip if veritykeyid is not present.
if "veritykeyid" not in in_cmdline:
common.ZipWriteStr(output_zip, "BOOT/cmdline", in_cmdline)
return
out_buffer = []
for param in in_cmdline.split():
if "veritykeyid" not in param:
out_buffer.append(param)
continue
# Extract keyid using openssl command.
p = common.Run(["openssl", "x509", "-in", key_path, "-text"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
keyid, stderr = p.communicate()
assert p.returncode == 0, "Failed to dump certificate: {}".format(stderr)
keyid = re.search(
r'keyid:([0-9a-fA-F:]*)', keyid).group(1).replace(':', '').lower()
print("Replacing verity keyid with {}".format(keyid))
out_buffer.append("veritykeyid=id:%s" % (keyid,))
out_cmdline = ' '.join(out_buffer).strip() + '\n'
common.ZipWriteStr(output_zip, "BOOT/cmdline", out_cmdline)
def ReplaceMiscInfoTxt(input_zip, output_zip, misc_info): def ReplaceMiscInfoTxt(input_zip, output_zip, misc_info):
"""Replaces META/misc_info.txt. """Replaces META/misc_info.txt.
@@ -1425,11 +1334,14 @@ def main(argv):
new.append(i[0] + i[1:].strip()) new.append(i[0] + i[1:].strip())
OPTIONS.tag_changes = tuple(new) OPTIONS.tag_changes = tuple(new)
elif o == "--replace_verity_public_key": elif o == "--replace_verity_public_key":
OPTIONS.replace_verity_public_key = (True, a) raise ValueError("--replace_verity_public_key is no longer supported,"
" please switch to AVB")
elif o == "--replace_verity_private_key": elif o == "--replace_verity_private_key":
OPTIONS.replace_verity_private_key = (True, a) raise ValueError("--replace_verity_private_key is no longer supported,"
" please switch to AVB")
elif o == "--replace_verity_keyid": elif o == "--replace_verity_keyid":
OPTIONS.replace_verity_keyid = (True, a) raise ValueError("--replace_verity_keyid is no longer supported, please"
" switch to AVB")
elif o == "--remove_avb_public_keys": elif o == "--remove_avb_public_keys":
OPTIONS.remove_avb_public_keys = a.split(",") OPTIONS.remove_avb_public_keys = a.split(",")
elif o == "--avb_vbmeta_key": elif o == "--avb_vbmeta_key":

View File

@@ -41,8 +41,7 @@ class SparseImage(object):
""" """
def __init__(self, simg_fn, file_map_fn=None, clobbered_blocks=None, def __init__(self, simg_fn, file_map_fn=None, clobbered_blocks=None,
mode="rb", build_map=True, allow_shared_blocks=False, mode="rb", build_map=True, allow_shared_blocks=False):
hashtree_info_generator=None):
self.simg_f = f = open(simg_fn, mode) self.simg_f = f = open(simg_fn, mode)
header_bin = f.read(28) header_bin = f.read(28)
@@ -74,8 +73,6 @@ class SparseImage(object):
blk_sz, total_chunks) blk_sz, total_chunks)
if not build_map: if not build_map:
assert not hashtree_info_generator, \
"Cannot generate the hashtree info without building the offset map."
return return
pos = 0 # in blocks pos = 0 # in blocks
@@ -114,16 +111,6 @@ class SparseImage(object):
if data_sz != 0: if data_sz != 0:
raise ValueError("Don't care chunk input size is non-zero (%u)" % raise ValueError("Don't care chunk input size is non-zero (%u)" %
(data_sz)) (data_sz))
# Fills the don't care data ranges with zeros.
# TODO(xunchang) pass the care_map to hashtree info generator.
if hashtree_info_generator:
fill_data = '\x00' * 4
# In order to compute verity hashtree on device, we need to write
# zeros explicitly to the don't care ranges. Because these ranges may
# contain non-zero data from the previous build.
care_data.append(pos)
care_data.append(pos + chunk_sz)
offset_map.append((pos, chunk_sz, None, fill_data))
pos += chunk_sz pos += chunk_sz
@@ -150,10 +137,6 @@ class SparseImage(object):
extended = extended.intersect(all_blocks).subtract(self.care_map) extended = extended.intersect(all_blocks).subtract(self.care_map)
self.extended = extended self.extended = extended
self.hashtree_info = None
if hashtree_info_generator:
self.hashtree_info = hashtree_info_generator.Generate(self)
if file_map_fn: if file_map_fn:
self.LoadFileBlockMap(file_map_fn, self.clobbered_blocks, self.LoadFileBlockMap(file_map_fn, self.clobbered_blocks,
allow_shared_blocks) allow_shared_blocks)
@@ -286,8 +269,6 @@ class SparseImage(object):
remaining = remaining.subtract(ranges) remaining = remaining.subtract(ranges)
remaining = remaining.subtract(clobbered_blocks) remaining = remaining.subtract(clobbered_blocks)
if self.hashtree_info:
remaining = remaining.subtract(self.hashtree_info.hashtree_range)
# For all the remaining blocks in the care_map (ie, those that # For all the remaining blocks in the care_map (ie, those that
# aren't part of the data for any file nor part of the clobbered_blocks), # aren't part of the data for any file nor part of the clobbered_blocks),
@@ -350,8 +331,6 @@ class SparseImage(object):
out["__NONZERO-%d" % i] = rangelib.RangeSet(data=blocks) out["__NONZERO-%d" % i] = rangelib.RangeSet(data=blocks)
if clobbered_blocks: if clobbered_blocks:
out["__COPY"] = clobbered_blocks out["__COPY"] = clobbered_blocks
if self.hashtree_info:
out["__HASHTREE"] = self.hashtree_info.hashtree_range
def ResetFileMap(self): def ResetFileMap(self):
"""Throw away the file map and treat the entire image as """Throw away the file map and treat the entire image as

View File

@@ -23,8 +23,8 @@ import common
import test_utils import test_utils
from sign_target_files_apks import ( from sign_target_files_apks import (
CheckApkAndApexKeysAvailable, EditTags, GetApkFileInfo, ReadApexKeysInfo, CheckApkAndApexKeysAvailable, EditTags, GetApkFileInfo, ReadApexKeysInfo,
ReplaceCerts, ReplaceGkiSigningKey, ReplaceVerityKeyId, RewriteAvbProps, ReplaceCerts, ReplaceGkiSigningKey, RewriteAvbProps, RewriteProps,
RewriteProps, WriteOtacerts) WriteOtacerts)
class SignTargetFilesApksTest(test_utils.ReleaseToolsTestCase): class SignTargetFilesApksTest(test_utils.ReleaseToolsTestCase):
@@ -154,64 +154,6 @@ name="apex.apexd_test_different_app.apex" public_key="system/apex/apexd/apexd_te
'\n'.join([prop[1] for prop in props]) + '\n', '\n'.join([prop[1] for prop in props]) + '\n',
RewriteProps('\n'.join([prop[0] for prop in props]))) RewriteProps('\n'.join([prop[0] for prop in props])))
def test_ReplaceVerityKeyId(self):
BOOT_CMDLINE1 = (
"console=ttyHSL0,115200,n8 androidboot.console=ttyHSL0 "
"androidboot.hardware=marlin user_debug=31 ehci-hcd.park=3 "
"lpm_levels.sleep_disabled=1 cma=32M@0-0xffffffff loop.max_part=7 "
"buildvariant=userdebug "
"veritykeyid=id:7e4333f9bba00adfe0ede979e28ed1920492b40f\n")
BOOT_CMDLINE2 = (
"console=ttyHSL0,115200,n8 androidboot.console=ttyHSL0 "
"androidboot.hardware=marlin user_debug=31 ehci-hcd.park=3 "
"lpm_levels.sleep_disabled=1 cma=32M@0-0xffffffff loop.max_part=7 "
"buildvariant=userdebug "
"veritykeyid=id:d24f2590e9abab5cff5f59da4c4f0366e3f43e94\n")
input_file = common.MakeTempFile(suffix='.zip')
with zipfile.ZipFile(input_file, 'w', allowZip64=True) as input_zip:
input_zip.writestr('BOOT/cmdline', BOOT_CMDLINE1)
# Test with the first certificate.
cert_file = os.path.join(self.testdata_dir, 'verity.x509.pem')
output_file = common.MakeTempFile(suffix='.zip')
with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip, \
zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
ReplaceVerityKeyId(input_zip, output_zip, cert_file)
with zipfile.ZipFile(output_file) as output_zip:
self.assertEqual(BOOT_CMDLINE1, output_zip.read('BOOT/cmdline').decode())
# Test with the second certificate.
cert_file = os.path.join(self.testdata_dir, 'testkey.x509.pem')
with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip, \
zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
ReplaceVerityKeyId(input_zip, output_zip, cert_file)
with zipfile.ZipFile(output_file) as output_zip:
self.assertEqual(BOOT_CMDLINE2, output_zip.read('BOOT/cmdline').decode())
def test_ReplaceVerityKeyId_no_veritykeyid(self):
BOOT_CMDLINE = (
"console=ttyHSL0,115200,n8 androidboot.hardware=bullhead boot_cpus=0-5 "
"lpm_levels.sleep_disabled=1 msm_poweroff.download_mode=0 "
"loop.max_part=7\n")
input_file = common.MakeTempFile(suffix='.zip')
with zipfile.ZipFile(input_file, 'w', allowZip64=True) as input_zip:
input_zip.writestr('BOOT/cmdline', BOOT_CMDLINE)
output_file = common.MakeTempFile(suffix='.zip')
with zipfile.ZipFile(input_file, 'r', allowZip64=True) as input_zip, \
zipfile.ZipFile(output_file, 'w', allowZip64=True) as output_zip:
ReplaceVerityKeyId(input_zip, output_zip, None)
with zipfile.ZipFile(output_file) as output_zip:
self.assertEqual(BOOT_CMDLINE, output_zip.read('BOOT/cmdline').decode())
def test_ReplaceCerts(self): def test_ReplaceCerts(self):
cert1_path = os.path.join(self.testdata_dir, 'platform.x509.pem') cert1_path = os.path.join(self.testdata_dir, 'platform.x509.pem')
with open(cert1_path) as cert1_fp: with open(cert1_path) as cert1_fp:

View File

@@ -27,249 +27,11 @@ from rangelib import RangeSet
from test_utils import ( from test_utils import (
get_testdata_dir, ReleaseToolsTestCase, SkipIfExternalToolsUnavailable) get_testdata_dir, ReleaseToolsTestCase, SkipIfExternalToolsUnavailable)
from verity_utils import ( from verity_utils import (
CalculateVbmetaDigest, CreateHashtreeInfoGenerator, CalculateVbmetaDigest, CreateVerityImageBuilder)
CreateVerityImageBuilder, HashtreeInfo,
VerifiedBootVersion1HashtreeInfoGenerator)
BLOCK_SIZE = common.BLOCK_SIZE BLOCK_SIZE = common.BLOCK_SIZE
class VerifiedBootVersion1HashtreeInfoGeneratorTest(ReleaseToolsTestCase):
def setUp(self):
self.testdata_dir = get_testdata_dir()
self.partition_size = 1024 * 1024
self.prop_dict = {
'verity': 'true',
'verity_fec': 'true',
'system_verity_block_device': '/dev/block/system',
'system_size': self.partition_size
}
self.hash_algorithm = "sha256"
self.fixed_salt = (
"aee087a5be3b982978c923f566a94613496b417f2af592639bc80d141e34dfe7")
self.expected_root_hash = (
"0b7c4565e87b1026e11fbab91c0bc29e185c847a5b44d40e6e86e461e8adf80d")
def _CreateSimg(self, raw_data): # pylint: disable=no-self-use
output_file = common.MakeTempFile()
raw_image = common.MakeTempFile()
with open(raw_image, 'wb') as f:
f.write(raw_data)
cmd = ["img2simg", raw_image, output_file, '4096']
common.RunAndCheckOutput(cmd)
return output_file
def _GenerateImage(self):
partition_size = 1024 * 1024
prop_dict = {
'partition_size': str(partition_size),
'verity': 'true',
'verity_block_device': '/dev/block/system',
'verity_key': os.path.join(self.testdata_dir, 'testkey'),
'verity_fec': 'true',
'verity_signer_cmd': 'verity_signer',
}
verity_image_builder = CreateVerityImageBuilder(prop_dict)
self.assertIsNotNone(verity_image_builder)
adjusted_size = verity_image_builder.CalculateMaxImageSize()
raw_image = bytearray(adjusted_size)
for i in range(adjusted_size):
raw_image[i] = ord('0') + i % 10
output_file = self._CreateSimg(raw_image)
# Append the verity metadata.
verity_image_builder.Build(output_file)
return output_file
@SkipIfExternalToolsUnavailable()
def test_CreateHashtreeInfoGenerator(self):
image_file = sparse_img.SparseImage(self._GenerateImage())
generator = CreateHashtreeInfoGenerator(
'system', image_file, self.prop_dict)
self.assertEqual(
VerifiedBootVersion1HashtreeInfoGenerator, type(generator))
self.assertEqual(self.partition_size, generator.partition_size)
self.assertTrue(generator.fec_supported)
@SkipIfExternalToolsUnavailable()
def test_DecomposeSparseImage(self):
image_file = sparse_img.SparseImage(self._GenerateImage())
generator = VerifiedBootVersion1HashtreeInfoGenerator(
self.partition_size, 4096, True)
generator.DecomposeSparseImage(image_file)
self.assertEqual(991232, generator.filesystem_size)
self.assertEqual(12288, generator.hashtree_size)
self.assertEqual(32768, generator.metadata_size)
@SkipIfExternalToolsUnavailable()
def test_ParseHashtreeMetadata(self):
image_file = sparse_img.SparseImage(self._GenerateImage())
generator = VerifiedBootVersion1HashtreeInfoGenerator(
self.partition_size, 4096, True)
generator.DecomposeSparseImage(image_file)
# pylint: disable=protected-access
generator._ParseHashtreeMetadata()
self.assertEqual(
self.hash_algorithm, generator.hashtree_info.hash_algorithm)
self.assertEqual(self.fixed_salt, generator.hashtree_info.salt)
self.assertEqual(self.expected_root_hash, generator.hashtree_info.root_hash)
@SkipIfExternalToolsUnavailable()
def test_ValidateHashtree_smoke(self):
generator = VerifiedBootVersion1HashtreeInfoGenerator(
self.partition_size, 4096, True)
generator.image = sparse_img.SparseImage(self._GenerateImage())
generator.hashtree_info = info = HashtreeInfo()
info.filesystem_range = RangeSet(data=[0, 991232 // 4096])
info.hashtree_range = RangeSet(
data=[991232 // 4096, (991232 + 12288) // 4096])
info.hash_algorithm = self.hash_algorithm
info.salt = self.fixed_salt
info.root_hash = self.expected_root_hash
self.assertTrue(generator.ValidateHashtree())
@SkipIfExternalToolsUnavailable()
def test_ValidateHashtree_failure(self):
generator = VerifiedBootVersion1HashtreeInfoGenerator(
self.partition_size, 4096, True)
generator.image = sparse_img.SparseImage(self._GenerateImage())
generator.hashtree_info = info = HashtreeInfo()
info.filesystem_range = RangeSet(data=[0, 991232 // 4096])
info.hashtree_range = RangeSet(
data=[991232 // 4096, (991232 + 12288) // 4096])
info.hash_algorithm = self.hash_algorithm
info.salt = self.fixed_salt
info.root_hash = "a" + self.expected_root_hash[1:]
self.assertFalse(generator.ValidateHashtree())
@SkipIfExternalToolsUnavailable()
def test_Generate(self):
image_file = sparse_img.SparseImage(self._GenerateImage())
generator = CreateHashtreeInfoGenerator('system', 4096, self.prop_dict)
info = generator.Generate(image_file)
self.assertEqual(RangeSet(data=[0, 991232 // 4096]), info.filesystem_range)
self.assertEqual(RangeSet(data=[991232 // 4096, (991232 + 12288) // 4096]),
info.hashtree_range)
self.assertEqual(self.hash_algorithm, info.hash_algorithm)
self.assertEqual(self.fixed_salt, info.salt)
self.assertEqual(self.expected_root_hash, info.root_hash)
class VerifiedBootVersion1VerityImageBuilderTest(ReleaseToolsTestCase):
DEFAULT_PARTITION_SIZE = 4096 * 1024
DEFAULT_PROP_DICT = {
'partition_size': str(DEFAULT_PARTITION_SIZE),
'verity': 'true',
'verity_block_device': '/dev/block/system',
'verity_key': os.path.join(get_testdata_dir(), 'testkey'),
'verity_fec': 'true',
'verity_signer_cmd': 'verity_signer',
}
def test_init(self):
prop_dict = copy.deepcopy(self.DEFAULT_PROP_DICT)
verity_image_builder = CreateVerityImageBuilder(prop_dict)
self.assertIsNotNone(verity_image_builder)
self.assertEqual(1, verity_image_builder.version)
def test_init_MissingProps(self):
prop_dict = copy.deepcopy(self.DEFAULT_PROP_DICT)
del prop_dict['verity']
self.assertIsNone(CreateVerityImageBuilder(prop_dict))
prop_dict = copy.deepcopy(self.DEFAULT_PROP_DICT)
del prop_dict['verity_block_device']
self.assertIsNone(CreateVerityImageBuilder(prop_dict))
@SkipIfExternalToolsUnavailable()
def test_CalculateMaxImageSize(self):
verity_image_builder = CreateVerityImageBuilder(self.DEFAULT_PROP_DICT)
size = verity_image_builder.CalculateMaxImageSize()
self.assertLess(size, self.DEFAULT_PARTITION_SIZE)
# Same result by explicitly passing the partition size.
self.assertEqual(
verity_image_builder.CalculateMaxImageSize(),
verity_image_builder.CalculateMaxImageSize(
self.DEFAULT_PARTITION_SIZE))
@staticmethod
def _BuildAndVerify(prop, verify_key):
verity_image_builder = CreateVerityImageBuilder(prop)
image_size = verity_image_builder.CalculateMaxImageSize()
# Build the sparse image with verity metadata.
input_dir = common.MakeTempDir()
image = common.MakeTempFile(suffix='.img')
cmd = ['mkuserimg_mke2fs', input_dir, image, 'ext4', '/system',
str(image_size), '-j', '0', '-s']
common.RunAndCheckOutput(cmd)
verity_image_builder.Build(image)
# Verify the verity metadata.
cmd = ['verity_verifier', image, '-mincrypt', verify_key]
common.RunAndCheckOutput(cmd)
@SkipIfExternalToolsUnavailable()
def test_Build(self):
self._BuildAndVerify(
self.DEFAULT_PROP_DICT,
os.path.join(get_testdata_dir(), 'testkey_mincrypt'))
@SkipIfExternalToolsUnavailable()
def test_Build_ValidationCheck(self):
# A validity check for the test itself: the image shouldn't be verifiable
# with wrong key.
self.assertRaises(
common.ExternalError,
self._BuildAndVerify,
self.DEFAULT_PROP_DICT,
os.path.join(get_testdata_dir(), 'verity_mincrypt'))
@SkipIfExternalToolsUnavailable()
def test_Build_FecDisabled(self):
prop_dict = copy.deepcopy(self.DEFAULT_PROP_DICT)
del prop_dict['verity_fec']
self._BuildAndVerify(
prop_dict,
os.path.join(get_testdata_dir(), 'testkey_mincrypt'))
@SkipIfExternalToolsUnavailable()
def test_Build_SquashFs(self):
verity_image_builder = CreateVerityImageBuilder(self.DEFAULT_PROP_DICT)
verity_image_builder.CalculateMaxImageSize()
# Build the sparse image with verity metadata.
input_dir = common.MakeTempDir()
image = common.MakeTempFile(suffix='.img')
cmd = ['mksquashfsimage.sh', input_dir, image, '-s']
common.RunAndCheckOutput(cmd)
verity_image_builder.PadSparseImage(image)
verity_image_builder.Build(image)
# Verify the verity metadata.
cmd = ["verity_verifier", image, '-mincrypt',
os.path.join(get_testdata_dir(), 'testkey_mincrypt')]
common.RunAndCheckOutput(cmd)
class VerifiedBootVersion2VerityImageBuilderTest(ReleaseToolsTestCase): class VerifiedBootVersion2VerityImageBuilderTest(ReleaseToolsTestCase):
DEFAULT_PROP_DICT = { DEFAULT_PROP_DICT = {

View File

@@ -49,107 +49,6 @@ class BuildVerityImageError(Exception):
Exception.__init__(self, message) Exception.__init__(self, message)
def GetVerityFECSize(image_size):
cmd = ["fec", "-s", str(image_size)]
output = common.RunAndCheckOutput(cmd, verbose=False)
return int(output)
def GetVerityTreeSize(image_size):
cmd = ["build_verity_tree", "-s", str(image_size)]
output = common.RunAndCheckOutput(cmd, verbose=False)
return int(output)
def GetVerityMetadataSize(image_size):
cmd = ["build_verity_metadata", "size", str(image_size)]
output = common.RunAndCheckOutput(cmd, verbose=False)
return int(output)
def GetVeritySize(image_size, fec_supported):
verity_tree_size = GetVerityTreeSize(image_size)
verity_metadata_size = GetVerityMetadataSize(image_size)
verity_size = verity_tree_size + verity_metadata_size
if fec_supported:
fec_size = GetVerityFECSize(image_size + verity_size)
return verity_size + fec_size
return verity_size
def GetSimgSize(image_file):
simg = sparse_img.SparseImage(image_file, build_map=False)
return simg.blocksize * simg.total_blocks
def ZeroPadSimg(image_file, pad_size):
blocks = pad_size // BLOCK_SIZE
logger.info("Padding %d blocks (%d bytes)", blocks, pad_size)
simg = sparse_img.SparseImage(image_file, mode="r+b", build_map=False)
simg.AppendFillChunk(0, blocks)
def BuildVerityFEC(sparse_image_path, verity_path, verity_fec_path,
padding_size):
cmd = ["fec", "-e", "-p", str(padding_size), sparse_image_path,
verity_path, verity_fec_path]
common.RunAndCheckOutput(cmd)
def BuildVerityTree(sparse_image_path, verity_image_path):
cmd = ["build_verity_tree", "-A", FIXED_SALT, sparse_image_path,
verity_image_path]
output = common.RunAndCheckOutput(cmd)
root, salt = output.split()
return root, salt
def BuildVerityMetadata(image_size, verity_metadata_path, root_hash, salt,
block_device, signer_path, key, signer_args,
verity_disable):
cmd = ["build_verity_metadata", "build", str(image_size),
verity_metadata_path, root_hash, salt, block_device, signer_path, key]
if signer_args:
cmd.append("--signer_args=\"%s\"" % (' '.join(signer_args),))
if verity_disable:
cmd.append("--verity_disable")
common.RunAndCheckOutput(cmd)
def Append2Simg(sparse_image_path, unsparse_image_path, error_message):
"""Appends the unsparse image to the given sparse image.
Args:
sparse_image_path: the path to the (sparse) image
unsparse_image_path: the path to the (unsparse) image
Raises:
BuildVerityImageError: On error.
"""
cmd = ["append2simg", sparse_image_path, unsparse_image_path]
try:
common.RunAndCheckOutput(cmd)
except:
logger.exception(error_message)
raise BuildVerityImageError(error_message)
def Append(target, file_to_append, error_message):
"""Appends file_to_append to target.
Raises:
BuildVerityImageError: On error.
"""
try:
with open(target, 'ab') as out_file, \
open(file_to_append, 'rb') as input_file:
for line in input_file:
out_file.write(line)
except IOError:
logger.exception(error_message)
raise BuildVerityImageError(error_message)
def CreateVerityImageBuilder(prop_dict): def CreateVerityImageBuilder(prop_dict):
"""Returns a verity image builder based on the given build properties. """Returns a verity image builder based on the given build properties.
@@ -166,23 +65,6 @@ def CreateVerityImageBuilder(prop_dict):
if partition_size: if partition_size:
partition_size = int(partition_size) partition_size = int(partition_size)
# Verified Boot 1.0
verity_supported = prop_dict.get("verity") == "true"
is_verity_partition = "verity_block_device" in prop_dict
if verity_supported and is_verity_partition:
if OPTIONS.verity_signer_path is not None:
signer_path = OPTIONS.verity_signer_path
else:
signer_path = prop_dict["verity_signer_cmd"]
return Version1VerityImageBuilder(
partition_size,
prop_dict["verity_block_device"],
prop_dict.get("verity_fec") == "true",
signer_path,
prop_dict["verity_key"] + ".pk8",
OPTIONS.verity_signer_args,
"verity_disable" in prop_dict)
# Verified Boot 2.0 # Verified Boot 2.0
if (prop_dict.get("avb_hash_enable") == "true" or if (prop_dict.get("avb_hash_enable") == "true" or
prop_dict.get("avb_hashtree_enable") == "true"): prop_dict.get("avb_hashtree_enable") == "true"):
@@ -245,125 +127,6 @@ class VerityImageBuilder(object):
raise NotImplementedError raise NotImplementedError
class Version1VerityImageBuilder(VerityImageBuilder):
"""A VerityImageBuilder for Verified Boot 1.0."""
def __init__(self, partition_size, block_dev, fec_supported, signer_path,
signer_key, signer_args, verity_disable):
self.version = 1
self.partition_size = partition_size
self.block_device = block_dev
self.fec_supported = fec_supported
self.signer_path = signer_path
self.signer_key = signer_key
self.signer_args = signer_args
self.verity_disable = verity_disable
self.image_size = None
self.verity_size = None
def CalculateDynamicPartitionSize(self, image_size):
# This needs to be implemented. Note that returning the given image size as
# the partition size doesn't make sense, as it will fail later.
raise NotImplementedError
def CalculateMaxImageSize(self, partition_size=None):
"""Calculates the max image size by accounting for the verity metadata.
Args:
partition_size: The partition size, which defaults to self.partition_size
if unspecified.
Returns:
The size of the image adjusted for verity metadata.
"""
if partition_size is None:
partition_size = self.partition_size
assert partition_size > 0, \
"Invalid partition size: {}".format(partition_size)
hi = partition_size
if hi % BLOCK_SIZE != 0:
hi = (hi // BLOCK_SIZE) * BLOCK_SIZE
# verity tree and fec sizes depend on the partition size, which
# means this estimate is always going to be unnecessarily small
verity_size = GetVeritySize(hi, self.fec_supported)
lo = partition_size - verity_size
result = lo
# do a binary search for the optimal size
while lo < hi:
i = ((lo + hi) // (2 * BLOCK_SIZE)) * BLOCK_SIZE
v = GetVeritySize(i, self.fec_supported)
if i + v <= partition_size:
if result < i:
result = i
verity_size = v
lo = i + BLOCK_SIZE
else:
hi = i
self.image_size = result
self.verity_size = verity_size
logger.info(
"Calculated image size for verity: partition_size %d, image_size %d, "
"verity_size %d", partition_size, result, verity_size)
return result
def Build(self, out_file):
"""Creates an image that is verifiable using dm-verity.
Args:
out_file: the output image.
Returns:
AssertionError: On invalid partition sizes.
BuildVerityImageError: On other errors.
"""
image_size = int(self.image_size)
tempdir_name = common.MakeTempDir(suffix="_verity_images")
# Get partial image paths.
verity_image_path = os.path.join(tempdir_name, "verity.img")
verity_metadata_path = os.path.join(tempdir_name, "verity_metadata.img")
# Build the verity tree and get the root hash and salt.
root_hash, salt = BuildVerityTree(out_file, verity_image_path)
# Build the metadata blocks.
BuildVerityMetadata(
image_size, verity_metadata_path, root_hash, salt, self.block_device,
self.signer_path, self.signer_key, self.signer_args,
self.verity_disable)
padding_size = self.partition_size - self.image_size - self.verity_size
assert padding_size >= 0
# Build the full verified image.
Append(
verity_image_path, verity_metadata_path,
"Failed to append verity metadata")
if self.fec_supported:
# Build FEC for the entire partition, including metadata.
verity_fec_path = os.path.join(tempdir_name, "verity_fec.img")
BuildVerityFEC(
out_file, verity_image_path, verity_fec_path, padding_size)
Append(verity_image_path, verity_fec_path, "Failed to append FEC")
Append2Simg(
out_file, verity_image_path, "Failed to append verity data")
def PadSparseImage(self, out_file):
sparse_image_size = GetSimgSize(out_file)
if sparse_image_size > self.image_size:
raise BuildVerityImageError(
"Error: image size of {} is larger than partition size of "
"{}".format(sparse_image_size, self.image_size))
ZeroPadSimg(out_file, self.image_size - sparse_image_size)
class VerifiedBootVersion2VerityImageBuilder(VerityImageBuilder): class VerifiedBootVersion2VerityImageBuilder(VerityImageBuilder):
"""A VerityImageBuilder for Verified Boot 2.0.""" """A VerityImageBuilder for Verified Boot 2.0."""
@@ -519,199 +282,6 @@ class VerifiedBootVersion2VerityImageBuilder(VerityImageBuilder):
raise BuildVerityImageError("Failed to add AVB footer: {}".format(output)) raise BuildVerityImageError("Failed to add AVB footer: {}".format(output))
class HashtreeInfoGenerationError(Exception):
"""An Exception raised during hashtree info generation."""
def __init__(self, message):
Exception.__init__(self, message)
class HashtreeInfo(object):
def __init__(self):
self.hashtree_range = None
self.filesystem_range = None
self.hash_algorithm = None
self.salt = None
self.root_hash = None
def CreateHashtreeInfoGenerator(partition_name, block_size, info_dict):
generator = None
if (info_dict.get("verity") == "true" and
info_dict.get("{}_verity_block_device".format(partition_name))):
partition_size = info_dict["{}_size".format(partition_name)]
fec_supported = info_dict.get("verity_fec") == "true"
generator = VerifiedBootVersion1HashtreeInfoGenerator(
partition_size, block_size, fec_supported)
return generator
class HashtreeInfoGenerator(object):
def Generate(self, image):
raise NotImplementedError
def DecomposeSparseImage(self, image):
raise NotImplementedError
def ValidateHashtree(self):
raise NotImplementedError
class VerifiedBootVersion1HashtreeInfoGenerator(HashtreeInfoGenerator):
"""A class that parses the metadata of hashtree for a given partition."""
def __init__(self, partition_size, block_size, fec_supported):
"""Initialize VerityTreeInfo with the sparse image and input property.
Arguments:
partition_size: The whole size in bytes of a partition, including the
filesystem size, padding size, and verity size.
block_size: Expected size in bytes of each block for the sparse image.
fec_supported: True if the verity section contains fec data.
"""
self.block_size = block_size
self.partition_size = partition_size
self.fec_supported = fec_supported
self.image = None
self.filesystem_size = None
self.hashtree_size = None
self.metadata_size = None
prop_dict = {
'partition_size': str(partition_size),
'verity': 'true',
'verity_fec': 'true' if fec_supported else None,
# 'verity_block_device' needs to be present to indicate a verity-enabled
# partition.
'verity_block_device': '',
# We don't need the following properties that are needed for signing the
# verity metadata.
'verity_key': '',
'verity_signer_cmd': None,
}
self.verity_image_builder = CreateVerityImageBuilder(prop_dict)
self.hashtree_info = HashtreeInfo()
def DecomposeSparseImage(self, image):
"""Calculate the verity size based on the size of the input image.
Since we already know the structure of a verity enabled image to be:
[filesystem, verity_hashtree, verity_metadata, fec_data]. We can then
calculate the size and offset of each section.
"""
self.image = image
assert self.block_size == image.blocksize
assert self.partition_size == image.total_blocks * self.block_size, \
"partition size {} doesn't match with the calculated image size." \
" total_blocks: {}".format(self.partition_size, image.total_blocks)
adjusted_size = self.verity_image_builder.CalculateMaxImageSize()
assert adjusted_size % self.block_size == 0
verity_tree_size = GetVerityTreeSize(adjusted_size)
assert verity_tree_size % self.block_size == 0
metadata_size = GetVerityMetadataSize(adjusted_size)
assert metadata_size % self.block_size == 0
self.filesystem_size = adjusted_size
self.hashtree_size = verity_tree_size
self.metadata_size = metadata_size
self.hashtree_info.filesystem_range = RangeSet(
data=[0, adjusted_size // self.block_size])
self.hashtree_info.hashtree_range = RangeSet(
data=[adjusted_size // self.block_size,
(adjusted_size + verity_tree_size) // self.block_size])
def _ParseHashtreeMetadata(self):
"""Parses the hash_algorithm, root_hash, salt from the metadata block."""
metadata_start = self.filesystem_size + self.hashtree_size
metadata_range = RangeSet(
data=[metadata_start // self.block_size,
(metadata_start + self.metadata_size) // self.block_size])
meta_data = b''.join(self.image.ReadRangeSet(metadata_range))
# More info about the metadata structure available in:
# system/extras/verity/build_verity_metadata.py
META_HEADER_SIZE = 268
header_bin = meta_data[0:META_HEADER_SIZE]
header = struct.unpack("II256sI", header_bin)
# header: magic_number, version, signature, table_len
assert header[0] == 0xb001b001, header[0]
table_len = header[3]
verity_table = meta_data[META_HEADER_SIZE: META_HEADER_SIZE + table_len]
table_entries = verity_table.rstrip().split()
# Expected verity table format: "1 block_device block_device block_size
# block_size data_blocks data_blocks hash_algorithm root_hash salt"
assert len(table_entries) == 10, "Unexpected verity table size {}".format(
len(table_entries))
assert (int(table_entries[3]) == self.block_size and
int(table_entries[4]) == self.block_size)
assert (int(table_entries[5]) * self.block_size == self.filesystem_size and
int(table_entries[6]) * self.block_size == self.filesystem_size)
self.hashtree_info.hash_algorithm = table_entries[7].decode()
self.hashtree_info.root_hash = table_entries[8].decode()
self.hashtree_info.salt = table_entries[9].decode()
def ValidateHashtree(self):
"""Checks that we can reconstruct the verity hash tree."""
# Writes the filesystem section to a temp file; and calls the executable
# build_verity_tree to construct the hash tree.
adjusted_partition = common.MakeTempFile(prefix="adjusted_partition")
with open(adjusted_partition, "wb") as fd:
self.image.WriteRangeDataToFd(self.hashtree_info.filesystem_range, fd)
generated_verity_tree = common.MakeTempFile(prefix="verity")
root_hash, salt = BuildVerityTree(adjusted_partition, generated_verity_tree)
# The salt should be always identical, as we use fixed value.
assert salt == self.hashtree_info.salt, \
"Calculated salt {} doesn't match the one in metadata {}".format(
salt, self.hashtree_info.salt)
if root_hash != self.hashtree_info.root_hash:
logger.warning(
"Calculated root hash %s doesn't match the one in metadata %s",
root_hash, self.hashtree_info.root_hash)
return False
# Reads the generated hash tree and checks if it has the exact same bytes
# as the one in the sparse image.
with open(generated_verity_tree, 'rb') as fd:
return fd.read() == b''.join(self.image.ReadRangeSet(
self.hashtree_info.hashtree_range))
def Generate(self, image):
"""Parses and validates the hashtree info in a sparse image.
Returns:
hashtree_info: The information needed to reconstruct the hashtree.
Raises:
HashtreeInfoGenerationError: If we fail to generate the exact bytes of
the hashtree.
"""
self.DecomposeSparseImage(image)
self._ParseHashtreeMetadata()
if not self.ValidateHashtree():
raise HashtreeInfoGenerationError("Failed to reconstruct the verity tree")
return self.hashtree_info
def CreateCustomImageBuilder(info_dict, partition_name, partition_size, def CreateCustomImageBuilder(info_dict, partition_name, partition_size,
key_path, algorithm, signing_args): key_path, algorithm, signing_args):
builder = None builder = None