diff --git a/cmd/soong_zip/rate_limit.go b/cmd/soong_zip/rate_limit.go index 04102b73c..9e95bc14b 100644 --- a/cmd/soong_zip/rate_limit.go +++ b/cmd/soong_zip/rate_limit.go @@ -15,54 +15,71 @@ package main import ( - "fmt" "runtime" ) type RateLimit struct { - requests chan request - completions chan int64 - - stop chan struct{} + requests chan struct{} + finished chan int + released chan int + stop chan struct{} } -type request struct { - size int64 - serviced chan struct{} -} - -// NewRateLimit starts a new rate limiter that permits the usage of up to at once, -// except when no capacity is in use, in which case the first caller is always permitted -func NewRateLimit(capacity int64) *RateLimit { - ret := &RateLimit{ - requests: make(chan request), - completions: make(chan int64), - - stop: make(chan struct{}), +// NewRateLimit starts a new rate limiter with maxExecs number of executions +// allowed to happen at a time. If maxExecs is <= 0, it will default to the +// number of logical CPUs on the system. +// +// With Finish and Release, we'll keep track of outstanding buffer sizes to be +// written. If that size goes above maxMem, we'll prevent starting new +// executions. +// +// The total memory use may be higher due to current executions. This just +// prevents runaway memory use due to slower writes. +func NewRateLimit(maxExecs int, maxMem int64) *RateLimit { + if maxExecs <= 0 { + maxExecs = runtime.NumCPU() + } + if maxMem <= 0 { + // Default to 512MB + maxMem = 512 * 1024 * 1024 } - go ret.monitorChannels(capacity) + ret := &RateLimit{ + requests: make(chan struct{}), + + // Let all of the pending executions to mark themselves as finished, + // even if our goroutine isn't processing input. + finished: make(chan int, maxExecs), + + released: make(chan int), + stop: make(chan struct{}), + } + + go ret.goFunc(maxExecs, maxMem) return ret } -// RequestExecution blocks until another execution of size can be allowed to run. -func (r *RateLimit) Request(size int64) { - request := request{ - size: size, - serviced: make(chan struct{}, 1), - } - - // wait for the request to be received - r.requests <- request - - // wait for the request to be accepted - <-request.serviced +// RequestExecution blocks until another execution can be allowed to run. +func (r *RateLimit) RequestExecution() Execution { + <-r.requests + return r.finished } -// Finish declares the completion of an execution of size -func (r *RateLimit) Finish(size int64) { - r.completions <- size +type Execution chan<- int + +// Finish will mark your execution as finished, and allow another request to be +// approved. +// +// bufferSize may be specified to count memory buffer sizes, and must be +// matched with calls to RateLimit.Release to mark the buffers as released. +func (e Execution) Finish(bufferSize int) { + e <- bufferSize +} + +// Call Release when finished with a buffer recorded with Finish. +func (r *RateLimit) Release(bufferSize int) { + r.released <- bufferSize } // Stop the background goroutine @@ -70,83 +87,29 @@ func (r *RateLimit) Stop() { close(r.stop) } -// monitorChannels processes incoming requests from channels -func (r *RateLimit) monitorChannels(capacity int64) { - var usedCapacity int64 - var currentRequest *request +func (r *RateLimit) goFunc(maxExecs int, maxMem int64) { + var curExecs int + var curMemory int64 for { - var requests chan request - if currentRequest == nil { - // If we don't already have a queued request, then we should check for a new request + var requests chan struct{} + if curExecs < maxExecs && curMemory < maxMem { requests = r.requests } select { - case newRequest := <-requests: - currentRequest = &newRequest - break - case amountCompleted := <-r.completions: - usedCapacity -= amountCompleted - if usedCapacity < 0 { - panic(fmt.Sprintf("usedCapacity < 0: %v", usedCapacity)) + case requests <- struct{}{}: + curExecs++ + case amount := <-r.finished: + curExecs-- + curMemory += int64(amount) + if curExecs < 0 { + panic("curExecs < 0") } + case amount := <-r.released: + curMemory -= int64(amount) case <-r.stop: return } - - if currentRequest != nil { - accepted := false - if usedCapacity == 0 { - accepted = true - } else { - if capacity >= usedCapacity+currentRequest.size { - accepted = true - } - } - if accepted { - usedCapacity += currentRequest.size - currentRequest.serviced <- struct{}{} - currentRequest = nil - } - } } } - -// A CPURateLimiter limits the number of active calls based on CPU requirements -type CPURateLimiter struct { - impl *RateLimit -} - -func NewCPURateLimiter(capacity int64) *CPURateLimiter { - if capacity <= 0 { - capacity = int64(runtime.NumCPU()) - } - impl := NewRateLimit(capacity) - return &CPURateLimiter{impl: impl} -} - -func (e CPURateLimiter) Request() { - e.impl.Request(1) -} - -func (e CPURateLimiter) Finish() { - e.impl.Finish(1) -} - -func (e CPURateLimiter) Stop() { - e.impl.Stop() -} - -// A MemoryRateLimiter limits the number of active calls based on Memory requirements -type MemoryRateLimiter struct { - *RateLimit -} - -func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter { - if capacity <= 0 { - capacity = 512 * 1024 * 1024 // 512MB - } - impl := NewRateLimit(capacity) - return &MemoryRateLimiter{RateLimit: impl} -} diff --git a/cmd/soong_zip/soong_zip.go b/cmd/soong_zip/soong_zip.go index 3d47f2c6a..d634dda3c 100644 --- a/cmd/soong_zip/soong_zip.go +++ b/cmd/soong_zip/soong_zip.go @@ -163,8 +163,7 @@ type zipWriter struct { errors chan error writeOps chan chan *zipEntry - cpuRateLimiter *CPURateLimiter - memoryRateLimiter *MemoryRateLimiter + rateLimit *RateLimit compressorPool sync.Pool compLevel int @@ -296,12 +295,8 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin // The RateLimit object will put the upper bounds on the number of // parallel compressions and outstanding buffers. z.writeOps = make(chan chan *zipEntry, 1000) - z.cpuRateLimiter = NewCPURateLimiter(int64(*parallelJobs)) - z.memoryRateLimiter = NewMemoryRateLimiter(0) - defer func() { - z.cpuRateLimiter.Stop() - z.memoryRateLimiter.Stop() - }() + z.rateLimit = NewRateLimit(*parallelJobs, 0) + defer z.rateLimit.Stop() go func() { var err error @@ -391,7 +386,7 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin if err != nil { return err } - z.memoryRateLimiter.Finish(count) + z.rateLimit.Release(int(count)) currentReader = nil @@ -461,7 +456,7 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error { return err } - z.cpuRateLimiter.Request() + exec := z.rateLimit.RequestExecution() if method == zip.Deflate && fileSize >= minParallelFileSize { wg := new(sync.WaitGroup) @@ -478,14 +473,14 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error { // know the result before we can begin writing the compressed // data out to the zipfile. wg.Add(1) - go z.crcFile(r, ze, compressChan, wg) + go z.crcFile(r, ze, exec, compressChan, wg) for start := int64(0); start < fileSize; start += parallelBlockSize { sr := io.NewSectionReader(r, start, parallelBlockSize) resultChan := make(chan io.Reader, 1) ze.futureReaders <- resultChan - z.cpuRateLimiter.Request() + exec := z.rateLimit.RequestExecution() last := !(start+parallelBlockSize < fileSize) var dict []byte @@ -494,7 +489,7 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error { } wg.Add(1) - go z.compressPartialFile(sr, dict, last, resultChan, wg) + go z.compressPartialFile(sr, dict, last, exec, resultChan, wg) } close(ze.futureReaders) @@ -505,15 +500,15 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error { f.Close() }(wg, r) } else { - go z.compressWholeFile(ze, r, compressChan) + go z.compressWholeFile(ze, r, exec, compressChan) } return nil } -func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry, wg *sync.WaitGroup) { +func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultChan chan *zipEntry, wg *sync.WaitGroup) { defer wg.Done() - defer z.cpuRateLimiter.Finish() + defer exec.Finish(0) crc := crc32.NewIEEE() _, err := io.Copy(crc, r) @@ -527,7 +522,7 @@ func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry close(resultChan) } -func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, resultChan chan io.Reader, wg *sync.WaitGroup) { +func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exec Execution, resultChan chan io.Reader, wg *sync.WaitGroup) { defer wg.Done() result, err := z.compressBlock(r, dict, last) @@ -536,9 +531,7 @@ func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, res return } - z.memoryRateLimiter.Request(int64(result.Len())) - z.cpuRateLimiter.Finish() - + exec.Finish(result.Len()) resultChan <- result } @@ -576,7 +569,7 @@ func (z *zipWriter) compressBlock(r io.Reader, dict []byte, last bool) (*bytes.B return buf, nil } -func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan chan *zipEntry) { +func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, compressChan chan *zipEntry) { var bufSize int defer r.Close() @@ -645,9 +638,7 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan cha bufSize = int(ze.fh.UncompressedSize64) } - z.memoryRateLimiter.Request(int64(bufSize)) - z.cpuRateLimiter.Finish() - + exec.Finish(bufSize) close(futureReader) compressChan <- ze @@ -718,7 +709,7 @@ func (z *zipWriter) writeSymlink(rel, file string) error { // We didn't ask permission to execute, since this should be very short // but we still need to increment the outstanding buffer sizes, since // the read will decrement the buffer size. - z.memoryRateLimiter.Finish(int64(-len(dest))) + z.rateLimit.Release(-len(dest)) ze <- &zipEntry{ fh: fileHeader,