Merge "Revert "Refactor rate_limit.go for more clarify""
This commit is contained in:
@@ -15,54 +15,71 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RateLimit struct {
|
type RateLimit struct {
|
||||||
requests chan request
|
requests chan struct{}
|
||||||
completions chan int64
|
finished chan int
|
||||||
|
released chan int
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type request struct {
|
// NewRateLimit starts a new rate limiter with maxExecs number of executions
|
||||||
size int64
|
// allowed to happen at a time. If maxExecs is <= 0, it will default to the
|
||||||
serviced chan struct{}
|
// number of logical CPUs on the system.
|
||||||
}
|
//
|
||||||
|
// With Finish and Release, we'll keep track of outstanding buffer sizes to be
|
||||||
// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
|
// written. If that size goes above maxMem, we'll prevent starting new
|
||||||
// except when no capacity is in use, in which case the first caller is always permitted
|
// executions.
|
||||||
func NewRateLimit(capacity int64) *RateLimit {
|
//
|
||||||
ret := &RateLimit{
|
// The total memory use may be higher due to current executions. This just
|
||||||
requests: make(chan request),
|
// prevents runaway memory use due to slower writes.
|
||||||
completions: make(chan int64),
|
func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
|
||||||
|
if maxExecs <= 0 {
|
||||||
stop: make(chan struct{}),
|
maxExecs = runtime.NumCPU()
|
||||||
|
}
|
||||||
|
if maxMem <= 0 {
|
||||||
|
// Default to 512MB
|
||||||
|
maxMem = 512 * 1024 * 1024
|
||||||
}
|
}
|
||||||
|
|
||||||
go ret.monitorChannels(capacity)
|
ret := &RateLimit{
|
||||||
|
requests: make(chan struct{}),
|
||||||
|
|
||||||
|
// Let all of the pending executions to mark themselves as finished,
|
||||||
|
// even if our goroutine isn't processing input.
|
||||||
|
finished: make(chan int, maxExecs),
|
||||||
|
|
||||||
|
released: make(chan int),
|
||||||
|
stop: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
go ret.goFunc(maxExecs, maxMem)
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequestExecution blocks until another execution of size <size> can be allowed to run.
|
// RequestExecution blocks until another execution can be allowed to run.
|
||||||
func (r *RateLimit) Request(size int64) {
|
func (r *RateLimit) RequestExecution() Execution {
|
||||||
request := request{
|
<-r.requests
|
||||||
size: size,
|
return r.finished
|
||||||
serviced: make(chan struct{}, 1),
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for the request to be received
|
|
||||||
r.requests <- request
|
|
||||||
|
|
||||||
// wait for the request to be accepted
|
|
||||||
<-request.serviced
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finish declares the completion of an execution of size <size>
|
type Execution chan<- int
|
||||||
func (r *RateLimit) Finish(size int64) {
|
|
||||||
r.completions <- size
|
// Finish will mark your execution as finished, and allow another request to be
|
||||||
|
// approved.
|
||||||
|
//
|
||||||
|
// bufferSize may be specified to count memory buffer sizes, and must be
|
||||||
|
// matched with calls to RateLimit.Release to mark the buffers as released.
|
||||||
|
func (e Execution) Finish(bufferSize int) {
|
||||||
|
e <- bufferSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call Release when finished with a buffer recorded with Finish.
|
||||||
|
func (r *RateLimit) Release(bufferSize int) {
|
||||||
|
r.released <- bufferSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop the background goroutine
|
// Stop the background goroutine
|
||||||
@@ -70,83 +87,29 @@ func (r *RateLimit) Stop() {
|
|||||||
close(r.stop)
|
close(r.stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
// monitorChannels processes incoming requests from channels
|
func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
|
||||||
func (r *RateLimit) monitorChannels(capacity int64) {
|
var curExecs int
|
||||||
var usedCapacity int64
|
var curMemory int64
|
||||||
var currentRequest *request
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
var requests chan request
|
var requests chan struct{}
|
||||||
if currentRequest == nil {
|
if curExecs < maxExecs && curMemory < maxMem {
|
||||||
// If we don't already have a queued request, then we should check for a new request
|
|
||||||
requests = r.requests
|
requests = r.requests
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case newRequest := <-requests:
|
case requests <- struct{}{}:
|
||||||
currentRequest = &newRequest
|
curExecs++
|
||||||
break
|
case amount := <-r.finished:
|
||||||
case amountCompleted := <-r.completions:
|
curExecs--
|
||||||
usedCapacity -= amountCompleted
|
curMemory += int64(amount)
|
||||||
if usedCapacity < 0 {
|
if curExecs < 0 {
|
||||||
panic(fmt.Sprintf("usedCapacity < 0: %v", usedCapacity))
|
panic("curExecs < 0")
|
||||||
}
|
}
|
||||||
|
case amount := <-r.released:
|
||||||
|
curMemory -= int64(amount)
|
||||||
case <-r.stop:
|
case <-r.stop:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if currentRequest != nil {
|
|
||||||
accepted := false
|
|
||||||
if usedCapacity == 0 {
|
|
||||||
accepted = true
|
|
||||||
} else {
|
|
||||||
if capacity >= usedCapacity+currentRequest.size {
|
|
||||||
accepted = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if accepted {
|
|
||||||
usedCapacity += currentRequest.size
|
|
||||||
currentRequest.serviced <- struct{}{}
|
|
||||||
currentRequest = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// A CPURateLimiter limits the number of active calls based on CPU requirements
|
|
||||||
type CPURateLimiter struct {
|
|
||||||
impl *RateLimit
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewCPURateLimiter(capacity int64) *CPURateLimiter {
|
|
||||||
if capacity <= 0 {
|
|
||||||
capacity = int64(runtime.NumCPU())
|
|
||||||
}
|
|
||||||
impl := NewRateLimit(capacity)
|
|
||||||
return &CPURateLimiter{impl: impl}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e CPURateLimiter) Request() {
|
|
||||||
e.impl.Request(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e CPURateLimiter) Finish() {
|
|
||||||
e.impl.Finish(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e CPURateLimiter) Stop() {
|
|
||||||
e.impl.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// A MemoryRateLimiter limits the number of active calls based on Memory requirements
|
|
||||||
type MemoryRateLimiter struct {
|
|
||||||
*RateLimit
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
|
|
||||||
if capacity <= 0 {
|
|
||||||
capacity = 512 * 1024 * 1024 // 512MB
|
|
||||||
}
|
|
||||||
impl := NewRateLimit(capacity)
|
|
||||||
return &MemoryRateLimiter{RateLimit: impl}
|
|
||||||
}
|
|
||||||
|
@@ -163,8 +163,7 @@ type zipWriter struct {
|
|||||||
errors chan error
|
errors chan error
|
||||||
writeOps chan chan *zipEntry
|
writeOps chan chan *zipEntry
|
||||||
|
|
||||||
cpuRateLimiter *CPURateLimiter
|
rateLimit *RateLimit
|
||||||
memoryRateLimiter *MemoryRateLimiter
|
|
||||||
|
|
||||||
compressorPool sync.Pool
|
compressorPool sync.Pool
|
||||||
compLevel int
|
compLevel int
|
||||||
@@ -296,12 +295,8 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin
|
|||||||
// The RateLimit object will put the upper bounds on the number of
|
// The RateLimit object will put the upper bounds on the number of
|
||||||
// parallel compressions and outstanding buffers.
|
// parallel compressions and outstanding buffers.
|
||||||
z.writeOps = make(chan chan *zipEntry, 1000)
|
z.writeOps = make(chan chan *zipEntry, 1000)
|
||||||
z.cpuRateLimiter = NewCPURateLimiter(int64(*parallelJobs))
|
z.rateLimit = NewRateLimit(*parallelJobs, 0)
|
||||||
z.memoryRateLimiter = NewMemoryRateLimiter(0)
|
defer z.rateLimit.Stop()
|
||||||
defer func() {
|
|
||||||
z.cpuRateLimiter.Stop()
|
|
||||||
z.memoryRateLimiter.Stop()
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
var err error
|
var err error
|
||||||
@@ -391,7 +386,7 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
z.memoryRateLimiter.Finish(count)
|
z.rateLimit.Release(int(count))
|
||||||
|
|
||||||
currentReader = nil
|
currentReader = nil
|
||||||
|
|
||||||
@@ -461,7 +456,7 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
z.cpuRateLimiter.Request()
|
exec := z.rateLimit.RequestExecution()
|
||||||
|
|
||||||
if method == zip.Deflate && fileSize >= minParallelFileSize {
|
if method == zip.Deflate && fileSize >= minParallelFileSize {
|
||||||
wg := new(sync.WaitGroup)
|
wg := new(sync.WaitGroup)
|
||||||
@@ -478,14 +473,14 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
|
|||||||
// know the result before we can begin writing the compressed
|
// know the result before we can begin writing the compressed
|
||||||
// data out to the zipfile.
|
// data out to the zipfile.
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go z.crcFile(r, ze, compressChan, wg)
|
go z.crcFile(r, ze, exec, compressChan, wg)
|
||||||
|
|
||||||
for start := int64(0); start < fileSize; start += parallelBlockSize {
|
for start := int64(0); start < fileSize; start += parallelBlockSize {
|
||||||
sr := io.NewSectionReader(r, start, parallelBlockSize)
|
sr := io.NewSectionReader(r, start, parallelBlockSize)
|
||||||
resultChan := make(chan io.Reader, 1)
|
resultChan := make(chan io.Reader, 1)
|
||||||
ze.futureReaders <- resultChan
|
ze.futureReaders <- resultChan
|
||||||
|
|
||||||
z.cpuRateLimiter.Request()
|
exec := z.rateLimit.RequestExecution()
|
||||||
|
|
||||||
last := !(start+parallelBlockSize < fileSize)
|
last := !(start+parallelBlockSize < fileSize)
|
||||||
var dict []byte
|
var dict []byte
|
||||||
@@ -494,7 +489,7 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go z.compressPartialFile(sr, dict, last, resultChan, wg)
|
go z.compressPartialFile(sr, dict, last, exec, resultChan, wg)
|
||||||
}
|
}
|
||||||
|
|
||||||
close(ze.futureReaders)
|
close(ze.futureReaders)
|
||||||
@@ -505,15 +500,15 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
|
|||||||
f.Close()
|
f.Close()
|
||||||
}(wg, r)
|
}(wg, r)
|
||||||
} else {
|
} else {
|
||||||
go z.compressWholeFile(ze, r, compressChan)
|
go z.compressWholeFile(ze, r, exec, compressChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry, wg *sync.WaitGroup) {
|
func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultChan chan *zipEntry, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
defer z.cpuRateLimiter.Finish()
|
defer exec.Finish(0)
|
||||||
|
|
||||||
crc := crc32.NewIEEE()
|
crc := crc32.NewIEEE()
|
||||||
_, err := io.Copy(crc, r)
|
_, err := io.Copy(crc, r)
|
||||||
@@ -527,7 +522,7 @@ func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry
|
|||||||
close(resultChan)
|
close(resultChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, resultChan chan io.Reader, wg *sync.WaitGroup) {
|
func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exec Execution, resultChan chan io.Reader, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
result, err := z.compressBlock(r, dict, last)
|
result, err := z.compressBlock(r, dict, last)
|
||||||
@@ -536,9 +531,7 @@ func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, res
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
z.memoryRateLimiter.Request(int64(result.Len()))
|
exec.Finish(result.Len())
|
||||||
z.cpuRateLimiter.Finish()
|
|
||||||
|
|
||||||
resultChan <- result
|
resultChan <- result
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -576,7 +569,7 @@ func (z *zipWriter) compressBlock(r io.Reader, dict []byte, last bool) (*bytes.B
|
|||||||
return buf, nil
|
return buf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan chan *zipEntry) {
|
func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, compressChan chan *zipEntry) {
|
||||||
var bufSize int
|
var bufSize int
|
||||||
|
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
@@ -645,9 +638,7 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan cha
|
|||||||
bufSize = int(ze.fh.UncompressedSize64)
|
bufSize = int(ze.fh.UncompressedSize64)
|
||||||
}
|
}
|
||||||
|
|
||||||
z.memoryRateLimiter.Request(int64(bufSize))
|
exec.Finish(bufSize)
|
||||||
z.cpuRateLimiter.Finish()
|
|
||||||
|
|
||||||
close(futureReader)
|
close(futureReader)
|
||||||
|
|
||||||
compressChan <- ze
|
compressChan <- ze
|
||||||
@@ -718,7 +709,7 @@ func (z *zipWriter) writeSymlink(rel, file string) error {
|
|||||||
// We didn't ask permission to execute, since this should be very short
|
// We didn't ask permission to execute, since this should be very short
|
||||||
// but we still need to increment the outstanding buffer sizes, since
|
// but we still need to increment the outstanding buffer sizes, since
|
||||||
// the read will decrement the buffer size.
|
// the read will decrement the buffer size.
|
||||||
z.memoryRateLimiter.Finish(int64(-len(dest)))
|
z.rateLimit.Release(-len(dest))
|
||||||
|
|
||||||
ze <- &zipEntry{
|
ze <- &zipEntry{
|
||||||
fh: fileHeader,
|
fh: fileHeader,
|
||||||
|
Reference in New Issue
Block a user