Handle aquery build statements in a goroutine

Creation of build statements is largely parallelizable because each
action is independent apart from updates/reads to
depsetHashToArtifactPathsCache. Locally resulted in build statements
taking ~.45 seconds on staging mode to ~.02 seconds

Test: CI
Change-Id: Iab00c8394a9eab17353f71230885ff0870e17f24
This commit is contained in:
Liz Kammer
2023-02-10 17:17:28 -05:00
parent 00629db646
commit a4655a96c0
3 changed files with 88 additions and 49 deletions

View File

@@ -22,6 +22,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
analysis_v2_proto "prebuilts/bazel/common/proto/analysis_v2"
@@ -130,7 +131,7 @@ type aqueryArtifactHandler struct {
// depsetIdToArtifactIdsCache is a memoization of depset flattening, because flattening
// may be an expensive operation.
depsetHashToArtifactPathsCache map[string][]string
depsetHashToArtifactPathsCache sync.Map
// Maps artifact ids to fully expanded paths.
artifactIdToPath map[artifactId]string
}
@@ -192,7 +193,7 @@ func newAqueryHandler(aqueryResult *analysis_v2_proto.ActionGraphContainer) (*aq
aqueryHandler := aqueryArtifactHandler{
depsetIdToAqueryDepset: map[depsetId]AqueryDepset{},
depsetHashToAqueryDepset: map[string]AqueryDepset{},
depsetHashToArtifactPathsCache: map[string][]string{},
depsetHashToArtifactPathsCache: sync.Map{},
emptyDepsetIds: make(map[depsetId]struct{}, 0),
artifactIdToPath: artifactIdToPath,
}
@@ -297,8 +298,8 @@ func (a *aqueryArtifactHandler) getInputPaths(depsetIds []uint32) ([]string, err
}
func (a *aqueryArtifactHandler) artifactPathsFromDepsetHash(depsetHash string) ([]string, error) {
if result, exists := a.depsetHashToArtifactPathsCache[depsetHash]; exists {
return result, nil
if result, exists := a.depsetHashToArtifactPathsCache.Load(depsetHash); exists {
return result.([]string), nil
}
if depset, exists := a.depsetHashToAqueryDepset[depsetHash]; exists {
result := depset.DirectArtifacts
@@ -309,7 +310,7 @@ func (a *aqueryArtifactHandler) artifactPathsFromDepsetHash(depsetHash string) (
}
result = append(result, childArtifactIds...)
}
a.depsetHashToArtifactPathsCache[depsetHash] = result
a.depsetHashToArtifactPathsCache.Store(depsetHash, result)
return result, nil
} else {
return nil, fmt.Errorf("undefined input depset hash %s", depsetHash)
@@ -321,7 +322,7 @@ func (a *aqueryArtifactHandler) artifactPathsFromDepsetHash(depsetHash string) (
// action graph, as described by the given action graph json proto.
// BuildStatements are one-to-one with actions in the given action graph, and AqueryDepsets
// are one-to-one with Bazel's depSetOfFiles objects.
func AqueryBuildStatements(aqueryJsonProto []byte, eventHandler *metrics.EventHandler) ([]BuildStatement, []AqueryDepset, error) {
func AqueryBuildStatements(aqueryJsonProto []byte, eventHandler *metrics.EventHandler) ([]*BuildStatement, []AqueryDepset, error) {
aqueryProto := &analysis_v2_proto.ActionGraphContainer{}
err := proto.Unmarshal(aqueryJsonProto, aqueryProto)
if err != nil {
@@ -338,21 +339,35 @@ func AqueryBuildStatements(aqueryJsonProto []byte, eventHandler *metrics.EventHa
}
}
buildStatements := make([]BuildStatement, 0, len(aqueryProto.Actions))
// allocate both length and capacity so each goroutine can write to an index independently without
// any need for synchronization for slice access.
buildStatements := make([]*BuildStatement, len(aqueryProto.Actions))
{
eventHandler.Begin("build_statements")
defer eventHandler.End("build_statements")
var buildStatement *BuildStatement
for _, actionEntry := range aqueryProto.Actions {
buildStatement, err = aqueryHandler.actionToBuildStatement(actionEntry)
if err != nil {
return nil, nil, err
}
if buildStatement == nil {
continue
}
buildStatements = append(buildStatements, *buildStatement)
wg := sync.WaitGroup{}
var errOnce sync.Once
for i, actionEntry := range aqueryProto.Actions {
wg.Add(1)
go func(i int, actionEntry *analysis_v2_proto.Action) {
buildStatement, aErr := aqueryHandler.actionToBuildStatement(actionEntry)
if aErr != nil {
errOnce.Do(func() {
err = aErr
})
} else {
// set build statement at an index rather than appending such that each goroutine does not
// impact other goroutines
buildStatements[i] = buildStatement
}
wg.Done()
}(i, actionEntry)
}
wg.Wait()
}
if err != nil {
return nil, nil, err
}
depsetsByHash := map[string]AqueryDepset{}
@@ -379,7 +394,13 @@ func AqueryBuildStatements(aqueryJsonProto []byte, eventHandler *metrics.EventHa
// output). Note they are not sorted by their original IDs nor their Bazel ordering,
// as Bazel gives nondeterministic ordering / identifiers in aquery responses.
sort.Slice(buildStatements, func(i, j int) bool {
// For build statements, compare output lists. In Bazel, each output file
// Sort all nil statements to the end of the slice
if buildStatements[i] == nil {
return false
} else if buildStatements[j] == nil {
return true
}
//For build statements, compare output lists. In Bazel, each output file
// may only have one action which generates it, so this will provide
// a deterministic ordering.
outputs_i := buildStatements[i].OutputPaths