Revert "Refactor rate_limit.go for more clarify"
This reverts commit 69f3b3e946
.
Caused rare deadlocks.
Bug: 64536066
Bug: 64813447
Change-Id: Ieb1b931bb2c0afdd8bf8edbfc32c373df3c08d8d
This commit is contained in:
@@ -163,8 +163,7 @@ type zipWriter struct {
|
||||
errors chan error
|
||||
writeOps chan chan *zipEntry
|
||||
|
||||
cpuRateLimiter *CPURateLimiter
|
||||
memoryRateLimiter *MemoryRateLimiter
|
||||
rateLimit *RateLimit
|
||||
|
||||
compressorPool sync.Pool
|
||||
compLevel int
|
||||
@@ -296,12 +295,8 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin
|
||||
// The RateLimit object will put the upper bounds on the number of
|
||||
// parallel compressions and outstanding buffers.
|
||||
z.writeOps = make(chan chan *zipEntry, 1000)
|
||||
z.cpuRateLimiter = NewCPURateLimiter(int64(*parallelJobs))
|
||||
z.memoryRateLimiter = NewMemoryRateLimiter(0)
|
||||
defer func() {
|
||||
z.cpuRateLimiter.Stop()
|
||||
z.memoryRateLimiter.Stop()
|
||||
}()
|
||||
z.rateLimit = NewRateLimit(*parallelJobs, 0)
|
||||
defer z.rateLimit.Stop()
|
||||
|
||||
go func() {
|
||||
var err error
|
||||
@@ -391,7 +386,7 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
z.memoryRateLimiter.Finish(count)
|
||||
z.rateLimit.Release(int(count))
|
||||
|
||||
currentReader = nil
|
||||
|
||||
@@ -461,7 +456,7 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
|
||||
return err
|
||||
}
|
||||
|
||||
z.cpuRateLimiter.Request()
|
||||
exec := z.rateLimit.RequestExecution()
|
||||
|
||||
if method == zip.Deflate && fileSize >= minParallelFileSize {
|
||||
wg := new(sync.WaitGroup)
|
||||
@@ -478,14 +473,14 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
|
||||
// know the result before we can begin writing the compressed
|
||||
// data out to the zipfile.
|
||||
wg.Add(1)
|
||||
go z.crcFile(r, ze, compressChan, wg)
|
||||
go z.crcFile(r, ze, exec, compressChan, wg)
|
||||
|
||||
for start := int64(0); start < fileSize; start += parallelBlockSize {
|
||||
sr := io.NewSectionReader(r, start, parallelBlockSize)
|
||||
resultChan := make(chan io.Reader, 1)
|
||||
ze.futureReaders <- resultChan
|
||||
|
||||
z.cpuRateLimiter.Request()
|
||||
exec := z.rateLimit.RequestExecution()
|
||||
|
||||
last := !(start+parallelBlockSize < fileSize)
|
||||
var dict []byte
|
||||
@@ -494,7 +489,7 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go z.compressPartialFile(sr, dict, last, resultChan, wg)
|
||||
go z.compressPartialFile(sr, dict, last, exec, resultChan, wg)
|
||||
}
|
||||
|
||||
close(ze.futureReaders)
|
||||
@@ -505,15 +500,15 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
|
||||
f.Close()
|
||||
}(wg, r)
|
||||
} else {
|
||||
go z.compressWholeFile(ze, r, compressChan)
|
||||
go z.compressWholeFile(ze, r, exec, compressChan)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry, wg *sync.WaitGroup) {
|
||||
func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultChan chan *zipEntry, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
defer z.cpuRateLimiter.Finish()
|
||||
defer exec.Finish(0)
|
||||
|
||||
crc := crc32.NewIEEE()
|
||||
_, err := io.Copy(crc, r)
|
||||
@@ -527,7 +522,7 @@ func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry
|
||||
close(resultChan)
|
||||
}
|
||||
|
||||
func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, resultChan chan io.Reader, wg *sync.WaitGroup) {
|
||||
func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exec Execution, resultChan chan io.Reader, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
result, err := z.compressBlock(r, dict, last)
|
||||
@@ -536,9 +531,7 @@ func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, res
|
||||
return
|
||||
}
|
||||
|
||||
z.memoryRateLimiter.Request(int64(result.Len()))
|
||||
z.cpuRateLimiter.Finish()
|
||||
|
||||
exec.Finish(result.Len())
|
||||
resultChan <- result
|
||||
}
|
||||
|
||||
@@ -576,7 +569,7 @@ func (z *zipWriter) compressBlock(r io.Reader, dict []byte, last bool) (*bytes.B
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan chan *zipEntry) {
|
||||
func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, compressChan chan *zipEntry) {
|
||||
var bufSize int
|
||||
|
||||
defer r.Close()
|
||||
@@ -645,9 +638,7 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan cha
|
||||
bufSize = int(ze.fh.UncompressedSize64)
|
||||
}
|
||||
|
||||
z.memoryRateLimiter.Request(int64(bufSize))
|
||||
z.cpuRateLimiter.Finish()
|
||||
|
||||
exec.Finish(bufSize)
|
||||
close(futureReader)
|
||||
|
||||
compressChan <- ze
|
||||
@@ -718,7 +709,7 @@ func (z *zipWriter) writeSymlink(rel, file string) error {
|
||||
// We didn't ask permission to execute, since this should be very short
|
||||
// but we still need to increment the outstanding buffer sizes, since
|
||||
// the read will decrement the buffer size.
|
||||
z.memoryRateLimiter.Finish(int64(-len(dest)))
|
||||
z.rateLimit.Release(-len(dest))
|
||||
|
||||
ze <- &zipEntry{
|
||||
fh: fileHeader,
|
||||
|
Reference in New Issue
Block a user