lib9p

Go 9P library.
Log | Files | Refs | LICENSE

commit f62e5a3699de784ec9d8cf593496188a7aca38ae
parent 590904aef9dc59d0c0e13487691917464ff513b8
Author: Matsuda Kenji <info@mtkn.jp>
Date:   Thu, 26 Oct 2023 16:20:28 +0900

move codes for client to subpackage client

Diffstat:
Dclient.go | 453-------------------------------------------------------------------------------
Aclient/client.go | 455+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aclient/client_test.go | 168+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aclient/fid.go | 96+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aclient/file.go | 88+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aclient/fs.go | 151++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aclient/req.go | 88+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Dclient2.go | 149-------------------------------------------------------------------------------
Dclient_test.go | 168-------------------------------------------------------------------------------
Mfid.go | 88-------------------------------------------------------------------------------
Mfile.go | 81-------------------------------------------------------------------------------
Mreq.go | 80-------------------------------------------------------------------------------
12 files changed, 1046 insertions(+), 1019 deletions(-)

diff --git a/client.go b/client.go @@ -1,453 +0,0 @@ -package lib9p - -import ( - "context" - "fmt" - "io" - "sync" -) - -// Client is a client side of the 9P conversation. -type Client struct { - msize uint32 - mSizeLock *sync.Mutex - uname string - fPool *clientFidPool - txc chan<- *clientReq - errc chan error - cancel context.CancelFunc - rootFid *clientFid - wg *sync.WaitGroup -} - -// newClient creates a Client. -func NewClient(mSize uint32, uname string, r io.Reader, w io.Writer) *Client { - ctx, cancel := context.WithCancel(context.Background()) - c := &Client{ - msize: mSize, - mSizeLock: new(sync.Mutex), - uname: uname, - fPool: allocClientFidPool(), - errc: make(chan error), - cancel: cancel, - wg: new(sync.WaitGroup), - } - tmsgc := c.runSpeaker(ctx, w) - rmsgc := c.runListener(ctx, r) - c.txc = c.runMultiplexer(ctx, tmsgc, rmsgc) - return c -} - -// Stop stops the Client. -func (c *Client) Stop() { - c.cancel() - c.wg.Wait() - close(c.errc) -} - -// mSize returns the maximum message size of the Client. -func (c *Client) mSize() uint32 { - c.mSizeLock.Lock() - defer c.mSizeLock.Unlock() - return c.msize -} - -// setMSize changes the maximum message size of the Client. -func (c *Client) setMSize(mSize uint32) { - c.mSizeLock.Lock() - defer c.mSizeLock.Unlock() - c.msize = mSize -} - -// RunListener runs listener goroutine. -// Listener reads byte array of 9P messages from r and make each of them into -// corresponding struct that implements Msg, and sends it to the returned channel. -// Listener goroutine returns when ctx is canceled. -// Listener goroutine reports errors to the client's errc channel. -func (c *Client) runListener(ctx context.Context, r io.Reader) <-chan Msg { - c.wg.Add(1) - // TODO: terminate with ctx.Done() - rmsgc := make(chan Msg, 3) - go func() { - wg := new(sync.WaitGroup) - defer func() { - wg.Wait() - close(rmsgc) - c.wg.Done() - }() - for { - select { - case <-ctx.Done(): - // TODO: should return error via ec?? - // TODO: should close r? - return - default: - done := make(chan struct{}) - var ( - msg Msg - err error - ) - go func() { - defer close(done) - msg, err = recv(r) - }() - select { - case <-done: - case <-ctx.Done(): - } - if err != nil { - c.errc <- fmt.Errorf("recv: %v", err) - continue - } - wg.Add(1) - go func() { - defer wg.Done() - select { - case rmsgc <- msg: - case <-ctx.Done(): - } - }() - } - } - }() - return rmsgc -} - -// RunSpeaker runs speaker goroutine. -// Speaker goroutine recieves 9P Msgs from the returned channel, marshal them -// into byte arrays and sends them to w. -// It reports any errors to the clients errc channel. -// It returnes when ctx is canceled. -func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- Msg { - c.wg.Add(1) - tmsgc := make(chan Msg, 3) - go func() { - defer c.wg.Done() - for { - select { - case <-ctx.Done(): - return - case msg := <-tmsgc: - if msg == nil { - // tmsgc is closed, which means ctx.Done() is also closed. - // but this code breaks semantics? - return - } - if err := send(msg, w); err != nil { - c.errc <- fmt.Errorf("send: %v", err) - } - } - } - }() - return tmsgc -} - -// RunMultiplexer runs multiplexer goroutine. -// Multiplexer goroutines, one for recieving Rmsg and another for sending Tmsg. -// The goroutine for Tmsg recieves *clientReq from the returned channel, -// and send the 9P Msg to the speaker goroutine via tmsgc. -// The goroutine for Rmsg recieves *clientReq from the Tmsg goroutine and waits for -// the reply to the corresponding message from the listener goroutine via rmsgc. -// After recieving the reply, it sets the *clientReq.rmsg and sends it t the -// *clientReq.rxc. -// It reports any errors to the client's errc channel. -func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- Msg, rmsgc <-chan Msg) chan<- *clientReq { - c.wg.Add(2) - txc := make(chan *clientReq) - reqc := make(chan *clientReq) - // Rmsg - go func(reqc <-chan *clientReq) { - wg := new(sync.WaitGroup) - defer func() { - wg.Wait() - c.wg.Done() - }() - rPool := make(map[uint16]*clientReq) - for { - select { - case <-ctx.Done(): - return - case req := <-reqc: - if req == nil { - // ctx is canceled. - continue - } - if _, ok := rPool[req.tag]; ok { - c.errc <- fmt.Errorf("mux: duplicate tag: %d", req.tag) - continue - } - rPool[req.tag] = req // TODO: wait for req.ctxDone channel. - wg.Add(1) - go func() { - defer wg.Done() - <-req.ctxDone - }() - case msg := <-rmsgc: - if msg == nil { - // ctx is canceled. - continue - } - req, ok := rPool[msg.Tag()] - if !ok { - c.errc <- fmt.Errorf("mux: unknown tag for msg: %v", msg) - continue - } - delete(rPool, msg.Tag()) - req.rmsg = msg - go func() { - defer close(req.rxc) - select { - case <-req.ctxDone: - case req.rxc <- req: - } - }() - } - } - }(reqc) - // Tmsg - go func(reqc chan<- *clientReq) { - wg := new(sync.WaitGroup) - defer func() { - wg.Wait() - close(reqc) - close(tmsgc) - c.wg.Done() - }() - for { - select { - case <-ctx.Done(): - return - case req := <-txc: - select { - case reqc <- req: - case <-ctx.Done(): - return - } - wg.Add(1) - go func() { - defer wg.Done() - tmsgc <- req.tmsg - }() - } - } - }(reqc) - return txc -} - -// Transact send 9P Msg of req to the multiplexer goroutines and recieves -// the reply. -func (c *Client) transact(ctx context.Context, tmsg Msg) (Msg, error) { - ctx1, cancel1 := context.WithCancel(ctx) - req := newClientReq(ctx1, tmsg) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case c.txc <- req: - } - select { - case req = <-req.rxc: // TODO: this assignment is not required. - cancel1() - return req.rmsg, req.err - case <-ctx.Done(): - return nil, ctx.Err() - } -} - -func (c *Client) Version(ctx context.Context, tag uint16, mSize uint32, version string) (uint32, string, error) { - tmsg := &TVersion{tag: tag, mSize: mSize, version: version} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return 0, "", fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RVersion: - return rmsg.mSize, rmsg.version, nil - case *RError: - return 0, "", rmsg.ename - default: - return 0, "", fmt.Errorf("invalid reply: %v", rmsg) - } -} - -func (c *Client) Auth(ctx context.Context, tag uint16, afid uint32, uname, aname string) (Qid, error) { - tmsg := &TAuth{tag: tag, afid: afid, uname: uname} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return Qid{}, fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RAuth: - return rmsg.aqid, nil - case *RError: - return Qid{}, rmsg.ename - default: - return Qid{}, fmt.Errorf("invalid reply: %v", rmsg) - } -} - -func (c *Client) Attach(ctx context.Context, tag uint16, fid, afid uint32, uname, aname string) (Qid, error) { - tmsg := &TAttach{tag: tag, fid: fid, afid: afid, uname: uname, aname: aname} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return Qid{}, fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RAttach: - return rmsg.qid, nil - case *RError: - return Qid{}, rmsg.ename - default: - return Qid{}, fmt.Errorf("invalid reply: %v", rmsg) - } -} - -func (c *Client) Flush(ctx context.Context, tag, oldtag uint16) error { - tmsg := &TFlush{tag: tag, oldtag: oldtag} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RFlush: - return nil - case *RError: - return rmsg.ename - default: - return fmt.Errorf("invalid reply: %v", rmsg) - } -} -func (c *Client) Walk(ctx context.Context, tag uint16, fid, newFid uint32, wname []string) (wqid []Qid, err error) { - tmsg := &TWalk{tag: tag, fid: fid, newFid: newFid, wname: wname} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return nil, fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RWalk: - return rmsg.qid, nil - case *RError: - return nil, rmsg.ename - default: - return nil, fmt.Errorf("invalid reply: %v", rmsg) - } -} -func (c *Client) Open(ctx context.Context, tag uint16, fid uint32, mode OpenMode) (qid Qid, iounit uint32, err error) { - tmsg := &TOpen{tag: tag, fid: fid, mode: mode} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return Qid{}, 0, fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *ROpen: - return rmsg.qid, rmsg.iounit, nil - case *RError: - return Qid{}, 0, rmsg.ename - default: - return Qid{}, 0, fmt.Errorf("invalid reply: %v", rmsg) - } -} -func (c *Client) Create(ctx context.Context, tag uint16, fid uint32, name string, perm FileMode, mode OpenMode) (Qid, uint32, error) { - tmsg := &TCreate{tag: tag, fid: fid, name: name, perm: perm, mode: mode} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return Qid{}, 0, fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RCreate: - return rmsg.qid, rmsg.iounit, nil - case *RError: - return Qid{}, 0, rmsg.ename - default: - return Qid{}, 0, fmt.Errorf("invalid reply: %v", rmsg) - } -} -func (c *Client) Read(ctx context.Context, tag uint16, fid uint32, offset uint64, count uint32) (data []byte, err error) { - tmsg := &TRead{tag: tag, fid: fid, offset: offset, count: count} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return nil, fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RRead: - return rmsg.data, nil - case *RError: - return nil, rmsg.ename - default: - return nil, fmt.Errorf("invalid reply: %v", rmsg) - } -} -func (c *Client) Write(ctx context.Context, tag uint16, fid uint32, offset uint64, count uint32, data []byte) (uint32, error) { - tmsg := &TWrite{tag: tag, fid: fid, offset: offset, count: count, data: data} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return 0, fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RWrite: - return rmsg.count, nil - case *RError: - return 0, rmsg.ename - default: - return 0, fmt.Errorf("invalid reply: %v", rmsg) - } -} -func (c *Client) Clunk(ctx context.Context, tag uint16, fid uint32) error { - tmsg := &TClunk{tag: tag, fid: fid} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RClunk: - return nil - case *RError: - return rmsg.ename - default: - return fmt.Errorf("invalid reply: %v", rmsg) - } -} -func (c *Client) Remove(ctx context.Context, tag uint16, fid uint32) error { - tmsg := &TRemove{tag: tag, fid: fid} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RRemove: - return nil - case *RError: - return rmsg.ename - default: - return fmt.Errorf("invalid reply: %v", rmsg) - } -} -func (c *Client) Stat(ctx context.Context, tag uint16, fid uint32) (*Stat, error) { - tmsg := &TStat{tag: tag, fid: fid} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return nil, fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RStat: - return rmsg.stat, nil - case *RError: - return nil, rmsg.ename - default: - return nil, fmt.Errorf("invalid reply: %v", rmsg) - } -} -func (c *Client) Wstat(ctx context.Context, tag uint16, fid uint32, stat *Stat) error { - tmsg := &TWStat{tag: tag, fid: fid, stat: stat} - rmsg, err := c.transact(ctx, tmsg) - if err != nil { - return fmt.Errorf("transact: %v", err) - } - switch rmsg := rmsg.(type) { - case *RWStat: - return nil - case *RError: - return rmsg.ename - default: - return fmt.Errorf("invalid reply: %v", rmsg) - } -} diff --git a/client/client.go b/client/client.go @@ -0,0 +1,455 @@ +package client + +import ( + "context" + "fmt" + "io" + "sync" + + "git.mtkn.jp/lib9p" +) + +// Client is a client side of the 9P conversation. +type Client struct { + msize uint32 + mSizeLock *sync.Mutex + uname string + fPool *clientFidPool + txc chan<- *clientReq + errc chan error + cancel context.CancelFunc + rootFid *clientFid + wg *sync.WaitGroup +} + +// newClient creates a Client. +func NewClient(mSize uint32, uname string, r io.Reader, w io.Writer) *Client { + ctx, cancel := context.WithCancel(context.Background()) + c := &Client{ + msize: mSize, + mSizeLock: new(sync.Mutex), + uname: uname, + fPool: allocClientFidPool(), + errc: make(chan error), + cancel: cancel, + wg: new(sync.WaitGroup), + } + tmsgc := c.runSpeaker(ctx, w) + rmsgc := c.runListener(ctx, r) + c.txc = c.runMultiplexer(ctx, tmsgc, rmsgc) + return c +} + +// Stop stops the Client. +func (c *Client) Stop() { + c.cancel() + c.wg.Wait() + close(c.errc) +} + +// mSize returns the maximum message size of the Client. +func (c *Client) mSize() uint32 { + c.mSizeLock.Lock() + defer c.mSizeLock.Unlock() + return c.msize +} + +// setMSize changes the maximum message size of the Client. +func (c *Client) setMSize(mSize uint32) { + c.mSizeLock.Lock() + defer c.mSizeLock.Unlock() + c.msize = mSize +} + +// RunListener runs listener goroutine. +// Listener reads byte array of 9P messages from r and make each of them into +// corresponding struct that implements lib9p.Msg, and sends it to the returned channel. +// Listener goroutine returns when ctx is canceled. +// Listener goroutine reports errors to the client's errc channel. +func (c *Client) runListener(ctx context.Context, r io.Reader) <-chan lib9p.Msg { + c.wg.Add(1) + // TODO: terminate with ctx.Done() + rmsgc := make(chan lib9p.Msg, 3) + go func() { + wg := new(sync.WaitGroup) + defer func() { + wg.Wait() + close(rmsgc) + c.wg.Done() + }() + for { + select { + case <-ctx.Done(): + // TODO: should return error via ec?? + // TODO: should close r? + return + default: + done := make(chan struct{}) + var ( + msg lib9p.Msg + err error + ) + go func() { + defer close(done) + msg, err = lib9p.recv(r) + }() + select { + case <-done: + case <-ctx.Done(): + } + if err != nil { + c.errc <- fmt.Errorf("recv: %v", err) + continue + } + wg.Add(1) + go func() { + defer wg.Done() + select { + case rmsgc <- msg: + case <-ctx.Done(): + } + }() + } + } + }() + return rmsgc +} + +// RunSpeaker runs speaker goroutine. +// Speaker goroutine recieves 9P lib9p.Msgs from the returned channel, marshal them +// into byte arrays and sends them to w. +// It reports any errors to the clients errc channel. +// It returnes when ctx is canceled. +func (c *Client) runSpeaker(ctx context.Context, w io.Writer) chan<- lib9p.Msg { + c.wg.Add(1) + tmsgc := make(chan lib9p.Msg, 3) + go func() { + defer c.wg.Done() + for { + select { + case <-ctx.Done(): + return + case msg := <-tmsgc: + if msg == nil { + // tmsgc is closed, which means ctx.Done() is also closed. + // but this code breaks semantics? + return + } + if err := lib9p.send(msg, w); err != nil { + c.errc <- fmt.Errorf("send: %v", err) + } + } + } + }() + return tmsgc +} + +// RunMultiplexer runs multiplexer goroutine. +// Multiplexer goroutines, one for recieving Rmsg and another for sending Tmsg. +// The goroutine for Tmsg recieves *clientReq from the returned channel, +// and send the 9P lib9p.Msg to the speaker goroutine via tmsgc. +// The goroutine for Rmsg recieves *clientReq from the Tmsg goroutine and waits for +// the reply to the corresponding message from the listener goroutine via rmsgc. +// After recieving the reply, it sets the *clientReq.rmsg and sends it t the +// *clientReq.rxc. +// It reports any errors to the client's errc channel. +func (c *Client) runMultiplexer(ctx context.Context, tmsgc chan<- lib9p.Msg, rmsgc <-chan lib9p.Msg) chan<- *clientReq { + c.wg.Add(2) + txc := make(chan *clientReq) + reqc := make(chan *clientReq) + // Rmsg + go func(reqc <-chan *clientReq) { + wg := new(sync.WaitGroup) + defer func() { + wg.Wait() + c.wg.Done() + }() + rPool := make(map[uint16]*clientReq) + for { + select { + case <-ctx.Done(): + return + case req := <-reqc: + if req == nil { + // ctx is canceled. + continue + } + if _, ok := rPool[req.tag]; ok { + c.errc <- fmt.Errorf("mux: duplicate tag: %d", req.tag) + continue + } + rPool[req.tag] = req // TODO: wait for req.ctxDone channel. + wg.Add(1) + go func() { + defer wg.Done() + <-req.ctxDone + }() + case msg := <-rmsgc: + if msg == nil { + // ctx is canceled. + continue + } + req, ok := rPool[msg.Tag()] + if !ok { + c.errc <- fmt.Errorf("mux: unknown tag for msg: %v", msg) + continue + } + delete(rPool, msg.Tag()) + req.rmsg = msg + go func() { + defer close(req.rxc) + select { + case <-req.ctxDone: + case req.rxc <- req: + } + }() + } + } + }(reqc) + // Tmsg + go func(reqc chan<- *clientReq) { + wg := new(sync.WaitGroup) + defer func() { + wg.Wait() + close(reqc) + close(tmsgc) + c.wg.Done() + }() + for { + select { + case <-ctx.Done(): + return + case req := <-txc: + select { + case reqc <- req: + case <-ctx.Done(): + return + } + wg.Add(1) + go func() { + defer wg.Done() + tmsgc <- req.tmsg + }() + } + } + }(reqc) + return txc +} + +// Transact send 9P lib9p.Msg of req to the multiplexer goroutines and recieves +// the reply. +func (c *Client) transact(ctx context.Context, tmsg lib9p.Msg) (lib9p.Msg, error) { + ctx1, cancel1 := context.WithCancel(ctx) + req := newClientReq(ctx1, tmsg) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case c.txc <- req: + } + select { + case req = <-req.rxc: // TODO: this assignment is not required. + cancel1() + return req.rmsg, req.err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (c *Client) Version(ctx context.Context, tag uint16, mSize uint32, version string) (uint32, string, error) { + tmsg := &lib9p.TVersion{tag: tag, mSize: mSize, version: version} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return 0, "", fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RVersion: + return rmsg.mSize, rmsg.version, nil + case *lib9p.RError: + return 0, "", rmsg.ename + default: + return 0, "", fmt.Errorf("invalid reply: %v", rmsg) + } +} + +func (c *Client) Auth(ctx context.Context, tag uint16, afid uint32, uname, aname string) (lib9p.Qid, error) { + tmsg := &lib9p.TAuth{tag: tag, afid: afid, uname: uname} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return lib9p.Qid{}, fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RAuth: + return rmsg.aqid, nil + case *lib9p.RError: + return lib9p.Qid{}, rmsg.ename + default: + return lib9p.Qid{}, fmt.Errorf("invalid reply: %v", rmsg) + } +} + +func (c *Client) Attach(ctx context.Context, tag uint16, fid, afid uint32, uname, aname string) (lib9p.Qid, error) { + tmsg := &lib9p.TAttach{tag: tag, fid: fid, afid: afid, uname: uname, aname: aname} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return lib9p.Qid{}, fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RAttach: + return rmsg.qid, nil + case *lib9p.RError: + return lib9p.Qid{}, rmsg.ename + default: + return lib9p.Qid{}, fmt.Errorf("invalid reply: %v", rmsg) + } +} + +func (c *Client) Flush(ctx context.Context, tag, oldtag uint16) error { + tmsg := &lib9p.TFlush{tag: tag, oldtag: oldtag} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RFlush: + return nil + case *lib9p.RError: + return rmsg.ename + default: + return fmt.Errorf("invalid reply: %v", rmsg) + } +} +func (c *Client) Walk(ctx context.Context, tag uint16, fid, newFid uint32, wname []string) (wqid []lib9p.Qid, err error) { + tmsg := &lib9p.TWalk{tag: tag, fid: fid, newFid: newFid, wname: wname} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return nil, fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RWalk: + return rmsg.qid, nil + case *lib9p.RError: + return nil, rmsg.ename + default: + return nil, fmt.Errorf("invalid reply: %v", rmsg) + } +} +func (c *Client) Open(ctx context.Context, tag uint16, fid uint32, mode lib9p.OpenMode) (qid lib9p.Qid, iounit uint32, err error) { + tmsg := &lib9p.TOpen{tag: tag, fid: fid, mode: mode} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return lib9p.Qid{}, 0, fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.ROpen: + return rmsg.qid, rmsg.iounit, nil + case *lib9p.RError: + return lib9p.Qid{}, 0, rmsg.ename + default: + return lib9p.Qid{}, 0, fmt.Errorf("invalid reply: %v", rmsg) + } +} +func (c *Client) Create(ctx context.Context, tag uint16, fid uint32, name string, perm lib9p.FileMode, mode lib9p.OpenMode) (lib9p.Qid, uint32, error) { + tmsg := &lib9p.TCreate{tag: tag, fid: fid, name: name, perm: perm, mode: mode} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return lib9p.Qid{}, 0, fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RCreate: + return rmsg.qid, rmsg.iounit, nil + case *lib9p.RError: + return lib9p.Qid{}, 0, rmsg.ename + default: + return lib9p.Qid{}, 0, fmt.Errorf("invalid reply: %v", rmsg) + } +} +func (c *Client) Read(ctx context.Context, tag uint16, fid uint32, offset uint64, count uint32) (data []byte, err error) { + tmsg := &lib9p.TRead{tag: tag, fid: fid, offset: offset, count: count} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return nil, fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RRead: + return rmsg.data, nil + case *lib9p.RError: + return nil, rmsg.ename + default: + return nil, fmt.Errorf("invalid reply: %v", rmsg) + } +} +func (c *Client) Write(ctx context.Context, tag uint16, fid uint32, offset uint64, count uint32, data []byte) (uint32, error) { + tmsg := &lib9p.TWrite{tag: tag, fid: fid, offset: offset, count: count, data: data} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return 0, fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RWrite: + return rmsg.count, nil + case *lib9p.RError: + return 0, rmsg.ename + default: + return 0, fmt.Errorf("invalid reply: %v", rmsg) + } +} +func (c *Client) Clunk(ctx context.Context, tag uint16, fid uint32) error { + tmsg := &lib9p.TClunk{tag: tag, fid: fid} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RClunk: + return nil + case *lib9p.RError: + return rmsg.ename + default: + return fmt.Errorf("invalid reply: %v", rmsg) + } +} +func (c *Client) Remove(ctx context.Context, tag uint16, fid uint32) error { + tmsg := &lib9p.TRemove{tag: tag, fid: fid} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RRemove: + return nil + case *lib9p.RError: + return rmsg.ename + default: + return fmt.Errorf("invalid reply: %v", rmsg) + } +} +func (c *Client) Stat(ctx context.Context, tag uint16, fid uint32) (*lib9p.Stat, error) { + tmsg := &lib9p.Tlib9p.Stat{tag: tag, fid: fid} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return nil, fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.Rlib9p.Stat: + return rmsg.stat, nil + case *lib9p.RError: + return nil, rmsg.ename + default: + return nil, fmt.Errorf("invalid reply: %v", rmsg) + } +} +func (c *Client) Wstat(ctx context.Context, tag uint16, fid uint32, stat *lib9p.Stat) error { + tmsg := &lib9p.TWlib9p.Stat{tag: tag, fid: fid, stat: stat} + rmsg, err := c.transact(ctx, tmsg) + if err != nil { + return fmt.Errorf("transact: %v", err) + } + switch rmsg := rmsg.(type) { + case *lib9p.RWlib9p.Stat: + return nil + case *lib9p.RError: + return rmsg.ename + default: + return fmt.Errorf("invalid reply: %v", rmsg) + } +} diff --git a/client/client_test.go b/client/client_test.go @@ -0,0 +1,168 @@ +package client + +import ( + "context" + "io" + "sync" + "testing" +) + +const ( + mSize = 8192 + uname = "kenji" +) + +func newClientForTest(msize uint32, uname string, tmsgc chan<- Msg, rmsgc <-chan Msg) *Client { + ctx, cancel := context.WithCancel(context.Background()) + c := &Client{ + msize: mSize, + mSizeLock: new(sync.Mutex), + uname: uname, + fPool: allocClientFidPool(), + errc: make(chan error), + cancel: cancel, + wg: new(sync.WaitGroup), + } + c.txc = c.runMultiplexer(ctx, tmsgc, rmsgc) + return c +} + +func TestClientVersion(t *testing.T) { + tests := []struct { + name string + mSize uint32 + version string + rmsg Msg + wantmsize uint32 + wantversion string + }{ + {"0", mSize, "9P2000", + &RVersion{tag: NOTAG, mSize: mSize, version: "9P2000"}, + mSize, "9P2000"}, + {"1", mSize, "unko", + &RVersion{tag: NOTAG, mSize: mSize, version: "unknown"}, + mSize, "unknown"}, + } + for _, test := range tests { + func() { + tmsgc := make(chan Msg) + rmsgc := make(chan Msg) + c := newClientForTest(mSize, uname, tmsgc, rmsgc) + tPool := newTagPool() + defer c.Stop() + bg := context.Background() + var ( + gotmsize uint32 + gotversion string + goterr error + wg sync.WaitGroup + ) + wg.Add(1) + go func() { + tag, err := tPool.add() + if err != nil { + t.Fatalf("add tag: %v", err) + } + gotmsize, gotversion, goterr = c.Version(bg, tag, test.mSize, test.version) + tPool.delete(tag) + wg.Done() + }() + gottmsg := <-tmsgc + tag := gottmsg.Tag() + test.rmsg.SetTag(tag) + rmsgc <- test.rmsg + done := make(chan struct{}) + go func() { wg.Wait(); done <- struct{}{} }() + select { + case err := <-c.errc: + t.Errorf("client error: %v", err) + return + case <-done: + } + if goterr != nil { + t.Errorf("%s: goterr: %v", test.name, goterr) + return + } + if test.wantmsize != gotmsize || test.wantversion != gotversion { + t.Errorf("%s: (mSize, verion) want: %d, %s, got: %d, %s", + test.name, test.wantmsize, test.wantversion, + gotmsize, gotversion) + } + }() + } +} + +// the following tests are test for both client and server. +// should be moved to propper file. +func TestTransaction(t *testing.T) { + cr, sw := io.Pipe() + sr, cw := io.Pipe() + + server := NewServer(fsys, mSize, sr, sw) + //server.Chatty() + client := NewClient(mSize, uname, cr, cw) + tPool := newTagPool() + bg := context.Background() + go server.Serve(bg) + tag, err := tPool.add() + if err != nil { + t.Fatalf("add tag: %v", err) + } + rmsize, rversion, err := client.Version(bg, tag, mSize, "9P2000") + tPool.delete(tag) + if err != nil { + t.Log(err) + } else { + t.Log(&RVersion{mSize: rmsize, version: rversion}) + } + tag, err = tPool.add() + if err != nil { + t.Fatalf("add tag: %v", err) + } + rauth, err := client.Auth(bg, tag, 0, "kenji", "") + tPool.delete(tag) + if err != nil { + t.Log(err) + } else { + t.Log(rauth) + } + tag, err = tPool.add() + if err != nil { + t.Fatalf("add tag: %v", err) + } + rattach, err := client.Attach(bg, tag, 0, NOFID, "kenji", "") + tPool.delete(tag) + if err != nil { + t.Log(err) + } else { + t.Log(rattach) + } +} + +func TestClient2(t *testing.T) { + cr, sw := io.Pipe() + sr, cw := io.Pipe() + + server := NewServer(fsys, mSize, sr, sw) + //server.Chatty() + go server.Serve(context.Background()) + + fs, err := Mount(cr, cw, "kenji", "") + if err != nil { + t.Errorf("mount: %v", err) + } + + a0, err := fs.OpenFile("a", OREAD, 0) + if err != nil { + t.Errorf("open: %v", err) + } + t.Log(a0) + a := a0.(*ClientFile) + + b := make([]byte, a.iounit) + n, err := a.Read(b) + if err != nil { + t.Errorf("read: %v", err) + } + t.Log(string(b[:n])) +} diff --git a/client/fid.go b/client/fid.go @@ -0,0 +1,96 @@ +package client + +import ( + "fmt" + "sync" + + "git.mtkn.jp/lib9p" +) + +// clientFid represents the Fid in the client side. +type clientFid struct { + fid uint32 + omode lib9p.OpenMode // -1 for not open + offset uint64 + file *ClientFile +} + +func newClientFid(fid uint32) *clientFid { + return &clientFid{ + fid: fid, + omode: -1, + offset: 0, + } +} + +type clientFidPool struct { + m map[uint32]*clientFid + lock *sync.Mutex +} + +func allocClientFidPool() *clientFidPool { + return &clientFidPool{ + m: make(map[uint32]*clientFid), + lock: new(sync.Mutex), + } +} + +func (pool *clientFidPool) lookup(fid uint32) (*clientFid, bool) { + pool.lock.Lock() + defer pool.lock.Unlock() + + f, ok := pool.m[fid] + return f, ok +} + +func (pool *clientFidPool) nextFid() (uint32, error) { + pool.lock.Lock() + defer pool.lock.Unlock() + for i := uint32(0); i < i+1; i++ { + if _, ok := pool.m[i]; !ok { + return i, nil + } + } + return 0, fmt.Errorf("run out of fid") +} + +func (pool *clientFidPool) add() (*clientFid, error) { + fid, err := pool.nextFid() + if err != nil { + return nil, err + } + pool.lock.Lock() + defer pool.lock.Unlock() + if _, ok := pool.m[fid]; ok { + return nil, fmt.Errorf("fid already in use.") + } + f := newClientFid(fid) + pool.m[fid] = f + return f, nil +} + +func (pool *clientFidPool) delete(fid uint32) { + pool.lock.Lock() + defer pool.lock.Unlock() + + delete(pool.m, fid) +} + +func (pool *clientFidPool) String() string { + pool.lock.Lock() // TODO: need? + defer pool.lock.Unlock() + s := "{" + for fnum, fstruct := range pool.m { + if fstruct.file == nil { + s += fmt.Sprintf(" [%d]<nil>", fnum) + continue + } + st, err := fstruct.file.Stat() + if err != nil { + panic(err) + } + s += fmt.Sprintf(" [%d]%v", fnum, st.Name()) + } + s += "}" + return s +} diff --git a/client/file.go b/client/file.go @@ -0,0 +1,88 @@ +package client + +import ( + "context" + "fmt" + "io" + + "git.mtkn.jp/lib9p" +) + +// ClientFile is a File for Client. +type ClientFile struct { + name string + path string // must not contain trailing slash. + fid *clientFid + qid lib9p.Qid + iounit uint32 + fs *ClientFS +} + +func (cf *ClientFile) Stat() (*lib9p.FileInfo, error) { + tag, err := cf.fs.tPool.add() + if err != nil { + return nil, err + } + st, err := cf.fs.c.Stat(context.TODO(), tag, cf.fid.fid) + cf.fs.tPool.delete(tag) + if err != nil { + return nil, err + } + return &FileInfo{*st}, nil +} + +// Don't use closed file. +func (cf *ClientFile) Close() error { + tag, err := cf.fs.tPool.add() + if err != nil { + return err + } + err = cf.fs.c.Clunk(context.TODO(), tag, cf.fid.fid) + cf.fs.tPool.delete(tag) + cf.fs.c.fPool.delete(cf.fid.fid) + cf.fid = nil + return err +} + +func (cf *ClientFile) Read(b []byte) (int, error) { + if len(b) == 0 { + return 0, nil + } + if cf.fid.omode == -1 { + return 0, fmt.Errorf("not open") + } + if cf.fid.omode&3 != OREAD && cf.fid.omode&3 != ORDWR { + return 0, ErrPerm + } + count := uint32(len(b)) + cur := 0 + for count > 0 { + var c uint32 + if count > cf.iounit { + c = cf.iounit + } else { + c = count + } + tag, err := cf.fs.tPool.add() + if err != nil { + return 0, err + } + buf, err := cf.fs.c.Read(context.TODO(), tag, cf.fid.fid, cf.fid.offset, c) + cf.fs.tPool.delete(tag) + var i int + for i = 0; i < len(buf); i++ { + b[cur+i] = buf[i] + } + cf.fid.offset += uint64(i) + cur += i + if err != nil { + return cur, err + } + count -= c + } + if cur == 0 { + return 0, io.EOF + } else { + return cur, nil + } +} diff --git a/client/fs.go b/client/fs.go @@ -0,0 +1,151 @@ +package client + +import ( + "context" + "fmt" + "io" + "io/fs" + "path" + "strings" + + "git.mtkn.jp/lib9p" +) + +// ClientFS represents the file system the client imports. +type ClientFS struct { + c *Client + tPool *tagPool +} + +// Open opens the file named name in fsys. +func (fsys *ClientFS) Open(name string) (lib9p.File, error) { + return fsys.OpenFile(name, OREAD, 0) +} + +// OpenFile opens the file named name in fsys with omode. +// If the file does not exist, it create it with perm. +func (fsys *ClientFS) OpenFile(name string, omode lib9p.OpenMode, perm fs.FileMode) (lib9p.File, error) { + var ( + qid Qid + iounit uint32 + ) + f, err := fsys.walkFile(name) + if err != nil { + // File not found. Create. + f, err = fsys.walkFile(path.Dir(name)) + if err != nil { + return nil, fmt.Errorf("walk to %s: %v", name, err) + } + tag, err := fsys.tPool.add() + if err != nil { + return nil, err + } + qid, iounit, err = fsys.c.Create(context.TODO(), tag, f.fid.fid, path.Base(name), perm, omode) + fsys.tPool.delete(tag) + if err != nil { + f.Close() + return nil, fmt.Errorf("create: %v", err) + } + } else { + // File exists. Open it. + tag, err := fsys.tPool.add() + if err != nil { + return nil, err + } + qid, iounit, err = fsys.c.Open(context.TODO(), tag, f.fid.fid, omode) + fsys.tPool.delete(tag) + if err != nil { + f.Close() + return nil, fmt.Errorf("open: %v", err) + } + } + f.fid.omode = omode + f.qid = qid + f.iounit = iounit + return f, nil +} + +// walkFile walks the file system to the named file name and +// returns the corresponding file. +// returned file is not open. +func (fsys *ClientFS) walkFile(name string) (*ClientFile, error) { + fid, err := fsys.c.fPool.add() + if err != nil { + return nil, fmt.Errorf("add fid: %v", err) + } + var wname []string + if name != "." { + wname = strings.Split(path.Clean(name), "/") + } + tag, err := fsys.tPool.add() + if err != nil { + return nil, err + } + wqid, err := fsys.c.Walk(context.TODO(), tag, fsys.c.rootFid.fid, fid.fid, wname) + fsys.tPool.delete(tag) + if err != nil { + return nil, fmt.Errorf("walk: %v", err) + } + var qid Qid + if name == "." { + qid = Qid{} + } else if len(wqid) > 0 { + qid = wqid[len(wqid)-1] + } else { + return nil, fmt.Errorf("invalid wqid: %v", wqid) + } + f := &ClientFile{ + name: path.Base(name), + path: name, + fid: fid, + qid: qid, + fs: fsys, + } + fid.file = f + return f, nil +} + +// Mount initiates a 9P session and returns the resulting file system. +// The 9P session is established by writing to w and reading from r. +func Mount(r io.Reader, w io.Writer, uname, aname string) (fs *ClientFS, err error) { + var ( + mSize uint32 = 8192 + version = "9P2000" + ctx = context.TODO() + ) + cfs := &ClientFS{ + c: NewClient(mSize, uname, r, w), + tPool: newTagPool(), + } + defer func() { + if err != nil { + cfs.c.Stop() + } + }() + rmSize, rver, err := cfs.c.Version(ctx, NOTAG, mSize, version) + if err != nil { + return nil, fmt.Errorf("version: %v", err) + } + if rver != version { + return nil, fmt.Errorf("incompatible version %s", rver) + } + if rmSize < mSize { + cfs.c.setMSize(rmSize) + } + // TODO: auth + fid, err := cfs.c.fPool.add() + if err != nil { + return nil, fmt.Errorf("add fid: %v", err) + } + tag, err := cfs.tPool.add() + if err != nil { + return nil, err + } + _, err = cfs.c.Attach(ctx, tag, fid.fid, NOFID, uname, aname) + cfs.tPool.delete(tag) + if err != nil { + return nil, fmt.Errorf("attach: %v", err) + } + cfs.c.rootFid = fid + return cfs, nil +} diff --git a/client/req.go b/client/req.go @@ -0,0 +1,88 @@ +package client + +import ( + "context" + "fmt" + "sync" + + "git.mtkn.jp/lib9p" +) + +// clientReq represents each requests of the client. +type clientReq struct { + tag uint16 + tmsg lib9p.Msg + rmsg lib9p.Msg + err error + rxc chan *clientReq + ctxDone <-chan struct{} +} + +// newClientReq allocates a clientReq with msg. +// It also sets the ctxDone channel to ctx.Done(). +// TODO: passing ctx is confusing? +// it only needs the done channel. +func newClientReq(ctx context.Context, msg lib9p.Msg) *clientReq { + return &clientReq{ + tag: msg.Tag(), + tmsg: msg, + rxc: make(chan *clientReq), + ctxDone: ctx.Done(), + } +} + +// tagPool is a pool of tags being used by a client. +type tagPool struct { + m map[uint16]bool + lock *sync.Mutex +} + +func newTagPool() *tagPool { + return &tagPool{ + m: make(map[uint16]bool), + lock: new(sync.Mutex), + } +} + +func (tp *tagPool) add() (uint16, error) { + tp.lock.Lock() + defer tp.lock.Unlock() + tag := NOTAG + for i := uint16(0); i < i+1; i++ { + if _, ok := tp.m[i]; !ok { + tag = i + break + } + } + if tag == NOTAG { + return NOTAG, fmt.Errorf("run out of tag") + } + tp.m[tag] = true + return tag, nil +} + +func (tp *tagPool) lookup(tag uint16) bool { + tp.lock.Lock() + defer tp.lock.Unlock() + _, ok := tp.m[tag] + return ok +} + +func (tp *tagPool) delete(tag uint16) { + tp.lock.Lock() + defer tp.lock.Unlock() + delete(tp.m, tag) +} + +func (tp *tagPool) String() string { + s := "[" + for tag := range tp.m { + s += fmt.Sprintf("%d ", tag) + } + if len(s) > 1 { + s = s[:len(s)-1] + "]" + } else { + s = "[]" + } + return s +} diff --git a/client2.go b/client2.go @@ -1,149 +0,0 @@ -package lib9p - -import ( - "context" - "fmt" - "io" - "io/fs" - "path" - "strings" -) - -// ClientFS represents the file system the client imports. -type ClientFS struct { - c *Client - tPool *tagPool -} - -// Open opens the file named name in fsys. -func (fsys *ClientFS) Open(name string) (File, error) { - return fsys.OpenFile(name, OREAD, 0) -} - -// OpenFile opens the file named name in fsys with omode. -// If the file does not exist, it create it with perm. -func (fsys *ClientFS) OpenFile(name string, omode OpenMode, perm fs.FileMode) (File, error) { - var ( - qid Qid - iounit uint32 - ) - f, err := fsys.walkFile(name) - if err != nil { - // File not found. Create. - f, err = fsys.walkFile(path.Dir(name)) - if err != nil { - return nil, fmt.Errorf("walk to %s: %v", name, err) - } - tag, err := fsys.tPool.add() - if err != nil { - return nil, err - } - qid, iounit, err = fsys.c.Create(context.TODO(), tag, f.fid.fid, path.Base(name), perm, omode) - fsys.tPool.delete(tag) - if err != nil { - f.Close() - return nil, fmt.Errorf("create: %v", err) - } - } else { - // File exists. Open it. - tag, err := fsys.tPool.add() - if err != nil { - return nil, err - } - qid, iounit, err = fsys.c.Open(context.TODO(), tag, f.fid.fid, omode) - fsys.tPool.delete(tag) - if err != nil { - f.Close() - return nil, fmt.Errorf("open: %v", err) - } - } - f.fid.omode = omode - f.qid = qid - f.iounit = iounit - return f, nil -} - -// walkFile walks the file system to the named file name and -// returns the corresponding file. -// returned file is not open. -func (fsys *ClientFS) walkFile(name string) (*ClientFile, error) { - fid, err := fsys.c.fPool.add() - if err != nil { - return nil, fmt.Errorf("add fid: %v", err) - } - var wname []string - if name != "." { - wname = strings.Split(path.Clean(name), "/") - } - tag, err := fsys.tPool.add() - if err != nil { - return nil, err - } - wqid, err := fsys.c.Walk(context.TODO(), tag, fsys.c.rootFid.fid, fid.fid, wname) - fsys.tPool.delete(tag) - if err != nil { - return nil, fmt.Errorf("walk: %v", err) - } - var qid Qid - if name == "." { - qid = Qid{} - } else if len(wqid) > 0 { - qid = wqid[len(wqid)-1] - } else { - return nil, fmt.Errorf("invalid wqid: %v", wqid) - } - f := &ClientFile{ - name: path.Base(name), - path: name, - fid: fid, - qid: qid, - fs: fsys, - } - fid.file = f - return f, nil -} - -// Mount initiates a 9P session and returns the resulting file system. -// The 9P session is established by writing to w and reading from r. -func Mount(r io.Reader, w io.Writer, uname, aname string) (fs *ClientFS, err error) { - var ( - mSize uint32 = 8192 - version = "9P2000" - ctx = context.TODO() - ) - cfs := &ClientFS{ - c: NewClient(mSize, uname, r, w), - tPool: newTagPool(), - } - defer func() { - if err != nil { - cfs.c.Stop() - } - }() - rmSize, rver, err := cfs.c.Version(ctx, NOTAG, mSize, version) - if err != nil { - return nil, fmt.Errorf("version: %v", err) - } - if rver != version { - return nil, fmt.Errorf("incompatible version %s", rver) - } - if rmSize < mSize { - cfs.c.setMSize(rmSize) - } - // TODO: auth - fid, err := cfs.c.fPool.add() - if err != nil { - return nil, fmt.Errorf("add fid: %v", err) - } - tag, err := cfs.tPool.add() - if err != nil { - return nil, err - } - _, err = cfs.c.Attach(ctx, tag, fid.fid, NOFID, uname, aname) - cfs.tPool.delete(tag) - if err != nil { - return nil, fmt.Errorf("attach: %v", err) - } - cfs.c.rootFid = fid - return cfs, nil -} diff --git a/client_test.go b/client_test.go @@ -1,168 +0,0 @@ -package lib9p - -import ( - "context" - "io" - "sync" - "testing" -) - -const ( - mSize = 8192 - uname = "kenji" -) - -func newClientForTest(msize uint32, uname string, tmsgc chan<- Msg, rmsgc <-chan Msg) *Client { - ctx, cancel := context.WithCancel(context.Background()) - c := &Client{ - msize: mSize, - mSizeLock: new(sync.Mutex), - uname: uname, - fPool: allocClientFidPool(), - errc: make(chan error), - cancel: cancel, - wg: new(sync.WaitGroup), - } - c.txc = c.runMultiplexer(ctx, tmsgc, rmsgc) - return c -} - -func TestClientVersion(t *testing.T) { - tests := []struct { - name string - mSize uint32 - version string - rmsg Msg - wantmsize uint32 - wantversion string - }{ - {"0", mSize, "9P2000", - &RVersion{tag: NOTAG, mSize: mSize, version: "9P2000"}, - mSize, "9P2000"}, - {"1", mSize, "unko", - &RVersion{tag: NOTAG, mSize: mSize, version: "unknown"}, - mSize, "unknown"}, - } - for _, test := range tests { - func() { - tmsgc := make(chan Msg) - rmsgc := make(chan Msg) - c := newClientForTest(mSize, uname, tmsgc, rmsgc) - tPool := newTagPool() - defer c.Stop() - bg := context.Background() - var ( - gotmsize uint32 - gotversion string - goterr error - wg sync.WaitGroup - ) - wg.Add(1) - go func() { - tag, err := tPool.add() - if err != nil { - t.Fatalf("add tag: %v", err) - } - gotmsize, gotversion, goterr = c.Version(bg, tag, test.mSize, test.version) - tPool.delete(tag) - wg.Done() - }() - gottmsg := <-tmsgc - tag := gottmsg.Tag() - test.rmsg.SetTag(tag) - rmsgc <- test.rmsg - done := make(chan struct{}) - go func() { wg.Wait(); done <- struct{}{} }() - select { - case err := <-c.errc: - t.Errorf("client error: %v", err) - return - case <-done: - } - if goterr != nil { - t.Errorf("%s: goterr: %v", test.name, goterr) - return - } - if test.wantmsize != gotmsize || test.wantversion != gotversion { - t.Errorf("%s: (mSize, verion) want: %d, %s, got: %d, %s", - test.name, test.wantmsize, test.wantversion, - gotmsize, gotversion) - } - }() - } -} - -// the following tests are test for both client and server. -// should be moved to propper file. -func TestTransaction(t *testing.T) { - cr, sw := io.Pipe() - sr, cw := io.Pipe() - - server := NewServer(fsys, mSize, sr, sw) - //server.Chatty() - client := NewClient(mSize, uname, cr, cw) - tPool := newTagPool() - bg := context.Background() - go server.Serve(bg) - tag, err := tPool.add() - if err != nil { - t.Fatalf("add tag: %v", err) - } - rmsize, rversion, err := client.Version(bg, tag, mSize, "9P2000") - tPool.delete(tag) - if err != nil { - t.Log(err) - } else { - t.Log(&RVersion{mSize: rmsize, version: rversion}) - } - tag, err = tPool.add() - if err != nil { - t.Fatalf("add tag: %v", err) - } - rauth, err := client.Auth(bg, tag, 0, "kenji", "") - tPool.delete(tag) - if err != nil { - t.Log(err) - } else { - t.Log(rauth) - } - tag, err = tPool.add() - if err != nil { - t.Fatalf("add tag: %v", err) - } - rattach, err := client.Attach(bg, tag, 0, NOFID, "kenji", "") - tPool.delete(tag) - if err != nil { - t.Log(err) - } else { - t.Log(rattach) - } -} - -func TestClient2(t *testing.T) { - cr, sw := io.Pipe() - sr, cw := io.Pipe() - - server := NewServer(fsys, mSize, sr, sw) - //server.Chatty() - go server.Serve(context.Background()) - - fs, err := Mount(cr, cw, "kenji", "") - if err != nil { - t.Errorf("mount: %v", err) - } - - a0, err := fs.OpenFile("a", OREAD, 0) - if err != nil { - t.Errorf("open: %v", err) - } - t.Log(a0) - a := a0.(*ClientFile) - - b := make([]byte, a.iounit) - n, err := a.Read(b) - if err != nil { - t.Errorf("read: %v", err) - } - t.Log(string(b[:n])) -} diff --git a/fid.go b/fid.go @@ -106,91 +106,3 @@ func (pool *FidPool) String() string { s += "}" return s } - -// clientFid represents the Fid in the client side. -type clientFid struct { - fid uint32 - omode OpenMode // -1 for not open - offset uint64 - file *ClientFile -} - -func newClientFid(fid uint32) *clientFid { - return &clientFid{ - fid: fid, - omode: -1, - offset: 0, - } -} - -type clientFidPool struct { - m map[uint32]*clientFid - lock *sync.Mutex -} - -func allocClientFidPool() *clientFidPool { - return &clientFidPool{ - m: make(map[uint32]*clientFid), - lock: new(sync.Mutex), - } -} - -func (pool *clientFidPool) lookup(fid uint32) (*clientFid, bool) { - pool.lock.Lock() - defer pool.lock.Unlock() - - f, ok := pool.m[fid] - return f, ok -} - -func (pool *clientFidPool) nextFid() (uint32, error) { - pool.lock.Lock() - defer pool.lock.Unlock() - for i := uint32(0); i < i+1; i++ { - if _, ok := pool.m[i]; !ok { - return i, nil - } - } - return 0, fmt.Errorf("run out of fid") -} - -func (pool *clientFidPool) add() (*clientFid, error) { - fid, err := pool.nextFid() - if err != nil { - return nil, err - } - pool.lock.Lock() - defer pool.lock.Unlock() - if _, ok := pool.m[fid]; ok { - return nil, fmt.Errorf("fid already in use.") - } - f := newClientFid(fid) - pool.m[fid] = f - return f, nil -} - -func (pool *clientFidPool) delete(fid uint32) { - pool.lock.Lock() - defer pool.lock.Unlock() - - delete(pool.m, fid) -} - -func (pool *clientFidPool) String() string { - pool.lock.Lock() // TODO: need? - defer pool.lock.Unlock() - s := "{" - for fnum, fstruct := range pool.m { - if fstruct.file == nil { - s += fmt.Sprintf(" [%d]<nil>", fnum) - continue - } - st, err := fstruct.file.Stat() - if err != nil { - panic(err) - } - s += fmt.Sprintf(" [%d]%v", fnum, st.Name()) - } - s += "}" - return s -} diff --git a/file.go b/file.go @@ -1,8 +1,6 @@ package lib9p import ( - "context" - "fmt" "io" ) @@ -49,82 +47,3 @@ type ReadDirFile interface { File ReadDir(n int) ([]*DirEntry, error) } - -// ClientFile is a File for Client. -type ClientFile struct { - name string - path string // must not contain trailing slash. - fid *clientFid - qid Qid - iounit uint32 - fs *ClientFS -} - -func (cf *ClientFile) Stat() (*FileInfo, error) { - tag, err := cf.fs.tPool.add() - if err != nil { - return nil, err - } - st, err := cf.fs.c.Stat(context.TODO(), tag, cf.fid.fid) - cf.fs.tPool.delete(tag) - if err != nil { - return nil, err - } - return &FileInfo{*st}, nil -} - -// Don't use closed file. -func (cf *ClientFile) Close() error { - tag, err := cf.fs.tPool.add() - if err != nil { - return err - } - err = cf.fs.c.Clunk(context.TODO(), tag, cf.fid.fid) - cf.fs.tPool.delete(tag) - cf.fs.c.fPool.delete(cf.fid.fid) - cf.fid = nil - return err -} - -func (cf *ClientFile) Read(b []byte) (int, error) { - if len(b) == 0 { - return 0, nil - } - if cf.fid.omode == -1 { - return 0, fmt.Errorf("not open") - } - if cf.fid.omode&3 != OREAD && cf.fid.omode&3 != ORDWR { - return 0, ErrPerm - } - count := uint32(len(b)) - cur := 0 - for count > 0 { - var c uint32 - if count > cf.iounit { - c = cf.iounit - } else { - c = count - } - tag, err := cf.fs.tPool.add() - if err != nil { - return 0, err - } - buf, err := cf.fs.c.Read(context.TODO(), tag, cf.fid.fid, cf.fid.offset, c) - cf.fs.tPool.delete(tag) - var i int - for i = 0; i < len(buf); i++ { - b[cur+i] = buf[i] - } - cf.fid.offset += uint64(i) - cur += i - if err != nil { - return cur, err - } - count -= c - } - if cur == 0 { - return 0, io.EOF - } else { - return cur, nil - } -} diff --git a/req.go b/req.go @@ -2,7 +2,6 @@ package lib9p import ( "context" - "fmt" "sync" ) @@ -72,82 +71,3 @@ func (rp *ReqPool) delete(tag uint16) { defer rp.lock.Unlock() delete(rp.m, tag) } - -// clientReq represents each requests of the client. -type clientReq struct { - tag uint16 - tmsg Msg - rmsg Msg - err error - rxc chan *clientReq - ctxDone <-chan struct{} -} - -// newClientReq allocates a clientReq with msg. -// It also sets the ctxDone channel to ctx.Done(). -// TODO: passing ctx is confusing? -// it only needs the done channel. -func newClientReq(ctx context.Context, msg Msg) *clientReq { - return &clientReq{ - tag: msg.Tag(), - tmsg: msg, - rxc: make(chan *clientReq), - ctxDone: ctx.Done(), - } -} - -// tagPool is a pool of tags being used by a client. -type tagPool struct { - m map[uint16]bool - lock *sync.Mutex -} - -func newTagPool() *tagPool { - return &tagPool{ - m: make(map[uint16]bool), - lock: new(sync.Mutex), - } -} - -func (tp *tagPool) add() (uint16, error) { - tp.lock.Lock() - defer tp.lock.Unlock() - tag := NOTAG - for i := uint16(0); i < i+1; i++ { - if _, ok := tp.m[i]; !ok { - tag = i - break - } - } - if tag == NOTAG { - return NOTAG, fmt.Errorf("run out of tag") - } - tp.m[tag] = true - return tag, nil -} - -func (tp *tagPool) lookup(tag uint16) bool { - tp.lock.Lock() - defer tp.lock.Unlock() - _, ok := tp.m[tag] - return ok -} - -func (tp *tagPool) delete(tag uint16) { - tp.lock.Lock() - defer tp.lock.Unlock() - delete(tp.m, tag) -} - -func (tp *tagPool) String() string { - s := "[" - for tag := range tp.m { - s += fmt.Sprintf("%d ", tag) - } - if len(s) > 1 { - s = s[:len(s)-1] + "]" - } else { - s = "[]" - } - return s -}