generate version 2 blockimgdiff files

Generate version 2 of the block_image_update transfer list format.
This improves patch size by a different strategy for dealing with
out-of-order transfers.  If transfer A must be done before transfer B
due to B overwriting A's source but we want to do B before A, we
resolve the conflict by:

  - before B is executed, we save ("stash") the overlapping region (ie
    the blocks B will overwrite that A wants to read)

  - when A is executed, it will read those parts of source data from
    the stash rather than from the image.

This reverses the ordering constraint; with these additions now B
*must* go before A.  The implementation of the stash is left up to the
code that executes the transfer list to apply the patch; it could hold
stashed data in RAM or on a scratch disk such as /cache, if available.

The code retains the ability to build a version 1 block image patch;
it's needed for processing older target-files.

Change-Id: I7259c1fcd41a0aa7767dab63f086951afa2aa657
This commit is contained in:
Doug Zongker
2014-09-08 08:29:55 -07:00
parent 82ba98de48
commit cf4fda7dc0
3 changed files with 260 additions and 23 deletions

View File

@@ -16,6 +16,7 @@ from __future__ import print_function
from collections import deque, OrderedDict from collections import deque, OrderedDict
from hashlib import sha1 from hashlib import sha1
import heapq
import itertools import itertools
import multiprocessing import multiprocessing
import os import os
@@ -142,9 +143,16 @@ class Transfer(object):
self.goes_before = {} self.goes_before = {}
self.goes_after = {} self.goes_after = {}
self.stash_before = []
self.use_stash = []
self.id = len(by_id) self.id = len(by_id)
by_id.append(self) by_id.append(self)
def NetStashChange(self):
return (sum(sr.size() for (_, sr) in self.stash_before) -
sum(sr.size() for (_, sr) in self.use_stash))
def __str__(self): def __str__(self):
return (str(self.id) + ": <" + str(self.src_ranges) + " " + self.style + return (str(self.id) + ": <" + str(self.src_ranges) + " " + self.style +
" to " + str(self.tgt_ranges) + ">") " to " + str(self.tgt_ranges) + ">")
@@ -182,11 +190,14 @@ class Transfer(object):
# original image. # original image.
class BlockImageDiff(object): class BlockImageDiff(object):
def __init__(self, tgt, src=None, threads=None): def __init__(self, tgt, src=None, threads=None, version=2):
if threads is None: if threads is None:
threads = multiprocessing.cpu_count() // 2 threads = multiprocessing.cpu_count() // 2
if threads == 0: threads = 1 if threads == 0: threads = 1
self.threads = threads self.threads = threads
self.version = version
assert version in (1, 2)
self.tgt = tgt self.tgt = tgt
if src is None: if src is None:
@@ -221,7 +232,12 @@ class BlockImageDiff(object):
self.FindVertexSequence() self.FindVertexSequence()
# Fix up the ordering dependencies that the sequence didn't # Fix up the ordering dependencies that the sequence didn't
# satisfy. # satisfy.
self.RemoveBackwardEdges() if self.version == 1:
self.RemoveBackwardEdges()
else:
self.ReverseBackwardEdges()
self.ImproveVertexSequence()
# Double-check our work. # Double-check our work.
self.AssertSequenceGood() self.AssertSequenceGood()
@@ -231,18 +247,88 @@ class BlockImageDiff(object):
def WriteTransfers(self, prefix): def WriteTransfers(self, prefix):
out = [] out = []
out.append("1\n") # format version number out.append("%d\n" % (self.version,)) # format version number
total = 0 total = 0
performs_read = False performs_read = False
stashes = {}
stashed_blocks = 0
max_stashed_blocks = 0
free_stash_ids = []
next_stash_id = 0
for xf in self.transfers: for xf in self.transfers:
# zero [rangeset] if self.version < 2:
# new [rangeset] assert not xf.stash_before
# bsdiff patchstart patchlen [src rangeset] [tgt rangeset] assert not xf.use_stash
# imgdiff patchstart patchlen [src rangeset] [tgt rangeset]
# move [src rangeset] [tgt rangeset] for s, sr in xf.stash_before:
# erase [rangeset] assert s not in stashes
if free_stash_ids:
sid = heapq.heappop(free_stash_ids)
else:
sid = next_stash_id
next_stash_id += 1
stashes[s] = sid
stashed_blocks += sr.size()
out.append("stash %d %s\n" % (sid, sr.to_string_raw()))
if stashed_blocks > max_stashed_blocks:
max_stashed_blocks = stashed_blocks
if self.version == 1:
src_string = xf.src_ranges.to_string_raw()
elif self.version == 2:
# <# blocks> <src ranges>
# OR
# <# blocks> <src ranges> <src locs> <stash refs...>
# OR
# <# blocks> - <stash refs...>
size = xf.src_ranges.size()
src_string = [str(size)]
unstashed_src_ranges = xf.src_ranges
mapped_stashes = []
for s, sr in xf.use_stash:
sid = stashes.pop(s)
stashed_blocks -= sr.size()
unstashed_src_ranges = unstashed_src_ranges.subtract(sr)
sr = xf.src_ranges.map_within(sr)
mapped_stashes.append(sr)
src_string.append("%d:%s" % (sid, sr.to_string_raw()))
heapq.heappush(free_stash_ids, sid)
if unstashed_src_ranges:
src_string.insert(1, unstashed_src_ranges.to_string_raw())
if xf.use_stash:
mapped_unstashed = xf.src_ranges.map_within(unstashed_src_ranges)
src_string.insert(2, mapped_unstashed.to_string_raw())
mapped_stashes.append(mapped_unstashed)
self.AssertPartition(RangeSet(data=(0, size)), mapped_stashes)
else:
src_string.insert(1, "-")
self.AssertPartition(RangeSet(data=(0, size)), mapped_stashes)
src_string = " ".join(src_string)
# both versions:
# zero <rangeset>
# new <rangeset>
# erase <rangeset>
#
# version 1:
# bsdiff patchstart patchlen <src rangeset> <tgt rangeset>
# imgdiff patchstart patchlen <src rangeset> <tgt rangeset>
# move <src rangeset> <tgt rangeset>
#
# version 2:
# bsdiff patchstart patchlen <tgt rangeset> <src_string>
# imgdiff patchstart patchlen <tgt rangeset> <src_string>
# move <tgt rangeset> <src_string>
tgt_size = xf.tgt_ranges.size() tgt_size = xf.tgt_ranges.size()
@@ -255,17 +341,27 @@ class BlockImageDiff(object):
assert xf.tgt_ranges assert xf.tgt_ranges
assert xf.src_ranges.size() == tgt_size assert xf.src_ranges.size() == tgt_size
if xf.src_ranges != xf.tgt_ranges: if xf.src_ranges != xf.tgt_ranges:
out.append("%s %s %s\n" % ( if self.version == 1:
xf.style, out.append("%s %s %s\n" % (
xf.src_ranges.to_string_raw(), xf.tgt_ranges.to_string_raw())) xf.style,
xf.src_ranges.to_string_raw(), xf.tgt_ranges.to_string_raw()))
elif self.version == 2:
out.append("%s %s %s\n" % (
xf.style,
xf.tgt_ranges.to_string_raw(), src_string))
total += tgt_size total += tgt_size
elif xf.style in ("bsdiff", "imgdiff"): elif xf.style in ("bsdiff", "imgdiff"):
performs_read = True performs_read = True
assert xf.tgt_ranges assert xf.tgt_ranges
assert xf.src_ranges assert xf.src_ranges
out.append("%s %d %d %s %s\n" % ( if self.version == 1:
xf.style, xf.patch_start, xf.patch_len, out.append("%s %d %d %s %s\n" % (
xf.src_ranges.to_string_raw(), xf.tgt_ranges.to_string_raw())) xf.style, xf.patch_start, xf.patch_len,
xf.src_ranges.to_string_raw(), xf.tgt_ranges.to_string_raw()))
elif self.version == 2:
out.append("%s %d %d %s %s\n" % (
xf.style, xf.patch_start, xf.patch_len,
xf.tgt_ranges.to_string_raw(), src_string))
total += tgt_size total += tgt_size
elif xf.style == "zero": elif xf.style == "zero":
assert xf.tgt_ranges assert xf.tgt_ranges
@@ -277,6 +373,15 @@ class BlockImageDiff(object):
raise ValueError, "unknown transfer style '%s'\n" % (xf.style,) raise ValueError, "unknown transfer style '%s'\n" % (xf.style,)
out.insert(1, str(total) + "\n") out.insert(1, str(total) + "\n")
if self.version >= 2:
# version 2 only: after the total block count, we give the number
# of stash slots needed, and the maximum size needed (in blocks)
out.insert(2, str(next_stash_id) + "\n")
out.insert(3, str(max_stashed_blocks) + "\n")
# sanity check: abort if we're going to need more than 512 MB if
# stash space
assert max_stashed_blocks * self.tgt.blocksize < (512 << 20)
all_tgt = RangeSet(data=(0, self.tgt.total_blocks)) all_tgt = RangeSet(data=(0, self.tgt.total_blocks))
if performs_read: if performs_read:
@@ -295,6 +400,10 @@ class BlockImageDiff(object):
for i in out: for i in out:
f.write(i) f.write(i)
if self.version >= 2:
print("max stashed blocks: %d (%d bytes)\n" % (
max_stashed_blocks, max_stashed_blocks * self.tgt.blocksize))
def ComputePatches(self, prefix): def ComputePatches(self, prefix):
print("Reticulating splines...") print("Reticulating splines...")
diff_q = [] diff_q = []
@@ -409,7 +518,13 @@ class BlockImageDiff(object):
# Imagine processing the transfers in order. # Imagine processing the transfers in order.
for xf in self.transfers: for xf in self.transfers:
# Check that the input blocks for this transfer haven't yet been touched. # Check that the input blocks for this transfer haven't yet been touched.
assert not touched.overlaps(xf.src_ranges)
x = xf.src_ranges
if self.version >= 2:
for _, sr in xf.use_stash:
x = x.subtract(sr)
assert not touched.overlaps(x)
# Check that the output blocks for this transfer haven't yet been touched. # Check that the output blocks for this transfer haven't yet been touched.
assert not touched.overlaps(xf.tgt_ranges) assert not touched.overlaps(xf.tgt_ranges)
# Touch all the blocks written by this transfer. # Touch all the blocks written by this transfer.
@@ -418,6 +533,47 @@ class BlockImageDiff(object):
# Check that we've written every target block. # Check that we've written every target block.
assert touched == self.tgt.care_map assert touched == self.tgt.care_map
def ImproveVertexSequence(self):
print("Improving vertex order...")
# At this point our digraph is acyclic; we reversed any edges that
# were backwards in the heuristically-generated sequence. The
# previously-generated order is still acceptable, but we hope to
# find a better order that needs less memory for stashed data.
# Now we do a topological sort to generate a new vertex order,
# using a greedy algorithm to choose which vertex goes next
# whenever we have a choice.
# Make a copy of the edge set; this copy will get destroyed by the
# algorithm.
for xf in self.transfers:
xf.incoming = xf.goes_after.copy()
xf.outgoing = xf.goes_before.copy()
L = [] # the new vertex order
# S is the set of sources in the remaining graph; we always choose
# the one that leaves the least amount of stashed data after it's
# executed.
S = [(u.NetStashChange(), u.order, u) for u in self.transfers
if not u.incoming]
heapq.heapify(S)
while S:
_, _, xf = heapq.heappop(S)
L.append(xf)
for u in xf.outgoing:
del u.incoming[xf]
if not u.incoming:
heapq.heappush(S, (u.NetStashChange(), u.order, u))
# if this fails then our graph had a cycle.
assert len(L) == len(self.transfers)
self.transfers = L
for i, xf in enumerate(L):
xf.order = i
def RemoveBackwardEdges(self): def RemoveBackwardEdges(self):
print("Removing backward edges...") print("Removing backward edges...")
in_order = 0 in_order = 0
@@ -425,19 +581,17 @@ class BlockImageDiff(object):
lost_source = 0 lost_source = 0
for xf in self.transfers: for xf in self.transfers:
io = 0
ooo = 0
lost = 0 lost = 0
size = xf.src_ranges.size() size = xf.src_ranges.size()
for u in xf.goes_before: for u in xf.goes_before:
# xf should go before u # xf should go before u
if xf.order < u.order: if xf.order < u.order:
# it does, hurray! # it does, hurray!
io += 1 in_order += 1
else: else:
# it doesn't, boo. trim the blocks that u writes from xf's # it doesn't, boo. trim the blocks that u writes from xf's
# source, so that xf can go after u. # source, so that xf can go after u.
ooo += 1 out_of_order += 1
assert xf.src_ranges.overlaps(u.tgt_ranges) assert xf.src_ranges.overlaps(u.tgt_ranges)
xf.src_ranges = xf.src_ranges.subtract(u.tgt_ranges) xf.src_ranges = xf.src_ranges.subtract(u.tgt_ranges)
xf.intact = False xf.intact = False
@@ -448,8 +602,6 @@ class BlockImageDiff(object):
lost = size - xf.src_ranges.size() lost = size - xf.src_ranges.size()
lost_source += lost lost_source += lost
in_order += io
out_of_order += ooo
print((" %d/%d dependencies (%.2f%%) were violated; " print((" %d/%d dependencies (%.2f%%) were violated; "
"%d source blocks removed.") % "%d source blocks removed.") %
@@ -458,6 +610,48 @@ class BlockImageDiff(object):
if (in_order + out_of_order) else 0.0, if (in_order + out_of_order) else 0.0,
lost_source)) lost_source))
def ReverseBackwardEdges(self):
print("Reversing backward edges...")
in_order = 0
out_of_order = 0
stashes = 0
stash_size = 0
for xf in self.transfers:
lost = 0
size = xf.src_ranges.size()
for u in xf.goes_before.copy():
# xf should go before u
if xf.order < u.order:
# it does, hurray!
in_order += 1
else:
# it doesn't, boo. modify u to stash the blocks that it
# writes that xf wants to read, and then require u to go
# before xf.
out_of_order += 1
overlap = xf.src_ranges.intersect(u.tgt_ranges)
assert overlap
u.stash_before.append((stashes, overlap))
xf.use_stash.append((stashes, overlap))
stashes += 1
stash_size += overlap.size()
# reverse the edge direction; now xf must go after u
del xf.goes_before[u]
del u.goes_after[xf]
xf.goes_after[u] = None # value doesn't matter
u.goes_before[xf] = None
print((" %d/%d dependencies (%.2f%%) were violated; "
"%d source blocks stashed.") %
(out_of_order, in_order + out_of_order,
(out_of_order * 100.0 / (in_order + out_of_order))
if (in_order + out_of_order) else 0.0,
stash_size))
def FindVertexSequence(self): def FindVertexSequence(self):
print("Finding vertex sequence...") print("Finding vertex sequence...")

