releasetools: Add Payload class.
This breaks down the current WriteABOTAPackageWithBrilloScript() into
smaller and testable units, which also prepares for the work in
b/35724498.
Bug: 35724498
Test: python -m unittest test_ota_from_target_files
Test: Get identical A/B OTA packages w/ and w/o the CL.
Change-Id: I2ea45ce98e2d2baa58e94fb829b7242f6fe685a7
Merged-In: I2ea45ce98e2d2baa58e94fb829b7242f6fe685a7
(cherry picked from commit 036d721812
)
This commit is contained in:
@@ -15,12 +15,14 @@
|
||||
#
|
||||
|
||||
import copy
|
||||
import os
|
||||
import os.path
|
||||
import unittest
|
||||
import zipfile
|
||||
|
||||
import common
|
||||
from ota_from_target_files import (
|
||||
_LoadOemDicts, BuildInfo, GetPackageMetadata, PayloadSigner,
|
||||
_LoadOemDicts, BuildInfo, GetPackageMetadata, Payload, PayloadSigner,
|
||||
WriteFingerprintAssertion)
|
||||
|
||||
|
||||
@@ -564,3 +566,157 @@ class PayloadSignerTest(unittest.TestCase):
|
||||
|
||||
verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
|
||||
self._assertFilesEqual(verify_file, signed_file)
|
||||
|
||||
|
||||
class PayloadTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.testdata_dir = get_testdata_dir()
|
||||
self.assertTrue(os.path.exists(self.testdata_dir))
|
||||
|
||||
common.OPTIONS.wipe_user_data = False
|
||||
common.OPTIONS.payload_signer = None
|
||||
common.OPTIONS.payload_signer_args = None
|
||||
common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
|
||||
common.OPTIONS.key_passwords = {
|
||||
common.OPTIONS.package_key : None,
|
||||
}
|
||||
|
||||
def tearDown(self):
|
||||
common.Cleanup()
|
||||
|
||||
@staticmethod
|
||||
def _construct_target_files():
|
||||
target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
|
||||
with zipfile.ZipFile(target_files, 'w') as target_files_zip:
|
||||
# META/update_engine_config.txt
|
||||
target_files_zip.writestr(
|
||||
'META/update_engine_config.txt',
|
||||
"PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n")
|
||||
|
||||
# META/ab_partitions.txt
|
||||
ab_partitions = ['boot', 'system', 'vendor']
|
||||
target_files_zip.writestr(
|
||||
'META/ab_partitions.txt',
|
||||
'\n'.join(ab_partitions))
|
||||
|
||||
# Create dummy images for each of them.
|
||||
for partition in ab_partitions:
|
||||
target_files_zip.writestr('IMAGES/' + partition + '.img',
|
||||
os.urandom(len(partition)))
|
||||
|
||||
return target_files
|
||||
|
||||
def _create_payload_full(self):
|
||||
target_file = self._construct_target_files()
|
||||
payload = Payload()
|
||||
payload.Generate(target_file)
|
||||
return payload
|
||||
|
||||
def _create_payload_incremental(self):
|
||||
target_file = self._construct_target_files()
|
||||
source_file = self._construct_target_files()
|
||||
payload = Payload()
|
||||
payload.Generate(target_file, source_file)
|
||||
return payload
|
||||
|
||||
def test_Generate_full(self):
|
||||
payload = self._create_payload_full()
|
||||
self.assertTrue(os.path.exists(payload.payload_file))
|
||||
|
||||
def test_Generate_incremental(self):
|
||||
payload = self._create_payload_incremental()
|
||||
self.assertTrue(os.path.exists(payload.payload_file))
|
||||
|
||||
def test_Generate_additionalArgs(self):
|
||||
target_file = self._construct_target_files()
|
||||
source_file = self._construct_target_files()
|
||||
payload = Payload()
|
||||
# This should work the same as calling payload.Generate(target_file,
|
||||
# source_file).
|
||||
payload.Generate(
|
||||
target_file, additional_args=["--source_image", source_file])
|
||||
self.assertTrue(os.path.exists(payload.payload_file))
|
||||
|
||||
def test_Generate_invalidInput(self):
|
||||
target_file = self._construct_target_files()
|
||||
common.ZipDelete(target_file, 'IMAGES/vendor.img')
|
||||
payload = Payload()
|
||||
self.assertRaises(AssertionError, payload.Generate, target_file)
|
||||
|
||||
def test_Sign_full(self):
|
||||
payload = self._create_payload_full()
|
||||
payload.Sign(PayloadSigner())
|
||||
|
||||
output_file = common.MakeTempFile(suffix='.zip')
|
||||
with zipfile.ZipFile(output_file, 'w') as output_zip:
|
||||
payload.WriteToZip(output_zip)
|
||||
|
||||
import check_ota_package_signature
|
||||
check_ota_package_signature.VerifyAbOtaPayload(
|
||||
os.path.join(self.testdata_dir, 'testkey.x509.pem'),
|
||||
output_file)
|
||||
|
||||
def test_Sign_incremental(self):
|
||||
payload = self._create_payload_incremental()
|
||||
payload.Sign(PayloadSigner())
|
||||
|
||||
output_file = common.MakeTempFile(suffix='.zip')
|
||||
with zipfile.ZipFile(output_file, 'w') as output_zip:
|
||||
payload.WriteToZip(output_zip)
|
||||
|
||||
import check_ota_package_signature
|
||||
check_ota_package_signature.VerifyAbOtaPayload(
|
||||
os.path.join(self.testdata_dir, 'testkey.x509.pem'),
|
||||
output_file)
|
||||
|
||||
def test_Sign_withDataWipe(self):
|
||||
common.OPTIONS.wipe_user_data = True
|
||||
payload = self._create_payload_full()
|
||||
payload.Sign(PayloadSigner())
|
||||
|
||||
with open(payload.payload_properties) as properties_fp:
|
||||
self.assertIn("POWERWASH=1", properties_fp.read())
|
||||
|
||||
def test_Sign_badSigner(self):
|
||||
"""Tests that signing failure can be captured."""
|
||||
payload = self._create_payload_full()
|
||||
payload_signer = PayloadSigner()
|
||||
payload_signer.signer_args.append('bad-option')
|
||||
self.assertRaises(AssertionError, payload.Sign, payload_signer)
|
||||
|
||||
def test_WriteToZip(self):
|
||||
payload = self._create_payload_full()
|
||||
payload.Sign(PayloadSigner())
|
||||
|
||||
output_file = common.MakeTempFile(suffix='.zip')
|
||||
with zipfile.ZipFile(output_file, 'w') as output_zip:
|
||||
payload.WriteToZip(output_zip)
|
||||
|
||||
with zipfile.ZipFile(output_file) as verify_zip:
|
||||
# First make sure we have the essential entries.
|
||||
namelist = verify_zip.namelist()
|
||||
self.assertIn(Payload.PAYLOAD_BIN, namelist)
|
||||
self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist)
|
||||
|
||||
# Then assert these entries are stored.
|
||||
for entry_info in verify_zip.infolist():
|
||||
if entry_info.filename not in (Payload.PAYLOAD_BIN,
|
||||
Payload.PAYLOAD_PROPERTIES_TXT):
|
||||
continue
|
||||
self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
|
||||
|
||||
def test_WriteToZip_unsignedPayload(self):
|
||||
"""Unsigned payloads should not be allowed to be written to zip."""
|
||||
payload = self._create_payload_full()
|
||||
|
||||
output_file = common.MakeTempFile(suffix='.zip')
|
||||
with zipfile.ZipFile(output_file, 'w') as output_zip:
|
||||
self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
|
||||
|
||||
# Also test with incremental payload.
|
||||
payload = self._create_payload_incremental()
|
||||
|
||||
output_file = common.MakeTempFile(suffix='.zip')
|
||||
with zipfile.ZipFile(output_file, 'w') as output_zip:
|
||||
self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
|
||||
|
Reference in New Issue
Block a user