lib9p

Go 9P library.
Log | Files | Refs

commit e530785c5da83066d6f699a3ae3b11ba507949c6
parent 74209f71af93744374f15b3dc41acdce54c587d5
Author: Matsuda Kenji <info@mtkn.jp>
Date:   Fri, 29 Sep 2023 08:00:09 +0900

add lister and speaker goroutines

Diffstat:
Mserver.go | 222+++++++++++++++++++++++++++++++++++++++++++++----------------------------------
1 file changed, 127 insertions(+), 95 deletions(-)

diff --git a/server.go b/server.go @@ -33,46 +33,77 @@ func setError(r *Req, err error) { } type Server struct { - fs FS - mSize uint32 - fPool *FidPool - rPool *ReqPool - reader io.Reader - rlock *sync.Mutex - writer io.Writer - wlock *sync.Mutex + fs FS + msize uint32 + mSizeLock *sync.Mutex + fPool *FidPool + rPool *ReqPool + listenChan <-chan *Req + listenErrChan <-chan error + speakChan chan<- *Req + speakErrChan <-chan error auth func(*Req, *Fid) } func NewServer(fsys FS, mSize uint32, r io.Reader, w io.Writer) *Server { - return &Server{ - fs: fsys, - mSize: mSize, - fPool: allocFidPool(), - rPool: allocReqPool(), - reader: r, - rlock: new(sync.Mutex), - writer: w, - wlock: new(sync.Mutex), + s := &Server{ + fs: fsys, + msize: mSize, + mSizeLock: new(sync.Mutex), + fPool: allocFidPool(), + rPool: allocReqPool(), } + s.listenChan, s.listenErrChan = s.runListener(r) + s.speakChan, s.speakErrChan = s.runSpeaker(w) + + return s } -func (s *Server) changeMSize(mSize uint32) { - if s.mSize == mSize { - return - } - s.rlock.Lock() - s.wlock.Lock() - s.mSize = mSize - s.rlock.Unlock() - s.wlock.Unlock() +func (s *Server) mSize() uint32 { + s.mSizeLock.Lock() + defer s.mSizeLock.Unlock() + return s.msize +} + +func (s *Server) setMSize(mSize uint32) { + s.mSizeLock.Lock() + defer s.mSizeLock.Unlock() + s.msize = mSize +} + +func (s *Server) runListener(r io.Reader) (<-chan *Req, <-chan error) { + rc := make(chan *Req) + ec := make(chan error) + go func() { + for { + req, err := getReq(r, s) + if err != nil { + ec <- err + } + rc <- req + } + }() + return rc, ec } -func (s *Server) getReq() (*Req, error) { - s.rlock.Lock() - buf, err := read9PMsg(s.reader) - s.rlock.Unlock() +func (s *Server) runSpeaker(w io.Writer) (chan<- *Req, <-chan error) { + rc := make(chan *Req, 3) // TODO: buffer size? + ec := make(chan error) + go func() { + for { + r := <-rc + _, err := w.Write(r.ofcall.marshal()) + if err != nil { + ec <- err + } + } + }() + return rc, ec +} + +func getReq(r io.Reader, s *Server) (*Req, error) { + buf, err := read9PMsg(r) if err != nil { if err == io.EOF { return nil, err @@ -80,39 +111,39 @@ func (s *Server) getReq() (*Req, error) { return nil, fmt.Errorf("read9PMsg: %v", err) } - r, err := s.rPool.add(bufMsg(buf).Tag()) + req, err := s.rPool.add(bufMsg(buf).Tag()) if err != nil { log.Printf("addReq(%d): %v", bufMsg(buf).Tag(), err) // duplicate tag: cons up a fake Req - r := new(Req) - r.srv = s - r.ifcall, err = unmarshal(buf) + req := new(Req) + req.srv = s + req.ifcall, err = unmarshal(buf) if err != nil { log.Printf("unmarshal: %v", err) - r.ifcall = bufMsg(buf) + req.ifcall = bufMsg(buf) } if chatty9P { - fmt.Fprintf(os.Stderr, "<-- %v\n", r.ifcall) + fmt.Fprintf(os.Stderr, "<-- %v\n", req.ifcall) } - return r, ErrDupTag + return req, ErrDupTag } - r.srv = s - r.tag = bufMsg(buf).Tag() - r.ifcall, err = unmarshal(buf) + req.srv = s + req.tag = bufMsg(buf).Tag() + req.ifcall, err = unmarshal(buf) if err != nil { log.Printf("unmarshal: %v", err) if chatty9P { fmt.Fprintf(os.Stderr, "<-- %v\n", bufMsg(buf)) } - r.ifcall = bufMsg(buf) - return r, err + req.ifcall = bufMsg(buf) + return req, err } if chatty9P { - fmt.Fprintf(os.Stderr, "<-- %v\n", r.ifcall) + fmt.Fprintf(os.Stderr, "<-- %v\n", req.ifcall) } - return r, nil + return req, nil } // TODO: abort all outstanding I/O on the same connection. @@ -126,8 +157,8 @@ func sVersion(s *Server, r *Req) { } msize := ifcall.MSize() - if msize > s.mSize { - msize = s.mSize + if msize > s.mSize() { + msize = s.mSize() } r.ofcall = &RVersion{ @@ -142,7 +173,7 @@ func rVersion(r *Req, err error) { if err != nil { panic(fmt.Errorf("rVersion err: %w", err)) } - r.srv.changeMSize(r.ofcall.(*RVersion).MSize()) + r.srv.setMSize(r.ofcall.(*RVersion).MSize()) } func sAuth(s *Server, r *Req) { @@ -349,7 +380,7 @@ func sOpen(s *Server, r *Req) { r.ofcall = &ROpen{ qid: r.fid.File.Qid(), - iounit: s.mSize - IOHDRSZ, + iounit: s.mSize() - IOHDRSZ, } respond(r, nil) } @@ -416,7 +447,7 @@ func sCreate(s *Server, r *Req) { r.ofcall = &RCreate{ qid: r.fid.File.Qid(), - iounit: s.mSize - IOHDRSZ, + iounit: s.mSize() - IOHDRSZ, } respond(r, nil) } @@ -529,8 +560,8 @@ func sWrite(s *Server, r *Req) { } // TODO: should I use exported function instead of directly // accessing the struct field? - if ifcall.count > s.mSize-IOHDRSZ { - ifcall.count = s.mSize - IOHDRSZ + if ifcall.count > s.mSize()-IOHDRSZ { + ifcall.count = s.mSize() - IOHDRSZ } if !hasPerm(r.fid.File, r.fid.Uid, AWRITE) { respond(r, ErrPerm) @@ -751,50 +782,48 @@ func sWStat(s *Server, r *Req) { func rWStat(r *Req, err error) {} func (s *Server) Serve() { +L: for { - r, err := s.getReq() - if err == io.EOF { - log.Printf("getReq: %v\n", err) - break - } - if r == nil { - log.Printf("getReq returns nil request: %v", err) - break - } else if err != nil { + select { + case err := <-s.listenErrChan: + if err == io.EOF { + log.Printf("getReq: %v\n", err) + break L + } log.Printf("getReq: %v\n", err) - respond(r, err) - continue + continue L + case r := <-s.listenChan: + go func(s *Server, r *Req) { + switch r.ifcall.(type) { + default: + respond(r, fmt.Errorf("unknown message type: %d", r.ifcall.Type())) + case *TVersion: + sVersion(s, r) + case *TAuth: + sAuth(s, r) + case *TAttach: + sAttach(s, r) + case *TWalk: + sWalk(s, r) + case *TOpen: + sOpen(s, r) + case *TCreate: + sCreate(s, r) + case *TRead: + sRead(s, r) + case *TWrite: + sWrite(s, r) + case *TClunk: + sClunk(s, r) + case *TRemove: + sRemove(s, r) + case *TStat: + sStat(s, r) + case *TWStat: + sWStat(s, r) + } + }(s, r) } - go func(s *Server, r *Req) { - switch r.ifcall.(type) { - default: - respond(r, fmt.Errorf("unknown message type: %d", r.ifcall.Type())) - case *TVersion: - sVersion(s, r) - case *TAuth: - sAuth(s, r) - case *TAttach: - sAttach(s, r) - case *TWalk: - sWalk(s, r) - case *TOpen: - sOpen(s, r) - case *TCreate: - sCreate(s, r) - case *TRead: - sRead(s, r) - case *TWrite: - sWrite(s, r) - case *TClunk: - sClunk(s, r) - case *TRemove: - sRemove(s, r) - case *TStat: - sStat(s, r) - case *TWStat: - sWStat(s, r) - } - }(s, r) } } @@ -844,7 +873,10 @@ func respond(r *Req, err error) { r.pool.delete(r.tag) } - r.srv.wlock.Lock() - r.srv.writer.Write(r.ofcall.marshal()) - r.srv.wlock.Unlock() + r.srv.speakChan <- r + // TODO: handle error gently + go func() { + err := <-r.srv.speakErrChan + log.Fatalf("speak: %v", err) + }() }