commit 45f58e3b361c861a6810c0b693a795f20160bcec
parent a33dc456403c6b2afcfa7383d38071da8c0252b4
Author: Matsuda Kenji <info@mtkn.jp>
Date: Sat, 23 Dec 2023 14:55:42 +0900
update test for flush and add clients error reporter
Diffstat:
3 files changed, 110 insertions(+), 17 deletions(-)
diff --git a/client/client.go b/client/client.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "log"
"sync"
"git.mtkn.jp/lib9p"
@@ -37,6 +38,7 @@ func NewClient(mSize uint32, uname string, r io.Reader, w io.Writer) *Client {
tmsgc := c.runSpeaker(ctx, w)
rmsgc := c.runListener(ctx, r)
c.txc = c.runMultiplexer(ctx, tmsgc, rmsgc)
+ c.runErrorReporter(ctx)
return c
}
@@ -61,6 +63,24 @@ func (c *Client) setMSize(mSize uint32) {
c.msize = mSize
}
+// TODO: handle errors properly.
+// By just printing log message, transact function can't return.
+func (c *Client) runErrorReporter(ctx context.Context) {
+ go func() {
+ for {
+ select {
+ case err := <-c.errc:
+ if err == nil {
+ return
+ }
+ log.Println("client err:", err)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+}
+
// RunListener runs listener goroutine.
// Listener reads byte array of 9P messages from r and make each of them into
// corresponding struct that implements lib9p.Msg, and sends it to the returned channel.
@@ -131,8 +151,6 @@ func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- lib9p.Msg {
return
case msg := <-tmsgc:
if msg == nil {
- // tmsgc is closed, which means ctx.Done() is also closed.
- // but this code breaks semantics?
return
}
if err := lib9p.SendMsg(msg, w); err != nil {
@@ -195,6 +213,12 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- lib9p.Msg, rms
continue
}
delete(rPool, msg.GetTag())
+ if tflush, ok := req.tmsg.(*lib9p.TFlush); ok {
+ if _, ok := msg.(*lib9p.RFlush); !ok {
+ c.errc <- fmt.Errorf("mux: response to Tflush is not Rflush")
+ }
+ delete(rPool, tflush.Oldtag)
+ }
req.rmsg = msg
go func() {
defer close(req.rxc)
diff --git a/req_test.go b/req_test.go
@@ -3,12 +3,19 @@ package lib9p_test
import (
"context"
"testing"
- "time"
"git.mtkn.jp/lib9p"
"git.mtkn.jp/lib9p/testfs"
)
+// TestFlush tests if Tflush message can cancel the time-consuming Tread.
+// It sets the testfs.Fsys slow, sends a Tread message which blocks until
+// something is sent over Fsys.Waitc, and sends a Tflush message, waits for
+// the Rflush, then unblock the pending Tread message by sending something
+// through the Fsys.Waitc.
+// And it checks if the response for Tread message is an error.
+// If the error is nil, Tflush is not successful. If not, Tflush is ok.
+// It also tests same thing for a Twrite request.
func TestFlush(t *testing.T) {
const (
mSize = 8 * 1024
@@ -18,32 +25,72 @@ func TestFlush(t *testing.T) {
defer func() { testfs.Fsys.Slow = false }()
conn := testfs.SetupConn()
defer conn.Close()
- ctx := context.Background()
conn.S.Chatty()
+ ctx := context.Background()
_, _, err := conn.C.Version(ctx, lib9p.NOTAG, mSize, "9P2000")
if err != nil {
t.Fatalf("version: %v", err)
}
_, err = conn.C.Attach(ctx, 0, 0, lib9p.NOFID, uname, "")
+ if err != nil {
+ t.Fatalf("attach: %v", err)
+ }
_, err = conn.C.Walk(ctx, 0, 0, 1, []string{"a"})
+ if err != nil {
+ t.Fatalf("walk: %v", err)
+ }
_, _, err = conn.C.Open(ctx, 0, 1, lib9p.OREAD)
- done := make(chan string)
+ if err != nil {
+ t.Fatalf("open: %v", err)
+ }
+ done := make(chan struct{})
ctx1, cancel1 := context.WithCancel(ctx)
- var data []byte
go func() {
- data, err = conn.C.Read(ctx1, 0, 1, 0, mSize-lib9p.IOHDRSZ)
+ _, err = conn.C.Read(ctx1, 0, 1, 0, mSize-lib9p.IOHDRSZ)
close(done)
}()
- // I want to send Tflush message after sending Tread and before recieving
- // Rread. But I think this method is not relyable.
- time.Sleep(1 * time.Millisecond)
+ <-testfs.Fsys.Waitc
err = conn.C.Flush(ctx, 1, 0)
if err != nil {
t.Errorf("flush: %v", err)
}
+ testfs.Fsys.Waitc <- struct{}{}
cancel1()
<-done
- t.Logf("canceled")
- t.Logf("read data: %v, err: %v", data, err)
- time.Sleep(testfs.SleepTime * 2)
+ if err == nil { // this err is from Read.
+ t.Errorf("Rread arrived after Rflush.")
+ }
+ err = conn.C.Clunk(ctx, 0, 1)
+ if err != nil {
+ t.Fatalf("clunk: %v", err)
+ }
+ wname := []string{"dir", "file"}
+ wqid, err := conn.C.Walk(ctx, 0, 0, 1, wname)
+ if err != nil {
+ t.Fatalf("walk: %v", err)
+ } else if len(wqid) != len(wname) {
+ t.Fatalf("walk: not found")
+ }
+ _, _, err = conn.C.Open(ctx, 0, 1, lib9p.OWRITE)
+ if err != nil {
+ t.Fatalf("open: %v", err)
+ }
+ done = make(chan struct{})
+ ctx1, cancel1 = context.WithCancel(ctx)
+ go func() {
+ data := []byte("unko")
+ _, err = conn.C.Write(ctx1, 0, 1, 0, uint32(len(data)), data)
+ close(done)
+ }()
+ <-testfs.Fsys.Waitc
+ err = conn.C.Flush(ctx, 1, 0)
+ if err != nil {
+ t.Errorf("flush: %v", err)
+ }
+ testfs.Fsys.Waitc <- struct{}{}
+ cancel1()
+ <-done
+ if err == nil { // this err is from Read.
+ t.Errorf("Rread arrived after Rflush.")
+ }
}
diff --git a/testfs/fs.go b/testfs/fs.go
@@ -13,18 +13,28 @@ import (
const SleepTime = 10 * time.Millisecond
type File struct {
+ // Fsys is the FS this File belongs to.
Fsys *FS
+ // Parent is the parent directory this File located at.
Parent *File
+ // Children is the child files this File has.
+ // It is nil if this File is not a directory
Children []*File
+ // Content is the content of this File.
+ // It is nil if this File is a directory
Content []byte
+ // Reader is used if this File is not a directory and
+ // when this File is read.
+ // It contains Content.
Reader *bytes.Reader
-
+ // St is the Stat of this File.
St lib9p.Stat
}
func (f *File) Stat() (fs.FileInfo, error) {
return &lib9p.FileInfo{Stat: f.St}, nil
}
+
func (f *File) Close() error {
f.Reader = nil
return nil
@@ -32,14 +42,16 @@ func (f *File) Close() error {
func (f *File) Read(b []byte) (int, error) {
if f.Fsys.Slow {
- time.Sleep(SleepTime)
+ f.Fsys.Waitc <-struct{}{}
+ <-f.Fsys.Waitc
}
return f.Reader.Read(b)
}
func (f *File) ReadAt(b []byte, off int64) (n int, err error) {
if f.Fsys.Slow {
- time.Sleep(SleepTime)
+ f.Fsys.Waitc <- struct{}{}
+ <-f.Fsys.Waitc
}
return f.Reader.ReadAt(b, off)
}
@@ -55,7 +67,7 @@ func (f *File) ReadDir(n int) ([]fs.DirEntry, error) {
func (f *File) WriteAt(p []byte, off int64) (int, error) {
if f.Fsys.Slow {
- time.Sleep(SleepTime)
+ f.Fsys.Waitc <- struct{}{}
}
if f.Reader == nil {
return 0, fmt.Errorf("not open")
@@ -70,12 +82,21 @@ func (f *File) WriteAt(p []byte, off int64) (int, error) {
}
copy(f.Content[off:], p)
f.Reader.Reset(f.Content)
+ if f.Fsys.Slow {
+ <-f.Fsys.Waitc
+ }
return len(p), nil
}
type FS struct {
+ // Root is the root File of this file system.
Root *File
+ // If Slow is true, each function on files of this FS waits for
+ // something arrives on Waitc.
Slow bool
+ // Waitc is used to wait some timing to emulate slow response.
+ Waitc chan struct{}
+ // groups holds the information of groups and it leader and members.
groups map[string]group
}
@@ -189,6 +210,7 @@ func init() {
},
},
},
+ Waitc: make(chan struct{}),
groups: map[string]group{
"bell": group{
leader: "glenda",