commit e4eab6cf9aacfdc7e5531ee3412ad06b6e0c705b
parent 856ece47bd11a90279f44643f86c2198fa055885
Author: Matsuda Kenji <info@mtkn.jp>
Date: Mon, 23 Oct 2023 08:30:09 +0900
fight against bug
Diffstat:
4 files changed, 80 insertions(+), 9 deletions(-)
diff --git a/client.go b/client.go
@@ -5,6 +5,8 @@ import (
"fmt"
"io"
"sync"
+
+ "log"
)
type Client struct {
@@ -44,6 +46,7 @@ func (c *Client) Stop() {
// TODO: check all goroutines are stopped.
c.wg.Wait()
close(c.errc)
+ log.Println("client stopped")
}
func (c *Client) mSize() uint32 {
@@ -73,14 +76,28 @@ func (c *Client) runListener(ctx context.Context, r io.Reader) <-chan Msg {
wg.Wait()
close(rmsgc)
c.wg.Done()
+ log.Println("listener stopped")
}()
for {
select {
case <-ctx.Done():
// TODO: should return error via ec??
+ // TODO: should close r?
return
default:
- msg, err := recv(r)
+ done := make(chan struct{})
+ var (
+ msg Msg
+ err error
+ )
+ go func () {
+ defer close(done)
+ msg, err = recv(r)
+ }()
+ select {
+ case <-done:
+ case <-ctx.Done():
+ }
if err != nil {
c.errc <- fmt.Errorf("recv: %v", err)
continue
@@ -108,12 +125,18 @@ func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- Msg {
c.wg.Add(1)
tmsgc := make(chan Msg, 3)
go func() {
+ defer log.Println("speaker stopped")
defer c.wg.Done()
for {
select {
case <-ctx.Done():
return
case msg := <-tmsgc:
+ if msg == nil {
+ // tmsgc is closed, which means ctx.Done() is also closed.
+ // but this code breaks semantics?
+ return
+ }
if err := send(msg, w); err != nil {
c.errc <- fmt.Errorf("send: %v", err)
}
@@ -138,19 +161,37 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-c
reqc := make(chan *clientReq)
// Rmsg
go func(reqc <-chan *clientReq) {
- defer c.wg.Done()
+ wg := new(sync.WaitGroup)
+ defer func() {
+ wg.Wait()
+ c.wg.Done()
+ log.Println("rmsg stopped")
+ }()
rPool := make(map[uint16]*clientReq)
for {
select {
case <-ctx.Done():
return
case req := <-reqc:
+ if req == nil {
+ // ctx is canceled.
+ continue
+ }
if _, ok := rPool[req.tag]; ok {
c.errc <- fmt.Errorf("mux: duplicate tag: %d", req.tag)
continue
}
- rPool[req.tag] = req
+ rPool[req.tag] = req // TODO: wait for req.ctxDone channel.
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-req.ctxDone
+ }()
case msg := <-rmsgc:
+ if msg == nil {
+ // ctx is canceled.
+ continue
+ }
req, ok := rPool[msg.Tag()]
if !ok {
c.errc <- fmt.Errorf("mux: unknown tag for msg: %v", msg)
@@ -170,16 +211,27 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-c
}(reqc)
// Tmsg
go func(reqc chan<- *clientReq) {
- defer c.wg.Done()
- defer close(tmsgc)
- defer close(reqc)
+ wg := new(sync.WaitGroup)
+ defer func() {
+ wg.Wait()
+ close(reqc)
+ close(tmsgc)
+ c.wg.Done()
+ log.Println("tmsg stopped")
+ }()
for {
select {
case <-ctx.Done():
return
case req := <-txc:
- reqc <- req
+ select {
+ case reqc <- req:
+ case <-ctx.Done():
+ return
+ }
+ wg.Add(1)
go func() {
+ defer wg.Done()
tmsgc <- req.tmsg
}()
}
@@ -191,7 +243,8 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-c
// Transact send 9P Msg of req to the multiplexer goroutines and recieves
// the reply.
func (c *Client) transact(ctx context.Context, tmsg Msg) (Msg, error) {
- req := newClientReq(ctx, tmsg)
+ ctx1, cancel1 := context.WithCancel(ctx)
+ req := newClientReq(ctx1, tmsg)
select {
case <-ctx.Done():
return nil, ctx.Err()
@@ -199,6 +252,7 @@ func (c *Client) transact(ctx context.Context, tmsg Msg) (Msg, error) {
}
select {
case req = <-req.rxc: // TODO: this assignment is not required.
+ cancel1()
return req.rmsg, req.err
case <-ctx.Done():
return nil, ctx.Err()
diff --git a/req.go b/req.go
@@ -21,6 +21,7 @@ type Req struct {
func (r *Req) flush() {
// TODO: need mutex?
+ // BUG: cancel() can be nil.
r.cancel()
r.pool.delete(r.tag)
// wait for tag being deleted.
diff --git a/req_test.go b/req_test.go
@@ -20,6 +20,7 @@ func TestFlush(t *testing.T) {
defer cr.Close()
defer sw.Close()
s := NewServer(fsys, mSize, sr, sw)
+ s.Chatty()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Serve(ctx)
@@ -69,16 +70,23 @@ func TestFlush(t *testing.T) {
c.tPool.delete(tag0)
close(done)
}()
+ // I want to send Tflush message after sending Tread and before recieving
+ // Rread. But I think this method is not relyable.
+ //time.Sleep(10 * time.Millisecond)
tag, err = c.tPool.add()
if err != nil {
t.Fatalf("add tag: %v", err)
}
+ t.Log(c.tPool)
err = c.Flush(ctx, tag, tag0)
c.tPool.delete(tag)
if err != nil {
t.Errorf("flush: %v", err)
}
+ t.Log(c.tPool)
cancel1()
+ t.Log(c.tPool)
t.Logf("canceled: %v", <-done)
t.Logf("read data: %v, err: %v", data, err)
+ t.Log(c.tPool)
}
diff --git a/server.go b/server.go
@@ -254,6 +254,14 @@ func rAttach(r *Req, err error) {
}
func sFlush(ctx context.Context, s *Server, r *Req) {
+ // BUG: if the oldTag does not exists and client sends another message
+ // before the server sends Rflush message, and getReq() adds the
+ // newly arrived message to the rPool, and then sFlush lookup()s the
+ // updated rPool, this lookup() can return the newly arrived req instead
+ // of nil. In this case, the server will flush the newly arrived message
+ // or if rFlush is called before the r.cancel is defined, the server will
+ // panic.
+ r.oldReq can be the newly arrived request.
ifcall := r.ifcall.(*TFlush)
r.oldReq, _ = s.rPool.lookup(ifcall.OldTag())
respond(ctx, r, nil)
@@ -871,7 +879,7 @@ L:
go func() {
switch r.ifcall.(type) {
default:
- respond(ctx, r, fmt.Errorf("unknown message type: %d", r.ifcall.Type()))
+ respond(ctx1, r, fmt.Errorf("unknown message type: %d", r.ifcall.Type()))
case *TVersion:
sVersion(ctx1, s, r)
case *TAuth: