Merge "Stop NinjaReader from sending new status messages after Close" into main
This commit is contained in:
@@ -40,10 +40,11 @@ func NewNinjaReader(ctx logger.Logger, status ToolStatus, fifo string) *NinjaRea
|
|||||||
}
|
}
|
||||||
|
|
||||||
n := &NinjaReader{
|
n := &NinjaReader{
|
||||||
status: status,
|
status: status,
|
||||||
fifo: fifo,
|
fifo: fifo,
|
||||||
done: make(chan bool),
|
forceClose: make(chan bool),
|
||||||
cancel: make(chan bool),
|
done: make(chan bool),
|
||||||
|
cancelOpen: make(chan bool),
|
||||||
}
|
}
|
||||||
|
|
||||||
go n.run()
|
go n.run()
|
||||||
@@ -52,10 +53,11 @@ func NewNinjaReader(ctx logger.Logger, status ToolStatus, fifo string) *NinjaRea
|
|||||||
}
|
}
|
||||||
|
|
||||||
type NinjaReader struct {
|
type NinjaReader struct {
|
||||||
status ToolStatus
|
status ToolStatus
|
||||||
fifo string
|
fifo string
|
||||||
done chan bool
|
forceClose chan bool
|
||||||
cancel chan bool
|
done chan bool
|
||||||
|
cancelOpen chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
const NINJA_READER_CLOSE_TIMEOUT = 5 * time.Second
|
const NINJA_READER_CLOSE_TIMEOUT = 5 * time.Second
|
||||||
@@ -63,18 +65,34 @@ const NINJA_READER_CLOSE_TIMEOUT = 5 * time.Second
|
|||||||
// Close waits for NinjaReader to finish reading from the fifo, or 5 seconds.
|
// Close waits for NinjaReader to finish reading from the fifo, or 5 seconds.
|
||||||
func (n *NinjaReader) Close() {
|
func (n *NinjaReader) Close() {
|
||||||
// Signal the goroutine to stop if it is blocking opening the fifo.
|
// Signal the goroutine to stop if it is blocking opening the fifo.
|
||||||
close(n.cancel)
|
close(n.cancelOpen)
|
||||||
|
|
||||||
|
// Ninja should already have exited or been killed, wait 5 seconds for the FIFO to be closed and any
|
||||||
|
// remaining messages to be processed through the NinjaReader.run goroutine.
|
||||||
timeoutCh := time.After(NINJA_READER_CLOSE_TIMEOUT)
|
timeoutCh := time.After(NINJA_READER_CLOSE_TIMEOUT)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-n.done:
|
case <-n.done:
|
||||||
// Nothing
|
return
|
||||||
case <-timeoutCh:
|
case <-timeoutCh:
|
||||||
n.status.Error(fmt.Sprintf("ninja fifo didn't finish after %s", NINJA_READER_CLOSE_TIMEOUT.String()))
|
// Channel is not closed yet
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
n.status.Error(fmt.Sprintf("ninja fifo didn't finish after %s", NINJA_READER_CLOSE_TIMEOUT.String()))
|
||||||
|
|
||||||
|
// Force close the reader even if the FIFO didn't close.
|
||||||
|
close(n.forceClose)
|
||||||
|
|
||||||
|
// Wait again for the reader thread to acknowledge the close before giving up and assuming it isn't going
|
||||||
|
// to send anything else.
|
||||||
|
timeoutCh = time.After(NINJA_READER_CLOSE_TIMEOUT)
|
||||||
|
select {
|
||||||
|
case <-n.done:
|
||||||
|
return
|
||||||
|
case <-timeoutCh:
|
||||||
|
// Channel is not closed yet
|
||||||
|
}
|
||||||
|
|
||||||
|
n.status.Verbose(fmt.Sprintf("ninja fifo didn't finish even after force closing after %s", NINJA_READER_CLOSE_TIMEOUT.String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *NinjaReader) run() {
|
func (n *NinjaReader) run() {
|
||||||
@@ -98,7 +116,7 @@ func (n *NinjaReader) run() {
|
|||||||
select {
|
select {
|
||||||
case f = <-fileCh:
|
case f = <-fileCh:
|
||||||
// Nothing
|
// Nothing
|
||||||
case <-n.cancel:
|
case <-n.cancelOpen:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -108,33 +126,58 @@ func (n *NinjaReader) run() {
|
|||||||
|
|
||||||
running := map[uint32]*Action{}
|
running := map[uint32]*Action{}
|
||||||
|
|
||||||
|
msgChan := make(chan *ninja_frontend.Status)
|
||||||
|
|
||||||
|
// Read from the ninja fifo and decode the protobuf in a goroutine so the main NinjaReader.run goroutine
|
||||||
|
// can listen
|
||||||
|
go func() {
|
||||||
|
defer close(msgChan)
|
||||||
|
for {
|
||||||
|
size, err := readVarInt(r)
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
n.status.Error(fmt.Sprintf("Got error reading from ninja: %s", err))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, size)
|
||||||
|
_, err = io.ReadFull(r, buf)
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
n.status.Print(fmt.Sprintf("Missing message of size %d from ninja\n", size))
|
||||||
|
} else {
|
||||||
|
n.status.Error(fmt.Sprintf("Got error reading from ninja: %s", err))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := &ninja_frontend.Status{}
|
||||||
|
err = proto.Unmarshal(buf, msg)
|
||||||
|
if err != nil {
|
||||||
|
n.status.Print(fmt.Sprintf("Error reading message from ninja: %v", err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
msgChan <- msg
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
size, err := readVarInt(r)
|
var msg *ninja_frontend.Status
|
||||||
if err != nil {
|
var msgOk bool
|
||||||
if err != io.EOF {
|
select {
|
||||||
n.status.Error(fmt.Sprintf("Got error reading from ninja: %s", err))
|
case <-n.forceClose:
|
||||||
}
|
// Close() has been called, but the reader goroutine didn't get EOF after 5 seconds
|
||||||
return
|
break
|
||||||
|
case msg, msgOk = <-msgChan:
|
||||||
|
// msg is ready or closed
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, size)
|
if !msgOk {
|
||||||
_, err = io.ReadFull(r, buf)
|
// msgChan is closed
|
||||||
if err != nil {
|
break
|
||||||
if err == io.EOF {
|
|
||||||
n.status.Print(fmt.Sprintf("Missing message of size %d from ninja\n", size))
|
|
||||||
} else {
|
|
||||||
n.status.Error(fmt.Sprintf("Got error reading from ninja: %s", err))
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := &ninja_frontend.Status{}
|
|
||||||
err = proto.Unmarshal(buf, msg)
|
|
||||||
if err != nil {
|
|
||||||
n.status.Print(fmt.Sprintf("Error reading message from ninja: %v", err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ignore msg.BuildStarted
|
// Ignore msg.BuildStarted
|
||||||
if msg.TotalEdges != nil {
|
if msg.TotalEdges != nil {
|
||||||
n.status.SetTotalActions(int(msg.TotalEdges.GetTotalEdges()))
|
n.status.SetTotalActions(int(msg.TotalEdges.GetTotalEdges()))
|
||||||
|
Reference in New Issue
Block a user