commit 3366a197eca7d51e38ee0e830c51f63c635fc2e3
parent 6549bb4eefff5ee14c051fe3931d0167d4bf0ea8
Author: Matsuda Kenji <info@mtkn.jp>
Date: Mon, 30 Oct 2023 08:29:11 +0900
change arguments and return values of *Server.runListener and
.runSpeaker
Diffstat:
2 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/server.go b/server.go
@@ -107,19 +107,19 @@ func (s *Server) setMSize(mSize uint32) {
// It reports any error to the returned chan of error.
// TODO: pass context.Context and stop the goroutine with the Context being
// canceled.
-func (s *Server) runListener(ctx context.Context, r io.Reader) <-chan *Req {
+func (s *Server) runListener(ctx context.Context) {
rc := make(chan *Req)
+ s.listenChan = rc
go func() {
defer close(rc)
for {
select {
- case rc <- getReq(r, s):
+ case rc <- getReq(s.r, s):
case <-ctx.Done():
return
}
}
}()
- return rc
}
// runSpeaker runs the speaker goroutine.
@@ -127,13 +127,14 @@ func (s *Server) runListener(ctx context.Context, r io.Reader) <-chan *Req {
// and marshalls each of them into 9P messages and writes it to w.
// TODO: pass context.Context and stop the goroutine with the Context being
// canceled.
-func (s *Server) runSpeaker(ctx context.Context, w io.Writer) chan<- *Req {
+func (s *Server) runSpeaker(ctx context.Context) {
rc := make(chan *Req)
+ s.speakChan = rc
go func() {
for {
select {
case r := <-rc:
- _, err := w.Write(r.Ofcall.marshal())
+ _, err := s.w.Write(r.Ofcall.marshal())
if err != nil {
select {
case r.speakErrChan <- err:
@@ -147,7 +148,6 @@ func (s *Server) runSpeaker(ctx context.Context, w io.Writer) chan<- *Req {
}
}
}()
- return rc
}
// GetReq reads 9P message from r, allocates Req and returns it.
@@ -905,8 +905,8 @@ func rWStat(r *Req, err error) {
func (s *Server) Serve(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- s.listenChan = s.runListener(ctx, s.r)
- s.speakChan = s.runSpeaker(ctx, s.w)
+ s.runListener(ctx)
+ s.runSpeaker(ctx)
L:
for {
select {
@@ -1009,7 +1009,6 @@ func Respond(ctx context.Context, r *Req, err error) {
case <-ctx.Done():
log.Printf("req flush: %v", r.Ifcall)
}
-
select {
case err := <- r.speakErrChan:
// TODO: handle errors.
diff --git a/server_test.go b/server_test.go
@@ -152,7 +152,11 @@ func TestServer(t *testing.T) {
Count: 1024,
},
}
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
s := lib9p.NewServer(testfs.FS, 1024, sr, sw)
+ s.listenChan = s.runListener(ctx, s.r)
+ s.speakChan = s.runSpeaker(ctx, s.w)
for _, m := range msg {
r, err := newReq(s, m)
if err != nil {
@@ -160,7 +164,7 @@ func TestServer(t *testing.T) {
return
}
t.Logf("<-- %v\n", r.Ifcall)
- go handleReq(context.Background(), s, r)
+ go handleReq(ctx, s, r)
buf := make([]byte, 1024)
_, err = cr.Read(buf)