commit e80adee77a2eeebee1492ba94d140ffbb8f1fbcb
parent b55aa17fe41867d65e855ba8533527004308a0bd
Author: Matsuda Kenji <info@mtkn.jp>
Date: Fri, 22 Dec 2023 09:31:28 +0900
flush pipeline
Diffstat:
| M | server.go | | | 88 | +++++++++++++++++++++++++++++++++++++++++++++++++------------------------------ |
1 file changed, 55 insertions(+), 33 deletions(-)
diff --git a/server.go b/server.go
@@ -195,9 +195,9 @@ func getReq(r io.Reader, s *Server) *Req {
// TODO: abort all outstanding I/O on the same connection before
// serving new Tversion.
func sVersion(ctx context.Context, s *Server, c <-chan *Req) {
- rVersionChan := make(chan *Req)
- defer close(rVersionChan)
- go rVersion(ctx, rVersionChan)
+ rc := make(chan *Req)
+ defer close(rc)
+ go rVersion(ctx, rc)
for {
select {
case <-ctx.Done():
@@ -221,7 +221,7 @@ func sVersion(ctx context.Context, s *Server, c <-chan *Req) {
Msize: msize,
Version: version,
}
- rVersionChan <- r
+ rc <- r
}
}
}
@@ -248,9 +248,9 @@ func rVersion(ctx context.Context, c <-chan *Req) {
// sAuth serves Tauth message.
func sAuth(ctx context.Context, s *Server, c <-chan *Req) {
- rAuthChan := make(chan *Req)
- defer close(rAuthChan)
- go rAuth(ctx, rAuthChan)
+ rc := make(chan *Req)
+ defer close(rc)
+ go rAuth(ctx, rc)
for {
select {
case <-ctx.Done():
@@ -264,14 +264,14 @@ func sAuth(ctx context.Context, s *Server, c <-chan *Req) {
r.Afid, err = s.fPool.add(ifcall.Afid)
if err != nil {
r.err = ErrDupFid
- rAuthChan <- r
+ rc <- r
continue
}
if s.Auth != nil {
- s.Auth(ctx, r, rAuthChan)
+ s.Auth(ctx, r, rc)
} else {
r.err = fmt.Errorf("authentication not required")
- rAuthChan <- r
+ rc <- r
continue
}
}
@@ -299,9 +299,9 @@ func rAuth(ctx context.Context, c <-chan *Req) {
}
func sAttach(ctx context.Context, s *Server, c <-chan *Req) {
- rAttachChan := make(chan *Req)
- defer close(rAttachChan)
- go rAttach(ctx, rAttachChan)
+ rc := make(chan *Req)
+ defer close(rc)
+ go rAttach(ctx, rc)
for {
select {
case <-ctx.Done():
@@ -314,36 +314,36 @@ func sAttach(ctx context.Context, s *Server, c <-chan *Req) {
fid, err := s.fPool.add(ifcall.Fid)
if err != nil {
r.err = ErrDupFid
- rAttachChan <- r
+ rc <- r
continue
}
switch {
case s.Auth == nil && ifcall.Afid == NOFID:
case s.Auth == nil && ifcall.Afid != NOFID:
r.err = ErrBotch
- rAttachChan <- r
+ rc <- r
continue
case s.Auth != nil && ifcall.Afid == NOFID:
r.err = fmt.Errorf("authentication required")
- rAttachChan <- r
+ rc <- r
continue
case s.Auth != nil && ifcall.Afid != NOFID:
afid, ok := s.fPool.lookup(ifcall.Afid)
if !ok {
r.err = ErrUnknownFid
- rAttachChan <- r
+ rc <- r
continue
}
af, ok := afid.File.(*AuthFile)
if !ok {
log.Printf("afile: %[1]T, %[1]v", afid.File)
r.err = fmt.Errorf("not auth file")
- rAttachChan <- r
+ rc <- r
continue
}
if !af.AuthOK {
r.err = fmt.Errorf("not authenticated")
- rAttachChan <- r
+ rc <- r
continue
}
}
@@ -353,13 +353,13 @@ func sAttach(ctx context.Context, s *Server, c <-chan *Req) {
st, err := fs.Stat(ExportFS{s.fs}, ".")
if err != nil {
r.err = fmt.Errorf("stat root: %v", err)
- rAttachChan <- r
+ rc <- r
continue
}
r.Ofcall = &RAttach{
Qid: st.Sys().(*Stat).Qid,
}
- rAttachChan <- r
+ rc <- r
}
}
}
@@ -382,18 +382,39 @@ func rAttach(ctx context.Context, c <-chan *Req) {
}
}
-func sFlush(ctx context.Context, s *Server, r *Req) {
- Respond(ctx, r, nil)
+func sFlush(ctx context.Context, s *Server, c <-chan *Req) {
+ rc := make(chan *Req)
+ defer close(rc)
+ go rFlush(ctx, rc)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case r := <-c:
+ if r == nil {
+ return
+ }
+ rc <- r
+ }
+ }
}
-func rFlush(r *Req, err error) {
- if err != nil {
- panic(fmt.Errorf("err in flush: %v", err))
- }
- if r.Oldreq != nil {
- r.Oldreq.flush()
+func rFlush(ctx context.Context, c <-chan *Req) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case r := <-c:
+ if r.err != nil {
+ panic(fmt.Errorf("err in flush: %v", r.err))
+ }
+ if r.Oldreq != nil {
+ r.Oldreq.flush()
+ }
+ r.Ofcall = &RFlush{}
+ r.Srv.respChan <- r
+ }
}
- r.Ofcall = &RFlush{}
}
func sWalk(ctx context.Context, s *Server, r *Req) {
@@ -1022,15 +1043,18 @@ func (s *Server) Serve(ctx context.Context) {
versionChan = make(chan *Req)
authChan = make(chan *Req)
attachChan = make(chan *Req)
+ flushChan = make(chan *Req)
)
defer func() {
close(versionChan)
close(authChan)
close(attachChan)
+ close(flushChan)
}()
go sVersion(ctx, s, versionChan)
go sAuth(ctx, s, authChan)
go sAttach(ctx, s, attachChan)
+ go sFlush(ctx, s, flushChan)
go Respond2(ctx, s)
L:
for {
@@ -1056,7 +1080,7 @@ L:
case *TAttach:
attachChan <- r
case *TFlush:
- sFlush(ctx1, s, r)
+ flushChan <- r
case *TWalk:
sWalk(ctx1, s, r)
case *TOpen:
@@ -1091,8 +1115,6 @@ func Respond(ctx context.Context, r *Req, err error) {
switch r.Ifcall.(type) {
default:
panic(fmt.Errorf("bug: r.Ifcall: %v", r.Ifcall))
- case *TFlush:
- rFlush(r, err)
case *TWalk:
rWalk(r, err)
case *TOpen: