commit c3c5e2ace48d2d47c310c7824e730369d2bd344a
parent 6405e3f0de3aa9a5730b04b3a973727d2768ec41
Author: Matsuda Kenji <info@mtkn.jp>
Date: Fri, 29 Dec 2023 07:55:17 +0900
delete ReqPool from Server
Diffstat:
3 files changed, 13 insertions(+), 21 deletions(-)
diff --git a/export_test.go b/export_test.go
@@ -45,11 +45,11 @@ type BufMsg = bufMsg
func (s *Server) RPool() *ReqPool { return s.rPool }
func (s *Server) FPool() *FidPool { return s.fPool }
-func (s *Server) RunListener(ctx context.Context) {
- s.runListener(ctx)
+func (s *Server) RunListener(ctx context.Context, rp *ReqPool) {
+ s.runListener(ctx, rp)
}
-func (s *Server) RunSpeaker(ctx context.Context) {
- s.runSpeaker(ctx)
+func (s *Server) RunSpeaker(ctx context.Context, rp *ReqPool) {
+ s.runSpeaker(ctx, rp)
}
func (s *Server) SetFS(fs FS) { s.fs = fs }
func (s *Server) SetRespChan(rc chan *Req) { s.respChan = rc }
diff --git a/server.go b/server.go
@@ -81,7 +81,6 @@ func NewServer(fsys FS, mSize uint32, r io.Reader, w io.Writer) *Server {
msize: mSize,
mSizeLock: new(sync.Mutex),
fPool: newFidPool(),
- rPool: newReqPool(),
r: r,
w: w,
respChan: make(chan *Req),
@@ -109,14 +108,14 @@ func (s *Server) setMSize(mSize uint32) {
// runListener runs the listener goroutine.
// Listener goroutine reads 9P messages from s.r by calling getReq
// and allocats Req for each of them, and sends it to the server's listenChan.
-func (s *Server) runListener(ctx context.Context) {
+func (s *Server) runListener(ctx context.Context, rp *ReqPool) {
rc := make(chan *Req)
s.listenChan = rc
go func() {
defer close(rc)
for {
select {
- case rc <- getReq(s.r, s.rPool, s.chatty9P):
+ case rc <- getReq(s.r, rp, s.chatty9P):
case <-ctx.Done():
return
}
@@ -127,7 +126,7 @@ func (s *Server) runListener(ctx context.Context) {
// runSpeaker runs the speaker goroutine.
// Speaker goroutine wait for reply Requests from the returned channel,
// and marshalls each of them into 9P messages and writes it to s.w.
-func (s *Server) runSpeaker(ctx context.Context) {
+func (s *Server) runSpeaker(ctx context.Context, rp *ReqPool) {
rc := make(chan *Req)
s.speakChan = rc
go func() {
@@ -135,13 +134,7 @@ func (s *Server) runSpeaker(ctx context.Context) {
select {
case r := <-rc:
// free tag.
- if r.pool == nil {
- if e, ok := r.Ofcall.(*RError); !ok || e.Ename != ErrDupTag {
- panic("r.pool is nil and error is not EDupTag")
- }
- } else {
- r.pool.delete(r.Tag)
- }
+ rp.delete(r.Tag)
_, err := s.w.Write(r.Ofcall.marshal())
if err != nil {
select {
@@ -1151,8 +1144,9 @@ func sWStat(ctx context.Context, s *Server, c <-chan *Req) {
// Serve serves 9P conversation.
func (s *Server) Serve(ctx context.Context) {
- s.runListener(ctx)
- s.runSpeaker(ctx)
+ rp := newReqPool()
+ s.runListener(ctx, rp)
+ s.runSpeaker(ctx, rp)
var (
versionChan = make(chan *Req)
authChan = make(chan *Req)
diff --git a/server2_test.go b/server2_test.go
@@ -27,7 +27,6 @@ func TestRunListener(t *testing.T) {
}
defer tFile2.Close()
s := &Server{
- rPool: newReqPool(),
r: tFile,
}
oldReqPoolAdd := reqPoolAdd
@@ -35,7 +34,7 @@ func TestRunListener(t *testing.T) {
reqPoolAdd = func(*ReqPool, uint16) (*Req, error) { return &Req{}, nil }
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- s.runListener(ctx)
+ s.runListener(ctx, newReqPool())
for {
want, err := RecvMsg(tFile2)
if err == io.EOF {
@@ -68,7 +67,7 @@ func TestRunSpeaker(t *testing.T) {
rp := newReqPool()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- s.runSpeaker(ctx)
+ s.runSpeaker(ctx, rp)
for {
want, err := readMsg(rFile)
if err == io.EOF {
@@ -83,7 +82,6 @@ func TestRunSpeaker(t *testing.T) {
s.speakChan <- &Req{
Ofcall: msg,
speakErrChan: make(chan error),
- pool: rp,
}
got := make([]byte, len(want))
_, err = io.ReadFull(r, got)