commit 6302288d883eea3f956acdcc3ff65d7244767bce
parent 95998a18c96f7235502e685c39d9f2bee90f94ae
Author: Matsuda Kenji <info@mtkn.jp>
Date: Mon, 30 Oct 2023 08:08:02 +0900
add speakErrChan to Req
Diffstat:
| M | req.go | | | 2 | ++ |
| M | server.go | | | 64 | ++++++++++++++++++++++++++++++++++++++++++---------------------- |
2 files changed, 44 insertions(+), 22 deletions(-)
diff --git a/req.go b/req.go
@@ -17,6 +17,7 @@ type Req struct {
pool *ReqPool
Cancel context.CancelFunc
err error
+ speakErrChan chan error
}
// flush cancels the Req by calling r.cancel.
@@ -51,6 +52,7 @@ func (rp *ReqPool) add(tag uint16) (*Req, error) {
}
req := &Req{
pool: rp,
+ speakErrChan: make(chan error),
}
rp.m[tag] = req
return req, nil
diff --git a/server.go b/server.go
@@ -52,14 +52,14 @@ type Server struct {
// The channel from which incoming requests are delivered by the
// listener goroutine.
listenChan <-chan *Req
- // The error channel the listener goroutine reports any error.
- //listenErrChan <-chan error
+ // r is the io.Reader which the listener goroutine reads from.
+ r io.Reader
// The channel to which outgoing replies are sent to the speaker
// goroutine.
speakChan chan<- *Req
- // The error channel the speaker goroutine reports any error.
- speakErrChan <-chan error
+ // w is the io.Writer which the speaker goroutine writes to.
+ w io.Writer
// Auth is called when Tauth message arrives.
// If authentication is desired, the Auth functions should
@@ -78,9 +78,9 @@ func NewServer(fsys FS, mSize uint32, r io.Reader, w io.Writer) *Server {
mSizeLock: new(sync.Mutex),
fPool: newFidPool(),
rPool: newReqPool(),
+ r: r,
+ w: w,
}
- s.listenChan = s.runListener(r)
- s.speakChan, s.speakErrChan = s.runSpeaker(w)
return s
}
@@ -107,12 +107,16 @@ func (s *Server) setMSize(mSize uint32) {
// It reports any error to the returned chan of error.
// TODO: pass context.Context and stop the goroutine with the Context being
// canceled.
-func (s *Server) runListener(r io.Reader) <-chan *Req {
+func (s *Server) runListener(ctx context.Context, r io.Reader) <-chan *Req {
rc := make(chan *Req)
go func() {
defer close(rc)
for {
- rc <- getReq(r, s)
+ select {
+ case rc <- getReq(r, s):
+ case <-ctx.Done():
+ return
+ }
}
}()
return rc
@@ -123,21 +127,27 @@ func (s *Server) runListener(r io.Reader) <-chan *Req {
// and marshalls each of them into 9P messages and writes it to w.
// TODO: pass context.Context and stop the goroutine with the Context being
// canceled.
-func (s *Server) runSpeaker(w io.Writer) (chan<- *Req, <-chan error) {
- rc := make(chan *Req, 3) // TODO: buffer size?
- ec := make(chan error)
+func (s *Server) runSpeaker(ctx context.Context, w io.Writer) chan<- *Req {
+ rc := make(chan *Req)
go func() {
- // TODO: close rc anywhere
- defer close(ec)
for {
- r := <-rc
- _, err := w.Write(r.Ofcall.marshal())
- if err != nil {
- ec <- err
+ select {
+ case r := <-rc:
+ _, err := w.Write(r.Ofcall.marshal())
+ if err != nil {
+ select {
+ case r.speakErrChan <- err:
+ case <-ctx.Done():
+ return
+ }
+ }
+ close(r.speakErrChan)
+ case <-ctx.Done():
+ return
}
}
}()
- return rc, ec
+ return rc
}
// GetReq reads 9P message from r, allocates Req and returns it.
@@ -893,13 +903,13 @@ func rWStat(r *Req, err error) {
// Serve serves 9P conversation.
func (s *Server) Serve(ctx context.Context) {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ s.listenChan = s.runListener(ctx, s.r)
+ s.speakChan = s.runSpeaker(ctx, s.w)
L:
for {
select {
- case err := <-s.speakErrChan:
- // TODO: handle error
- log.Printf("speak: %v", err)
- continue L
case r := <-s.listenChan:
if r.err != nil {
log.Printf("listen: %v", r.err)
@@ -999,4 +1009,14 @@ func Respond(ctx context.Context, r *Req, err error) {
case <-ctx.Done():
log.Printf("req flush: %v", r.Ifcall)
}
+
+ select {
+ case err := <- r.speakErrChan:
+ // TODO: handle errors.
+ if err != nil {
+ log.Printf("speak: %v", err)
+ }
+ case <-ctx.Done():
+ log.Printf("respond: %v", ctx.Err())
+ }
}