lib9p

Go 9P library.
Log | Files | Refs | LICENSE

commit 6a70e03c7738491d3bed4ce2bfbcb4a7cd15988f
parent 336e8e1f3f6e5e4110474f882860c714625711cf
Author: Matsuda Kenji <info@mtkn.jp>
Date:   Mon, 25 Dec 2023 09:11:12 +0900

rename client structs and add file test

Diffstat:
Mclient/client.go | 105+++++++++++++++++++++++++++++++++++++++++++++++--------------------------------
Aclient/file_test.go | 96+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mclient/req.go | 16++++++++--------
3 files changed, 166 insertions(+), 51 deletions(-)

diff --git a/client/client.go b/client/client.go @@ -12,18 +12,38 @@ import ( // Client is a client side of the 9P conversation. type Client struct { - msize uint32 + // msize is the maximum message size in length + msize uint32 + // mSizeLock is the mutex used when msize is to be changed. mSizeLock *sync.Mutex - uname string - fPool *fidPool - txc chan<- *clientReq - errc chan error - cancel context.CancelFunc - rootFid *fid - wg *sync.WaitGroup + + // uname is used to communicate with a server. + uname string + + // fPool is the fidPool which hold the list of opend fids. + fPool *fidPool + + // txc is used to send a reqest to the multiplexer goroutine + txc chan<- *req + + // errc is used to report any error which is not relevant to + // a specific request + errc chan error + + // cancel is the CancelFunc to stop the goroutines evoked by this client. + cancel context.CancelFunc + + // rootFid is the fid of the root of the file system. + rootFid *fid + + // wg is the WaitGroup of all goroutines evoked by this client and its + // descendants. + wg *sync.WaitGroup } -// newClient creates a Client. +// NewClient creates a Client. +// It also runs several goroutines to handle requests. +// And the returned client should be stopped by calling *Client.Stop afterwards. func NewClient(mSize uint32, uname string, r io.Reader, w io.Writer) *Client { ctx, cancel := context.WithCancel(context.Background()) c := &Client{ @@ -163,73 +183,73 @@ func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- lib9p.Msg { // RunMultiplexer runs multiplexer goroutine. // Multiplexer goroutines, one for recieving Rmsg and another for sending Tmsg. -// The goroutine for Tmsg recieves *clientReq from the returned channel, +// The goroutine for Tmsg recieves *req from the returned channel, // and send the 9P lib9p.Msg to the speaker goroutine via tmsgc. -// The goroutine for Rmsg recieves *clientReq from the Tmsg goroutine and waits for +// The goroutine for Rmsg recieves *req from the Tmsg goroutine and waits for // the reply to the corresponding message from the listener goroutine via rmsgc. -// After recieving the reply, it sets the *clientReq.rmsg and sends it t the -// *clientReq.rxc. +// After recieving the reply, it sets the *req.rmsg and sends it t the +// *req.rxc. // It reports any errors to the client's errc channel. -func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- lib9p.Msg, rmsgc <-chan lib9p.Msg) chan<- *clientReq { +func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- lib9p.Msg, rmsgc <-chan lib9p.Msg) chan<- *req { c.wg.Add(2) - txc := make(chan *clientReq) - reqc := make(chan *clientReq) + txc := make(chan *req) + reqc := make(chan *req) // Rmsg - go func(reqc <-chan *clientReq) { + go func(reqc <-chan *req) { wg := new(sync.WaitGroup) defer func() { wg.Wait() c.wg.Done() }() - rPool := make(map[uint16]*clientReq) + rPool := make(map[uint16]*req) for { select { case <-ctx.Done(): return - case req := <-reqc: - if req == nil { + case r := <-reqc: + if r == nil { // ctx is canceled. continue } - if _, ok := rPool[req.tag]; ok { - req.errc <- fmt.Errorf("mux: duplicate tag: %d", req.tag) + if _, ok := rPool[r.tag]; ok { + r.errc <- fmt.Errorf("mux: duplicate tag: %d", r.tag) continue } - rPool[req.tag] = req // TODO: wait for req.ctxDone channel. + rPool[r.tag] = r // TODO: wait for r.ctxDone channel. wg.Add(1) go func() { defer wg.Done() - <-req.ctxDone + <-r.ctxDone }() case msg := <-rmsgc: if msg == nil { // ctx is canceled. continue } - req, ok := rPool[msg.GetTag()] + r, ok := rPool[msg.GetTag()] if !ok { c.errc <- fmt.Errorf("mux: unknown tag for msg: %v", msg) continue } delete(rPool, msg.GetTag()) - if tflush, ok := req.tmsg.(*lib9p.TFlush); ok { + if tflush, ok := r.tmsg.(*lib9p.TFlush); ok { if _, ok := msg.(*lib9p.RFlush); !ok { - req.errc <- fmt.Errorf("mux: response to Tflush is not Rflush") + r.errc <- fmt.Errorf("mux: response to Tflush is not Rflush") } delete(rPool, tflush.Oldtag) } - req.rmsg = msg + r.rmsg = msg go func() { select { - case <-req.ctxDone: - case req.rxc <- req: + case <-r.ctxDone: + case r.rxc <- r: } }() } } }(reqc) // Tmsg - go func(reqc chan<- *clientReq) { + go func(reqc chan<- *req) { wg := new(sync.WaitGroup) defer func() { wg.Wait() @@ -241,16 +261,16 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- lib9p.Msg, rms select { case <-ctx.Done(): return - case req := <-txc: + case r := <-txc: select { - case reqc <- req: + case reqc <- r: case <-ctx.Done(): return } wg.Add(1) go func() { defer wg.Done() - tmsgc <- req.tmsg + tmsgc <- r.tmsg }() } } @@ -258,23 +278,22 @@ func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- lib9p.Msg, rms return txc } -// Transact send 9P lib9p.Msg of req to the multiplexer goroutines and recieves +// Transact send 9P lib9p.Msg of r to the multiplexer goroutines and recieves // the reply. func (c *Client) transact(ctx context.Context, tmsg lib9p.Msg) (lib9p.Msg, error) { ctx1, cancel1 := context.WithCancel(ctx) - req := newClientReq(ctx1, tmsg) - defer req.Close() + defer cancel1() + r := newReq(ctx1, tmsg) + defer r.Close() select { case <-ctx.Done(): return nil, ctx.Err() - case c.txc <- req: + case c.txc <- r: } select { - case req = <-req.rxc: // This assignment is not required. - cancel1() - return req.rmsg, req.err - case err := <-req.errc: // Client side error. - cancel1() + case r = <-r.rxc: // This assignment is not required. + return r.rmsg, r.err + case err := <-r.errc: // Client side error. return nil, err case <-ctx.Done(): return nil, ctx.Err() diff --git a/client/file_test.go b/client/file_test.go @@ -0,0 +1,95 @@ +package client_test + +import ( + "context" + "io" + "path" + "testing" + + "git.mtkn.jp/lib9p" + "git.mtkn.jp/lib9p/client" + "git.mtkn.jp/lib9p/testfs" +) + +type FS struct { + *client.FS + cr, sr *io.PipeReader + cw, sw *io.PipeWriter + cancel context.CancelFunc +} + +func mountTestFS(t *testing.T) *FS { + cr, sw := io.Pipe() + sr, cw := io.Pipe() + s := lib9p.NewServer(testfs.Fsys, 8 * 1024, sr, sw) + ctx, cancel := context.WithCancel(context.Background()) + go s.Serve(ctx) + fsys, err := client.Mount(cr, cw, "ken", "") + if err != nil { + t.Fatal(err) + } + return &FS{fsys, cr, sr, cw, sw, cancel} +} + +func (fsys *FS) unmount() { + fsys.Unmount() + fsys.cancel() + fsys.cr.Close() + fsys.cw.Close() + fsys.sr.Close() + fsys.sw.Close() +} + +// TestStat tests whether Stat returns the same lib9p.Stat as testfs.Fsys defines. +// TODO: work in progress. +func TestStat(t *testing.T) { + fsys := mountTestFS(t) + defer fsys.unmount() + +} + +// TestReadDir tests whether ReadDir returns the same dir entries as testfs.Fsys +// has. +func TestReadDir(t *testing.T) { + fsys := mountTestFS(t) + defer fsys.unmount() + testReadDir(t, fsys, testfs.Fsys, ".") +} + +func testReadDir(t *testing.T, cfs *FS, tfs *testfs.FS, cwd string) { + t.Log(cwd) + cf, err := cfs.Open(cwd) + if err != nil { + t.Fatalf("open: %v", err) + } + defer cf.Close() + st, err := cf.Stat() + if err != nil { + t.Fatalf("stat: %v", err) + } + if !st.IsDir() { + return + } + de, err := cf.(*client.File).ReadDir(-1) + if err != nil && err != io.EOF { + t.Fatalf("readdir: %v", err) + } + tf, err := tfs.OpenFile(cwd, lib9p.OREAD) + if err != nil { + t.Fatalf("open: %v", err) + } + if len(de) != len(tf.(*testfs.File).Children) { + t.Fatal("number of directory entries does not match.") + } +L: + for _, f := range de { + for _, c := range tf.(*testfs.File).Children { + if c.St.Name == f.Name() { + childPath := path.Join(cwd, f.Name()) + testReadDir(t, cfs, tfs, childPath) + continue L + } + } + t.Errorf("file name not in the testfs: %s", f.Name()) + } +} +\ No newline at end of file diff --git a/client/req.go b/client/req.go @@ -8,32 +8,32 @@ import ( "git.mtkn.jp/lib9p" ) -// clientReq represents each requests of the client. -type clientReq struct { +// req represents each requests of the client. +type req struct { tag uint16 tmsg lib9p.Msg rmsg lib9p.Msg err error errc chan error // To report any client side error to transact(). - rxc chan *clientReq + rxc chan *req ctxDone <-chan struct{} } -// newClientReq allocates a clientReq with msg. +// newReq allocates a req with msg. // It also sets the ctxDone channel to ctx.Done(). // TODO: passing ctx is confusing? // it only needs the done channel. -func newClientReq(ctx context.Context, msg lib9p.Msg) *clientReq { - return &clientReq{ +func newReq(ctx context.Context, msg lib9p.Msg) *req { + return &req{ tag: msg.GetTag(), tmsg: msg, - rxc: make(chan *clientReq), + rxc: make(chan *req), ctxDone: ctx.Done(), errc: make(chan error), } } -func (r *clientReq) Close() { +func (r *req) Close() { close(r.rxc) close(r.errc) }