View File

@@ -1021,7 +1021,14 @@ class BlockDifference:
self.src = src self.src = src
self.partition = partition self.partition = partition
b = blockimgdiff.BlockImageDiff(tgt, src, threads=OPTIONS.worker_threads) version = 1
if OPTIONS.info_dict:
version = max(
int(i) for i in
OPTIONS.info_dict.get("blockimgdiff_versions", "1").split(","))
b = blockimgdiff.BlockImageDiff(tgt, src, threads=OPTIONS.worker_threads,
version=version)
tmpdir = tempfile.mkdtemp() tmpdir = tempfile.mkdtemp()
OPTIONS.tempfiles.append(tmpdir) OPTIONS.tempfiles.append(tmpdir)
self.path = os.path.join(tmpdir, partition) self.path = os.path.join(tmpdir, partition)

View File

@@ -173,3 +173,39 @@ class RangeSet(object):
else: else:
total -= p total -= p
return total return total
def map_within(self, other):
"""'other' should be a subset of 'self'. Returns a RangeSet
representing what 'other' would get translated to if the integers
of 'self' were translated down to be contiguous starting at zero.
>>> RangeSet.parse("0-9").map_within(RangeSet.parse("3-4")).to_string()
'3-4'
>>> RangeSet.parse("10-19").map_within(RangeSet.parse("13-14")).to_string()
'3-4'
>>> RangeSet.parse("10-19 30-39").map_within(
... RangeSet.parse("17-19 30-32")).to_string()
'7-12'
>>> RangeSet.parse("10-19 30-39").map_within(
... RangeSet.parse("12-13 17-19 30-32")).to_string()
'2-3 7-12'
"""
out = []
offset = 0
start = None
for p, d in heapq.merge(zip(self.data, itertools.cycle((-5, +5))),
zip(other.data, itertools.cycle((-1, +1)))):
if d == -5:
start = p
elif d == +5:
offset += p-start
start = None
else:
out.append(offset + p - start)
return RangeSet(data=out)
if __name__ == "__main__":
import doctest
doctest.testmod()