commit 783ac9457889da947eb86519ab6fcd12c4d7be2f
parent e634e721d9c36d833791880f538111ab95f8f6ab
Author: Matsuda Kenji <info@mtkn.jp>
Date: Fri, 29 Dec 2023 09:12:24 +0900
unexport Req, rename it to request
Diffstat:
M | export_test.go | | | 1 | + |
M | req.go | | | 32 | ++++++++++++++++---------------- |
M | server.go | | | 86 | ++++++++++++++++++++++++++++++++++++++++---------------------------------------- |
M | server2_test.go | | | 64 | ++++++++++++++++++++++++++++++++-------------------------------- |
4 files changed, 92 insertions(+), 91 deletions(-)
diff --git a/export_test.go b/export_test.go
@@ -57,6 +57,7 @@ func (s *Server) SetFPool(fp *FidPool) { s.fPool = fp }
func (s *Server) NewMSizeLock() { s.mSizeLock = new(sync.Mutex) }
func (s *Server) SetMSize(size uint32) { s.setMSize(size) }
+type Req = request
func (r *Req) Ifcall() Msg { return r.ifcall }
func (r *Req) SetIfcall(m Msg) { r.ifcall = m }
func (r *Req) Ofcall() Msg { return r.ofcall }
diff --git a/req.go b/req.go
@@ -4,8 +4,8 @@ import (
"sync"
)
-// Req represents each requests.
-type Req struct {
+// request represents each requests.
+type request struct {
tag uint16
// Ifcall is incomming 9P message
ifcall Msg
@@ -14,7 +14,7 @@ type Req struct {
fid *Fid
afid *Fid
// Oldreq is set with Tflush message.
- oldreq *Req
+ oldreq *request
// Pool is the pool this request belongs to.
pool *ReqPool
// Done is used by time consuming goroutines to check whether the request
@@ -30,9 +30,9 @@ type Req struct {
err error
}
-// flush cancels the Req by calling r.cancel.
+// flush cancels the request by calling r.cancel.
// It also delete the request from its pool.
-func (r *Req) flush() {
+func (r *request) flush() {
// TODO: need mutex?
close(r.done)
r.pool.delete(r.tag)
@@ -40,31 +40,31 @@ func (r *Req) flush() {
// ReqPool is the pool of Reqs the server is dealing with.
type ReqPool struct {
- m map[uint16]*Req
+ m map[uint16]*request
lock *sync.Mutex
}
// newReqPool allocats a ReqPool.
func newReqPool() *ReqPool {
return &ReqPool{
- m: make(map[uint16]*Req),
+ m: make(map[uint16]*request),
lock: new(sync.Mutex),
}
}
-// Add allocates a Req with the specified tag in ReqPool rp.
-// It returns (nil, ErrDupTag) if there is already a Req with the specified tag.
-func (rp *ReqPool) add(tag uint16) (*Req, error) {
+// Add allocates a request with the specified tag in ReqPool rp.
+// It returns (nil, ErrDupTag) if there is already a request with the specified tag.
+func (rp *ReqPool) add(tag uint16) (*request, error) {
return reqPoolAdd(rp, tag)
}
-var reqPoolAdd = func(rp *ReqPool, tag uint16) (*Req, error) {
+var reqPoolAdd = func(rp *ReqPool, tag uint16) (*request, error) {
rp.lock.Lock()
defer rp.lock.Unlock()
if _, ok := rp.m[tag]; ok {
return nil, ErrDupTag
}
- req := &Req{
+ req := &request{
pool: rp,
speakErrChan: make(chan error),
}
@@ -72,17 +72,17 @@ var reqPoolAdd = func(rp *ReqPool, tag uint16) (*Req, error) {
return req, nil
}
-// lookup looks for the Req in the pool with tag.
-// If found, it returns the found Req and true, otherwise
+// lookup looks for the request in the pool with tag.
+// If found, it returns the found request and true, otherwise
// it returns nil and false.
-func (rp *ReqPool) lookup(tag uint16) (*Req, bool) {
+func (rp *ReqPool) lookup(tag uint16) (*request, bool) {
rp.lock.Lock()
defer rp.lock.Unlock()
r, ok := rp.m[tag]
return r, ok
}
-// delete delets the Req with tag from the pool.
+// delete delets the request with tag from the pool.
func (rp *ReqPool) delete(tag uint16) {
rp.lock.Lock()
defer rp.lock.Unlock()
diff --git a/server.go b/server.go
@@ -22,7 +22,7 @@ var (
ErrNotFound = fmt.Errorf("not found")
)
-func setError(r *Req, err error) {
+func setError(r *request, err error) {
r.ofcall = &RError{
Ename: err,
}
@@ -50,14 +50,14 @@ type Server struct {
// The channel from which incoming requests are delivered by the
// listener goroutine.
- listenChan <-chan *Req
+ listenChan <-chan *request
// r is the io.Reader which the listener goroutine reads from.
// It should be accessed only by the listener goroutine.
r io.Reader
// The channel to which outgoing replies are sent to the responder
// goroutine.
- respChan chan *Req
+ respChan chan *request
// w is the io.Writer which the responder goroutine writes to.
// It should be accessed only by the responder goroutine.
@@ -65,11 +65,11 @@ type Server struct {
// Auth function is passed an auth request when a TAuth message arrives
// If authentication is desired, Auth should
- // set Req.Afid.File to an *AuthFile and Req.ofcall.Qid, and prepare to
- // authenticate via the Read()/Write() calls to Req.Afid.File.
+ // set request.Afid.File to an *AuthFile and request.ofcall.Qid, and prepare to
+ // authenticate via the Read()/Write() calls to request.Afid.File.
// Auth should clean up everything it creates when ctx is canceled.
// If this is nil, no authentication is performed.
- Auth func(ctx context.Context, r *Req)
+ Auth func(ctx context.Context, r *request)
}
// NewServer creates a Server and runs listener and responder goroutines.
@@ -82,7 +82,7 @@ func NewServer(fsys FS, mSize uint32, r io.Reader, w io.Writer) *Server {
fPool: newFidPool(),
r: r,
w: w,
- respChan: make(chan *Req),
+ respChan: make(chan *request),
}
return s
}
@@ -106,9 +106,9 @@ func (s *Server) setMSize(mSize uint32) {
// runListener runs the listener goroutine.
// Listener goroutine reads 9P messages from s.r by calling getReq
-// and allocats Req for each of them, and sends it to the server's listenChan.
+// and allocats request for each of them, and sends it to the server's listenChan.
func (s *Server) runListener(ctx context.Context, rp *ReqPool) {
- rc := make(chan *Req)
+ rc := make(chan *request)
s.listenChan = rc
go func() {
defer close(rc)
@@ -126,7 +126,7 @@ func (s *Server) runListener(ctx context.Context, rp *ReqPool) {
// Responder goroutine wait for reply Requests from the returned channel,
// and marshalls each of them into 9P messages and writes it to s.w.
func (s *Server) runResponder(ctx context.Context, rp *ReqPool) {
- rc := make(chan *Req)
+ rc := make(chan *request)
s.respChan = rc
go func() {
for {
@@ -154,23 +154,23 @@ func (s *Server) runResponder(ctx context.Context, rp *ReqPool) {
}()
}
-// GetReq reads 9P message from s.r, allocates Req, adds it to s.rPool,
+// GetReq reads 9P message from s.r, allocates request, adds it to s.rPool,
// and returns it.
-// Any error it encountered is embedded into the Req struct.
+// Any error it encountered is embedded into the request struct.
// This function is called only by the server's listener goroutine,
// and does not need to lock s.r.
-func getReq(r io.Reader, rp *ReqPool, chatty bool) *Req {
+func getReq(r io.Reader, rp *ReqPool, chatty bool) *request {
ifcall, err := RecvMsg(r)
if err != nil {
if err == io.EOF {
- return &Req{listenErr: err}
+ return &request{listenErr: err}
}
- return &Req{listenErr: fmt.Errorf("readMsg: %v", err)}
+ return &request{listenErr: fmt.Errorf("readMsg: %v", err)}
}
req, err := rp.add(ifcall.GetTag())
if err != nil {
- // duplicate tag: cons up a fake Req
- req := new(Req)
+ // duplicate tag: cons up a fake request
+ req := new(request)
req.ifcall = ifcall
req.listenErr = ErrDupTag
req.done = make(chan struct{})
@@ -199,7 +199,7 @@ func getReq(r io.Reader, rp *ReqPool, chatty bool) *Req {
// server's mSize to the message's one.
// TODO: abort all outstanding I/O on the same connection before
// serving new Tversion.
-func sVersion(ctx context.Context, s *Server, c <-chan *Req) {
+func sVersion(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -234,7 +234,7 @@ func sVersion(ctx context.Context, s *Server, c <-chan *Req) {
}
// sAuth serves Tauth message.
-func sAuth(ctx context.Context, s *Server, c <-chan *Req) {
+func sAuth(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -279,7 +279,7 @@ func sAuth(ctx context.Context, s *Server, c <-chan *Req) {
}
}
-func sAttach(ctx context.Context, s *Server, c <-chan *Req) {
+func sAttach(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -349,7 +349,7 @@ func sAttach(ctx context.Context, s *Server, c <-chan *Req) {
}
}
-func sFlush(ctx context.Context, s *Server, c <-chan *Req) {
+func sFlush(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -371,7 +371,7 @@ func sFlush(ctx context.Context, s *Server, c <-chan *Req) {
}
}
-func sWalk(ctx context.Context, s *Server, c <-chan *Req) {
+func sWalk(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -464,7 +464,7 @@ func sWalk(ctx context.Context, s *Server, c <-chan *Req) {
}
}
-func sOpen(ctx context.Context, s *Server, c <-chan *Req) {
+func sOpen(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -592,7 +592,7 @@ func sOpen(ctx context.Context, s *Server, c <-chan *Req) {
}
}
-func sCreate(ctx context.Context, s *Server, c <-chan *Req) {
+func sCreate(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -675,7 +675,7 @@ func sCreate(ctx context.Context, s *Server, c <-chan *Req) {
// TODO: I think the file should be locked while reading.
// or should the undeterminism left for the client side?
-func sRead(ctx context.Context, s *Server, c <-chan *Req) {
+func sRead(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -789,7 +789,7 @@ func sRead(ctx context.Context, s *Server, c <-chan *Req) {
}
// TODO: I think the file should be locked while reading.
-func sWrite(ctx context.Context, s *Server, c <-chan *Req) {
+func sWrite(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -866,7 +866,7 @@ func sWrite(ctx context.Context, s *Server, c <-chan *Req) {
}
}
-func sClunk(ctx context.Context, s *Server, c <-chan *Req) {
+func sClunk(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -903,7 +903,7 @@ func sClunk(ctx context.Context, s *Server, c <-chan *Req) {
}
}
-func sRemove(ctx context.Context, s *Server, c <-chan *Req) {
+func sRemove(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -961,7 +961,7 @@ func sRemove(ctx context.Context, s *Server, c <-chan *Req) {
}
}
-func sStat(ctx context.Context, s *Server, c <-chan *Req) {
+func sStat(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -1001,7 +1001,7 @@ func sStat(ctx context.Context, s *Server, c <-chan *Req) {
}
}
-func sWStat(ctx context.Context, s *Server, c <-chan *Req) {
+func sWStat(ctx context.Context, s *Server, c <-chan *request) {
for {
select {
case <-ctx.Done():
@@ -1151,19 +1151,19 @@ func (s *Server) Serve(ctx context.Context) {
s.runListener(ctx, rp)
s.runResponder(ctx, rp)
var (
- versionChan = make(chan *Req)
- authChan = make(chan *Req)
- attachChan = make(chan *Req)
- flushChan = make(chan *Req)
- walkChan = make(chan *Req)
- openChan = make(chan *Req)
- createChan = make(chan *Req)
- readChan = make(chan *Req)
- writeChan = make(chan *Req)
- clunkChan = make(chan *Req)
- removeChan = make(chan *Req)
- statChan = make(chan *Req)
- wstatChan = make(chan *Req)
+ versionChan = make(chan *request)
+ authChan = make(chan *request)
+ attachChan = make(chan *request)
+ flushChan = make(chan *request)
+ walkChan = make(chan *request)
+ openChan = make(chan *request)
+ createChan = make(chan *request)
+ readChan = make(chan *request)
+ writeChan = make(chan *request)
+ clunkChan = make(chan *request)
+ removeChan = make(chan *request)
+ statChan = make(chan *request)
+ wstatChan = make(chan *request)
)
defer func() {
close(versionChan)
diff --git a/server2_test.go b/server2_test.go
@@ -31,7 +31,7 @@ func TestRunListener(t *testing.T) {
}
oldReqPoolAdd := reqPoolAdd
defer func() { reqPoolAdd = oldReqPoolAdd }()
- reqPoolAdd = func(*ReqPool, uint16) (*Req, error) { return &Req{}, nil }
+ reqPoolAdd = func(*ReqPool, uint16) (*request, error) { return &request{}, nil }
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s.runListener(ctx, newReqPool())
@@ -79,7 +79,7 @@ func TestRunResponder(t *testing.T) {
if err != nil {
t.Fatalf("unmarshal %v", err)
}
- s.respChan <- &Req{
+ s.respChan <- &request{
tag: msg.GetTag(),
ofcall: msg,
}
@@ -137,20 +137,20 @@ func TestGetReq(t *testing.T) {
func TestSVersion(t *testing.T) {
tests := []struct {
- input *Req
- want *Req
+ input *request
+ want *request
}{
- {&Req{ifcall: &TVersion{Msize: 1024, Version: "9P2000"}},
- &Req{ofcall: &RVersion{Msize: 1024, Version: "9P2000"}}},
- {&Req{ifcall: &TVersion{Msize: 564, Version: "9P2000"}},
- &Req{ofcall: &RVersion{Msize: 564, Version: "9P2000"}}},
- {&Req{ifcall: &TVersion{Msize: 8 * 1024, Version: "9P2000"}},
- &Req{ofcall: &RVersion{Msize: 1024, Version: "9P2000"}}},
- {&Req{ifcall: &TVersion{Msize: 1024, Version: "unko"}},
- &Req{ofcall: &RVersion{Msize: 1024, Version: "unknown"}}},
+ {&request{ifcall: &TVersion{Msize: 1024, Version: "9P2000"}},
+ &request{ofcall: &RVersion{Msize: 1024, Version: "9P2000"}}},
+ {&request{ifcall: &TVersion{Msize: 564, Version: "9P2000"}},
+ &request{ofcall: &RVersion{Msize: 564, Version: "9P2000"}}},
+ {&request{ifcall: &TVersion{Msize: 8 * 1024, Version: "9P2000"}},
+ &request{ofcall: &RVersion{Msize: 1024, Version: "9P2000"}}},
+ {&request{ifcall: &TVersion{Msize: 1024, Version: "unko"}},
+ &request{ofcall: &RVersion{Msize: 1024, Version: "unknown"}}},
}
- tc := make(chan *Req)
- rc := make(chan *Req)
+ tc := make(chan *request)
+ rc := make(chan *request)
s := &Server{msize: 1024, mSizeLock: new(sync.Mutex), respChan: rc}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -176,25 +176,25 @@ func TestSVersion(t *testing.T) {
func TestSAuth(t *testing.T) {
tests := []struct {
- input *Req
- want *Req
- authFunc func(context.Context, *Req)
+ input *request
+ want *request
+ authFunc func(context.Context, *request)
}{
- {&Req{ifcall: &TAuth{Afid: NOFID, Uname: "kenji", Aname: ""}},
- &Req{ofcall: &RError{Ename: errors.New("authentication not required")}}, nil},
- {&Req{ifcall: &TAuth{Afid: NOFID, Uname: "kenji", Aname: ""}},
- &Req{ofcall: &RError{Ename: errors.New("NOFID can't be used for afid")}},
- func(ctx context.Context, r *Req) {}},
- {&Req{ifcall: &TAuth{Afid: 0, Uname: "kenji", Aname: ""}},
- &Req{ofcall: &RAuth{Tag: 0, Aqid: Qid{0, 1, 2}}},
- func(ctx context.Context, r *Req) {
+ {&request{ifcall: &TAuth{Afid: NOFID, Uname: "kenji", Aname: ""}},
+ &request{ofcall: &RError{Ename: errors.New("authentication not required")}}, nil},
+ {&request{ifcall: &TAuth{Afid: NOFID, Uname: "kenji", Aname: ""}},
+ &request{ofcall: &RError{Ename: errors.New("NOFID can't be used for afid")}},
+ func(ctx context.Context, r *request) {}},
+ {&request{ifcall: &TAuth{Afid: 0, Uname: "kenji", Aname: ""}},
+ &request{ofcall: &RAuth{Tag: 0, Aqid: Qid{0, 1, 2}}},
+ func(ctx context.Context, r *request) {
r.ofcall = &RAuth{Tag: 0, Aqid: Qid{0, 1, 2}}
}},
}
for _, test := range tests {
func() {
- tc := make(chan *Req)
- rc := make(chan *Req)
+ tc := make(chan *request)
+ rc := make(chan *request)
defer close(tc)
defer close(rc)
s := &Server{respChan: rc, Auth: test.authFunc, fPool: newFidPool()}
@@ -230,13 +230,13 @@ func TestSAuth(t *testing.T) {
func TestSFlush(t *testing.T) {
rp := newReqPool()
tests := []struct {
- input *Req
+ input *request
}{
- {&Req{ifcall: &TFlush{},
- oldreq: &Req{pool: rp, done: make(chan struct{})}}},
+ {&request{ifcall: &TFlush{},
+ oldreq: &request{pool: rp, done: make(chan struct{})}}},
}
- tc := make(chan *Req)
- rc := make(chan *Req)
+ tc := make(chan *request)
+ rc := make(chan *request)
s := &Server{msize: 1024, mSizeLock: new(sync.Mutex), respChan: rc}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()