Move Payload/StreamProperty class to ota_utils.py

This allows other modules to import these classes w/o bring in tons of
depedency. No functional changes.

Test: th
Bug: 227848550
Change-Id: I98139b45c02eddefa8a26d032e759fa11cc4c694
This commit is contained in:
Kelvin Zhang
2022-08-30 17:41:29 +00:00
parent d5c36e1fd7
commit 62a7f6e08e
2 changed files with 248 additions and 246 deletions

View File

@@ -255,7 +255,6 @@ import os.path
import re
import shlex
import shutil
import struct
import subprocess
import sys
import zipfile
@@ -264,7 +263,7 @@ import care_map_pb2
import common
import ota_utils
from ota_utils import (UNZIP_PATTERN, FinalizeMetadata, GetPackageMetadata,
PropertyFiles, SECURITY_PATCH_LEVEL_PROP_NAME, GetZipEntryOffset)
Payload, SECURITY_PATCH_LEVEL_PROP_NAME, StreamingPropertyFiles, AbOtaPropertyFiles)
from common import IsSparseImage
import target_files_diff
from check_target_files_vintf import CheckVintfIfTrebleEnabled
@@ -336,143 +335,6 @@ SECONDARY_PAYLOAD_SKIPPED_IMAGES = [
'vendor', 'vendor_boot']
class Payload(object):
"""Manages the creation and the signing of an A/B OTA Payload."""
PAYLOAD_BIN = 'payload.bin'
PAYLOAD_PROPERTIES_TXT = 'payload_properties.txt'
SECONDARY_PAYLOAD_BIN = 'secondary/payload.bin'
SECONDARY_PAYLOAD_PROPERTIES_TXT = 'secondary/payload_properties.txt'
def __init__(self, secondary=False):
"""Initializes a Payload instance.
Args:
secondary: Whether it's generating a secondary payload (default: False).
"""
self.payload_file = None
self.payload_properties = None
self.secondary = secondary
def _Run(self, cmd): # pylint: disable=no-self-use
# Don't pipe (buffer) the output if verbose is set. Let
# brillo_update_payload write to stdout/stderr directly, so its progress can
# be monitored.
if OPTIONS.verbose:
common.RunAndCheckOutput(cmd, stdout=None, stderr=None)
else:
common.RunAndCheckOutput(cmd)
def Generate(self, target_file, source_file=None, additional_args=None):
"""Generates a payload from the given target-files zip(s).
Args:
target_file: The filename of the target build target-files zip.
source_file: The filename of the source build target-files zip; or None if
generating a full OTA.
additional_args: A list of additional args that should be passed to
brillo_update_payload script; or None.
"""
if additional_args is None:
additional_args = []
payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin")
cmd = ["brillo_update_payload", "generate",
"--payload", payload_file,
"--target_image", target_file]
if source_file is not None:
cmd.extend(["--source_image", source_file])
if OPTIONS.disable_fec_computation:
cmd.extend(["--disable_fec_computation", "true"])
if OPTIONS.disable_verity_computation:
cmd.extend(["--disable_verity_computation", "true"])
cmd.extend(additional_args)
self._Run(cmd)
self.payload_file = payload_file
self.payload_properties = None
def Sign(self, payload_signer):
"""Generates and signs the hashes of the payload and metadata.
Args:
payload_signer: A PayloadSigner() instance that serves the signing work.
Raises:
AssertionError: On any failure when calling brillo_update_payload script.
"""
assert isinstance(payload_signer, PayloadSigner)
# 1. Generate hashes of the payload and metadata files.
payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
cmd = ["brillo_update_payload", "hash",
"--unsigned_payload", self.payload_file,
"--signature_size", str(payload_signer.maximum_signature_size),
"--metadata_hash_file", metadata_sig_file,
"--payload_hash_file", payload_sig_file]
self._Run(cmd)
# 2. Sign the hashes.
signed_payload_sig_file = payload_signer.Sign(payload_sig_file)
signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
# 3. Insert the signatures back into the payload file.
signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
suffix=".bin")
cmd = ["brillo_update_payload", "sign",
"--unsigned_payload", self.payload_file,
"--payload", signed_payload_file,
"--signature_size", str(payload_signer.maximum_signature_size),
"--metadata_signature_file", signed_metadata_sig_file,
"--payload_signature_file", signed_payload_sig_file]
self._Run(cmd)
# 4. Dump the signed payload properties.
properties_file = common.MakeTempFile(prefix="payload-properties-",
suffix=".txt")
cmd = ["brillo_update_payload", "properties",
"--payload", signed_payload_file,
"--properties_file", properties_file]
self._Run(cmd)
if self.secondary:
with open(properties_file, "a") as f:
f.write("SWITCH_SLOT_ON_REBOOT=0\n")
if OPTIONS.wipe_user_data:
with open(properties_file, "a") as f:
f.write("POWERWASH=1\n")
self.payload_file = signed_payload_file
self.payload_properties = properties_file
def WriteToZip(self, output_zip):
"""Writes the payload to the given zip.
Args:
output_zip: The output ZipFile instance.
"""
assert self.payload_file is not None
assert self.payload_properties is not None
if self.secondary:
payload_arcname = Payload.SECONDARY_PAYLOAD_BIN
payload_properties_arcname = Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT
else:
payload_arcname = Payload.PAYLOAD_BIN
payload_properties_arcname = Payload.PAYLOAD_PROPERTIES_TXT
# Add the signed payload file and properties into the zip. In order to
# support streaming, we pack them as ZIP_STORED. So these entries can be
# read directly with the offset and length pairs.
common.ZipWrite(output_zip, self.payload_file, arcname=payload_arcname,
compress_type=zipfile.ZIP_STORED)
common.ZipWrite(output_zip, self.payload_properties,
arcname=payload_properties_arcname,
compress_type=zipfile.ZIP_STORED)
def _LoadOemDicts(oem_source):
"""Returns the list of loaded OEM properties dict."""
if not oem_source:
@@ -484,113 +346,6 @@ def _LoadOemDicts(oem_source):
return oem_dicts
class StreamingPropertyFiles(PropertyFiles):
"""A subclass for computing the property-files for streaming A/B OTAs."""
def __init__(self):
super(StreamingPropertyFiles, self).__init__()
self.name = 'ota-streaming-property-files'
self.required = (
# payload.bin and payload_properties.txt must exist.
'payload.bin',
'payload_properties.txt',
)
self.optional = (
# apex_info.pb isn't directly used in the update flow
'apex_info.pb',
# care_map is available only if dm-verity is enabled.
'care_map.pb',
'care_map.txt',
# compatibility.zip is available only if target supports Treble.
'compatibility.zip',
)
class AbOtaPropertyFiles(StreamingPropertyFiles):
"""The property-files for A/B OTA that includes payload_metadata.bin info.
Since P, we expose one more token (aka property-file), in addition to the ones
for streaming A/B OTA, for a virtual entry of 'payload_metadata.bin'.
'payload_metadata.bin' is the header part of a payload ('payload.bin'), which
doesn't exist as a separate ZIP entry, but can be used to verify if the
payload can be applied on the given device.
For backward compatibility, we keep both of the 'ota-streaming-property-files'
and the newly added 'ota-property-files' in P. The new token will only be
available in 'ota-property-files'.
"""
def __init__(self):
super(AbOtaPropertyFiles, self).__init__()
self.name = 'ota-property-files'
def _GetPrecomputed(self, input_zip):
offset, size = self._GetPayloadMetadataOffsetAndSize(input_zip)
return ['payload_metadata.bin:{}:{}'.format(offset, size)]
@staticmethod
def _GetPayloadMetadataOffsetAndSize(input_zip):
"""Computes the offset and size of the payload metadata for a given package.
(From system/update_engine/update_metadata.proto)
A delta update file contains all the deltas needed to update a system from
one specific version to another specific version. The update format is
represented by this struct pseudocode:
struct delta_update_file {
char magic[4] = "CrAU";
uint64 file_format_version;
uint64 manifest_size; // Size of protobuf DeltaArchiveManifest
// Only present if format_version > 1:
uint32 metadata_signature_size;
// The Bzip2 compressed DeltaArchiveManifest
char manifest[metadata_signature_size];
// The signature of the metadata (from the beginning of the payload up to
// this location, not including the signature itself). This is a
// serialized Signatures message.
char medatada_signature_message[metadata_signature_size];
// Data blobs for files, no specific format. The specific offset
// and length of each data blob is recorded in the DeltaArchiveManifest.
struct {
char data[];
} blobs[];
// These two are not signed:
uint64 payload_signatures_message_size;
char payload_signatures_message[];
};
'payload-metadata.bin' contains all the bytes from the beginning of the
payload, till the end of 'medatada_signature_message'.
"""
payload_info = input_zip.getinfo('payload.bin')
(payload_offset, payload_size) = GetZipEntryOffset(input_zip, payload_info)
# Read the underlying raw zipfile at specified offset
payload_fp = input_zip.fp
payload_fp.seek(payload_offset)
header_bin = payload_fp.read(24)
# network byte order (big-endian)
header = struct.unpack("!IQQL", header_bin)
# 'CrAU'
magic = header[0]
assert magic == 0x43724155, "Invalid magic: {:x}, computed offset {}" \
.format(magic, payload_offset)
manifest_size = header[2]
metadata_signature_size = header[3]
metadata_total = 24 + manifest_size + metadata_signature_size
assert metadata_total < payload_size
return (payload_offset, metadata_total)
def ModifyVABCCompressionParam(content, algo):
""" Update update VABC Compression Param in dynamic_partitions_info.txt
Args:

View File

@@ -21,10 +21,13 @@ import struct
import zipfile
import ota_metadata_pb2
import common
from common import (ZipDelete, ZipClose, OPTIONS, MakeTempFile,
ZipWriteStr, BuildInfo, LoadDictionaryFromFile,
SignFile, PARTITIONS_WITH_BUILD_PROP, PartitionBuildProps,
GetRamdiskFormat)
from payload_signer import PayloadSigner
logger = logging.getLogger(__name__)
@@ -702,3 +705,247 @@ def IsZucchiniCompatible(source_file: str, target_file: str):
sourceEntry = ReadEntry(source_file, _ZUCCHINI_CONFIG_ENTRY_NAME)
targetEntry = ReadEntry(target_file, _ZUCCHINI_CONFIG_ENTRY_NAME)
return sourceEntry and targetEntry and sourceEntry == targetEntry
class Payload(object):
"""Manages the creation and the signing of an A/B OTA Payload."""
PAYLOAD_BIN = 'payload.bin'
PAYLOAD_PROPERTIES_TXT = 'payload_properties.txt'
SECONDARY_PAYLOAD_BIN = 'secondary/payload.bin'
SECONDARY_PAYLOAD_PROPERTIES_TXT = 'secondary/payload_properties.txt'
def __init__(self, secondary=False):
"""Initializes a Payload instance.
Args:
secondary: Whether it's generating a secondary payload (default: False).
"""
self.payload_file = None
self.payload_properties = None
self.secondary = secondary
def _Run(self, cmd): # pylint: disable=no-self-use
# Don't pipe (buffer) the output if verbose is set. Let
# brillo_update_payload write to stdout/stderr directly, so its progress can
# be monitored.
if OPTIONS.verbose:
common.RunAndCheckOutput(cmd, stdout=None, stderr=None)
else:
common.RunAndCheckOutput(cmd)
def Generate(self, target_file, source_file=None, additional_args=None):
"""Generates a payload from the given target-files zip(s).
Args:
target_file: The filename of the target build target-files zip.
source_file: The filename of the source build target-files zip; or None if
generating a full OTA.
additional_args: A list of additional args that should be passed to
brillo_update_payload script; or None.
"""
if additional_args is None:
additional_args = []
payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin")
cmd = ["brillo_update_payload", "generate",
"--payload", payload_file,
"--target_image", target_file]
if source_file is not None:
cmd.extend(["--source_image", source_file])
if OPTIONS.disable_fec_computation:
cmd.extend(["--disable_fec_computation", "true"])
if OPTIONS.disable_verity_computation:
cmd.extend(["--disable_verity_computation", "true"])
cmd.extend(additional_args)
self._Run(cmd)
self.payload_file = payload_file
self.payload_properties = None
def Sign(self, payload_signer):
"""Generates and signs the hashes of the payload and metadata.
Args:
payload_signer: A PayloadSigner() instance that serves the signing work.
Raises:
AssertionError: On any failure when calling brillo_update_payload script.
"""
assert isinstance(payload_signer, PayloadSigner)
# 1. Generate hashes of the payload and metadata files.
payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
cmd = ["brillo_update_payload", "hash",
"--unsigned_payload", self.payload_file,
"--signature_size", str(payload_signer.maximum_signature_size),
"--metadata_hash_file", metadata_sig_file,
"--payload_hash_file", payload_sig_file]
self._Run(cmd)
# 2. Sign the hashes.
signed_payload_sig_file = payload_signer.Sign(payload_sig_file)
signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
# 3. Insert the signatures back into the payload file.
signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
suffix=".bin")
cmd = ["brillo_update_payload", "sign",
"--unsigned_payload", self.payload_file,
"--payload", signed_payload_file,
"--signature_size", str(payload_signer.maximum_signature_size),
"--metadata_signature_file", signed_metadata_sig_file,
"--payload_signature_file", signed_payload_sig_file]
self._Run(cmd)
# 4. Dump the signed payload properties.
properties_file = common.MakeTempFile(prefix="payload-properties-",
suffix=".txt")
cmd = ["brillo_update_payload", "properties",
"--payload", signed_payload_file,
"--properties_file", properties_file]
self._Run(cmd)
if self.secondary:
with open(properties_file, "a") as f:
f.write("SWITCH_SLOT_ON_REBOOT=0\n")
if OPTIONS.wipe_user_data:
with open(properties_file, "a") as f:
f.write("POWERWASH=1\n")
self.payload_file = signed_payload_file
self.payload_properties = properties_file
def WriteToZip(self, output_zip):
"""Writes the payload to the given zip.
Args:
output_zip: The output ZipFile instance.
"""
assert self.payload_file is not None
assert self.payload_properties is not None
if self.secondary:
payload_arcname = Payload.SECONDARY_PAYLOAD_BIN
payload_properties_arcname = Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT
else:
payload_arcname = Payload.PAYLOAD_BIN
payload_properties_arcname = Payload.PAYLOAD_PROPERTIES_TXT
# Add the signed payload file and properties into the zip. In order to
# support streaming, we pack them as ZIP_STORED. So these entries can be
# read directly with the offset and length pairs.
common.ZipWrite(output_zip, self.payload_file, arcname=payload_arcname,
compress_type=zipfile.ZIP_STORED)
common.ZipWrite(output_zip, self.payload_properties,
arcname=payload_properties_arcname,
compress_type=zipfile.ZIP_STORED)
class StreamingPropertyFiles(PropertyFiles):
"""A subclass for computing the property-files for streaming A/B OTAs."""
def __init__(self):
super(StreamingPropertyFiles, self).__init__()
self.name = 'ota-streaming-property-files'
self.required = (
# payload.bin and payload_properties.txt must exist.
'payload.bin',
'payload_properties.txt',
)
self.optional = (
# apex_info.pb isn't directly used in the update flow
'apex_info.pb',
# care_map is available only if dm-verity is enabled.
'care_map.pb',
'care_map.txt',
# compatibility.zip is available only if target supports Treble.
'compatibility.zip',
)
class AbOtaPropertyFiles(StreamingPropertyFiles):
"""The property-files for A/B OTA that includes payload_metadata.bin info.
Since P, we expose one more token (aka property-file), in addition to the ones
for streaming A/B OTA, for a virtual entry of 'payload_metadata.bin'.
'payload_metadata.bin' is the header part of a payload ('payload.bin'), which
doesn't exist as a separate ZIP entry, but can be used to verify if the
payload can be applied on the given device.
For backward compatibility, we keep both of the 'ota-streaming-property-files'
and the newly added 'ota-property-files' in P. The new token will only be
available in 'ota-property-files'.
"""
def __init__(self):
super(AbOtaPropertyFiles, self).__init__()
self.name = 'ota-property-files'
def _GetPrecomputed(self, input_zip):
offset, size = self._GetPayloadMetadataOffsetAndSize(input_zip)
return ['payload_metadata.bin:{}:{}'.format(offset, size)]
@staticmethod
def _GetPayloadMetadataOffsetAndSize(input_zip):
"""Computes the offset and size of the payload metadata for a given package.
(From system/update_engine/update_metadata.proto)
A delta update file contains all the deltas needed to update a system from
one specific version to another specific version. The update format is
represented by this struct pseudocode:
struct delta_update_file {
char magic[4] = "CrAU";
uint64 file_format_version;
uint64 manifest_size; // Size of protobuf DeltaArchiveManifest
// Only present if format_version > 1:
uint32 metadata_signature_size;
// The Bzip2 compressed DeltaArchiveManifest
char manifest[metadata_signature_size];
// The signature of the metadata (from the beginning of the payload up to
// this location, not including the signature itself). This is a
// serialized Signatures message.
char medatada_signature_message[metadata_signature_size];
// Data blobs for files, no specific format. The specific offset
// and length of each data blob is recorded in the DeltaArchiveManifest.
struct {
char data[];
} blobs[];
// These two are not signed:
uint64 payload_signatures_message_size;
char payload_signatures_message[];
};
'payload-metadata.bin' contains all the bytes from the beginning of the
payload, till the end of 'medatada_signature_message'.
"""
payload_info = input_zip.getinfo('payload.bin')
(payload_offset, payload_size) = GetZipEntryOffset(input_zip, payload_info)
# Read the underlying raw zipfile at specified offset
payload_fp = input_zip.fp
payload_fp.seek(payload_offset)
header_bin = payload_fp.read(24)
# network byte order (big-endian)
header = struct.unpack("!IQQL", header_bin)
# 'CrAU'
magic = header[0]
assert magic == 0x43724155, "Invalid magic: {:x}, computed offset {}" \
.format(magic, payload_offset)
manifest_size = header[2]
metadata_signature_size = header[3]
metadata_total = 24 + manifest_size + metadata_signature_size
assert metadata_total < payload_size
return (payload_offset, metadata_total)