soong_jar: Parallel compression
This compresses multiple files in parallel, and will split up larger files (5MB+) into smaller chunks (1MB) to compress in parallel. There is a small size overhead to recombine the chunks, but it's only a few bytes per chunk, so for a 1MB chunk, it's minimal. Rough numbers, with everything in the page cache, this can compress ~4GB (1000 files) down to 1GB in 6.5 seconds, instead of 120 seconds with the non-parallel soong_jar and 150 seconds with zip. Go's DEFLATE algorithm is still a bit worse than zip's -- about 3.5% larger file sizes, but for most of our "dist" targets that is fine. Change-Id: Ie4886c7d0f954ace46e599156e35fea7e74d6dd7
This commit is contained in:
115
cmd/soong_jar/rate_limit.go
Normal file
115
cmd/soong_jar/rate_limit.go
Normal file
@@ -0,0 +1,115 @@
|
||||
// Copyright 2016 Google Inc. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
)
|
||||
|
||||
type RateLimit struct {
|
||||
requests chan struct{}
|
||||
finished chan int
|
||||
released chan int
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
// NewRateLimit starts a new rate limiter with maxExecs number of executions
|
||||
// allowed to happen at a time. If maxExecs is <= 0, it will default to the
|
||||
// number of logical CPUs on the system.
|
||||
//
|
||||
// With Finish and Release, we'll keep track of outstanding buffer sizes to be
|
||||
// written. If that size goes above maxMem, we'll prevent starting new
|
||||
// executions.
|
||||
//
|
||||
// The total memory use may be higher due to current executions. This just
|
||||
// prevents runaway memory use due to slower writes.
|
||||
func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
|
||||
if maxExecs <= 0 {
|
||||
maxExecs = runtime.NumCPU()
|
||||
}
|
||||
if maxMem <= 0 {
|
||||
// Default to 512MB
|
||||
maxMem = 512 * 1024 * 1024
|
||||
}
|
||||
|
||||
ret := &RateLimit{
|
||||
requests: make(chan struct{}),
|
||||
|
||||
// Let all of the pending executions to mark themselves as finished,
|
||||
// even if our goroutine isn't processing input.
|
||||
finished: make(chan int, maxExecs),
|
||||
|
||||
released: make(chan int),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
go ret.goFunc(maxExecs, maxMem)
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
// RequestExecution blocks until another execution can be allowed to run.
|
||||
func (r *RateLimit) RequestExecution() Execution {
|
||||
<-r.requests
|
||||
return r.finished
|
||||
}
|
||||
|
||||
type Execution chan<- int
|
||||
|
||||
// Finish will mark your execution as finished, and allow another request to be
|
||||
// approved.
|
||||
//
|
||||
// bufferSize may be specified to count memory buffer sizes, and must be
|
||||
// matched with calls to RateLimit.Release to mark the buffers as released.
|
||||
func (e Execution) Finish(bufferSize int) {
|
||||
e <- bufferSize
|
||||
}
|
||||
|
||||
// Call Release when finished with a buffer recorded with Finish.
|
||||
func (r *RateLimit) Release(bufferSize int) {
|
||||
r.released <- bufferSize
|
||||
}
|
||||
|
||||
// Stop the background goroutine
|
||||
func (r *RateLimit) Stop() {
|
||||
close(r.stop)
|
||||
}
|
||||
|
||||
func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
|
||||
var curExecs int
|
||||
var curMemory int64
|
||||
|
||||
for {
|
||||
var requests chan struct{}
|
||||
if curExecs < maxExecs && curMemory < maxMem {
|
||||
requests = r.requests
|
||||
}
|
||||
|
||||
select {
|
||||
case requests <- struct{}{}:
|
||||
curExecs++
|
||||
case amount := <-r.finished:
|
||||
curExecs--
|
||||
curMemory += int64(amount)
|
||||
if curExecs < 0 {
|
||||
panic("curExecs < 0")
|
||||
}
|
||||
case amount := <-r.released:
|
||||
curMemory -= int64(amount)
|
||||
case <-r.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user