Generates the care_map.txt in protobuf format

Call the host binary care_map_generator to generate the care_map in
protobuf format. The parsing part of proto messages has already been
supported in the update_verifier.

Bug: 77867897
Test: unittests pass; run add_image_to_target_files
Change-Id: I40d3184b4b5c48a6dd55203afc84cca73d5765e9
This commit is contained in:
Tianjie Xu
2018-08-15 14:28:27 -07:00
parent 075435035f
commit ccbae48ac5
3 changed files with 62 additions and 49 deletions

View File

@@ -2934,6 +2934,7 @@ OTATOOLS := $(HOST_OUT_EXECUTABLES)/minigzip \
$(HOST_OUT_EXECUTABLES)/brillo_update_payload \
$(HOST_OUT_EXECUTABLES)/lib/shflags/shflags \
$(HOST_OUT_EXECUTABLES)/delta_generator \
$(HOST_OUT_EXECUTABLES)/care_map_generator \
$(AVBTOOL) \
$(BLK_ALLOC_TO_BASE_FS) \
$(BROTLI) \

View File

@@ -545,7 +545,7 @@ def AddCareMapTxtForAbOta(output_zip, ab_partitions, image_paths):
Args:
output_zip: The output zip file (needs to be already open), or None to
write images to OPTIONS.input_tmp/.
write care_map.txt to OPTIONS.input_tmp/.
ab_partitions: The list of A/B partitions.
image_paths: A map from the partition name to the image path.
"""
@@ -563,15 +563,32 @@ def AddCareMapTxtForAbOta(output_zip, ab_partitions, image_paths):
assert os.path.exists(image_path)
care_map_list += GetCareMap(partition, image_path)
if care_map_list:
care_map_path = "META/care_map.txt"
if output_zip and care_map_path not in output_zip.namelist():
common.ZipWriteStr(output_zip, care_map_path, '\n'.join(care_map_list))
else:
with open(os.path.join(OPTIONS.input_tmp, care_map_path), 'w') as fp:
fp.write('\n'.join(care_map_list))
if output_zip:
OPTIONS.replace_updated_files_list.append(care_map_path)
if not care_map_list:
return
# Converts the list into proto buf message by calling care_map_generator; and
# writes the result to a temp file.
temp_care_map_text = common.MakeTempFile(prefix="caremap_text-",
suffix=".txt")
with open(temp_care_map_text, 'w') as text_file:
text_file.write('\n'.join(care_map_list))
temp_care_map = common.MakeTempFile(prefix="caremap-", suffix=".txt")
care_map_gen_cmd = (["care_map_generator", temp_care_map_text, temp_care_map])
p = common.Run(care_map_gen_cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output, _ = p.communicate()
assert p.returncode == 0, "Failed to generate the care_map proto message."
if OPTIONS.verbose:
print(output.rstrip())
care_map_path = "META/care_map.txt"
if output_zip and care_map_path not in output_zip.namelist():
common.ZipWrite(output_zip, temp_care_map, arcname=care_map_path)
else:
shutil.copy(temp_care_map, os.path.join(OPTIONS.input_tmp, care_map_path))
if output_zip:
OPTIONS.replace_updated_files_list.append(care_map_path)
def AddPackRadioImages(output_zip, images):

View File

@@ -16,6 +16,7 @@
import os
import os.path
import subprocess
import unittest
import zipfile
@@ -38,6 +39,20 @@ class AddImagesToTargetFilesTest(unittest.TestCase):
def tearDown(self):
common.Cleanup()
def _verifyCareMap(self, expected, file_name):
"""Parses the care_map proto; and checks the content in plain text."""
text_file = common.MakeTempFile(prefix="caremap-", suffix=".txt")
# Calls an external binary to convert the proto message.
cmd = ["care_map_generator", "--parse_proto", file_name, text_file]
p = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output, _ = p.communicate()
self.assertEqual(0, p.returncode)
with open(text_file, 'r') as verify_fp:
plain_text = verify_fp.read()
self.assertEqual('\n'.join(expected), plain_text)
@staticmethod
def _create_images(images, prefix):
"""Creates images under OPTIONS.input_tmp/prefix."""
@@ -155,15 +170,10 @@ class AddImagesToTargetFilesTest(unittest.TestCase):
AddCareMapTxtForAbOta(None, ['system', 'vendor'], image_paths)
care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.txt')
with open(care_map_file, 'r') as verify_fp:
care_map = verify_fp.read()
expected = ['system', RangeSet("0-5 10-15").to_string_raw(), 'vendor',
RangeSet("0-9").to_string_raw()]
lines = care_map.split('\n')
self.assertEqual(4, len(lines))
self.assertEqual('system', lines[0])
self.assertEqual(RangeSet("0-5 10-15").to_string_raw(), lines[1])
self.assertEqual('vendor', lines[2])
self.assertEqual(RangeSet("0-9").to_string_raw(), lines[3])
self._verifyCareMap(expected, care_map_file)
def test_AddCareMapTxtForAbOta_withNonCareMapPartitions(self):
"""Partitions without care_map should be ignored."""
@@ -173,15 +183,10 @@ class AddImagesToTargetFilesTest(unittest.TestCase):
None, ['boot', 'system', 'vendor', 'vbmeta'], image_paths)
care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.txt')
with open(care_map_file, 'r') as verify_fp:
care_map = verify_fp.read()
expected = ['system', RangeSet("0-5 10-15").to_string_raw(), 'vendor',
RangeSet("0-9").to_string_raw()]
lines = care_map.split('\n')
self.assertEqual(4, len(lines))
self.assertEqual('system', lines[0])
self.assertEqual(RangeSet("0-5 10-15").to_string_raw(), lines[1])
self.assertEqual('vendor', lines[2])
self.assertEqual(RangeSet("0-9").to_string_raw(), lines[3])
self._verifyCareMap(expected, care_map_file)
def test_AddCareMapTxtForAbOta_withAvb(self):
"""Tests the case for device using AVB."""
@@ -194,15 +199,10 @@ class AddImagesToTargetFilesTest(unittest.TestCase):
AddCareMapTxtForAbOta(None, ['system', 'vendor'], image_paths)
care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.txt')
with open(care_map_file, 'r') as verify_fp:
care_map = verify_fp.read()
expected = ['system', RangeSet("0-5 10-15").to_string_raw(), 'vendor',
RangeSet("0-9").to_string_raw()]
lines = care_map.split('\n')
self.assertEqual(4, len(lines))
self.assertEqual('system', lines[0])
self.assertEqual(RangeSet("0-5 10-15").to_string_raw(), lines[1])
self.assertEqual('vendor', lines[2])
self.assertEqual(RangeSet("0-9").to_string_raw(), lines[3])
self._verifyCareMap(expected, care_map_file)
def test_AddCareMapTxtForAbOta_verityNotEnabled(self):
"""No care_map.txt should be generated if verity not enabled."""
@@ -228,15 +228,15 @@ class AddImagesToTargetFilesTest(unittest.TestCase):
with zipfile.ZipFile(output_file, 'w') as output_zip:
AddCareMapTxtForAbOta(output_zip, ['system', 'vendor'], image_paths)
care_map_name = "META/care_map.txt"
temp_dir = common.MakeTempDir()
with zipfile.ZipFile(output_file, 'r') as verify_zip:
care_map = verify_zip.read('META/care_map.txt').decode('ascii')
self.assertTrue(care_map_name in verify_zip.namelist())
verify_zip.extract(care_map_name, path=temp_dir)
lines = care_map.split('\n')
self.assertEqual(4, len(lines))
self.assertEqual('system', lines[0])
self.assertEqual(RangeSet("0-5 10-15").to_string_raw(), lines[1])
self.assertEqual('vendor', lines[2])
self.assertEqual(RangeSet("0-9").to_string_raw(), lines[3])
expected = ['system', RangeSet("0-5 10-15").to_string_raw(), 'vendor',
RangeSet("0-9").to_string_raw()]
self._verifyCareMap(expected, os.path.join(temp_dir, care_map_name))
def test_AddCareMapTxtForAbOta_zipOutput_careMapEntryExists(self):
"""Tests the case with ZIP output which already has care_map entry."""
@@ -252,15 +252,10 @@ class AddImagesToTargetFilesTest(unittest.TestCase):
# The one under OPTIONS.input_tmp must have been replaced.
care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.txt')
with open(care_map_file, 'r') as verify_fp:
care_map = verify_fp.read()
expected = ['system', RangeSet("0-5 10-15").to_string_raw(), 'vendor',
RangeSet("0-9").to_string_raw()]
lines = care_map.split('\n')
self.assertEqual(4, len(lines))
self.assertEqual('system', lines[0])
self.assertEqual(RangeSet("0-5 10-15").to_string_raw(), lines[1])
self.assertEqual('vendor', lines[2])
self.assertEqual(RangeSet("0-9").to_string_raw(), lines[3])
self._verifyCareMap(expected, care_map_file)
# The existing entry should be scheduled to be replaced.
self.assertIn('META/care_map.txt', OPTIONS.replace_updated_files_list)