Merge "Revert "Revert "Refactor rate_limit.go for more clarify"""

This commit is contained in:
Jeff Gaston
2017-08-24 21:41:14 +00:00
committed by Gerrit Code Review
2 changed files with 128 additions and 90 deletions

View File

@@ -15,71 +15,54 @@
package main package main
import ( import (
"fmt"
"runtime" "runtime"
) )
type RateLimit struct { type RateLimit struct {
requests chan struct{} requests chan request
finished chan int completions chan int64
released chan int
stop chan struct{} stop chan struct{}
} }
// NewRateLimit starts a new rate limiter with maxExecs number of executions type request struct {
// allowed to happen at a time. If maxExecs is <= 0, it will default to the size int64
// number of logical CPUs on the system. serviced chan struct{}
// }
// With Finish and Release, we'll keep track of outstanding buffer sizes to be
// written. If that size goes above maxMem, we'll prevent starting new
// executions.
//
// The total memory use may be higher due to current executions. This just
// prevents runaway memory use due to slower writes.
func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
if maxExecs <= 0 {
maxExecs = runtime.NumCPU()
}
if maxMem <= 0 {
// Default to 512MB
maxMem = 512 * 1024 * 1024
}
// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
// except when no capacity is in use, in which case the first caller is always permitted
func NewRateLimit(capacity int64) *RateLimit {
ret := &RateLimit{ ret := &RateLimit{
requests: make(chan struct{}), requests: make(chan request),
completions: make(chan int64),
// Let all of the pending executions to mark themselves as finished, stop: make(chan struct{}),
// even if our goroutine isn't processing input.
finished: make(chan int, maxExecs),
released: make(chan int),
stop: make(chan struct{}),
} }
go ret.goFunc(maxExecs, maxMem) go ret.monitorChannels(capacity)
return ret return ret
} }
// RequestExecution blocks until another execution can be allowed to run. // RequestExecution blocks until another execution of size <size> can be allowed to run.
func (r *RateLimit) RequestExecution() Execution { func (r *RateLimit) Request(size int64) {
<-r.requests request := request{
return r.finished size: size,
serviced: make(chan struct{}, 1),
}
// wait for the request to be received
r.requests <- request
// wait for the request to be accepted
<-request.serviced
} }
type Execution chan<- int // Finish declares the completion of an execution of size <size>
func (r *RateLimit) Finish(size int64) {
// Finish will mark your execution as finished, and allow another request to be r.completions <- size
// approved.
//
// bufferSize may be specified to count memory buffer sizes, and must be
// matched with calls to RateLimit.Release to mark the buffers as released.
func (e Execution) Finish(bufferSize int) {
e <- bufferSize
}
// Call Release when finished with a buffer recorded with Finish.
func (r *RateLimit) Release(bufferSize int) {
r.released <- bufferSize
} }
// Stop the background goroutine // Stop the background goroutine
@@ -87,29 +70,83 @@ func (r *RateLimit) Stop() {
close(r.stop) close(r.stop)
} }
func (r *RateLimit) goFunc(maxExecs int, maxMem int64) { // monitorChannels processes incoming requests from channels
var curExecs int func (r *RateLimit) monitorChannels(capacity int64) {
var curMemory int64 var usedCapacity int64
var currentRequest *request
for { for {
var requests chan struct{} var requests chan request
if curExecs < maxExecs && curMemory < maxMem { if currentRequest == nil {
// If we don't already have a queued request, then we should check for a new request
requests = r.requests requests = r.requests
} }
select { select {
case requests <- struct{}{}: case newRequest := <-requests:
curExecs++ currentRequest = &newRequest
case amount := <-r.finished: case amountCompleted := <-r.completions:
curExecs-- usedCapacity -= amountCompleted
curMemory += int64(amount)
if curExecs < 0 { if usedCapacity < 0 {
panic("curExecs < 0") panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted))
} }
case amount := <-r.released:
curMemory -= int64(amount)
case <-r.stop: case <-r.stop:
return return
} }
if currentRequest != nil {
accepted := false
if usedCapacity == 0 {
accepted = true
} else {
if capacity >= usedCapacity+currentRequest.size {
accepted = true
}
}
if accepted {
usedCapacity += currentRequest.size
currentRequest.serviced <- struct{}{}
currentRequest = nil
}
}
} }
} }
// A CPURateLimiter limits the number of active calls based on CPU requirements
type CPURateLimiter struct {
impl *RateLimit
}
func NewCPURateLimiter(capacity int64) *CPURateLimiter {
if capacity <= 0 {
capacity = int64(runtime.NumCPU())
}
impl := NewRateLimit(capacity)
return &CPURateLimiter{impl: impl}
}
func (e CPURateLimiter) Request() {
e.impl.Request(1)
}
func (e CPURateLimiter) Finish() {
e.impl.Finish(1)
}
func (e CPURateLimiter) Stop() {
e.impl.Stop()
}
// A MemoryRateLimiter limits the number of active calls based on Memory requirements
type MemoryRateLimiter struct {
*RateLimit
}
func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
if capacity <= 0 {
capacity = 512 * 1024 * 1024 // 512MB
}
impl := NewRateLimit(capacity)
return &MemoryRateLimiter{RateLimit: impl}
}

