commit 568032591eaf175f7d5b6cfc4a20e0e927fba94a
parent 5678bec158a3bed7fc2b3c863933f09caf8f3194
Author: Matsuda Kenji <info@mtkn.jp>
Date: Mon, 9 Oct 2023 08:15:55 +0900
divide function
Diffstat:
| M | client.go | | | 86 | ++++++++++++++++++++++++------------------------------------------------------- |
1 file changed, 26 insertions(+), 60 deletions(-)
diff --git a/client.go b/client.go
@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
- "log"
"sync"
)
@@ -80,22 +79,20 @@ func (c *Client) runListener(ctx context.Context, r io.Reader) <-chan Msg {
}
func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- Msg {
- mc := make(chan Msg, 3)
+ tmsgc := make(chan Msg, 3)
go func() {
- defer close(mc)
for {
select {
case <-ctx.Done():
return
- case msg := <-mc:
+ case msg := <-tmsgc:
if err := send(msg, w); err != nil {
- // TODO: handle error
- log.Printf("send: %v", err)
+ c.errc <- fmt.Errorf("send: %v", err)
}
}
}
}()
- return mc
+ return tmsgc
}
func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-chan Msg) chan<- *clientReq {
@@ -128,10 +125,10 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-c
}
}
}(reqc)
-
// Tmsg
go func(reqc chan<- *clientReq) {
defer close(tmsgc)
+ defer close(reqc)
for {
select {
case <-ctx.Done():
@@ -144,78 +141,47 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-c
}
}
}(reqc)
-
return txc
}
-
-func (c *Client) Version(ctx context.Context, mSize uint32, version string) (Msg, error) {
- tmsg := &TVersion{
- mSize: mSize,
- version: version,
- }
+func (c *Client) transact(ctx context.Context, tmsg Msg) (Msg, error) {
req, err := c.rPool.add(ctx, tmsg)
if err != nil {
return nil, fmt.Errorf("add to reqpool: %v", err)
}
defer c.rPool.delete(req.tag)
- go func() {
- select {
- case <-ctx.Done():
- case c.txc <- req:
- }
- }()
select {
- case req = <-req.rxc:
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case c.txc <- req:
+ }
+ select {
+ case req = <-req.rxc: // TODO: this assignment is not required.
return req.rmsg, req.err
case <-ctx.Done():
return nil, ctx.Err()
}
}
-/*
-func (c *Client) Auth(ctx context.Context, afid uint32, uname, aname string) (*RAuth, error) {
- tmsg := &TAuth{afid: afid, uname: uname}
- rmsgc, errc := c.transact(ctx, tmsg)
- select {
- case rmsg := <-rmsgc:
- switch rmsg := rmsg.(type) {
- case *RAuth:
- return rmsg, nil
- case *RError:
- return nil, rmsg.ename
- default:
- return nil, fmt.Errorf("invalid R message: %v", rmsg)
- }
- case err := <-errc:
- return nil, err
- case <-ctx.Done():
- return nil, fmt.Errorf("wait transaction: %w", ctx.Err())
+func (c *Client) Version(ctx context.Context, mSize uint32, version string) (Msg, error) {
+ tmsg := &TVersion{
+ mSize: mSize,
+ version: version,
}
+ return c.transact(ctx, tmsg)
}
-func (c *Client) Attach(ctx context.Context, fid, afid uint32, uname, aname string) (*RAttach, error) {
+func (c *Client) Auth(ctx context.Context, afid uint32, uname, aname string) (Msg, error) {
+ tmsg := &TAuth{afid: afid, uname: uname}
+ return c.transact(ctx, tmsg)
+}
+
+func (c *Client) Attach(ctx context.Context, fid, afid uint32, uname, aname string) (Msg, error) {
tmsg := &TAttach{
fid: fid,
afid: afid,
uname: uname,
aname: aname,
}
- rmsgc, errc := c.transact(ctx, tmsg)
- select {
- case rmsg := <-rmsgc:
- switch rmsg := rmsg.(type) {
- case *RAttach:
- return rmsg, nil
- case *RError:
- return nil, rmsg.ename
- default:
- return nil, fmt.Errorf("invalid R message: %v", rmsg)
- }
- case err := <-errc:
- return nil, err
- case <-ctx.Done():
- return nil, fmt.Errorf("wait transaction: %w", ctx.Err())
- }
-}
-*/
-\ No newline at end of file
+ return c.transact(ctx, tmsg)
+}
+\ No newline at end of file