commit 1f669ac5ea4f72b502bd7220e26b356a843747aa
parent 4bcdaf78db4a1251b26c1b1b7780c85d381a7de2
Author: Matsuda Kenji <info@mtkn.jp>
Date: Mon, 9 Oct 2023 07:53:37 +0900
pass *clientReq between client workers
Diffstat:
| M | client.go | | | 140 | ++++++++++++++++++++++++++++++++++++++++++++++++------------------------------- |
| M | client_test.go | | | 8 | ++++---- |
| M | req.go | | | 39 | +++++++++++++++++++++++++++++++++++---- |
3 files changed, 125 insertions(+), 62 deletions(-)
diff --git a/client.go b/client.go
@@ -14,8 +14,14 @@ type Client struct {
uname string
fPool *FidPool
rPool *clientReqPool
- tmsgc chan<- Msg
- rerrc <-chan error
+ txc chan<- *clientReq
+ errc chan error
+}
+
+type LibErr string
+func (e LibErr) Error() string { return string(e) }
+func newLibErrMsg(format string, a ...any) *RError {
+ return &RError{tag: NOTAG, ename: fmt.Errorf(format, a...)}
}
func NewClient(mSize uint32, uname string, r io.Reader, w io.Writer) *Client {
@@ -25,9 +31,11 @@ func NewClient(mSize uint32, uname string, r io.Reader, w io.Writer) *Client {
uname: uname,
fPool: allocFidPool(),
rPool: newClientReqPool(),
+ errc: make(chan error),
}
- c.tmsgc = c.runSpeaker(context.TODO(), w)
- c.rerrc = c.runListener(context.TODO(), r)
+ tmsgc := c.runSpeaker(context.TODO(), w)
+ rmsgc := c.runListener(context.TODO(), r)
+ c.txc = c.runMultiplexer(context.TODO(), tmsgc, rmsgc)
return c
}
@@ -43,11 +51,11 @@ func (c *Client) setMSize(mSize uint32) {
c.msize = mSize
}
-func (c *Client) runListener(ctx context.Context, r io.Reader) <-chan error {
+func (c *Client) runListener(ctx context.Context, r io.Reader) <-chan Msg {
// TODO: terminate with ctx.Done()
- ec := make(chan error)
+ rmsgc := make(chan Msg, 3)
go func() {
- defer close(ec)
+ defer close(rmsgc)
for {
select {
case <-ctx.Done():
@@ -56,30 +64,23 @@ func (c *Client) runListener(ctx context.Context, r io.Reader) <-chan error {
default:
msg, err := recv(r)
if err != nil {
- ec <- err
+ c.errc <- fmt.Errorf("recv: %v", err)
continue
}
- req, ok := c.rPool.lookup(msg.Tag())
- if !ok {
- ec <- fmt.Errorf("unknown tag: %d", msg.Tag())
- }
go func() {
- defer close(req.rmsgc)
- defer c.rPool.delete(msg.Tag())
select {
- case req.rmsgc <- msg:
+ case rmsgc <- msg:
case <-ctx.Done():
- case <-req.ctxDone:
}
}()
}
}
}()
- return ec
+ return rmsgc
}
func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- Msg {
- mc := make(chan Msg)
+ mc := make(chan Msg, 3)
go func() {
defer close(mc)
for {
@@ -88,11 +89,8 @@ func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- Msg {
return
case msg := <-mc:
if err := send(msg, w); err != nil {
- req, ok := c.rPool.lookup(msg.Tag())
- if !ok {
- log.Printf("speaker: req not found. err: %v", err)
- }
- req.errc <- err
+ // TODO: handle error
+ log.Printf("send: %v", err)
}
}
}
@@ -100,48 +98,81 @@ func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- Msg {
return mc
}
-// tag of tmsg is managed by the library.
-func (c *Client) transact(ctx context.Context, tmsg Msg) (<-chan Msg, <-chan error) {
+func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-chan Msg) chan<- *clientReq {
+ txc := make(chan *clientReq)
+ reqc := make(chan *clientReq)
+ // Rmsg
+ go func(reqc <-chan *clientReq) {
+ rPool := make(map[uint16]*clientReq)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case req := <-reqc:
+ rPool[req.tag] = req
+ case msg := <-rmsgc:
+ req, ok := rPool[msg.Tag()]
+ if !ok {
+ c.errc <- fmt.Errorf("mux: unknown tag for msg: %v", msg)
+ continue
+ }
+ delete(rPool, msg.Tag())
+ req.rmsg = msg
+ go func() {
+ defer close(req.rxc)
+ select{
+ case <-req.ctxDone:
+ case req.rxc <- req:
+ }
+ }()
+ }
+ }
+ }(reqc)
+
+ // Tmsg
+ go func(reqc chan<- *clientReq) {
+ defer close(tmsgc)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case req := <- txc:
+ reqc <- req
+ go func() {
+ tmsgc <- req.tmsg
+ }()
+ }
+ }
+ }(reqc)
+
+ return txc
+}
+
+
+func (c *Client) Version(ctx context.Context, mSize uint32, version string) (Msg, error) {
+ tmsg := &TVersion{
+ mSize: mSize,
+ version: version,
+ }
req, err := c.rPool.add(ctx, tmsg)
if err != nil {
- errc := make(chan error, 1)
- defer close(errc)
- errc <- fmt.Errorf("add clientReq: %v", err)
- return nil, errc
+ return nil, fmt.Errorf("add to reqpool: %v", err)
}
go func() {
select {
- case c.tmsgc <- tmsg:
case <-ctx.Done():
+ case c.txc <- req:
}
}()
- return req.rmsgc, req.errc
-}
-
-func (c *Client) Version(ctx context.Context, mSize uint32, version string) (*RVersion, error) {
- tmsg := &TVersion{
- mSize: mSize,
- version: version,
- }
- rmsgc, errc := c.transact(ctx, tmsg)
select {
- case rmsg := <-rmsgc:
- switch rmsg := rmsg.(type) {
- case *RVersion:
- return rmsg, nil
- case *RError:
- return nil, rmsg.ename
- default:
- return nil, fmt.Errorf("invalid R message: %v", rmsg)
- }
- case err := <-errc:
- return nil, err
+ case req = <-req.rxc:
+ return req.rmsg, req.err
case <-ctx.Done():
- return nil, fmt.Errorf("wait transaction: %w", ctx.Err())
+ return nil, ctx.Err()
}
}
-
+/*
func (c *Client) Auth(ctx context.Context, afid uint32, uname, aname string) (*RAuth, error) {
tmsg := &TAuth{afid: afid, uname: uname}
rmsgc, errc := c.transact(ctx, tmsg)
@@ -185,4 +216,5 @@ func (c *Client) Attach(ctx context.Context, fid, afid uint32, uname, aname stri
case <-ctx.Done():
return nil, fmt.Errorf("wait transaction: %w", ctx.Err())
}
-}
-\ No newline at end of file
+}
+*/
+\ No newline at end of file
diff --git a/client_test.go b/client_test.go
@@ -23,8 +23,7 @@ func newClientForTest() (*Client, chan<- Msg, chan<- error) {
uname: uname,
fPool: allocFidPool(),
rPool: newClientReqPool(),
- tmsgc: tmsgc,
- rerrc: rerrc,
+ txc: make(chan *clientReq, 1),
}
return c, tmsgc, rerrc
}
@@ -56,7 +55,7 @@ func dummyTransact(test msgTest) (gotmsg Msg, goterr error) {
for !ok {
req, ok = c.rPool.lookup(NOTAG)
}
- req.rmsgc <- test.reply
+ req.rxc <- test.reply
close(req.errc)
close(req.rmsgc)
@@ -90,7 +89,7 @@ func TestClientVersion(t *testing.T) {
}
}
}
-
+/*
// the following tests are test for both client and server.
// should be moved to propper file.
func TestTransaction(t *testing.T) {
@@ -122,6 +121,7 @@ func TestTransaction(t *testing.T) {
t.Log(rattach)
}
}
+*/
func TestVersion(t *testing.T) {
const (
diff --git a/req.go b/req.go
@@ -62,14 +62,45 @@ func (rp *ReqPool) delete(tag uint16) {
delete(rp.m, tag)
}
+type clientReqKeyType int
+var clientReqKey clientReqKeyType
+type keyType int
+const (
+ msgKey keyType = iota
+ rmsgcKey
+)
+func ctxWithMsgAndRmsgc(ctx context.Context, m Msg) (context.Context, <-chan Msg) {
+ rmsgc := make(chan Msg)
+ return context.WithValue(context.WithValue(ctx, msgKey, m), rmsgcKey, rmsgc), rmsgc
+}
+func msgFromCtx(ctx context.Context) (Msg, bool) {
+ m, ok := ctx.Value(msgKey).(Msg)
+ return m, ok
+}
+func rmsgcFromCtx(ctx context.Context) (chan<- Msg, bool) {
+ c, ok := ctx.Value(rmsgcKey).(chan Msg)
+ return c, ok
+}
+
type clientReq struct {
tag uint16
pool *clientReqPool
- rmsgc chan Msg
- errc chan error
+ tmsg Msg
+ rmsg Msg
+ err error
+ rxc chan *clientReq
ctxDone <-chan struct{}
}
+func ctxWithClientReq(ctx context.Context, r *clientReq) context.Context {
+ return context.WithValue(ctx, clientReqKey, r)
+}
+
+func clientReqFromCtx(ctx context.Context) (*clientReq, bool) {
+ r, ok := ctx.Value(clientReqKey).(*clientReq)
+ return r, ok
+}
+
type clientReqPool struct {
m map[uint16]*clientReq
lock *sync.Mutex
@@ -112,8 +143,8 @@ func (rp *clientReqPool) add(ctx context.Context, msg Msg) (*clientReq, error) {
req := &clientReq{
tag: tag,
pool: rp,
- rmsgc: make(chan Msg),
- errc: make(chan error),
+ tmsg: msg,
+ rxc: make(chan *clientReq),
ctxDone: ctx.Done(),
}
rp.m[tag] = req