commit 7d78089531ea3268e91a69014d9f72c7e3f616d0
parent 65a9b341699f585edfce4a150914284ea6b61224
Author: Matsuda Kenji <info@mtkn.jp>
Date: Fri, 29 Sep 2023 15:26:54 +0900
add listener and speaker for client
Diffstat:
| M | client.go | | | 61 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++------ |
1 file changed, 55 insertions(+), 6 deletions(-)
diff --git a/client.go b/client.go
@@ -8,29 +8,78 @@ import (
)
type Client struct {
- mSize uint32
+ msize uint32
+ mSizeLock *sync.Mutex
uname string
fPool *FidPool
rPool *ReqPool
reader io.Reader
- rlock *sync.Mutex
writer io.Writer
- wlock *sync.Mutex
}
func NewClient(mSize uint32, uname string, r io.Reader, w io.Writer) *Client {
return &Client{
- mSize: mSize,
+ msize: mSize,
+ mSizeLock: new(sync.Mutex),
uname: uname,
fPool: allocFidPool(),
rPool: allocReqPool(),
reader: r,
- rlock: new(sync.Mutex),
writer: w,
- wlock: new(sync.Mutex),
}
}
+func (c *Client) mSize() uint32 {
+ c.mSizeLock.Lock()
+ defer c.mSizeLock.Unlock()
+ return c.msize
+}
+
+func (c *Client) setMSize(mSize uint32) {
+ c.mSizeLock.Lock()
+ defer c.mSizeLock.Unlock()
+ c.msize = mSize
+}
+
+func (c *Client) runListnener(ctx context.Context, r io.Reader) (<-chan Msg, <-chan error) {
+ // TODO: terminate with ctx.Done()
+ mc := make(chan Msg)
+ ec := make(chan error)
+ go func() {
+ defer close(mc)
+ defer close(ec)
+ for {
+ msg, err := recv(r)
+ if err != nil {
+ ec <- err
+ continue
+ }
+ mc <- msg
+ }
+ }()
+ return mc, ec
+}
+
+func (c *Client) runSpeaker(ctx context.Context, w io.Writer) (chan<- Msg, <-chan error) {
+ mc := make(chan Msg)
+ ec := make(chan error)
+ go func() {
+ defer close(mc)
+ defer close(ec)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case msg := <- mc:
+ if err := send(msg, w); err != nil {
+ ec <- err
+ }
+ }
+ }
+ }()
+ return mc, ec
+}
+
func transact(ctx context.Context, w io.Writer, r io.Reader, tmsg Msg) (<-chan Msg, error) {
if err := send(tmsg, w); err != nil {
return nil, fmt.Errorf("send: %v", err)