]> asedeno.scripts.mit.edu Git - linux.git/commitdiff
rxrpc: Fix error distribution
authorDavid Howells <dhowells@redhat.com>
Thu, 27 Sep 2018 14:13:09 +0000 (15:13 +0100)
committerDavid Howells <dhowells@redhat.com>
Fri, 28 Sep 2018 09:33:17 +0000 (10:33 +0100)
Fix error distribution by immediately delivering the errors to all the
affected calls rather than deferring them to a worker thread.  The problem
with the latter is that retries and things can happen in the meantime when we
want to stop that sooner.

To this end:

 (1) Stop the error distributor from removing calls from the error_targets
     list so that peer->lock isn't needed to synchronise against other adds
     and removals.

 (2) Require the peer's error_targets list to be accessed with RCU, thereby
     avoiding the need to take peer->lock over distribution.

 (3) Don't attempt to affect a call's state if it is already marked complete.

Signed-off-by: David Howells <dhowells@redhat.com>
include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/call_object.c
net/rxrpc/conn_client.c
net/rxrpc/conn_object.c
net/rxrpc/peer_event.c
net/rxrpc/peer_object.c

index 196587b8f204de13da0529c3cce46b68df75b4ac..837393fa897bb764264741ec2051f163841f0a4d 100644 (file)
@@ -56,7 +56,6 @@ enum rxrpc_peer_trace {
        rxrpc_peer_new,
        rxrpc_peer_processing,
        rxrpc_peer_put,
-       rxrpc_peer_queued_error,
 };
 
 enum rxrpc_conn_trace {
@@ -257,8 +256,7 @@ enum rxrpc_tx_point {
        EM(rxrpc_peer_got,                      "GOT") \
        EM(rxrpc_peer_new,                      "NEW") \
        EM(rxrpc_peer_processing,               "PRO") \
-       EM(rxrpc_peer_put,                      "PUT") \
-       E_(rxrpc_peer_queued_error,             "QER")
+       E_(rxrpc_peer_put,                      "PUT")
 
 #define rxrpc_conn_traces \
        EM(rxrpc_conn_got,                      "GOT") \
index c72686193d832735ddfd53afa862e8967daaaabc..ef9554131434496ae02ab6c47418c13ebbc3b239 100644 (file)
@@ -288,7 +288,6 @@ struct rxrpc_peer {
        struct hlist_node       hash_link;
        struct rxrpc_local      *local;
        struct hlist_head       error_targets;  /* targets for net error distribution */
-       struct work_struct      error_distributor;
        struct rb_root          service_conns;  /* Service connections */
        struct list_head        keepalive_link; /* Link in net->peer_keepalive[] */
        time64_t                last_tx_at;     /* Last time packet sent here */
@@ -299,8 +298,6 @@ struct rxrpc_peer {
        unsigned int            maxdata;        /* data size (MTU - hdrsize) */
        unsigned short          hdrsize;        /* header size (IP + UDP + RxRPC) */
        int                     debug_id;       /* debug ID for printks */
-       int                     error_report;   /* Net (+0) or local (+1000000) to distribute */
-#define RXRPC_LOCAL_ERROR_OFFSET 1000000
        struct sockaddr_rxrpc   srx;            /* remote address */
 
        /* calculated RTT cache */
@@ -1039,7 +1036,6 @@ void rxrpc_send_keepalive(struct rxrpc_peer *);
  * peer_event.c
  */
 void rxrpc_error_report(struct sock *);
-void rxrpc_peer_error_distributor(struct work_struct *);
 void rxrpc_peer_add_rtt(struct rxrpc_call *, enum rxrpc_rtt_rx_trace,
                        rxrpc_serial_t, rxrpc_serial_t, ktime_t, ktime_t);
 void rxrpc_peer_keepalive_worker(struct work_struct *);
@@ -1057,7 +1053,6 @@ void rxrpc_destroy_all_peers(struct rxrpc_net *);
 struct rxrpc_peer *rxrpc_get_peer(struct rxrpc_peer *);
 struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *);
 void rxrpc_put_peer(struct rxrpc_peer *);
-void __rxrpc_queue_peer_error(struct rxrpc_peer *);
 
 /*
  * proc.c
index 9486293fef5c6f98c96397fc90eb14eecf332196..799f75b6900ddc4a7a5aecf87325b355ebbbcecc 100644 (file)
@@ -400,7 +400,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
        rcu_assign_pointer(conn->channels[chan].call, call);
 
        spin_lock(&conn->params.peer->lock);
-       hlist_add_head(&call->error_link, &conn->params.peer->error_targets);
+       hlist_add_head_rcu(&call->error_link, &conn->params.peer->error_targets);
        spin_unlock(&conn->params.peer->lock);
 
        _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
index f8f37188a9322829b8f4277c09b7329d2f4c1da0..8acf74fe24c03646916c1b69cddf8c7be3f79d43 100644 (file)
@@ -710,8 +710,8 @@ int rxrpc_connect_call(struct rxrpc_call *call,
        }
 
        spin_lock_bh(&call->conn->params.peer->lock);
-       hlist_add_head(&call->error_link,
-                      &call->conn->params.peer->error_targets);
+       hlist_add_head_rcu(&call->error_link,
+                          &call->conn->params.peer->error_targets);
        spin_unlock_bh(&call->conn->params.peer->lock);
 
 out:
index b4438f98dc5ce1483c3b81d51da8e44537644fd6..885dae829f4a1a1690334f0a6a3375d34d79bffb 100644 (file)
@@ -216,7 +216,7 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
        call->peer->cong_cwnd = call->cong_cwnd;
 
        spin_lock_bh(&conn->params.peer->lock);
-       hlist_del_init(&call->error_link);
+       hlist_del_rcu(&call->error_link);
        spin_unlock_bh(&conn->params.peer->lock);
 
        if (rxrpc_is_client_call(call))
index 4f9da2f51c694c3f93d3883476057377664b80e7..f3e6fc670da2339998992f0f0904e1f0b767ddd1 100644 (file)
@@ -23,6 +23,8 @@
 #include "ar-internal.h"
 
 static void rxrpc_store_error(struct rxrpc_peer *, struct sock_exterr_skb *);
+static void rxrpc_distribute_error(struct rxrpc_peer *, int,
+                                  enum rxrpc_call_completion);
 
 /*
  * Find the peer associated with an ICMP packet.
@@ -194,8 +196,6 @@ void rxrpc_error_report(struct sock *sk)
        rcu_read_unlock();
        rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 
-       /* The ref we obtained is passed off to the work item */
-       __rxrpc_queue_peer_error(peer);
        _leave("");
 }
 
@@ -205,6 +205,7 @@ void rxrpc_error_report(struct sock *sk)
 static void rxrpc_store_error(struct rxrpc_peer *peer,
                              struct sock_exterr_skb *serr)
 {
+       enum rxrpc_call_completion compl = RXRPC_CALL_NETWORK_ERROR;
        struct sock_extended_err *ee;
        int err;
 
@@ -255,7 +256,7 @@ static void rxrpc_store_error(struct rxrpc_peer *peer,
        case SO_EE_ORIGIN_NONE:
        case SO_EE_ORIGIN_LOCAL:
                _proto("Rx Received local error { error=%d }", err);
-               err += RXRPC_LOCAL_ERROR_OFFSET;
+               compl = RXRPC_CALL_LOCAL_ERROR;
                break;
 
        case SO_EE_ORIGIN_ICMP6:
@@ -264,48 +265,23 @@ static void rxrpc_store_error(struct rxrpc_peer *peer,
                break;
        }
 
-       peer->error_report = err;
+       rxrpc_distribute_error(peer, err, compl);
 }
 
 /*
- * Distribute an error that occurred on a peer
+ * Distribute an error that occurred on a peer.
  */
-void rxrpc_peer_error_distributor(struct work_struct *work)
+static void rxrpc_distribute_error(struct rxrpc_peer *peer, int error,
+                                  enum rxrpc_call_completion compl)
 {
-       struct rxrpc_peer *peer =
-               container_of(work, struct rxrpc_peer, error_distributor);
        struct rxrpc_call *call;
-       enum rxrpc_call_completion compl;
-       int error;
-
-       _enter("");
-
-       error = READ_ONCE(peer->error_report);
-       if (error < RXRPC_LOCAL_ERROR_OFFSET) {
-               compl = RXRPC_CALL_NETWORK_ERROR;
-       } else {
-               compl = RXRPC_CALL_LOCAL_ERROR;
-               error -= RXRPC_LOCAL_ERROR_OFFSET;
-       }
 
-       _debug("ISSUE ERROR %s %d", rxrpc_call_completions[compl], error);
-
-       spin_lock_bh(&peer->lock);
-
-       while (!hlist_empty(&peer->error_targets)) {
-               call = hlist_entry(peer->error_targets.first,
-                                  struct rxrpc_call, error_link);
-               hlist_del_init(&call->error_link);
+       hlist_for_each_entry_rcu(call, &peer->error_targets, error_link) {
                rxrpc_see_call(call);
-
-               if (rxrpc_set_call_completion(call, compl, 0, -error))
+               if (call->state < RXRPC_CALL_COMPLETE &&
+                   rxrpc_set_call_completion(call, compl, 0, -error))
                        rxrpc_notify_socket(call);
        }
-
-       spin_unlock_bh(&peer->lock);
-
-       rxrpc_put_peer(peer);
-       _leave("");
 }
 
 /*
index 70083e8fb6e5a2a83bad364f33eb5c2c25a71e95..01a9febfa36714da7293c1b9b5a5235d0947f8d0 100644 (file)
@@ -220,8 +220,6 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
                atomic_set(&peer->usage, 1);
                peer->local = local;
                INIT_HLIST_HEAD(&peer->error_targets);
-               INIT_WORK(&peer->error_distributor,
-                         &rxrpc_peer_error_distributor);
                peer->service_conns = RB_ROOT;
                seqlock_init(&peer->service_conn_lock);
                spin_lock_init(&peer->lock);
@@ -402,21 +400,6 @@ struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *peer)
        return peer;
 }
 
-/*
- * Queue a peer record.  This passes the caller's ref to the workqueue.
- */
-void __rxrpc_queue_peer_error(struct rxrpc_peer *peer)
-{
-       const void *here = __builtin_return_address(0);
-       int n;
-
-       n = atomic_read(&peer->usage);
-       if (rxrpc_queue_work(&peer->error_distributor))
-               trace_rxrpc_peer(peer, rxrpc_peer_queued_error, n, here);
-       else
-               rxrpc_put_peer(peer);
-}
-
 /*
  * Discard a peer record.
  */