Add multithread support to call imgdiff with block-limit

With the new implementation of handling large apks, we need to call
imgdiff with block-limit to split the apk and generate the patch at
the same time. The call to imgdiff would significantly increase the
time consumption of the "FindTransfers" function which we used to
execute sequentially. This cl addresses this issue and speeds up the
process by making the imgdiff call parallel.

Bug: 34220646
Test: Create and sideload an incremental package for angler
Change-Id: Id62e348418fc1d22e32ea6c8ac16d9ab3ec92d7b
This commit is contained in:
Tianjie Xu
2017-09-08 17:19:02 -07:00
parent 44cb0db6a7
commit 2536607d90

View File

@@ -16,6 +16,7 @@ from __future__ import print_function
import array
import common
import copy
import functools
import heapq
import itertools
@@ -204,6 +205,18 @@ class Transfer(object):
self.id = len(by_id)
by_id.append(self)
self._patch = None
@property
def patch(self):
return self._patch
@patch.setter
def patch(self, patch):
if patch:
assert self.style == "diff"
self._patch = patch
def NetStashChange(self):
return (sum(sr.size() for (_, sr) in self.stash_before) -
sum(sr.size() for (_, sr) in self.use_stash))
@@ -213,6 +226,7 @@ class Transfer(object):
self.use_stash = []
self.style = "new"
self.src_ranges = RangeSet()
self.patch = None
def __str__(self):
return (str(self.id) + ": <" + str(self.src_ranges) + " " + self.style +
@@ -675,6 +689,7 @@ class BlockImageDiff(object):
# These are identical; we don't need to generate a patch,
# just issue copy commands on the device.
xf.style = "move"
xf.patch = None
tgt_size = xf.tgt_ranges.size() * self.tgt.blocksize
if xf.src_ranges != xf.tgt_ranges:
print("%10d %10d (%6.2f%%) %7s %s %s (from %s)" % (
@@ -683,24 +698,33 @@ class BlockImageDiff(object):
xf.tgt_name + " (from " + xf.src_name + ")"),
str(xf.tgt_ranges), str(xf.src_ranges)))
else:
# For files in zip format (eg, APKs, JARs, etc.) we would
# like to use imgdiff -z if possible (because it usually
# produces significantly smaller patches than bsdiff).
# This is permissible if:
#
# - imgdiff is not disabled, and
# - the source and target files are monotonic (ie, the
# data is stored with blocks in increasing order), and
# - we haven't removed any blocks from the source set.
#
# If these conditions are satisfied then appending all the
# blocks in the set together in order will produce a valid
# zip file (plus possibly extra zeros in the last block),
# which is what imgdiff needs to operate. (imgdiff is
# fine with extra zeros at the end of the file.)
imgdiff = (not self.disable_imgdiff and xf.intact and
xf.tgt_name.split(".")[-1].lower()
in ("apk", "jar", "zip"))
if xf.patch:
# We have already generated the patch with imgdiff. Check if the
# transfer is intact.
assert not self.disable_imgdiff
imgdiff = True
if not xf.intact:
imgdiff = False
xf.patch = None
else:
# For files in zip format (eg, APKs, JARs, etc.) we would
# like to use imgdiff -z if possible (because it usually
# produces significantly smaller patches than bsdiff).
# This is permissible if:
#
# - imgdiff is not disabled, and
# - the source and target files are monotonic (ie, the
# data is stored with blocks in increasing order), and
# - we haven't removed any blocks from the source set.
#
# If these conditions are satisfied then appending all the
# blocks in the set together in order will produce a valid
# zip file (plus possibly extra zeros in the last block),
# which is what imgdiff needs to operate. (imgdiff is
# fine with extra zeros at the end of the file.)
imgdiff = (not self.disable_imgdiff and xf.intact and
xf.tgt_name.split(".")[-1].lower()
in ("apk", "jar", "zip"))
xf.style = "imgdiff" if imgdiff else "bsdiff"
diff_queue.append((index, imgdiff, patch_num))
patch_num += 1
@@ -738,48 +762,51 @@ class BlockImageDiff(object):
xf_index, imgdiff, patch_index = diff_queue.pop()
xf = self.transfers[xf_index]
src_ranges = xf.src_ranges
tgt_ranges = xf.tgt_ranges
patch = xf.patch
if not patch:
src_ranges = xf.src_ranges
tgt_ranges = xf.tgt_ranges
# Needs lock since WriteRangeDataToFd() is stateful (calling seek).
with lock:
src_file = common.MakeTempFile(prefix="src-")
with open(src_file, "wb") as fd:
self.src.WriteRangeDataToFd(src_ranges, fd)
tgt_file = common.MakeTempFile(prefix="tgt-")
with open(tgt_file, "wb") as fd:
self.tgt.WriteRangeDataToFd(tgt_ranges, fd)
message = []
try:
patch = compute_patch(src_file, tgt_file, imgdiff)
except ValueError as e:
message.append(
"Failed to generate %s for %s: tgt=%s, src=%s:\n%s" % (
"imgdiff" if imgdiff else "bsdiff",
xf.tgt_name if xf.tgt_name == xf.src_name else
xf.tgt_name + " (from " + xf.src_name + ")",
xf.tgt_ranges, xf.src_ranges, e.message))
# TODO(b/68016761): Better handle the holes in mke2fs created images.
if imgdiff:
try:
patch = compute_patch(src_file, tgt_file, imgdiff=False)
message.append(
"Fell back and generated with bsdiff instead for %s" % (
xf.tgt_name,))
xf.style = "bsdiff"
with lock:
warning_messages.extend(message)
del message[:]
except ValueError as e:
message.append(
"Also failed to generate with bsdiff for %s:\n%s" % (
xf.tgt_name, e.message))
if message:
# Needs lock since WriteRangeDataToFd() is stateful (calling seek).
with lock:
error_messages.extend(message)
src_file = common.MakeTempFile(prefix="src-")
with open(src_file, "wb") as fd:
self.src.WriteRangeDataToFd(src_ranges, fd)
tgt_file = common.MakeTempFile(prefix="tgt-")
with open(tgt_file, "wb") as fd:
self.tgt.WriteRangeDataToFd(tgt_ranges, fd)
message = []
try:
patch = compute_patch(src_file, tgt_file, imgdiff)
except ValueError as e:
message.append(
"Failed to generate %s for %s: tgt=%s, src=%s:\n%s" % (
"imgdiff" if imgdiff else "bsdiff",
xf.tgt_name if xf.tgt_name == xf.src_name else
xf.tgt_name + " (from " + xf.src_name + ")",
xf.tgt_ranges, xf.src_ranges, e.message))
# TODO(b/68016761): Better handle the holes in mke2fs created
# images.
if imgdiff:
try:
patch = compute_patch(src_file, tgt_file, imgdiff=False)
message.append(
"Fell back and generated with bsdiff instead for %s" % (
xf.tgt_name,))
xf.style = "bsdiff"
with lock:
warning_messages.extend(message)
del message[:]
except ValueError as e:
message.append(
"Also failed to generate with bsdiff for %s:\n%s" % (
xf.tgt_name, e.message))
if message:
with lock:
error_messages.extend(message)
with lock:
patches[patch_index] = (xf_index, patch)
@@ -1167,12 +1194,9 @@ class BlockImageDiff(object):
Compared to the fixed 1024-block limit, it reduces the overall package
size by 30% for volantis, and 20% for angler and bullhead."""
assert style == "diff"
# Possibly split large files into smaller chunks.
pieces = 0
cache_size = common.OPTIONS.cache_size
split_threshold = 0.125
max_blocks_per_transfer = int(cache_size * split_threshold /
self.tgt.blocksize)
# Change nothing for small files.
if (tgt_ranges.size() <= max_blocks_per_transfer and
@@ -1182,6 +1206,14 @@ class BlockImageDiff(object):
style, by_id)
return
if tgt_name.split(".")[-1].lower() in ("apk", "jar", "zip"):
split_enable = (not self.disable_imgdiff and src_ranges.monotonic and
tgt_ranges.monotonic)
if split_enable and (self.tgt.RangeSha1(tgt_ranges) !=
self.src.RangeSha1(src_ranges)):
large_apks.append((tgt_name, src_name, tgt_ranges, src_ranges))
return
while (tgt_ranges.size() > max_blocks_per_transfer and
src_ranges.size() > max_blocks_per_transfer):
tgt_split_name = "%s-%d" % (tgt_name, pieces)
@@ -1275,8 +1307,136 @@ class BlockImageDiff(object):
AddSplitTransfers(
tgt_name, src_name, tgt_ranges, src_ranges, style, by_id)
def ParseAndValidateSplitInfo(patch_size, tgt_ranges, src_ranges,
split_info):
"""Parse the split_info and return a list of info tuples.
Args:
patch_size: total size of the patch file.
tgt_ranges: Ranges of the target file within the original image.
src_ranges: Ranges of the source file within the original image.
split_info format:
imgdiff version#
count of pieces
<patch_size_1> <tgt_size_1> <src_ranges_1>
...
<patch_size_n> <tgt_size_n> <src_ranges_n>
Returns:
[patch_start, patch_len, split_tgt_ranges, split_src_ranges]
"""
version = int(split_info[0])
assert version == 2
count = int(split_info[1])
assert len(split_info) - 2 == count
split_info_list = []
patch_start = 0
tgt_remain = copy.deepcopy(tgt_ranges)
# each line has the format <patch_size>, <tgt_size>, <src_ranges>
for line in split_info[2:]:
info = line.split()
assert len(info) == 3
patch_length = int(info[0])
split_tgt_size = int(info[1])
assert split_tgt_size % 4096 == 0
assert split_tgt_size / 4096 <= tgt_remain.size()
split_tgt_ranges = tgt_remain.first(split_tgt_size / 4096)
tgt_remain = tgt_remain.subtract(split_tgt_ranges)
# Find the split_src_ranges within the image file from its relative
# position in file.
split_src_indices = RangeSet.parse_raw(info[2])
split_src_ranges = RangeSet()
for r in split_src_indices:
curr_range = src_ranges.first(r[1]).subtract(src_ranges.first(r[0]))
assert not split_src_ranges.overlaps(curr_range)
split_src_ranges = split_src_ranges.union(curr_range)
split_info_list.append((patch_start, patch_length,
split_tgt_ranges, split_src_ranges))
patch_start += patch_length
# Check that the sizes of all the split pieces add up to the final file
# size for patch and target.
assert tgt_remain.size() == 0
assert patch_start == patch_size
return split_info_list
def AddSplitTransferForLargeApks():
"""Create split transfers for large apk files.
Example: Chrome.apk will be split into
src-0: Chrome.apk-0, tgt-0: Chrome.apk-0
src-1: Chrome.apk-1, tgt-1: Chrome.apk-1
...
After the split, the target pieces are continuous and block aligned; and
the source pieces are mutually exclusive. During the split, we also
generate and save the image patch between src-X & tgt-X. This patch will
be valid because the block ranges of src-X & tgt-X will always stay the
same afterwards; but there's a chance we don't use the patch if we
convert the "diff" command into "new" or "move" later.
"""
while True:
with transfer_lock:
if not large_apks:
return
tgt_name, src_name, tgt_ranges, src_ranges = large_apks.pop(0)
src_file = common.MakeTempFile(prefix="src-")
tgt_file = common.MakeTempFile(prefix="tgt-")
with transfer_lock:
with open(src_file, "wb") as src_fd:
self.src.WriteRangeDataToFd(src_ranges, src_fd)
with open(tgt_file, "wb") as tgt_fd:
self.tgt.WriteRangeDataToFd(tgt_ranges, tgt_fd)
patch_file = common.MakeTempFile(prefix="patch-")
patch_info_file = common.MakeTempFile(prefix="split_info-")
cmd = ["imgdiff", "-z",
"--block-limit={}".format(max_blocks_per_transfer),
"--split-info=" + patch_info_file,
src_file, tgt_file, patch_file]
p = common.Run(cmd, stdout=subprocess.PIPE)
p.communicate()
# TODO(xunchang) fall back to the normal split if imgdiff fails.
if p.returncode != 0:
raise ValueError("Failed to create patch between {} and {}".format(
src_name, tgt_name))
with open(patch_info_file) as patch_info:
lines = patch_info.readlines()
patch_size_total = os.path.getsize(patch_file)
split_info_list = ParseAndValidateSplitInfo(patch_size_total,
tgt_ranges, src_ranges,
lines)
for index, (patch_start, patch_length, split_tgt_ranges,
split_src_ranges) in enumerate(split_info_list):
with open(patch_file) as f:
f.seek(patch_start)
patch_content = f.read(patch_length)
split_src_name = "{}-{}".format(src_name, index)
split_tgt_name = "{}-{}".format(tgt_name, index)
transfer_split = Transfer(split_tgt_name, split_src_name,
split_tgt_ranges, split_src_ranges,
self.tgt.RangeSha1(split_tgt_ranges),
self.src.RangeSha1(split_src_ranges),
"diff", self.transfers)
transfer_split.patch = patch_content
print("Finding transfers...")
large_apks = []
cache_size = common.OPTIONS.cache_size
split_threshold = 0.125
max_blocks_per_transfer = int(cache_size * split_threshold /
self.tgt.blocksize)
empty = RangeSet()
for tgt_fn, tgt_ranges in self.tgt.file_map.items():
if tgt_fn == "__ZERO":
@@ -1321,6 +1481,14 @@ class BlockImageDiff(object):
AddTransfer(tgt_fn, None, tgt_ranges, empty, "new", self.transfers)
transfer_lock = threading.Lock()
threads = [threading.Thread(target=AddSplitTransferForLargeApks)
for _ in range(self.threads)]
for th in threads:
th.start()
while threads:
threads.pop().join()
def AbbreviateSourceNames(self):
for k in self.src.file_map.keys():
b = os.path.basename(k)