commit f728a1471e9389fe2ba31192f295e4224b1f2e3b
parent a66ad50c693dd549728431f8492f27d5c128b9f3
Author: Matsuda Kenji <info@mtkn.jp>
Date: Sun, 7 Jan 2024 12:01:30 +0900
add comment on context
Diffstat:
2 files changed, 18 insertions(+), 13 deletions(-)
diff --git a/client/client.go b/client/client.go
@@ -24,9 +24,6 @@ type Client struct {
// FPool is the fidPool which hold the list of opend fids.
fPool *fidPool
- // RPool is the list of outstanding requests.
- rPool *reqPool
-
// Txc is used to send a reqest to the multiplexer goroutine
txc chan<- *req
@@ -50,8 +47,8 @@ type Client struct {
done <-chan struct{}
}
-// NewClient creates a Client.
-// It also runs several goroutines to handle requests.
+// NewClient creates a Client and prepare to transact with a server via r and w.
+// It runs several goroutines to handle requests.
// And the returned client should be stopped by cancelling ctx afterwards.
func NewClient(ctx context.Context, mSize uint32, uname string, r io.Reader, w io.Writer) *Client {
c := &Client{
@@ -59,7 +56,6 @@ func NewClient(ctx context.Context, mSize uint32, uname string, r io.Reader, w i
mSizeLock: new(sync.Mutex),
uname: uname,
fPool: allocClientFidPool(),
- rPool: newReqPool(),
errc: make(chan error),
wg: new(sync.WaitGroup),
done: ctx.Done(),
@@ -196,6 +192,7 @@ func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- lib9p.Msg {
func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- lib9p.Msg, rmsgc <-chan lib9p.Msg) chan<- *req {
c.wg.Add(2)
txc := make(chan *req)
+ rPool := newReqPool()
// Rmsg
go func() {
defer func() {
@@ -209,19 +206,19 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- lib9p.Msg, rms
if !ok {
return
}
- r, ok := c.rPool.lookup(msg.GetTag())
+ r, ok := rPool.lookup(msg.GetTag())
if !ok {
c.errc <- fmt.Errorf("mux: unknown tag for msg: %v", msg)
continue
}
- c.rPool.delete(msg.GetTag())
+ rPool.delete(msg.GetTag())
if tflush, ok := r.tmsg.(*lib9p.TFlush); ok {
if _, ok := msg.(*lib9p.RFlush); !ok {
r.errc <- fmt.Errorf("mux: response to Tflush is not Rflush")
}
- if oldreq, ok := c.rPool.lookup(tflush.Oldtag); ok {
+ if oldreq, ok := rPool.lookup(tflush.Oldtag); ok {
oldreq.errc <- errors.New("request flushed")
- c.rPool.delete(tflush.Oldtag)
+ rPool.delete(tflush.Oldtag)
}
}
r.rmsg = msg
@@ -243,11 +240,11 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- lib9p.Msg, rms
case <-ctx.Done():
return
case r := <-txc:
- if _, ok := c.rPool.lookup(r.tag); ok {
+ if _, ok := rPool.lookup(r.tag); ok {
r.errc <- fmt.Errorf("mux: duplicate tag: %d", r.tag)
continue
}
- c.rPool.add(r)
+ rPool.add(r)
select {
case tmsgc <- r.tmsg:
case <-ctx.Done():
@@ -278,6 +275,15 @@ func (c *Client) transact(tmsg lib9p.Msg) (lib9p.Msg, error) {
}
}
+// Version sends Tversion message to the server and returns the resulting
+// data of Rversion or non nil error if any.
+// This function and other Tmessage functions don't have a context.Contex
+// as their argument.
+// The caller can call *Client.Flush to cancel a pending request if the
+// connection to the server is helthy.
+// And even if the connection has some problem sending/recieving, there
+// is no way to cancel blocking reads/writes. In this case, the caller
+// can close the connection.
func (c *Client) Version(tag uint16, msize uint32, version string) (uint32, string, error) {
tmsg := &lib9p.TVersion{Tag: tag, Msize: msize, Version: version}
rmsg, err := c.transact(tmsg)
diff --git a/client/client_test.go b/client/client_test.go
@@ -88,7 +88,6 @@ func newClientForTest(ctx context.Context, msize uint32, uname string) (*Client,
mSizeLock: new(sync.Mutex),
uname: uname,
fPool: allocClientFidPool(),
- rPool: newReqPool(),
errc: make(chan error),
wg: new(sync.WaitGroup),
}