commit e7f453a45c687591ceb0b4a6f975966d074cd297
parent e80adee77a2eeebee1492ba94d140ffbb8f1fbcb
Author: Matsuda Kenji <info@mtkn.jp>
Date: Fri, 22 Dec 2023 09:40:34 +0900
walk pipeline
Diffstat:
| M | server.go | | | 167 | ++++++++++++++++++++++++++++++++++++++++++++++--------------------------------- |
1 file changed, 98 insertions(+), 69 deletions(-)
diff --git a/server.go b/server.go
@@ -417,79 +417,107 @@ func rFlush(ctx context.Context, c <-chan *Req) {
}
}
-func sWalk(ctx context.Context, s *Server, r *Req) {
- ifcall := r.Ifcall.(*TWalk)
- oldFid, ok := s.fPool.lookup(ifcall.Fid)
- if !ok {
- Respond(ctx, r, ErrUnknownFid)
- return
- }
- if oldFid.OMode != -1 {
- Respond(ctx, r, fmt.Errorf("cannot clone open fid"))
- return
- }
- oldst, err := fs.Stat(ExportFS{s.fs}, oldFid.path)
- if err != nil {
- Respond(ctx, r, fmt.Errorf("stat: %v", err))
- return
- }
- if len(ifcall.Wnames) > 0 && oldst.Sys().(*Stat).Qid.Type&QTDIR == 0 {
- Respond(ctx, r, fmt.Errorf("walk on non-dir"))
- return
- }
- var newFid *Fid
- if ifcall.Fid == ifcall.Newfid {
- newFid = oldFid
- } else {
- var err error
- newFid, err = s.fPool.add(ifcall.Newfid)
- if err != nil {
- log.Printf("alloc: %v", err)
- Respond(ctx, r, fmt.Errorf("internal error"))
+func sWalk(ctx context.Context, s *Server, c <-chan *Req) {
+ rc := make(chan *Req)
+ defer close(rc)
+ go rWalk(ctx, rc)
+ for {
+ select {
+ case <-ctx.Done():
return
+ case r := <-c:
+ if r == nil {
+ return
+ }
+ ifcall := r.Ifcall.(*TWalk)
+ oldFid, ok := s.fPool.lookup(ifcall.Fid)
+ if !ok {
+ r.err = ErrUnknownFid
+ rc <- r
+ continue
+ }
+ if oldFid.OMode != -1 {
+ r.err = fmt.Errorf("cannot clone open fid")
+ rc <- r
+ continue
+ }
+ oldst, err := fs.Stat(ExportFS{s.fs}, oldFid.path)
+ if err != nil {
+ r.err = fmt.Errorf("stat: %v", err)
+ rc <- r
+ continue
+ }
+ if len(ifcall.Wnames) > 0 && oldst.Sys().(*Stat).Qid.Type&QTDIR == 0 {
+ r.err = fmt.Errorf("walk on non-dir")
+ rc <- r
+ continue
+ }
+ var newFid *Fid
+ if ifcall.Fid == ifcall.Newfid {
+ newFid = oldFid
+ } else {
+ var err error
+ newFid, err = s.fPool.add(ifcall.Newfid)
+ if err != nil {
+ r.err = fmt.Errorf("alloc: $v", err)
+ rc <- r
+ continue
+ }
+ }
+ wqids := make([]Qid, len(ifcall.Wnames))
+ cwdp := oldFid.path
+ var n int
+ // TODO: replace this block with fs.WalkDir.
+ for i, name := range ifcall.Wnames {
+ cwdp = path.Join(cwdp, name)
+ stat, err := fs.Stat(ExportFS{s.fs}, cwdp)
+ if err != nil {
+ break
+ }
+ wqids[i] = stat.Sys().(*Stat).Qid
+ n++
+ }
+ newFid.OMode = -1
+ newFid.path = cwdp
+ newFid.Uid = oldFid.Uid
+ r.Ofcall = &RWalk{
+ Qids: wqids[:n],
+ }
+ rc <- r
}
}
- wqids := make([]Qid, len(ifcall.Wnames))
- cwdp := oldFid.path
- var n int
- // TODO: replace this block with fs.WalkDir.
- for i, name := range ifcall.Wnames {
- cwdp = path.Join(cwdp, name)
- stat, err := fs.Stat(ExportFS{s.fs}, cwdp)
- if err != nil {
- break
- }
- wqids[i] = stat.Sys().(*Stat).Qid
- n++
- }
- newFid.OMode = -1
- newFid.path = cwdp
- newFid.Uid = oldFid.Uid
- r.Ofcall = &RWalk{
- Qids: wqids[:n],
- }
- Respond(ctx, r, nil)
}
-func rWalk(r *Req, err error) {
- ifcall := r.Ifcall.(*TWalk)
- if r.Ofcall == nil {
- if err == nil {
- panic("err and r.Ofcall are both nil")
- }
- setError(r, err)
- return
- }
- ofcall := r.Ofcall.(*RWalk)
- if err != nil || len(ofcall.Qids) < len(ifcall.Wnames) {
- if ifcall.Fid != ifcall.Newfid {
- r.Srv.fPool.delete(ifcall.Newfid)
- }
- if len(ofcall.Qids) == 0 {
- if err == nil && len(ifcall.Wnames) != 0 {
- setError(r, ErrNotFound)
+func rWalk(ctx context.Context, c <-chan *Req) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case r := <- c:
+ if r == nil {
return
}
+ ifcall := r.Ifcall.(*TWalk)
+ if r.Ofcall == nil {
+ if r.err == nil {
+ panic("err and r.Ofcall are both nil")
+ }
+ setError(r, r.err)
+ r.Srv.respChan <- r
+ continue
+ }
+ ofcall := r.Ofcall.(*RWalk)
+ if r.err != nil || len(ofcall.Qids) < len(ifcall.Wnames) {
+ if ifcall.Fid != ifcall.Newfid {
+ r.Srv.fPool.delete(ifcall.Newfid)
+ }
+ if len(ofcall.Qids) == 0 {
+ if r.err == nil && len(ifcall.Wnames) != 0 {
+ setError(r, ErrNotFound)
+ }
+ }
+ }
+ r.Srv.respChan <- r
}
}
}
@@ -1044,17 +1072,20 @@ func (s *Server) Serve(ctx context.Context) {
authChan = make(chan *Req)
attachChan = make(chan *Req)
flushChan = make(chan *Req)
+ walkChan = make(chan *Req)
)
defer func() {
close(versionChan)
close(authChan)
close(attachChan)
close(flushChan)
+ close(walkChan)
}()
go sVersion(ctx, s, versionChan)
go sAuth(ctx, s, authChan)
go sAttach(ctx, s, attachChan)
go sFlush(ctx, s, flushChan)
+ go sWalk(ctx, s, walkChan)
go Respond2(ctx, s)
L:
for {
@@ -1082,7 +1113,7 @@ L:
case *TFlush:
flushChan <- r
case *TWalk:
- sWalk(ctx1, s, r)
+ walkChan <- r
case *TOpen:
sOpen(ctx1, s, r)
case *TCreate:
@@ -1115,8 +1146,6 @@ func Respond(ctx context.Context, r *Req, err error) {
switch r.Ifcall.(type) {
default:
panic(fmt.Errorf("bug: r.Ifcall: %v", r.Ifcall))
- case *TWalk:
- rWalk(r, err)
case *TOpen:
rOpen(r, err)
case *TCreate: