split soong_zip into a library and a binary
to make it faster/easier to invoke from other Go programs (such as multiproduct_kati) Bug: 67478260 Test: m -j Change-Id: Idd2671a44290550197c88f53dd11a6dd39c85cc5
This commit is contained in:
152
zip/rate_limit.go
Normal file
152
zip/rate_limit.go
Normal file
@@ -0,0 +1,152 @@
|
||||
// Copyright 2016 Google Inc. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package zip
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
type RateLimit struct {
|
||||
requests chan request
|
||||
completions chan int64
|
||||
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
type request struct {
|
||||
size int64
|
||||
serviced chan struct{}
|
||||
}
|
||||
|
||||
// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
|
||||
// except when no capacity is in use, in which case the first caller is always permitted
|
||||
func NewRateLimit(capacity int64) *RateLimit {
|
||||
ret := &RateLimit{
|
||||
requests: make(chan request),
|
||||
completions: make(chan int64),
|
||||
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
go ret.monitorChannels(capacity)
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
// RequestExecution blocks until another execution of size <size> can be allowed to run.
|
||||
func (r *RateLimit) Request(size int64) {
|
||||
request := request{
|
||||
size: size,
|
||||
serviced: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
// wait for the request to be received
|
||||
r.requests <- request
|
||||
|
||||
// wait for the request to be accepted
|
||||
<-request.serviced
|
||||
}
|
||||
|
||||
// Finish declares the completion of an execution of size <size>
|
||||
func (r *RateLimit) Finish(size int64) {
|
||||
r.completions <- size
|
||||
}
|
||||
|
||||
// Stop the background goroutine
|
||||
func (r *RateLimit) Stop() {
|
||||
close(r.stop)
|
||||
}
|
||||
|
||||
// monitorChannels processes incoming requests from channels
|
||||
func (r *RateLimit) monitorChannels(capacity int64) {
|
||||
var usedCapacity int64
|
||||
var currentRequest *request
|
||||
|
||||
for {
|
||||
var requests chan request
|
||||
if currentRequest == nil {
|
||||
// If we don't already have a queued request, then we should check for a new request
|
||||
requests = r.requests
|
||||
}
|
||||
|
||||
select {
|
||||
case newRequest := <-requests:
|
||||
currentRequest = &newRequest
|
||||
case amountCompleted := <-r.completions:
|
||||
usedCapacity -= amountCompleted
|
||||
|
||||
if usedCapacity < 0 {
|
||||
panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted))
|
||||
}
|
||||
case <-r.stop:
|
||||
return
|
||||
}
|
||||
|
||||
if currentRequest != nil {
|
||||
accepted := false
|
||||
if usedCapacity == 0 {
|
||||
accepted = true
|
||||
} else {
|
||||
if capacity >= usedCapacity+currentRequest.size {
|
||||
accepted = true
|
||||
}
|
||||
}
|
||||
if accepted {
|
||||
usedCapacity += currentRequest.size
|
||||
currentRequest.serviced <- struct{}{}
|
||||
currentRequest = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// A CPURateLimiter limits the number of active calls based on CPU requirements
|
||||
type CPURateLimiter struct {
|
||||
impl *RateLimit
|
||||
}
|
||||
|
||||
func NewCPURateLimiter(capacity int64) *CPURateLimiter {
|
||||
if capacity <= 0 {
|
||||
capacity = int64(runtime.NumCPU())
|
||||
}
|
||||
impl := NewRateLimit(capacity)
|
||||
return &CPURateLimiter{impl: impl}
|
||||
}
|
||||
|
||||
func (e CPURateLimiter) Request() {
|
||||
e.impl.Request(1)
|
||||
}
|
||||
|
||||
func (e CPURateLimiter) Finish() {
|
||||
e.impl.Finish(1)
|
||||
}
|
||||
|
||||
func (e CPURateLimiter) Stop() {
|
||||
e.impl.Stop()
|
||||
}
|
||||
|
||||
// A MemoryRateLimiter limits the number of active calls based on Memory requirements
|
||||
type MemoryRateLimiter struct {
|
||||
*RateLimit
|
||||
}
|
||||
|
||||
func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
|
||||
if capacity <= 0 {
|
||||
capacity = 512 * 1024 * 1024 // 512MB
|
||||
}
|
||||
impl := NewRateLimit(capacity)
|
||||
return &MemoryRateLimiter{RateLimit: impl}
|
||||
}
|
Reference in New Issue
Block a user