View File

@@ -163,7 +163,8 @@ type zipWriter struct {
errors chan error errors chan error
writeOps chan chan *zipEntry writeOps chan chan *zipEntry
rateLimit *RateLimit cpuRateLimiter *CPURateLimiter
memoryRateLimiter *MemoryRateLimiter
compressorPool sync.Pool compressorPool sync.Pool
compLevel int compLevel int
@@ -174,6 +175,10 @@ type zipEntry struct {
// List of delayed io.Reader // List of delayed io.Reader
futureReaders chan chan io.Reader futureReaders chan chan io.Reader
// Only used for passing into the MemoryRateLimiter to ensure we
// release as much memory as much as we request
allocatedSize int64
} }
func main() { func main() {
@@ -295,9 +300,12 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin
// The RateLimit object will put the upper bounds on the number of // The RateLimit object will put the upper bounds on the number of
// parallel compressions and outstanding buffers. // parallel compressions and outstanding buffers.
z.writeOps = make(chan chan *zipEntry, 1000) z.writeOps = make(chan chan *zipEntry, 1000)
z.rateLimit = NewRateLimit(*parallelJobs, 0) z.cpuRateLimiter = NewCPURateLimiter(int64(*parallelJobs))
defer z.rateLimit.Stop() z.memoryRateLimiter = NewMemoryRateLimiter(0)
defer func() {
z.cpuRateLimiter.Stop()
z.memoryRateLimiter.Stop()
}()
go func() { go func() {
var err error var err error
defer close(z.writeOps) defer close(z.writeOps)
@@ -369,6 +377,7 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin
currentWriter.Close() currentWriter.Close()
currentWriter = nil currentWriter = nil
} }
z.memoryRateLimiter.Finish(op.allocatedSize)
case futureReader, ok := <-readersChan: case futureReader, ok := <-readersChan:
if !ok { if !ok {
@@ -381,12 +390,10 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin
currentReader = futureReader currentReader = futureReader
case reader := <-currentReader: case reader := <-currentReader:
var count int64 _, err = io.Copy(currentWriter, reader)
count, err = io.Copy(currentWriter, reader)
if err != nil { if err != nil {
return err return err
} }
z.rateLimit.Release(int(count))
currentReader = nil currentReader = nil
@@ -456,7 +463,9 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
return err return err
} }
exec := z.rateLimit.RequestExecution() ze.allocatedSize = fileSize
z.cpuRateLimiter.Request()
z.memoryRateLimiter.Request(ze.allocatedSize)
if method == zip.Deflate && fileSize >= minParallelFileSize { if method == zip.Deflate && fileSize >= minParallelFileSize {
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
@@ -473,14 +482,14 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
// know the result before we can begin writing the compressed // know the result before we can begin writing the compressed
// data out to the zipfile. // data out to the zipfile.
wg.Add(1) wg.Add(1)
go z.crcFile(r, ze, exec, compressChan, wg) go z.crcFile(r, ze, compressChan, wg)
for start := int64(0); start < fileSize; start += parallelBlockSize { for start := int64(0); start < fileSize; start += parallelBlockSize {
sr := io.NewSectionReader(r, start, parallelBlockSize) sr := io.NewSectionReader(r, start, parallelBlockSize)
resultChan := make(chan io.Reader, 1) resultChan := make(chan io.Reader, 1)
ze.futureReaders <- resultChan ze.futureReaders <- resultChan
exec := z.rateLimit.RequestExecution() z.cpuRateLimiter.Request()
last := !(start+parallelBlockSize < fileSize) last := !(start+parallelBlockSize < fileSize)
var dict []byte var dict []byte
@@ -489,7 +498,7 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
} }
wg.Add(1) wg.Add(1)
go z.compressPartialFile(sr, dict, last, exec, resultChan, wg) go z.compressPartialFile(sr, dict, last, resultChan, wg)
} }
close(ze.futureReaders) close(ze.futureReaders)
@@ -500,15 +509,15 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
f.Close() f.Close()
}(wg, r) }(wg, r)
} else { } else {
go z.compressWholeFile(ze, r, exec, compressChan) go z.compressWholeFile(ze, r, compressChan)
} }
return nil return nil
} }
func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultChan chan *zipEntry, wg *sync.WaitGroup) { func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
defer exec.Finish(0) defer z.cpuRateLimiter.Finish()
crc := crc32.NewIEEE() crc := crc32.NewIEEE()
_, err := io.Copy(crc, r) _, err := io.Copy(crc, r)
@@ -522,7 +531,7 @@ func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultCha
close(resultChan) close(resultChan)
} }
func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exec Execution, resultChan chan io.Reader, wg *sync.WaitGroup) { func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, resultChan chan io.Reader, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
result, err := z.compressBlock(r, dict, last) result, err := z.compressBlock(r, dict, last)
@@ -531,7 +540,8 @@ func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exe
return return
} }
exec.Finish(result.Len()) z.cpuRateLimiter.Finish()
resultChan <- result resultChan <- result
} }
@@ -569,9 +579,7 @@ func (z *zipWriter) compressBlock(r io.Reader, dict []byte, last bool) (*bytes.B
return buf, nil return buf, nil
} }
func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, compressChan chan *zipEntry) { func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan chan *zipEntry) {
var bufSize int
defer r.Close() defer r.Close()
crc := crc32.NewIEEE() crc := crc32.NewIEEE()
@@ -616,7 +624,6 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution,
} }
if uint64(compressed.Len()) < ze.fh.UncompressedSize64 { if uint64(compressed.Len()) < ze.fh.UncompressedSize64 {
futureReader <- compressed futureReader <- compressed
bufSize = compressed.Len()
} else { } else {
buf, err := readFile(r) buf, err := readFile(r)
if err != nil { if err != nil {
@@ -625,7 +632,6 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution,
} }
ze.fh.Method = zip.Store ze.fh.Method = zip.Store
futureReader <- bytes.NewReader(buf) futureReader <- bytes.NewReader(buf)
bufSize = int(ze.fh.UncompressedSize64)
} }
} else { } else {
buf, err := readFile(r) buf, err := readFile(r)
@@ -635,10 +641,10 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution,
} }
ze.fh.Method = zip.Store ze.fh.Method = zip.Store
futureReader <- bytes.NewReader(buf) futureReader <- bytes.NewReader(buf)
bufSize = int(ze.fh.UncompressedSize64)
} }
exec.Finish(bufSize) z.cpuRateLimiter.Finish()
close(futureReader) close(futureReader)
compressChan <- ze compressChan <- ze
@@ -706,11 +712,6 @@ func (z *zipWriter) writeSymlink(rel, file string) error {
futureReader <- bytes.NewBufferString(dest) futureReader <- bytes.NewBufferString(dest)
close(futureReader) close(futureReader)
// We didn't ask permission to execute, since this should be very short
// but we still need to increment the outstanding buffer sizes, since
// the read will decrement the buffer size.
z.rateLimit.Release(-len(dest))
ze <- &zipEntry{ ze <- &zipEntry{
fh: fileHeader, fh: fileHeader,
futureReaders: futureReaders, futureReaders: futureReaders,