commit 3cb0e48677685d6afce4894dbca6b435c8e4ba91
parent a94586d0442ad25992734e01060383b53d1fb46f
Author: Matsuda Kenji <info@mtkn.jp>
Date: Thu, 21 Dec 2023 17:21:35 +0900
make pipeline and use for version messages
Diffstat:
| M | req.go | | | 2 | ++ |
| M | server.go | | | 108 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------- |
2 files changed, 87 insertions(+), 23 deletions(-)
diff --git a/req.go b/req.go
@@ -21,6 +21,8 @@ type Req struct {
// speakErrChan is used to report any error encountered while sending
// the response message.
speakErrChan chan error
+ // err is any error encountered while processing the request.
+ err error
}
// flush cancels the Req by calling r.cancel.
diff --git a/server.go b/server.go
@@ -62,6 +62,8 @@ type Server struct {
// It should be accessed only by the speaker goroutine.
w io.Writer
+ respChan chan *Req
+
// Auth is called when Tauth message arrives.
// If authentication is desired, the Auth functions should
// set Req.Afid.File to an *AuthFile and Req.Ofcall.Qid, and prepare to
@@ -81,6 +83,7 @@ func NewServer(fsys FS, mSize uint32, r io.Reader, w io.Writer) *Server {
rPool: newReqPool(),
r: r,
w: w,
+ respChan: make(chan *Req),
}
return s
}
@@ -191,32 +194,53 @@ func getReq(r io.Reader, s *Server) *Req {
// server's mSize to the message's one.
// TODO: abort all outstanding I/O on the same connection before
// serving new Tversion.
-func sVersion(ctx context.Context, s *Server, r *Req) {
- ifcall := r.Ifcall.(*TVersion)
- version := ifcall.Version
- if ok := strings.HasPrefix(version, "9P2000"); !ok {
- version = "unknown"
- } else {
- version = "9P2000"
- }
- msize := ifcall.Msize
- if msize > s.mSize() {
- msize = s.mSize()
- }
- r.Ofcall = &RVersion{
- Msize: msize,
- Version: version,
+func sVersion(ctx context.Context, s *Server, c <-chan *Req) {
+ rVersionChan := make(chan *Req)
+ defer close(rVersionChan)
+ go rVersion(ctx, rVersionChan)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case r := <-c:
+ if r == nil {
+ return
+ }
+ ifcall := r.Ifcall.(*TVersion)
+ version := ifcall.Version
+ if ok := strings.HasPrefix(version, "9P2000"); !ok {
+ version = "unknown"
+ } else {
+ version = "9P2000"
+ }
+ msize := ifcall.Msize
+ if msize > s.mSize() {
+ msize = s.mSize()
+ }
+ r.Ofcall = &RVersion{
+ Msize: msize,
+ Version: version,
+ }
+ rVersionChan <- r
+ }
}
- Respond(ctx, r, nil)
}
// rVersion confirms that err is nil, and sets the server's msize to the
// appropreate one.
-func rVersion(r *Req, err error) {
- if err != nil {
- panic(fmt.Errorf("rVersion err: %w", err))
+func rVersion(ctx context.Context, c <-chan *Req) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case r := <-c:
+ if r.err != nil {
+ panic(fmt.Errorf("rVersion err: %w", r.err))
+ }
+ r.Srv.setMSize(r.Ofcall.(*RVersion).Msize)
+ r.Srv.respChan <- r
+ }
}
- r.Srv.setMSize(r.Ofcall.(*RVersion).Msize)
}
// sAuth serves Tauth message.
@@ -933,6 +957,10 @@ func (s *Server) Serve(ctx context.Context) {
defer cancel()
s.runListener(ctx)
s.runSpeaker(ctx)
+ versionChan := make(chan *Req)
+ defer close(versionChan)
+ go sVersion(ctx, s, versionChan)
+ go Respond2(ctx, s)
L:
for {
select {
@@ -951,7 +979,7 @@ L:
default:
Respond(ctx1, r, fmt.Errorf("unknown message type: %d", r.Ifcall.Type()))
case *TVersion:
- sVersion(ctx1, s, r)
+ versionChan <- r
case *TAuth:
sAuth(ctx1, s, r)
case *TAttach:
@@ -992,8 +1020,6 @@ func Respond(ctx context.Context, r *Req, err error) {
switch r.Ifcall.(type) {
default:
panic(fmt.Errorf("bug: r.Ifcall: %v", r.Ifcall))
- case *TVersion:
- rVersion(r, err)
case *TAuth:
rAuth(r, err)
case *TAttach:
@@ -1045,3 +1071,38 @@ func Respond(ctx context.Context, r *Req, err error) {
//log.Printf("respond: %v", ctx.Err())
}
}
+
+func Respond2(ctx context.Context, s *Server) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case r := <-s.respChan:
+ r.Ofcall.SetTag(r.Tag)
+ // free tag.
+ if r.pool == nil && r.err != ErrDupTag {
+ panic("ReqPool is nil but err is not EDupTag")
+ }
+ if r.pool != nil {
+ r.pool.delete(r.Tag)
+ }
+ select {
+ case r.Srv.speakChan <- r:
+ if r.Srv.chatty9P {
+ fmt.Fprintf(os.Stderr, "--> %s\n", r.Ofcall)
+ }
+ case <-ctx.Done():
+ return
+ }
+ select {
+ case err := <-r.speakErrChan:
+ // TODO: handle errors.
+ if err != nil {
+ log.Printf("speak: %v", err)
+ }
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+}
+\ No newline at end of file