]> asedeno.scripts.mit.edu Git - linux.git/commitdiff
SUNRPC: Allow AF_LOCAL sockets to use the generic stream receive
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Fri, 14 Sep 2018 18:32:45 +0000 (14:32 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Sun, 30 Sep 2018 19:35:16 +0000 (15:35 -0400)
Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
include/linux/sunrpc/xdr.h
net/sunrpc/socklib.c
net/sunrpc/xprtsock.c

index 745587132a87fee94f7d4d8715df137cf6b6ea0f..8815be7cae7247df3c99288d725b5bee431ef3a1 100644 (file)
@@ -185,7 +185,6 @@ struct xdr_skb_reader {
 
 typedef size_t (*xdr_skb_read_actor)(struct xdr_skb_reader *desc, void *to, size_t len);
 
-size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len);
 extern int csum_partial_copy_to_xdr(struct xdr_buf *, struct sk_buff *);
 extern ssize_t xdr_partial_copy_from_skb(struct xdr_buf *, unsigned int,
                struct xdr_skb_reader *, xdr_skb_read_actor);
index 08f00a98151fd4e8517b89459c559f937298b5b8..0e7c0dee7578473c3ecb8d9ca0a068a4752349fa 100644 (file)
@@ -26,7 +26,8 @@
  * Possibly called several times to iterate over an sk_buff and copy
  * data out of it.
  */
-size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len)
+static size_t
+xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len)
 {
        if (len > desc->count)
                len = desc->count;
@@ -36,7 +37,6 @@ size_t xdr_skb_read_bits(struct xdr_skb_reader *desc, void *to, size_t len)
        desc->offset += len;
        return len;
 }
-EXPORT_SYMBOL_GPL(xdr_skb_read_bits);
 
 /**
  * xdr_skb_read_and_csum_bits - copy and checksum from skb to buffer
index 55df1fadab27741ba4ff332c318f0bd69b2de0e6..90d4c92177b7fad7d5d08c7809085c57e4a0bae1 100644 (file)
@@ -670,6 +670,17 @@ static void xs_stream_data_receive_workfn(struct work_struct *work)
        xs_stream_data_receive(transport);
 }
 
+static void
+xs_stream_reset_connect(struct sock_xprt *transport)
+{
+       transport->recv.offset = 0;
+       transport->recv.len = 0;
+       transport->recv.copied = 0;
+       transport->xmit.offset = 0;
+       transport->xprt.stat.connect_count++;
+       transport->xprt.stat.connect_start = jiffies;
+}
+
 #define XS_SENDMSG_FLAGS       (MSG_DONTWAIT | MSG_NOSIGNAL)
 
 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
@@ -1266,114 +1277,6 @@ static void xs_destroy(struct rpc_xprt *xprt)
        module_put(THIS_MODULE);
 }
 
-static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
-{
-       struct xdr_skb_reader desc = {
-               .skb            = skb,
-               .offset         = sizeof(rpc_fraghdr),
-               .count          = skb->len - sizeof(rpc_fraghdr),
-       };
-
-       if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
-               return -1;
-       if (desc.count)
-               return -1;
-       return 0;
-}
-
-/**
- * xs_local_data_read_skb
- * @xprt: transport
- * @sk: socket
- * @skb: skbuff
- *
- * Currently this assumes we can read the whole reply in a single gulp.
- */
-static void xs_local_data_read_skb(struct rpc_xprt *xprt,
-               struct sock *sk,
-               struct sk_buff *skb)
-{
-       struct rpc_task *task;
-       struct rpc_rqst *rovr;
-       int repsize, copied;
-       u32 _xid;
-       __be32 *xp;
-
-       repsize = skb->len - sizeof(rpc_fraghdr);
-       if (repsize < 4) {
-               dprintk("RPC:       impossible RPC reply size %d\n", repsize);
-               return;
-       }
-
-       /* Copy the XID from the skb... */
-       xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
-       if (xp == NULL)
-               return;
-
-       /* Look up and lock the request corresponding to the given XID */
-       spin_lock(&xprt->queue_lock);
-       rovr = xprt_lookup_rqst(xprt, *xp);
-       if (!rovr)
-               goto out_unlock;
-       xprt_pin_rqst(rovr);
-       spin_unlock(&xprt->queue_lock);
-       task = rovr->rq_task;
-
-       copied = rovr->rq_private_buf.buflen;
-       if (copied > repsize)
-               copied = repsize;
-
-       if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
-               dprintk("RPC:       sk_buff copy failed\n");
-               spin_lock(&xprt->queue_lock);
-               goto out_unpin;
-       }
-
-       spin_lock(&xprt->queue_lock);
-       xprt_complete_rqst(task, copied);
-out_unpin:
-       xprt_unpin_rqst(rovr);
- out_unlock:
-       spin_unlock(&xprt->queue_lock);
-}
-
-static void xs_local_data_receive(struct sock_xprt *transport)
-{
-       struct sk_buff *skb;
-       struct sock *sk;
-       int err;
-
-restart:
-       mutex_lock(&transport->recv_mutex);
-       sk = transport->inet;
-       if (sk == NULL)
-               goto out;
-       for (;;) {
-               skb = skb_recv_datagram(sk, 0, 1, &err);
-               if (skb != NULL) {
-                       xs_local_data_read_skb(&transport->xprt, sk, skb);
-                       skb_free_datagram(sk, skb);
-                       continue;
-               }
-               if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state))
-                       break;
-               if (need_resched()) {
-                       mutex_unlock(&transport->recv_mutex);
-                       cond_resched();
-                       goto restart;
-               }
-       }
-out:
-       mutex_unlock(&transport->recv_mutex);
-}
-
-static void xs_local_data_receive_workfn(struct work_struct *work)
-{
-       struct sock_xprt *transport =
-               container_of(work, struct sock_xprt, recv_worker);
-       xs_local_data_receive(transport);
-}
-
 /**
  * xs_udp_data_read_skb - receive callback for UDP sockets
  * @xprt: transport
@@ -1974,11 +1877,8 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
                write_unlock_bh(&sk->sk_callback_lock);
        }
 
-       transport->xmit.offset = 0;
+       xs_stream_reset_connect(transport);
 
-       /* Tell the socket layer to start connecting... */
-       xprt->stat.connect_count++;
-       xprt->stat.connect_start = jiffies;
        return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
 }
 
@@ -2335,14 +2235,9 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
        xs_set_memalloc(xprt);
 
        /* Reset TCP record info */
-       transport->recv.offset = 0;
-       transport->recv.len = 0;
-       transport->recv.copied = 0;
-       transport->xmit.offset = 0;
+       xs_stream_reset_connect(transport);
 
        /* Tell the socket layer to start connecting... */
-       xprt->stat.connect_count++;
-       xprt->stat.connect_start = jiffies;
        set_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
        ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
        switch (ret) {
@@ -2717,6 +2612,7 @@ static const struct rpc_xprt_ops xs_local_ops = {
        .connect                = xs_local_connect,
        .buf_alloc              = rpc_malloc,
        .buf_free               = rpc_free,
+       .prepare_request        = xs_stream_prepare_request,
        .send_request           = xs_local_send_request,
        .set_retrans_timeout    = xprt_set_retrans_timeout_def,
        .close                  = xs_close,
@@ -2901,9 +2797,8 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
        xprt->ops = &xs_local_ops;
        xprt->timeout = &xs_local_default_timeout;
 
-       INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn);
-       INIT_DELAYED_WORK(&transport->connect_worker,
-                       xs_dummy_setup_socket);
+       INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
+       INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
 
        switch (sun->sun_family) {
        case AF_LOCAL: