commit bf6cc12d2755fafe0f3fb4967fc88f04b1ebff4d
parent 2212645c2192d583d93a5ad9b12ce48d4b278475
Author: Matsuda Kenji <info@mtkn.jp>
Date: Sat, 21 Oct 2023 07:53:26 +0900
change the manager of message tag
Diffstat:
| M | client.go | | | 102 | ++++++++++++++++++++++++++++++++++++++++++++----------------------------------- |
| M | client2.go | | | 2 | +- |
| M | req.go | | | 9 | +++++++++ |
3 files changed, 67 insertions(+), 46 deletions(-)
diff --git a/client.go b/client.go
@@ -118,6 +118,10 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-c
case <-ctx.Done():
return
case req := <-reqc:
+ if _, ok := rPool[req.tag]; ok {
+ c.errc <- fmt.Errorf("mux: duplicate tag: %d", req.tag)
+ continue
+ }
rPool[req.tag] = req
case msg := <-rmsgc:
req, ok := rPool[msg.Tag()]
@@ -156,12 +160,7 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-c
return txc
}
-func (c *Client) transact(ctx context.Context, tmsg Msg) (Msg, error) {
- req, err := c.rPool.add(ctx, tmsg)
- if err != nil {
- return nil, fmt.Errorf("add to reqpool: %v", err)
- }
- defer c.rPool.delete(req.tag)
+func (c *Client) transact(ctx context.Context, req *clientReq) (Msg, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
@@ -175,9 +174,10 @@ func (c *Client) transact(ctx context.Context, tmsg Msg) (Msg, error) {
}
}
-func (c *Client) Version(ctx context.Context, mSize uint32, version string) (uint32, string, error) {
- tmsg := &TVersion{mSize: mSize, version: version}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Version(ctx context.Context, tag uint16, mSize uint32, version string) (uint32, string, error) {
+ tmsg := &TVersion{tag: tag, mSize: mSize, version: version}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return 0, "", fmt.Errorf("transact: %v", err)
}
@@ -191,9 +191,10 @@ func (c *Client) Version(ctx context.Context, mSize uint32, version string) (uin
}
}
-func (c *Client) Auth(ctx context.Context, afid uint32, uname, aname string) (Qid, error) {
- tmsg := &TAuth{afid: afid, uname: uname}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Auth(ctx context.Context, tag uint16, afid uint32, uname, aname string) (Qid, error) {
+ tmsg := &TAuth{tag: tag, afid: afid, uname: uname}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return Qid{}, fmt.Errorf("transact: %v", err)
}
@@ -207,9 +208,10 @@ func (c *Client) Auth(ctx context.Context, afid uint32, uname, aname string) (Qi
}
}
-func (c *Client) Attach(ctx context.Context, fid, afid uint32, uname, aname string) (Qid, error) {
- tmsg := &TAttach{fid: fid, afid: afid, uname: uname, aname: aname}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Attach(ctx context.Context, tag uint16, fid, afid uint32, uname, aname string) (Qid, error) {
+ tmsg := &TAttach{tag: tag, fid: fid, afid: afid, uname: uname, aname: aname}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return Qid{}, fmt.Errorf("transact: %v", err)
}
@@ -223,9 +225,10 @@ func (c *Client) Attach(ctx context.Context, fid, afid uint32, uname, aname stri
}
}
-func (c *Client) Flush(ctx context.Context, oldtag uint16) error {
- tmsg := &TFlush{oldtag: oldtag}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Flush(ctx context.Context, tag, oldtag uint16) error {
+ tmsg := &TFlush{tag: tag, oldtag: oldtag}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return fmt.Errorf("transact: %v", err)
}
@@ -238,9 +241,10 @@ func (c *Client) Flush(ctx context.Context, oldtag uint16) error {
return fmt.Errorf("invalid reply: %v", rmsg)
}
}
-func (c *Client) Walk(ctx context.Context, fid, newFid uint32, wname []string) (wqid []Qid, err error) {
- tmsg := &TWalk{fid: fid, newFid: newFid, wname: wname}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Walk(ctx context.Context, tag uint16, fid, newFid uint32, wname []string) (wqid []Qid, err error) {
+ tmsg := &TWalk{tag: tag, fid: fid, newFid: newFid, wname: wname}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return nil, fmt.Errorf("transact: %v", err)
}
@@ -253,9 +257,10 @@ func (c *Client) Walk(ctx context.Context, fid, newFid uint32, wname []string) (
return nil, fmt.Errorf("invalid reply: %v", rmsg)
}
}
-func (c *Client) Open(ctx context.Context, fid uint32, mode OpenMode) (qid Qid, iounit uint32, err error) {
- tmsg := &TOpen{fid: fid, mode: mode}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Open(ctx context.Context, tag uint16, fid uint32, mode OpenMode) (qid Qid, iounit uint32, err error) {
+ tmsg := &TOpen{tag: tag, fid: fid, mode: mode}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return Qid{}, 0, fmt.Errorf("transact: %v", err)
}
@@ -268,9 +273,10 @@ func (c *Client) Open(ctx context.Context, fid uint32, mode OpenMode) (qid Qid,
return Qid{}, 0, fmt.Errorf("invalid reply: %v", rmsg)
}
}
-func (c *Client) Create(ctx context.Context, fid uint32, name string, perm FileMode, mode OpenMode) (Qid, uint32, error) {
- tmsg := &TCreate{fid: fid, name: name, perm: perm, mode: mode}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Create(ctx context.Context, tag uint16, fid uint32, name string, perm FileMode, mode OpenMode) (Qid, uint32, error) {
+ tmsg := &TCreate{tag: tag, fid: fid, name: name, perm: perm, mode: mode}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return Qid{}, 0, fmt.Errorf("transact: %v", err)
}
@@ -283,9 +289,10 @@ func (c *Client) Create(ctx context.Context, fid uint32, name string, perm FileM
return Qid{}, 0, fmt.Errorf("invalid reply: %v", rmsg)
}
}
-func (c *Client) Read(ctx context.Context, fid uint32, offset uint64, count uint32) (data []byte, err error) {
- tmsg := &TRead{fid: fid, offset: offset, count: count}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Read(ctx context.Context, tag uint16, fid uint32, offset uint64, count uint32) (data []byte, err error) {
+ tmsg := &TRead{tag: tag, fid: fid, offset: offset, count: count}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return nil, fmt.Errorf("transact: %v", err)
}
@@ -298,9 +305,10 @@ func (c *Client) Read(ctx context.Context, fid uint32, offset uint64, count uint
return nil, fmt.Errorf("invalid reply: %v", rmsg)
}
}
-func (c *Client) Write(ctx context.Context, fid uint32, offset uint64, count uint32, data []byte) (uint32, error) {
- tmsg := &TWrite{fid: fid, offset: offset, count: count, data: data}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Write(ctx context.Context, tag uint16, fid uint32, offset uint64, count uint32, data []byte) (uint32, error) {
+ tmsg := &TWrite{tag: tag, fid: fid, offset: offset, count: count, data: data}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return 0, fmt.Errorf("transact: %v", err)
}
@@ -313,9 +321,10 @@ func (c *Client) Write(ctx context.Context, fid uint32, offset uint64, count uin
return 0, fmt.Errorf("invalid reply: %v", rmsg)
}
}
-func (c *Client) Clunk(ctx context.Context, fid uint32) error {
- tmsg := &TClunk{fid: fid}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Clunk(ctx context.Context, tag uint16, fid uint32) error {
+ tmsg := &TClunk{tag: tag, fid: fid}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return fmt.Errorf("transact: %v", err)
}
@@ -328,9 +337,10 @@ func (c *Client) Clunk(ctx context.Context, fid uint32) error {
return fmt.Errorf("invalid reply: %v", rmsg)
}
}
-func (c *Client) Remove(ctx context.Context, fid uint32) error {
- tmsg := &TRemove{fid: fid}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Remove(ctx context.Context, tag uint16, fid uint32) error {
+ tmsg := &TRemove{tag: tag, fid: fid}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return fmt.Errorf("transact: %v", err)
}
@@ -343,9 +353,10 @@ func (c *Client) Remove(ctx context.Context, fid uint32) error {
return fmt.Errorf("invalid reply: %v", rmsg)
}
}
-func (c *Client) Stat(ctx context.Context, fid uint32) (*Stat, error) {
- tmsg := &TStat{fid: fid}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Stat(ctx context.Context, tag uint16, fid uint32) (*Stat, error) {
+ tmsg := &TStat{tag: tag, fid: fid}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return nil, fmt.Errorf("transact: %v", err)
}
@@ -358,9 +369,10 @@ func (c *Client) Stat(ctx context.Context, fid uint32) (*Stat, error) {
return nil, fmt.Errorf("invalid reply: %v", rmsg)
}
}
-func (c *Client) Wstat(ctx context.Context, fid uint32, stat *Stat) error {
- tmsg := &TWStat{fid: fid, stat: stat}
- rmsg, err := c.transact(ctx, tmsg)
+func (c *Client) Wstat(ctx context.Context, tag uint16, fid uint32, stat *Stat) error {
+ tmsg := &TWStat{tag: tag, fid: fid, stat: stat}
+ req := newClientReq(ctx, tmsg)
+ rmsg, err := c.transact(ctx, req)
if err != nil {
return fmt.Errorf("transact: %v", err)
}
diff --git a/client2.go b/client2.go
@@ -69,7 +69,7 @@ func Mount(r io.Reader, w io.Writer, uname, aname string) (fs *Client, err error
c.Stop()
}
}()
- rmSize, rver, err := c.Version(ctx, mSize, version)
+ rmSize, rver, err := c.Version(ctx, 0, mSize, version)
if err != nil {
return nil, fmt.Errorf("version: %v", err)
}
diff --git a/req.go b/req.go
@@ -81,6 +81,15 @@ type clientReq struct {
ctxDone <-chan struct{}
}
+func newClientReq(ctx context.Context, msg Msg) *clientReq {
+ return &clientReq{
+ tag: msg.Tag(),
+ tmsg: msg,
+ rxc: make(chan *clientReq),
+ ctxDone: ctx.Done(),
+ }
+}
+
type clientReqPool struct {
m map[uint16]*clientReq
lock *sync.Mutex