commit 856ece47bd11a90279f44643f86c2198fa055885
parent 071b093176846afbb759f0a70b051c1dfdeaa731
Author: Matsuda Kenji <info@mtkn.jp>
Date: Sun, 22 Oct 2023 10:23:12 +0900
add sync.WaitGroup to listener goroutine
Diffstat:
4 files changed, 22 insertions(+), 3 deletions(-)
diff --git a/client.go b/client.go
@@ -17,6 +17,7 @@ type Client struct {
errc chan error
cancel context.CancelFunc
rootFid *clientFid
+ wg *sync.WaitGroup
}
func NewClient(mSize uint32, uname string, r io.Reader, w io.Writer) *Client {
@@ -29,6 +30,7 @@ func NewClient(mSize uint32, uname string, r io.Reader, w io.Writer) *Client {
tPool: newTagPool(),
errc: make(chan error),
cancel: cancel,
+ wg: new(sync.WaitGroup),
}
tmsgc := c.runSpeaker(ctx, w)
rmsgc := c.runListener(ctx, r)
@@ -40,7 +42,8 @@ func (c *Client) Stop() {
// TODO: fPool, rPool
c.cancel()
// TODO: check all goroutines are stopped.
- // close(c.errc)
+ c.wg.Wait()
+ close(c.errc)
}
func (c *Client) mSize() uint32 {
@@ -61,10 +64,16 @@ func (c *Client) setMSize(mSize uint32) {
// Listener goroutine returns when ctx is canceled.
// Listener goroutine reports errors to the client's errc channel.
func (c *Client) runListener(ctx context.Context, r io.Reader) <-chan Msg {
+ c.wg.Add(1)
// TODO: terminate with ctx.Done()
rmsgc := make(chan Msg, 3)
go func() {
- defer close(rmsgc)
+ wg := new(sync.WaitGroup)
+ defer func(){
+ wg.Wait()
+ close(rmsgc)
+ c.wg.Done()
+ }()
for {
select {
case <-ctx.Done():
@@ -76,7 +85,9 @@ func (c *Client) runListener(ctx context.Context, r io.Reader) <-chan Msg {
c.errc <- fmt.Errorf("recv: %v", err)
continue
}
+ wg.Add(1)
go func() {
+ defer wg.Done()
select {
case rmsgc <- msg:
case <-ctx.Done():
@@ -94,8 +105,10 @@ func (c *Client) runListener(ctx context.Context, r io.Reader) <-chan Msg {
// It reports any errors to the clients errc channel.
// It returnes when ctx is canceled.
func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- Msg {
+ c.wg.Add(1)
tmsgc := make(chan Msg, 3)
go func() {
+ defer c.wg.Done()
for {
select {
case <-ctx.Done():
@@ -120,10 +133,12 @@ func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- Msg {
// *clientReq.rxc.
// It reports any errors to the client's errc channel.
func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-chan Msg) chan<- *clientReq {
+ c.wg.Add(2)
txc := make(chan *clientReq)
reqc := make(chan *clientReq)
// Rmsg
go func(reqc <-chan *clientReq) {
+ defer c.wg.Done()
rPool := make(map[uint16]*clientReq)
for {
select {
@@ -155,6 +170,7 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-c
}(reqc)
// Tmsg
go func(reqc chan<- *clientReq) {
+ defer c.wg.Done()
defer close(tmsgc)
defer close(reqc)
for {
diff --git a/client_test.go b/client_test.go
@@ -22,6 +22,7 @@ func newClientForTest(msize uint32, uname string, tmsgc chan<- Msg, rmsgc <-chan
tPool: newTagPool(),
errc: make(chan error),
cancel: cancel,
+ wg: new(sync.WaitGroup),
}
c.txc = c.runMultiplexer(ctx, tmsgc, rmsgc)
return c
diff --git a/req.go b/req.go
@@ -81,6 +81,8 @@ type clientReq struct {
ctxDone <-chan struct{}
}
+// TODO: passing ctx is confusing?
+// it only needs the done channel.
func newClientReq(ctx context.Context, msg Msg) *clientReq {
return &clientReq{
tag: msg.Tag(),
diff --git a/test_fs.go b/test_fs.go
@@ -9,7 +9,7 @@ import (
"time"
)
-const sleepTime = 100 * time.Second
+const sleepTime = 1 * time.Second
type testFile struct {
fsys *testFS