commit 37d55bf6f90220be33d34b56d6ed12c25dcd11f9
parent f4f8f78589d78039179f3bf537a5ca0377de87f9
Author: Matsuda Kenji <info@mtkn.jp>
Date: Fri, 22 Dec 2023 09:06:07 +0900
auth pipeline
Diffstat:
| M | server.go | | | 70 | ++++++++++++++++++++++++++++++++++++++++++++++++---------------------- |
1 file changed, 48 insertions(+), 22 deletions(-)
diff --git a/server.go b/server.go
@@ -69,7 +69,7 @@ type Server struct {
// set Req.Afid.File to an *AuthFile and Req.Ofcall.Qid, and prepare to
// authenticate via the Read()/Write() calls to Req.Afid.File
// If this is nil, no authentication is performed.
- Auth func(context.Context, *Req)
+ Auth func(context.Context, *Req, chan<- *Req)
}
// NewServer creates a Server and runs listener and speaker goroutines.
@@ -244,27 +244,51 @@ func rVersion(ctx context.Context, c <-chan *Req) {
}
// sAuth serves Tauth message.
-func sAuth(ctx context.Context, s *Server, r *Req) {
- ifcall := r.Ifcall.(*TAuth)
- var err error
- r.Afid, err = s.fPool.add(ifcall.Afid)
- if err != nil {
- Respond(ctx, r, ErrDupFid)
- }
- if s.Auth != nil {
- s.Auth(ctx, r)
- } else {
- Respond(ctx, r, fmt.Errorf("authentication not required"))
- return
+func sAuth(ctx context.Context, s *Server, c <-chan *Req) {
+ rAuthChan := make(chan *Req)
+ defer close(rAuthChan)
+ go rAuth(ctx, rAuthChan)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case r := <- c:
+ if r == nil {
+ return
+ }
+ ifcall := r.Ifcall.(*TAuth)
+ var err error
+ r.Afid, err = s.fPool.add(ifcall.Afid)
+ if err != nil {
+ r.err = ErrDupFid
+ rAuthChan <- r
+ continue
+ }
+ if s.Auth != nil {
+ s.Auth(ctx, r, rAuthChan)
+ } else {
+ r.err = fmt.Errorf("authentication not required")
+ rAuthChan <- r
+ continue
+ }
+ }
}
}
// rAuth checks if err is nil, and if not, it deletes the
// allocated fid from fPool.
-func rAuth(r *Req, err error) {
- if err != nil {
- r.Srv.fPool.delete(r.Ifcall.(*TAuth).Afid)
- setError(r, err)
+func rAuth(ctx context.Context, c <-chan *Req) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case r := <-c:
+ if r.err != nil {
+ r.Srv.fPool.delete(r.Ifcall.(*TAuth).Afid)
+ setError(r, r.err)
+ }
+ r.Srv.respChan <- r
+ }
}
}
@@ -958,10 +982,14 @@ func (s *Server) Serve(ctx context.Context) {
s.runListener(ctx)
s.runSpeaker(ctx)
versionChan := make(chan *Req)
- defer close(versionChan)
+ authChan := make(chan *Req)
+ defer func() {
+ close(versionChan)
+ close(authChan)
+ }()
go sVersion(ctx, s, versionChan)
+ go sAuth(ctx, s, authChan)
go Respond2(ctx, s)
-
L:
for {
select {
@@ -982,7 +1010,7 @@ L:
case *TVersion:
versionChan <- r
case *TAuth:
- sAuth(ctx1, s, r)
+ authChan <- r
case *TAttach:
sAttach(ctx1, s, r)
case *TFlush:
@@ -1021,8 +1049,6 @@ func Respond(ctx context.Context, r *Req, err error) {
switch r.Ifcall.(type) {
default:
panic(fmt.Errorf("bug: r.Ifcall: %v", r.Ifcall))
- case *TAuth:
- rAuth(r, err)
case *TAttach:
rAttach(r, err)
case *TFlush: