commit 7cee1a932b36f9b36a9fe7c7de272535d1ac9f74
parent c89eeddf70bff42358191ad1c57690eb25ffdc72
Author: Matsuda Kenji <info@mtkn.jp>
Date: Fri, 22 Dec 2023 12:54:52 +0900
restore Flush
Diffstat:
| M | req.go | | | 1 | + |
| M | req_test.go | | | 75 | +++++++++++++++++++-------------------------------------------------------- |
| M | server.go | | | 58 | +++++++++++++--------------------------------------------- |
| M | testfs/fs.go | | | 8 | ++++---- |
4 files changed, 37 insertions(+), 105 deletions(-)
diff --git a/req.go b/req.go
@@ -16,6 +16,7 @@ type Req struct {
Oldreq *Req
pool *ReqPool
Cancel context.CancelFunc
+ Done <-chan struct{}
// listenErr is any error encountered while waiting for new 9P message.
listenErr error
// speakErrChan is used to report any error encountered while sending
diff --git a/req_test.go b/req_test.go
@@ -1,13 +1,14 @@
-package lib9p
+package lib9p_test
-/*
import (
"context"
"io"
"testing"
"time"
- "git.mtkn.jp/lib9p/client" // TODO: import cycle
+ "git.mtkn.jp/lib9p"
+ "git.mtkn.jp/lib9p/client"
+ "git.mtkn.jp/lib9p/testfs"
)
func TestFlush(t *testing.T) {
@@ -15,84 +16,46 @@ func TestFlush(t *testing.T) {
mSize = 1024
uname = "kenji"
)
- fsys.slow = true
- defer func() { fsys.slow = false }()
+ testfs.FS.Slow = true
+ defer func() { testfs.FS.Slow = false }()
sr, cw := io.Pipe()
defer sr.Close()
defer cw.Close()
cr, sw := io.Pipe()
defer cr.Close()
defer sw.Close()
- s := NewServer(fsys, mSize, sr, sw)
- //s.Chatty()
+ s := lib9p.NewServer(testfs.FS, mSize, sr, sw)
+ s.Chatty()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go s.Serve(ctx)
c := client.NewClient(mSize, uname, cr, cw)
defer c.Stop()
- tPool := newTagPool()
- rmsize, rversion, err := c.Version(ctx, NOTAG, mSize, "9P2000")
+ _, _, err := c.Version(ctx, lib9p.NOTAG, mSize, "9P2000")
if err != nil {
t.Fatalf("version: %v", err)
}
- t.Logf("rversion: %d, %s", rmsize, rversion)
- tag, err := tPool.add()
- if err != nil {
- t.Fatalf("add tag: %v", err)
- }
- _, err = c.Attach(ctx, tag, 0, NOFID, uname, "")
- tPool.delete(tag)
- if err != nil {
- t.Fatalf("attach: %v", err)
- }
- tag, err = tPool.add()
- if err != nil {
- t.Fatalf("add tag: %v", err)
- }
- _, err = c.Walk(ctx, tag, 0, 1, []string{"a"})
- tPool.delete(tag)
- if err != nil {
- t.Fatalf("walk: %v", err)
- }
- tag, err = tPool.add()
- if err != nil {
- t.Fatalf("add tag: %v", err)
- }
- _, _, err = c.Open(ctx, tag, 1, OREAD)
- tPool.delete(tag)
- if err != nil {
- t.Fatalf("open: %v", err)
- }
+ _, err = c.Attach(ctx, 0, 0, lib9p.NOFID, uname, "")
+ _, err = c.Walk(ctx, 0, 0, 1, []string{"a"})
+ _, _, err = c.Open(ctx, 0, 1, lib9p.OREAD)
done := make(chan string)
- tag0, err := tPool.add()
- if err != nil {
- t.Fatalf("add tag: %v", err)
- }
ctx1, cancel1 := context.WithCancel(ctx)
var data []byte
go func() {
- data, err = c.Read(ctx1, tag0, 1, 0, mSize-IOHDRSZ)
- tPool.delete(tag0)
+ data, err = c.Read(ctx1, 0, 1, 0, mSize-lib9p.IOHDRSZ)
close(done)
}()
// I want to send Tflush message after sending Tread and before recieving
// Rread. But I think this method is not relyable.
- time.Sleep(10 * time.Millisecond)
- tag, err = tPool.add()
- if err != nil {
- t.Fatalf("add tag: %v", err)
- }
- t.Log(tPool)
- err = c.Flush(ctx, tag, tag0)
- tPool.delete(tag)
+ time.Sleep(1 * time.Millisecond)
+ err = c.Flush(ctx, 1, 0)
if err != nil {
t.Errorf("flush: %v", err)
}
- t.Log(tPool)
cancel1()
- t.Log(tPool)
- t.Logf("canceled: %v", <-done)
+ <-done
+ t.Logf("canceled")
t.Logf("read data: %v, err: %v", data, err)
- t.Log(tPool)
+ time.Sleep(testfs.SleepTime * 2)
}
-*/
+
diff --git a/server.go b/server.go
@@ -64,8 +64,11 @@ type Server struct {
respChan chan *Req
- // Auth is called when Tauth message arrives.
- // If authentication is desired, the Auth functions should
+ // Auth function is passed an auth request when a TAuth message arrives.
+ // It should run a goroutine listeneing to the channel which the function
+ // returns and send processed *Req through the channel specified by the
+ // second argument.
+ // If authentication is desired, the Auth goroutine should
// set Req.Afid.File to an *AuthFile and Req.Ofcall.Qid, and prepare to
// authenticate via the Read()/Write() calls to Req.Afid.File
// If this is nil, no authentication is performed.
@@ -757,6 +760,7 @@ func rCreate(ctx context.Context, c <-chan *Req) {
}
// TODO: I think the file should be locked while reading.
+// or should the undeterminism left for the client side?
func sRead(ctx context.Context, s *Server, c <-chan *Req) {
rc := make(chan *Req)
defer close(rc)
@@ -851,7 +855,7 @@ func sRead(ctx context.Context, s *Server, c <-chan *Req) {
rc <- r
continue
}
- case <-ctx.Done():
+ case <-r.Done:
return
}
r.Ofcall = &RRead{
@@ -941,7 +945,7 @@ func sWrite(ctx context.Context, s *Server, c <-chan *Req) {
rc <- r
continue
}
- case <-ctx.Done():
+ case <-r.Done:
return
}
r.Ofcall = ofcall
@@ -1355,7 +1359,7 @@ func (s *Server) Serve(ctx context.Context) {
go sRemove(ctx, s, removeChan)
go sStat(ctx, s, statChan)
go sWStat(ctx, s, wstatChan)
- go Respond2(ctx, s)
+ go respond(ctx, s)
L:
for {
select {
@@ -1367,8 +1371,9 @@ L:
}
continue L
}
- //ctx1, cancel := context.WithCancel(ctx)
- //r.Cancel = cancel
+ ctx1, cancel := context.WithCancel(ctx)
+ r.Cancel = cancel
+ r.Done = ctx1.Done()
go func() {
switch r.Ifcall.(type) {
default:
@@ -1410,44 +1415,7 @@ L:
}
}
-/*
-// Respond Responds to the request r with the message r.Ofcall if err is nil,
-// or if err is not nil, with the Rerror with the error message.
-// If r is nil, or both r.Ofcall and err are nil it panics.
-func Respond() {
- switch r.Ifcall.(type) {
- default:
- panic(fmt.Errorf("bug: r.Ifcall: %v", r.Ifcall))
- }
- r.Ofcall.SetTag(r.Tag)
- // free tag.
- if r.pool == nil && err != ErrDupTag {
- panic("ReqPool is nil but err is not EDupTag")
- }
- if r.pool != nil {
- r.pool.delete(r.Tag)
- }
- select {
- case r.Srv.speakChan <- r:
- if r.Srv.chatty9P {
- fmt.Fprintf(os.Stderr, "--> %s\n", r.Ofcall)
- }
- case <-ctx.Done():
- log.Printf("req flush: %v", r.Ifcall)
- }
- select {
- case err := <-r.speakErrChan:
- // TODO: handle errors.
- if err != nil {
- log.Printf("speak: %v", err)
- }
- case <-ctx.Done():
- //log.Printf("respond: %v", ctx.Err())
- }
-}
-*/
-
-func Respond2(ctx context.Context, s *Server) {
+func respond(ctx context.Context, s *Server) {
for {
select {
case <-ctx.Done():
diff --git a/testfs/fs.go b/testfs/fs.go
@@ -10,7 +10,7 @@ import (
"git.mtkn.jp/lib9p"
)
-const sleepTime = 1 * time.Second
+const SleepTime = 10 * time.Millisecond
type TestFile struct {
Fsys *TestFS
@@ -32,14 +32,14 @@ func (f *TestFile) Close() error {
func (f *TestFile) Read(b []byte) (int, error) {
if f.Fsys.Slow {
- time.Sleep(sleepTime)
+ time.Sleep(SleepTime)
}
return f.Reader.Read(b)
}
func (f *TestFile) ReadAt(b []byte, off int64) (n int, err error) {
if f.Fsys.Slow {
- time.Sleep(sleepTime)
+ time.Sleep(SleepTime)
}
return f.Reader.ReadAt(b, off)
}
@@ -55,7 +55,7 @@ func (f *TestFile) ReadDir(n int) ([]fs.DirEntry, error) {
func (f *TestFile) WriteAt(p []byte, off int64) (int, error) {
if f.Fsys.Slow {
- time.Sleep(sleepTime)
+ time.Sleep(SleepTime)
}
if f.Reader == nil {
return 0, fmt.Errorf("not open")