Merge "Stop NinjaReader from sending new status messages after Close" into main

This commit is contained in:
Treehugger Robot
2023-09-05 22:00:29 +00:00
committed by Gerrit Code Review

View File

@@ -40,10 +40,11 @@ func NewNinjaReader(ctx logger.Logger, status ToolStatus, fifo string) *NinjaRea
} }
n := &NinjaReader{ n := &NinjaReader{
status: status, status: status,
fifo: fifo, fifo: fifo,
done: make(chan bool), forceClose: make(chan bool),
cancel: make(chan bool), done: make(chan bool),
cancelOpen: make(chan bool),
} }
go n.run() go n.run()
@@ -52,10 +53,11 @@ func NewNinjaReader(ctx logger.Logger, status ToolStatus, fifo string) *NinjaRea
} }
type NinjaReader struct { type NinjaReader struct {
status ToolStatus status ToolStatus
fifo string fifo string
done chan bool forceClose chan bool
cancel chan bool done chan bool
cancelOpen chan bool
} }
const NINJA_READER_CLOSE_TIMEOUT = 5 * time.Second const NINJA_READER_CLOSE_TIMEOUT = 5 * time.Second
@@ -63,18 +65,34 @@ const NINJA_READER_CLOSE_TIMEOUT = 5 * time.Second
// Close waits for NinjaReader to finish reading from the fifo, or 5 seconds. // Close waits for NinjaReader to finish reading from the fifo, or 5 seconds.
func (n *NinjaReader) Close() { func (n *NinjaReader) Close() {
// Signal the goroutine to stop if it is blocking opening the fifo. // Signal the goroutine to stop if it is blocking opening the fifo.
close(n.cancel) close(n.cancelOpen)
// Ninja should already have exited or been killed, wait 5 seconds for the FIFO to be closed and any
// remaining messages to be processed through the NinjaReader.run goroutine.
timeoutCh := time.After(NINJA_READER_CLOSE_TIMEOUT) timeoutCh := time.After(NINJA_READER_CLOSE_TIMEOUT)
select { select {
case <-n.done: case <-n.done:
// Nothing return
case <-timeoutCh: case <-timeoutCh:
n.status.Error(fmt.Sprintf("ninja fifo didn't finish after %s", NINJA_READER_CLOSE_TIMEOUT.String())) // Channel is not closed yet
} }
return n.status.Error(fmt.Sprintf("ninja fifo didn't finish after %s", NINJA_READER_CLOSE_TIMEOUT.String()))
// Force close the reader even if the FIFO didn't close.
close(n.forceClose)
// Wait again for the reader thread to acknowledge the close before giving up and assuming it isn't going
// to send anything else.
timeoutCh = time.After(NINJA_READER_CLOSE_TIMEOUT)
select {
case <-n.done:
return
case <-timeoutCh:
// Channel is not closed yet
}
n.status.Verbose(fmt.Sprintf("ninja fifo didn't finish even after force closing after %s", NINJA_READER_CLOSE_TIMEOUT.String()))
} }
func (n *NinjaReader) run() { func (n *NinjaReader) run() {
@@ -98,7 +116,7 @@ func (n *NinjaReader) run() {
select { select {
case f = <-fileCh: case f = <-fileCh:
// Nothing // Nothing
case <-n.cancel: case <-n.cancelOpen:
return return
} }
@@ -108,33 +126,58 @@ func (n *NinjaReader) run() {
running := map[uint32]*Action{} running := map[uint32]*Action{}
msgChan := make(chan *ninja_frontend.Status)
// Read from the ninja fifo and decode the protobuf in a goroutine so the main NinjaReader.run goroutine
// can listen
go func() {
defer close(msgChan)
for {
size, err := readVarInt(r)
if err != nil {
if err != io.EOF {
n.status.Error(fmt.Sprintf("Got error reading from ninja: %s", err))
}
return
}
buf := make([]byte, size)
_, err = io.ReadFull(r, buf)
if err != nil {
if err == io.EOF {
n.status.Print(fmt.Sprintf("Missing message of size %d from ninja\n", size))
} else {
n.status.Error(fmt.Sprintf("Got error reading from ninja: %s", err))
}
return
}
msg := &ninja_frontend.Status{}
err = proto.Unmarshal(buf, msg)
if err != nil {
n.status.Print(fmt.Sprintf("Error reading message from ninja: %v", err))
continue
}
msgChan <- msg
}
}()
for { for {
size, err := readVarInt(r) var msg *ninja_frontend.Status
if err != nil { var msgOk bool
if err != io.EOF { select {
n.status.Error(fmt.Sprintf("Got error reading from ninja: %s", err)) case <-n.forceClose:
} // Close() has been called, but the reader goroutine didn't get EOF after 5 seconds
return break
case msg, msgOk = <-msgChan:
// msg is ready or closed
} }
buf := make([]byte, size) if !msgOk {
_, err = io.ReadFull(r, buf) // msgChan is closed
if err != nil { break
if err == io.EOF {
n.status.Print(fmt.Sprintf("Missing message of size %d from ninja\n", size))
} else {
n.status.Error(fmt.Sprintf("Got error reading from ninja: %s", err))
}
return
} }
msg := &ninja_frontend.Status{}
err = proto.Unmarshal(buf, msg)
if err != nil {
n.status.Print(fmt.Sprintf("Error reading message from ninja: %v", err))
continue
}
// Ignore msg.BuildStarted // Ignore msg.BuildStarted
if msg.TotalEdges != nil { if msg.TotalEdges != nil {
n.status.SetTotalActions(int(msg.TotalEdges.GetTotalEdges())) n.status.SetTotalActions(int(msg.TotalEdges.GetTotalEdges()))