]> asedeno.scripts.mit.edu Git - linux.git/blob - fs/ceph/mds_client.c
ceph: fix uid/gid on resent mds requests
[linux.git] / fs / ceph / mds_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/sched.h>
7 #include <linux/debugfs.h>
8 #include <linux/seq_file.h>
9 #include <linux/smp_lock.h>
10
11 #include "super.h"
12 #include "mds_client.h"
13
14 #include <linux/ceph/messenger.h>
15 #include <linux/ceph/decode.h>
16 #include <linux/ceph/pagelist.h>
17 #include <linux/ceph/auth.h>
18 #include <linux/ceph/debugfs.h>
19
20 /*
21  * A cluster of MDS (metadata server) daemons is responsible for
22  * managing the file system namespace (the directory hierarchy and
23  * inodes) and for coordinating shared access to storage.  Metadata is
24  * partitioning hierarchically across a number of servers, and that
25  * partition varies over time as the cluster adjusts the distribution
26  * in order to balance load.
27  *
28  * The MDS client is primarily responsible to managing synchronous
29  * metadata requests for operations like open, unlink, and so forth.
30  * If there is a MDS failure, we find out about it when we (possibly
31  * request and) receive a new MDS map, and can resubmit affected
32  * requests.
33  *
34  * For the most part, though, we take advantage of a lossless
35  * communications channel to the MDS, and do not need to worry about
36  * timing out or resubmitting requests.
37  *
38  * We maintain a stateful "session" with each MDS we interact with.
39  * Within each session, we sent periodic heartbeat messages to ensure
40  * any capabilities or leases we have been issues remain valid.  If
41  * the session times out and goes stale, our leases and capabilities
42  * are no longer valid.
43  */
44
45 struct ceph_reconnect_state {
46         struct ceph_pagelist *pagelist;
47         bool flock;
48 };
49
50 static void __wake_requests(struct ceph_mds_client *mdsc,
51                             struct list_head *head);
52
53 static const struct ceph_connection_operations mds_con_ops;
54
55
56 /*
57  * mds reply parsing
58  */
59
60 /*
61  * parse individual inode info
62  */
63 static int parse_reply_info_in(void **p, void *end,
64                                struct ceph_mds_reply_info_in *info)
65 {
66         int err = -EIO;
67
68         info->in = *p;
69         *p += sizeof(struct ceph_mds_reply_inode) +
70                 sizeof(*info->in->fragtree.splits) *
71                 le32_to_cpu(info->in->fragtree.nsplits);
72
73         ceph_decode_32_safe(p, end, info->symlink_len, bad);
74         ceph_decode_need(p, end, info->symlink_len, bad);
75         info->symlink = *p;
76         *p += info->symlink_len;
77
78         ceph_decode_32_safe(p, end, info->xattr_len, bad);
79         ceph_decode_need(p, end, info->xattr_len, bad);
80         info->xattr_data = *p;
81         *p += info->xattr_len;
82         return 0;
83 bad:
84         return err;
85 }
86
87 /*
88  * parse a normal reply, which may contain a (dir+)dentry and/or a
89  * target inode.
90  */
91 static int parse_reply_info_trace(void **p, void *end,
92                                   struct ceph_mds_reply_info_parsed *info)
93 {
94         int err;
95
96         if (info->head->is_dentry) {
97                 err = parse_reply_info_in(p, end, &info->diri);
98                 if (err < 0)
99                         goto out_bad;
100
101                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
102                         goto bad;
103                 info->dirfrag = *p;
104                 *p += sizeof(*info->dirfrag) +
105                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
106                 if (unlikely(*p > end))
107                         goto bad;
108
109                 ceph_decode_32_safe(p, end, info->dname_len, bad);
110                 ceph_decode_need(p, end, info->dname_len, bad);
111                 info->dname = *p;
112                 *p += info->dname_len;
113                 info->dlease = *p;
114                 *p += sizeof(*info->dlease);
115         }
116
117         if (info->head->is_target) {
118                 err = parse_reply_info_in(p, end, &info->targeti);
119                 if (err < 0)
120                         goto out_bad;
121         }
122
123         if (unlikely(*p != end))
124                 goto bad;
125         return 0;
126
127 bad:
128         err = -EIO;
129 out_bad:
130         pr_err("problem parsing mds trace %d\n", err);
131         return err;
132 }
133
134 /*
135  * parse readdir results
136  */
137 static int parse_reply_info_dir(void **p, void *end,
138                                 struct ceph_mds_reply_info_parsed *info)
139 {
140         u32 num, i = 0;
141         int err;
142
143         info->dir_dir = *p;
144         if (*p + sizeof(*info->dir_dir) > end)
145                 goto bad;
146         *p += sizeof(*info->dir_dir) +
147                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
148         if (*p > end)
149                 goto bad;
150
151         ceph_decode_need(p, end, sizeof(num) + 2, bad);
152         num = ceph_decode_32(p);
153         info->dir_end = ceph_decode_8(p);
154         info->dir_complete = ceph_decode_8(p);
155         if (num == 0)
156                 goto done;
157
158         /* alloc large array */
159         info->dir_nr = num;
160         info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
161                                sizeof(*info->dir_dname) +
162                                sizeof(*info->dir_dname_len) +
163                                sizeof(*info->dir_dlease),
164                                GFP_NOFS);
165         if (info->dir_in == NULL) {
166                 err = -ENOMEM;
167                 goto out_bad;
168         }
169         info->dir_dname = (void *)(info->dir_in + num);
170         info->dir_dname_len = (void *)(info->dir_dname + num);
171         info->dir_dlease = (void *)(info->dir_dname_len + num);
172
173         while (num) {
174                 /* dentry */
175                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
176                 info->dir_dname_len[i] = ceph_decode_32(p);
177                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
178                 info->dir_dname[i] = *p;
179                 *p += info->dir_dname_len[i];
180                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
181                      info->dir_dname[i]);
182                 info->dir_dlease[i] = *p;
183                 *p += sizeof(struct ceph_mds_reply_lease);
184
185                 /* inode */
186                 err = parse_reply_info_in(p, end, &info->dir_in[i]);
187                 if (err < 0)
188                         goto out_bad;
189                 i++;
190                 num--;
191         }
192
193 done:
194         if (*p != end)
195                 goto bad;
196         return 0;
197
198 bad:
199         err = -EIO;
200 out_bad:
201         pr_err("problem parsing dir contents %d\n", err);
202         return err;
203 }
204
205 /*
206  * parse entire mds reply
207  */
208 static int parse_reply_info(struct ceph_msg *msg,
209                             struct ceph_mds_reply_info_parsed *info)
210 {
211         void *p, *end;
212         u32 len;
213         int err;
214
215         info->head = msg->front.iov_base;
216         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
217         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
218
219         /* trace */
220         ceph_decode_32_safe(&p, end, len, bad);
221         if (len > 0) {
222                 err = parse_reply_info_trace(&p, p+len, info);
223                 if (err < 0)
224                         goto out_bad;
225         }
226
227         /* dir content */
228         ceph_decode_32_safe(&p, end, len, bad);
229         if (len > 0) {
230                 err = parse_reply_info_dir(&p, p+len, info);
231                 if (err < 0)
232                         goto out_bad;
233         }
234
235         /* snap blob */
236         ceph_decode_32_safe(&p, end, len, bad);
237         info->snapblob_len = len;
238         info->snapblob = p;
239         p += len;
240
241         if (p != end)
242                 goto bad;
243         return 0;
244
245 bad:
246         err = -EIO;
247 out_bad:
248         pr_err("mds parse_reply err %d\n", err);
249         return err;
250 }
251
252 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
253 {
254         kfree(info->dir_in);
255 }
256
257
258 /*
259  * sessions
260  */
261 static const char *session_state_name(int s)
262 {
263         switch (s) {
264         case CEPH_MDS_SESSION_NEW: return "new";
265         case CEPH_MDS_SESSION_OPENING: return "opening";
266         case CEPH_MDS_SESSION_OPEN: return "open";
267         case CEPH_MDS_SESSION_HUNG: return "hung";
268         case CEPH_MDS_SESSION_CLOSING: return "closing";
269         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
270         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
271         default: return "???";
272         }
273 }
274
275 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
276 {
277         if (atomic_inc_not_zero(&s->s_ref)) {
278                 dout("mdsc get_session %p %d -> %d\n", s,
279                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
280                 return s;
281         } else {
282                 dout("mdsc get_session %p 0 -- FAIL", s);
283                 return NULL;
284         }
285 }
286
287 void ceph_put_mds_session(struct ceph_mds_session *s)
288 {
289         dout("mdsc put_session %p %d -> %d\n", s,
290              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
291         if (atomic_dec_and_test(&s->s_ref)) {
292                 if (s->s_authorizer)
293                      s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
294                              s->s_mdsc->fsc->client->monc.auth,
295                              s->s_authorizer);
296                 kfree(s);
297         }
298 }
299
300 /*
301  * called under mdsc->mutex
302  */
303 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
304                                                    int mds)
305 {
306         struct ceph_mds_session *session;
307
308         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
309                 return NULL;
310         session = mdsc->sessions[mds];
311         dout("lookup_mds_session %p %d\n", session,
312              atomic_read(&session->s_ref));
313         get_session(session);
314         return session;
315 }
316
317 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
318 {
319         if (mds >= mdsc->max_sessions)
320                 return false;
321         return mdsc->sessions[mds];
322 }
323
324 static int __verify_registered_session(struct ceph_mds_client *mdsc,
325                                        struct ceph_mds_session *s)
326 {
327         if (s->s_mds >= mdsc->max_sessions ||
328             mdsc->sessions[s->s_mds] != s)
329                 return -ENOENT;
330         return 0;
331 }
332
333 /*
334  * create+register a new session for given mds.
335  * called under mdsc->mutex.
336  */
337 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
338                                                  int mds)
339 {
340         struct ceph_mds_session *s;
341
342         s = kzalloc(sizeof(*s), GFP_NOFS);
343         if (!s)
344                 return ERR_PTR(-ENOMEM);
345         s->s_mdsc = mdsc;
346         s->s_mds = mds;
347         s->s_state = CEPH_MDS_SESSION_NEW;
348         s->s_ttl = 0;
349         s->s_seq = 0;
350         mutex_init(&s->s_mutex);
351
352         ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
353         s->s_con.private = s;
354         s->s_con.ops = &mds_con_ops;
355         s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
356         s->s_con.peer_name.num = cpu_to_le64(mds);
357
358         spin_lock_init(&s->s_cap_lock);
359         s->s_cap_gen = 0;
360         s->s_cap_ttl = 0;
361         s->s_renew_requested = 0;
362         s->s_renew_seq = 0;
363         INIT_LIST_HEAD(&s->s_caps);
364         s->s_nr_caps = 0;
365         s->s_trim_caps = 0;
366         atomic_set(&s->s_ref, 1);
367         INIT_LIST_HEAD(&s->s_waiting);
368         INIT_LIST_HEAD(&s->s_unsafe);
369         s->s_num_cap_releases = 0;
370         s->s_cap_iterator = NULL;
371         INIT_LIST_HEAD(&s->s_cap_releases);
372         INIT_LIST_HEAD(&s->s_cap_releases_done);
373         INIT_LIST_HEAD(&s->s_cap_flushing);
374         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
375
376         dout("register_session mds%d\n", mds);
377         if (mds >= mdsc->max_sessions) {
378                 int newmax = 1 << get_count_order(mds+1);
379                 struct ceph_mds_session **sa;
380
381                 dout("register_session realloc to %d\n", newmax);
382                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
383                 if (sa == NULL)
384                         goto fail_realloc;
385                 if (mdsc->sessions) {
386                         memcpy(sa, mdsc->sessions,
387                                mdsc->max_sessions * sizeof(void *));
388                         kfree(mdsc->sessions);
389                 }
390                 mdsc->sessions = sa;
391                 mdsc->max_sessions = newmax;
392         }
393         mdsc->sessions[mds] = s;
394         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
395
396         ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
397
398         return s;
399
400 fail_realloc:
401         kfree(s);
402         return ERR_PTR(-ENOMEM);
403 }
404
405 /*
406  * called under mdsc->mutex
407  */
408 static void __unregister_session(struct ceph_mds_client *mdsc,
409                                struct ceph_mds_session *s)
410 {
411         dout("__unregister_session mds%d %p\n", s->s_mds, s);
412         BUG_ON(mdsc->sessions[s->s_mds] != s);
413         mdsc->sessions[s->s_mds] = NULL;
414         ceph_con_close(&s->s_con);
415         ceph_put_mds_session(s);
416 }
417
418 /*
419  * drop session refs in request.
420  *
421  * should be last request ref, or hold mdsc->mutex
422  */
423 static void put_request_session(struct ceph_mds_request *req)
424 {
425         if (req->r_session) {
426                 ceph_put_mds_session(req->r_session);
427                 req->r_session = NULL;
428         }
429 }
430
431 void ceph_mdsc_release_request(struct kref *kref)
432 {
433         struct ceph_mds_request *req = container_of(kref,
434                                                     struct ceph_mds_request,
435                                                     r_kref);
436         if (req->r_request)
437                 ceph_msg_put(req->r_request);
438         if (req->r_reply) {
439                 ceph_msg_put(req->r_reply);
440                 destroy_reply_info(&req->r_reply_info);
441         }
442         if (req->r_inode) {
443                 ceph_put_cap_refs(ceph_inode(req->r_inode),
444                                   CEPH_CAP_PIN);
445                 iput(req->r_inode);
446         }
447         if (req->r_locked_dir)
448                 ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
449                                   CEPH_CAP_PIN);
450         if (req->r_target_inode)
451                 iput(req->r_target_inode);
452         if (req->r_dentry)
453                 dput(req->r_dentry);
454         if (req->r_old_dentry) {
455                 ceph_put_cap_refs(
456                         ceph_inode(req->r_old_dentry->d_parent->d_inode),
457                         CEPH_CAP_PIN);
458                 dput(req->r_old_dentry);
459         }
460         kfree(req->r_path1);
461         kfree(req->r_path2);
462         put_request_session(req);
463         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
464         kfree(req);
465 }
466
467 /*
468  * lookup session, bump ref if found.
469  *
470  * called under mdsc->mutex.
471  */
472 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
473                                              u64 tid)
474 {
475         struct ceph_mds_request *req;
476         struct rb_node *n = mdsc->request_tree.rb_node;
477
478         while (n) {
479                 req = rb_entry(n, struct ceph_mds_request, r_node);
480                 if (tid < req->r_tid)
481                         n = n->rb_left;
482                 else if (tid > req->r_tid)
483                         n = n->rb_right;
484                 else {
485                         ceph_mdsc_get_request(req);
486                         return req;
487                 }
488         }
489         return NULL;
490 }
491
492 static void __insert_request(struct ceph_mds_client *mdsc,
493                              struct ceph_mds_request *new)
494 {
495         struct rb_node **p = &mdsc->request_tree.rb_node;
496         struct rb_node *parent = NULL;
497         struct ceph_mds_request *req = NULL;
498
499         while (*p) {
500                 parent = *p;
501                 req = rb_entry(parent, struct ceph_mds_request, r_node);
502                 if (new->r_tid < req->r_tid)
503                         p = &(*p)->rb_left;
504                 else if (new->r_tid > req->r_tid)
505                         p = &(*p)->rb_right;
506                 else
507                         BUG();
508         }
509
510         rb_link_node(&new->r_node, parent, p);
511         rb_insert_color(&new->r_node, &mdsc->request_tree);
512 }
513
514 /*
515  * Register an in-flight request, and assign a tid.  Link to directory
516  * are modifying (if any).
517  *
518  * Called under mdsc->mutex.
519  */
520 static void __register_request(struct ceph_mds_client *mdsc,
521                                struct ceph_mds_request *req,
522                                struct inode *dir)
523 {
524         req->r_tid = ++mdsc->last_tid;
525         if (req->r_num_caps)
526                 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
527                                   req->r_num_caps);
528         dout("__register_request %p tid %lld\n", req, req->r_tid);
529         ceph_mdsc_get_request(req);
530         __insert_request(mdsc, req);
531
532         req->r_uid = current_fsuid();
533         req->r_gid = current_fsgid();
534
535         if (dir) {
536                 struct ceph_inode_info *ci = ceph_inode(dir);
537
538                 spin_lock(&ci->i_unsafe_lock);
539                 req->r_unsafe_dir = dir;
540                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
541                 spin_unlock(&ci->i_unsafe_lock);
542         }
543 }
544
545 static void __unregister_request(struct ceph_mds_client *mdsc,
546                                  struct ceph_mds_request *req)
547 {
548         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
549         rb_erase(&req->r_node, &mdsc->request_tree);
550         RB_CLEAR_NODE(&req->r_node);
551
552         if (req->r_unsafe_dir) {
553                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
554
555                 spin_lock(&ci->i_unsafe_lock);
556                 list_del_init(&req->r_unsafe_dir_item);
557                 spin_unlock(&ci->i_unsafe_lock);
558         }
559
560         ceph_mdsc_put_request(req);
561 }
562
563 /*
564  * Choose mds to send request to next.  If there is a hint set in the
565  * request (e.g., due to a prior forward hint from the mds), use that.
566  * Otherwise, consult frag tree and/or caps to identify the
567  * appropriate mds.  If all else fails, choose randomly.
568  *
569  * Called under mdsc->mutex.
570  */
571 struct dentry *get_nonsnap_parent(struct dentry *dentry)
572 {
573         while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
574                 dentry = dentry->d_parent;
575         return dentry;
576 }
577
578 static int __choose_mds(struct ceph_mds_client *mdsc,
579                         struct ceph_mds_request *req)
580 {
581         struct inode *inode;
582         struct ceph_inode_info *ci;
583         struct ceph_cap *cap;
584         int mode = req->r_direct_mode;
585         int mds = -1;
586         u32 hash = req->r_direct_hash;
587         bool is_hash = req->r_direct_is_hash;
588
589         /*
590          * is there a specific mds we should try?  ignore hint if we have
591          * no session and the mds is not up (active or recovering).
592          */
593         if (req->r_resend_mds >= 0 &&
594             (__have_session(mdsc, req->r_resend_mds) ||
595              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
596                 dout("choose_mds using resend_mds mds%d\n",
597                      req->r_resend_mds);
598                 return req->r_resend_mds;
599         }
600
601         if (mode == USE_RANDOM_MDS)
602                 goto random;
603
604         inode = NULL;
605         if (req->r_inode) {
606                 inode = req->r_inode;
607         } else if (req->r_dentry) {
608                 struct inode *dir = req->r_dentry->d_parent->d_inode;
609
610                 if (dir->i_sb != mdsc->fsc->sb) {
611                         /* not this fs! */
612                         inode = req->r_dentry->d_inode;
613                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
614                         /* direct snapped/virtual snapdir requests
615                          * based on parent dir inode */
616                         struct dentry *dn =
617                                 get_nonsnap_parent(req->r_dentry->d_parent);
618                         inode = dn->d_inode;
619                         dout("__choose_mds using nonsnap parent %p\n", inode);
620                 } else if (req->r_dentry->d_inode) {
621                         /* dentry target */
622                         inode = req->r_dentry->d_inode;
623                 } else {
624                         /* dir + name */
625                         inode = dir;
626                         hash = req->r_dentry->d_name.hash;
627                         is_hash = true;
628                 }
629         }
630
631         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
632              (int)hash, mode);
633         if (!inode)
634                 goto random;
635         ci = ceph_inode(inode);
636
637         if (is_hash && S_ISDIR(inode->i_mode)) {
638                 struct ceph_inode_frag frag;
639                 int found;
640
641                 ceph_choose_frag(ci, hash, &frag, &found);
642                 if (found) {
643                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
644                                 u8 r;
645
646                                 /* choose a random replica */
647                                 get_random_bytes(&r, 1);
648                                 r %= frag.ndist;
649                                 mds = frag.dist[r];
650                                 dout("choose_mds %p %llx.%llx "
651                                      "frag %u mds%d (%d/%d)\n",
652                                      inode, ceph_vinop(inode),
653                                      frag.frag, frag.mds,
654                                      (int)r, frag.ndist);
655                                 return mds;
656                         }
657
658                         /* since this file/dir wasn't known to be
659                          * replicated, then we want to look for the
660                          * authoritative mds. */
661                         mode = USE_AUTH_MDS;
662                         if (frag.mds >= 0) {
663                                 /* choose auth mds */
664                                 mds = frag.mds;
665                                 dout("choose_mds %p %llx.%llx "
666                                      "frag %u mds%d (auth)\n",
667                                      inode, ceph_vinop(inode), frag.frag, mds);
668                                 return mds;
669                         }
670                 }
671         }
672
673         spin_lock(&inode->i_lock);
674         cap = NULL;
675         if (mode == USE_AUTH_MDS)
676                 cap = ci->i_auth_cap;
677         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
678                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
679         if (!cap) {
680                 spin_unlock(&inode->i_lock);
681                 goto random;
682         }
683         mds = cap->session->s_mds;
684         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
685              inode, ceph_vinop(inode), mds,
686              cap == ci->i_auth_cap ? "auth " : "", cap);
687         spin_unlock(&inode->i_lock);
688         return mds;
689
690 random:
691         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
692         dout("choose_mds chose random mds%d\n", mds);
693         return mds;
694 }
695
696
697 /*
698  * session messages
699  */
700 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
701 {
702         struct ceph_msg *msg;
703         struct ceph_mds_session_head *h;
704
705         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
706         if (!msg) {
707                 pr_err("create_session_msg ENOMEM creating msg\n");
708                 return NULL;
709         }
710         h = msg->front.iov_base;
711         h->op = cpu_to_le32(op);
712         h->seq = cpu_to_le64(seq);
713         return msg;
714 }
715
716 /*
717  * send session open request.
718  *
719  * called under mdsc->mutex
720  */
721 static int __open_session(struct ceph_mds_client *mdsc,
722                           struct ceph_mds_session *session)
723 {
724         struct ceph_msg *msg;
725         int mstate;
726         int mds = session->s_mds;
727
728         /* wait for mds to go active? */
729         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
730         dout("open_session to mds%d (%s)\n", mds,
731              ceph_mds_state_name(mstate));
732         session->s_state = CEPH_MDS_SESSION_OPENING;
733         session->s_renew_requested = jiffies;
734
735         /* send connect message */
736         msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
737         if (!msg)
738                 return -ENOMEM;
739         ceph_con_send(&session->s_con, msg);
740         return 0;
741 }
742
743 /*
744  * open sessions for any export targets for the given mds
745  *
746  * called under mdsc->mutex
747  */
748 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
749                                           struct ceph_mds_session *session)
750 {
751         struct ceph_mds_info *mi;
752         struct ceph_mds_session *ts;
753         int i, mds = session->s_mds;
754         int target;
755
756         if (mds >= mdsc->mdsmap->m_max_mds)
757                 return;
758         mi = &mdsc->mdsmap->m_info[mds];
759         dout("open_export_target_sessions for mds%d (%d targets)\n",
760              session->s_mds, mi->num_export_targets);
761
762         for (i = 0; i < mi->num_export_targets; i++) {
763                 target = mi->export_targets[i];
764                 ts = __ceph_lookup_mds_session(mdsc, target);
765                 if (!ts) {
766                         ts = register_session(mdsc, target);
767                         if (IS_ERR(ts))
768                                 return;
769                 }
770                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
771                     session->s_state == CEPH_MDS_SESSION_CLOSING)
772                         __open_session(mdsc, session);
773                 else
774                         dout(" mds%d target mds%d %p is %s\n", session->s_mds,
775                              i, ts, session_state_name(ts->s_state));
776                 ceph_put_mds_session(ts);
777         }
778 }
779
780 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
781                                            struct ceph_mds_session *session)
782 {
783         mutex_lock(&mdsc->mutex);
784         __open_export_target_sessions(mdsc, session);
785         mutex_unlock(&mdsc->mutex);
786 }
787
788 /*
789  * session caps
790  */
791
792 /*
793  * Free preallocated cap messages assigned to this session
794  */
795 static void cleanup_cap_releases(struct ceph_mds_session *session)
796 {
797         struct ceph_msg *msg;
798
799         spin_lock(&session->s_cap_lock);
800         while (!list_empty(&session->s_cap_releases)) {
801                 msg = list_first_entry(&session->s_cap_releases,
802                                        struct ceph_msg, list_head);
803                 list_del_init(&msg->list_head);
804                 ceph_msg_put(msg);
805         }
806         while (!list_empty(&session->s_cap_releases_done)) {
807                 msg = list_first_entry(&session->s_cap_releases_done,
808                                        struct ceph_msg, list_head);
809                 list_del_init(&msg->list_head);
810                 ceph_msg_put(msg);
811         }
812         spin_unlock(&session->s_cap_lock);
813 }
814
815 /*
816  * Helper to safely iterate over all caps associated with a session, with
817  * special care taken to handle a racing __ceph_remove_cap().
818  *
819  * Caller must hold session s_mutex.
820  */
821 static int iterate_session_caps(struct ceph_mds_session *session,
822                                  int (*cb)(struct inode *, struct ceph_cap *,
823                                             void *), void *arg)
824 {
825         struct list_head *p;
826         struct ceph_cap *cap;
827         struct inode *inode, *last_inode = NULL;
828         struct ceph_cap *old_cap = NULL;
829         int ret;
830
831         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
832         spin_lock(&session->s_cap_lock);
833         p = session->s_caps.next;
834         while (p != &session->s_caps) {
835                 cap = list_entry(p, struct ceph_cap, session_caps);
836                 inode = igrab(&cap->ci->vfs_inode);
837                 if (!inode) {
838                         p = p->next;
839                         continue;
840                 }
841                 session->s_cap_iterator = cap;
842                 spin_unlock(&session->s_cap_lock);
843
844                 if (last_inode) {
845                         iput(last_inode);
846                         last_inode = NULL;
847                 }
848                 if (old_cap) {
849                         ceph_put_cap(session->s_mdsc, old_cap);
850                         old_cap = NULL;
851                 }
852
853                 ret = cb(inode, cap, arg);
854                 last_inode = inode;
855
856                 spin_lock(&session->s_cap_lock);
857                 p = p->next;
858                 if (cap->ci == NULL) {
859                         dout("iterate_session_caps  finishing cap %p removal\n",
860                              cap);
861                         BUG_ON(cap->session != session);
862                         list_del_init(&cap->session_caps);
863                         session->s_nr_caps--;
864                         cap->session = NULL;
865                         old_cap = cap;  /* put_cap it w/o locks held */
866                 }
867                 if (ret < 0)
868                         goto out;
869         }
870         ret = 0;
871 out:
872         session->s_cap_iterator = NULL;
873         spin_unlock(&session->s_cap_lock);
874
875         if (last_inode)
876                 iput(last_inode);
877         if (old_cap)
878                 ceph_put_cap(session->s_mdsc, old_cap);
879
880         return ret;
881 }
882
883 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
884                                   void *arg)
885 {
886         struct ceph_inode_info *ci = ceph_inode(inode);
887         int drop = 0;
888
889         dout("removing cap %p, ci is %p, inode is %p\n",
890              cap, ci, &ci->vfs_inode);
891         spin_lock(&inode->i_lock);
892         __ceph_remove_cap(cap);
893         if (!__ceph_is_any_real_caps(ci)) {
894                 struct ceph_mds_client *mdsc =
895                         ceph_sb_to_client(inode->i_sb)->mdsc;
896
897                 spin_lock(&mdsc->cap_dirty_lock);
898                 if (!list_empty(&ci->i_dirty_item)) {
899                         pr_info(" dropping dirty %s state for %p %lld\n",
900                                 ceph_cap_string(ci->i_dirty_caps),
901                                 inode, ceph_ino(inode));
902                         ci->i_dirty_caps = 0;
903                         list_del_init(&ci->i_dirty_item);
904                         drop = 1;
905                 }
906                 if (!list_empty(&ci->i_flushing_item)) {
907                         pr_info(" dropping dirty+flushing %s state for %p %lld\n",
908                                 ceph_cap_string(ci->i_flushing_caps),
909                                 inode, ceph_ino(inode));
910                         ci->i_flushing_caps = 0;
911                         list_del_init(&ci->i_flushing_item);
912                         mdsc->num_cap_flushing--;
913                         drop = 1;
914                 }
915                 if (drop && ci->i_wrbuffer_ref) {
916                         pr_info(" dropping dirty data for %p %lld\n",
917                                 inode, ceph_ino(inode));
918                         ci->i_wrbuffer_ref = 0;
919                         ci->i_wrbuffer_ref_head = 0;
920                         drop++;
921                 }
922                 spin_unlock(&mdsc->cap_dirty_lock);
923         }
924         spin_unlock(&inode->i_lock);
925         while (drop--)
926                 iput(inode);
927         return 0;
928 }
929
930 /*
931  * caller must hold session s_mutex
932  */
933 static void remove_session_caps(struct ceph_mds_session *session)
934 {
935         dout("remove_session_caps on %p\n", session);
936         iterate_session_caps(session, remove_session_caps_cb, NULL);
937         BUG_ON(session->s_nr_caps > 0);
938         BUG_ON(!list_empty(&session->s_cap_flushing));
939         cleanup_cap_releases(session);
940 }
941
942 /*
943  * wake up any threads waiting on this session's caps.  if the cap is
944  * old (didn't get renewed on the client reconnect), remove it now.
945  *
946  * caller must hold s_mutex.
947  */
948 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
949                               void *arg)
950 {
951         struct ceph_inode_info *ci = ceph_inode(inode);
952
953         wake_up_all(&ci->i_cap_wq);
954         if (arg) {
955                 spin_lock(&inode->i_lock);
956                 ci->i_wanted_max_size = 0;
957                 ci->i_requested_max_size = 0;
958                 spin_unlock(&inode->i_lock);
959         }
960         return 0;
961 }
962
963 static void wake_up_session_caps(struct ceph_mds_session *session,
964                                  int reconnect)
965 {
966         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
967         iterate_session_caps(session, wake_up_session_cb,
968                              (void *)(unsigned long)reconnect);
969 }
970
971 /*
972  * Send periodic message to MDS renewing all currently held caps.  The
973  * ack will reset the expiration for all caps from this session.
974  *
975  * caller holds s_mutex
976  */
977 static int send_renew_caps(struct ceph_mds_client *mdsc,
978                            struct ceph_mds_session *session)
979 {
980         struct ceph_msg *msg;
981         int state;
982
983         if (time_after_eq(jiffies, session->s_cap_ttl) &&
984             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
985                 pr_info("mds%d caps stale\n", session->s_mds);
986         session->s_renew_requested = jiffies;
987
988         /* do not try to renew caps until a recovering mds has reconnected
989          * with its clients. */
990         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
991         if (state < CEPH_MDS_STATE_RECONNECT) {
992                 dout("send_renew_caps ignoring mds%d (%s)\n",
993                      session->s_mds, ceph_mds_state_name(state));
994                 return 0;
995         }
996
997         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
998                 ceph_mds_state_name(state));
999         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1000                                  ++session->s_renew_seq);
1001         if (!msg)
1002                 return -ENOMEM;
1003         ceph_con_send(&session->s_con, msg);
1004         return 0;
1005 }
1006
1007 /*
1008  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1009  *
1010  * Called under session->s_mutex
1011  */
1012 static void renewed_caps(struct ceph_mds_client *mdsc,
1013                          struct ceph_mds_session *session, int is_renew)
1014 {
1015         int was_stale;
1016         int wake = 0;
1017
1018         spin_lock(&session->s_cap_lock);
1019         was_stale = is_renew && (session->s_cap_ttl == 0 ||
1020                                  time_after_eq(jiffies, session->s_cap_ttl));
1021
1022         session->s_cap_ttl = session->s_renew_requested +
1023                 mdsc->mdsmap->m_session_timeout*HZ;
1024
1025         if (was_stale) {
1026                 if (time_before(jiffies, session->s_cap_ttl)) {
1027                         pr_info("mds%d caps renewed\n", session->s_mds);
1028                         wake = 1;
1029                 } else {
1030                         pr_info("mds%d caps still stale\n", session->s_mds);
1031                 }
1032         }
1033         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1034              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1035              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1036         spin_unlock(&session->s_cap_lock);
1037
1038         if (wake)
1039                 wake_up_session_caps(session, 0);
1040 }
1041
1042 /*
1043  * send a session close request
1044  */
1045 static int request_close_session(struct ceph_mds_client *mdsc,
1046                                  struct ceph_mds_session *session)
1047 {
1048         struct ceph_msg *msg;
1049
1050         dout("request_close_session mds%d state %s seq %lld\n",
1051              session->s_mds, session_state_name(session->s_state),
1052              session->s_seq);
1053         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1054         if (!msg)
1055                 return -ENOMEM;
1056         ceph_con_send(&session->s_con, msg);
1057         return 0;
1058 }
1059
1060 /*
1061  * Called with s_mutex held.
1062  */
1063 static int __close_session(struct ceph_mds_client *mdsc,
1064                          struct ceph_mds_session *session)
1065 {
1066         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1067                 return 0;
1068         session->s_state = CEPH_MDS_SESSION_CLOSING;
1069         return request_close_session(mdsc, session);
1070 }
1071
1072 /*
1073  * Trim old(er) caps.
1074  *
1075  * Because we can't cache an inode without one or more caps, we do
1076  * this indirectly: if a cap is unused, we prune its aliases, at which
1077  * point the inode will hopefully get dropped to.
1078  *
1079  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1080  * memory pressure from the MDS, though, so it needn't be perfect.
1081  */
1082 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1083 {
1084         struct ceph_mds_session *session = arg;
1085         struct ceph_inode_info *ci = ceph_inode(inode);
1086         int used, oissued, mine;
1087
1088         if (session->s_trim_caps <= 0)
1089                 return -1;
1090
1091         spin_lock(&inode->i_lock);
1092         mine = cap->issued | cap->implemented;
1093         used = __ceph_caps_used(ci);
1094         oissued = __ceph_caps_issued_other(ci, cap);
1095
1096         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1097              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1098              ceph_cap_string(used));
1099         if (ci->i_dirty_caps)
1100                 goto out;   /* dirty caps */
1101         if ((used & ~oissued) & mine)
1102                 goto out;   /* we need these caps */
1103
1104         session->s_trim_caps--;
1105         if (oissued) {
1106                 /* we aren't the only cap.. just remove us */
1107                 __ceph_remove_cap(cap);
1108         } else {
1109                 /* try to drop referring dentries */
1110                 spin_unlock(&inode->i_lock);
1111                 d_prune_aliases(inode);
1112                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1113                      inode, cap, atomic_read(&inode->i_count));
1114                 return 0;
1115         }
1116
1117 out:
1118         spin_unlock(&inode->i_lock);
1119         return 0;
1120 }
1121
1122 /*
1123  * Trim session cap count down to some max number.
1124  */
1125 static int trim_caps(struct ceph_mds_client *mdsc,
1126                      struct ceph_mds_session *session,
1127                      int max_caps)
1128 {
1129         int trim_caps = session->s_nr_caps - max_caps;
1130
1131         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1132              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1133         if (trim_caps > 0) {
1134                 session->s_trim_caps = trim_caps;
1135                 iterate_session_caps(session, trim_caps_cb, session);
1136                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1137                      session->s_mds, session->s_nr_caps, max_caps,
1138                         trim_caps - session->s_trim_caps);
1139                 session->s_trim_caps = 0;
1140         }
1141         return 0;
1142 }
1143
1144 /*
1145  * Allocate cap_release messages.  If there is a partially full message
1146  * in the queue, try to allocate enough to cover it's remainder, so that
1147  * we can send it immediately.
1148  *
1149  * Called under s_mutex.
1150  */
1151 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1152                           struct ceph_mds_session *session)
1153 {
1154         struct ceph_msg *msg, *partial = NULL;
1155         struct ceph_mds_cap_release *head;
1156         int err = -ENOMEM;
1157         int extra = mdsc->fsc->mount_options->cap_release_safety;
1158         int num;
1159
1160         dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1161              extra);
1162
1163         spin_lock(&session->s_cap_lock);
1164
1165         if (!list_empty(&session->s_cap_releases)) {
1166                 msg = list_first_entry(&session->s_cap_releases,
1167                                        struct ceph_msg,
1168                                  list_head);
1169                 head = msg->front.iov_base;
1170                 num = le32_to_cpu(head->num);
1171                 if (num) {
1172                         dout(" partial %p with (%d/%d)\n", msg, num,
1173                              (int)CEPH_CAPS_PER_RELEASE);
1174                         extra += CEPH_CAPS_PER_RELEASE - num;
1175                         partial = msg;
1176                 }
1177         }
1178         while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1179                 spin_unlock(&session->s_cap_lock);
1180                 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1181                                    GFP_NOFS);
1182                 if (!msg)
1183                         goto out_unlocked;
1184                 dout("add_cap_releases %p msg %p now %d\n", session, msg,
1185                      (int)msg->front.iov_len);
1186                 head = msg->front.iov_base;
1187                 head->num = cpu_to_le32(0);
1188                 msg->front.iov_len = sizeof(*head);
1189                 spin_lock(&session->s_cap_lock);
1190                 list_add(&msg->list_head, &session->s_cap_releases);
1191                 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1192         }
1193
1194         if (partial) {
1195                 head = partial->front.iov_base;
1196                 num = le32_to_cpu(head->num);
1197                 dout(" queueing partial %p with %d/%d\n", partial, num,
1198                      (int)CEPH_CAPS_PER_RELEASE);
1199                 list_move_tail(&partial->list_head,
1200                                &session->s_cap_releases_done);
1201                 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1202         }
1203         err = 0;
1204         spin_unlock(&session->s_cap_lock);
1205 out_unlocked:
1206         return err;
1207 }
1208
1209 /*
1210  * flush all dirty inode data to disk.
1211  *
1212  * returns true if we've flushed through want_flush_seq
1213  */
1214 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1215 {
1216         int mds, ret = 1;
1217
1218         dout("check_cap_flush want %lld\n", want_flush_seq);
1219         mutex_lock(&mdsc->mutex);
1220         for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1221                 struct ceph_mds_session *session = mdsc->sessions[mds];
1222
1223                 if (!session)
1224                         continue;
1225                 get_session(session);
1226                 mutex_unlock(&mdsc->mutex);
1227
1228                 mutex_lock(&session->s_mutex);
1229                 if (!list_empty(&session->s_cap_flushing)) {
1230                         struct ceph_inode_info *ci =
1231                                 list_entry(session->s_cap_flushing.next,
1232                                            struct ceph_inode_info,
1233                                            i_flushing_item);
1234                         struct inode *inode = &ci->vfs_inode;
1235
1236                         spin_lock(&inode->i_lock);
1237                         if (ci->i_cap_flush_seq <= want_flush_seq) {
1238                                 dout("check_cap_flush still flushing %p "
1239                                      "seq %lld <= %lld to mds%d\n", inode,
1240                                      ci->i_cap_flush_seq, want_flush_seq,
1241                                      session->s_mds);
1242                                 ret = 0;
1243                         }
1244                         spin_unlock(&inode->i_lock);
1245                 }
1246                 mutex_unlock(&session->s_mutex);
1247                 ceph_put_mds_session(session);
1248
1249                 if (!ret)
1250                         return ret;
1251                 mutex_lock(&mdsc->mutex);
1252         }
1253
1254         mutex_unlock(&mdsc->mutex);
1255         dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1256         return ret;
1257 }
1258
1259 /*
1260  * called under s_mutex
1261  */
1262 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1263                             struct ceph_mds_session *session)
1264 {
1265         struct ceph_msg *msg;
1266
1267         dout("send_cap_releases mds%d\n", session->s_mds);
1268         spin_lock(&session->s_cap_lock);
1269         while (!list_empty(&session->s_cap_releases_done)) {
1270                 msg = list_first_entry(&session->s_cap_releases_done,
1271                                  struct ceph_msg, list_head);
1272                 list_del_init(&msg->list_head);
1273                 spin_unlock(&session->s_cap_lock);
1274                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1275                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1276                 ceph_con_send(&session->s_con, msg);
1277                 spin_lock(&session->s_cap_lock);
1278         }
1279         spin_unlock(&session->s_cap_lock);
1280 }
1281
1282 static void discard_cap_releases(struct ceph_mds_client *mdsc,
1283                                  struct ceph_mds_session *session)
1284 {
1285         struct ceph_msg *msg;
1286         struct ceph_mds_cap_release *head;
1287         unsigned num;
1288
1289         dout("discard_cap_releases mds%d\n", session->s_mds);
1290         spin_lock(&session->s_cap_lock);
1291
1292         /* zero out the in-progress message */
1293         msg = list_first_entry(&session->s_cap_releases,
1294                                struct ceph_msg, list_head);
1295         head = msg->front.iov_base;
1296         num = le32_to_cpu(head->num);
1297         dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1298         head->num = cpu_to_le32(0);
1299         session->s_num_cap_releases += num;
1300
1301         /* requeue completed messages */
1302         while (!list_empty(&session->s_cap_releases_done)) {
1303                 msg = list_first_entry(&session->s_cap_releases_done,
1304                                  struct ceph_msg, list_head);
1305                 list_del_init(&msg->list_head);
1306
1307                 head = msg->front.iov_base;
1308                 num = le32_to_cpu(head->num);
1309                 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1310                      num);
1311                 session->s_num_cap_releases += num;
1312                 head->num = cpu_to_le32(0);
1313                 msg->front.iov_len = sizeof(*head);
1314                 list_add(&msg->list_head, &session->s_cap_releases);
1315         }
1316
1317         spin_unlock(&session->s_cap_lock);
1318 }
1319
1320 /*
1321  * requests
1322  */
1323
1324 /*
1325  * Create an mds request.
1326  */
1327 struct ceph_mds_request *
1328 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1329 {
1330         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1331
1332         if (!req)
1333                 return ERR_PTR(-ENOMEM);
1334
1335         mutex_init(&req->r_fill_mutex);
1336         req->r_mdsc = mdsc;
1337         req->r_started = jiffies;
1338         req->r_resend_mds = -1;
1339         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1340         req->r_fmode = -1;
1341         kref_init(&req->r_kref);
1342         INIT_LIST_HEAD(&req->r_wait);
1343         init_completion(&req->r_completion);
1344         init_completion(&req->r_safe_completion);
1345         INIT_LIST_HEAD(&req->r_unsafe_item);
1346
1347         req->r_op = op;
1348         req->r_direct_mode = mode;
1349         return req;
1350 }
1351
1352 /*
1353  * return oldest (lowest) request, tid in request tree, 0 if none.
1354  *
1355  * called under mdsc->mutex.
1356  */
1357 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1358 {
1359         if (RB_EMPTY_ROOT(&mdsc->request_tree))
1360                 return NULL;
1361         return rb_entry(rb_first(&mdsc->request_tree),
1362                         struct ceph_mds_request, r_node);
1363 }
1364
1365 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1366 {
1367         struct ceph_mds_request *req = __get_oldest_req(mdsc);
1368
1369         if (req)
1370                 return req->r_tid;
1371         return 0;
1372 }
1373
1374 /*
1375  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1376  * on build_path_from_dentry in fs/cifs/dir.c.
1377  *
1378  * If @stop_on_nosnap, generate path relative to the first non-snapped
1379  * inode.
1380  *
1381  * Encode hidden .snap dirs as a double /, i.e.
1382  *   foo/.snap/bar -> foo//bar
1383  */
1384 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1385                            int stop_on_nosnap)
1386 {
1387         struct dentry *temp;
1388         char *path;
1389         int len, pos;
1390
1391         if (dentry == NULL)
1392                 return ERR_PTR(-EINVAL);
1393
1394 retry:
1395         len = 0;
1396         for (temp = dentry; !IS_ROOT(temp);) {
1397                 struct inode *inode = temp->d_inode;
1398                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1399                         len++;  /* slash only */
1400                 else if (stop_on_nosnap && inode &&
1401                          ceph_snap(inode) == CEPH_NOSNAP)
1402                         break;
1403                 else
1404                         len += 1 + temp->d_name.len;
1405                 temp = temp->d_parent;
1406                 if (temp == NULL) {
1407                         pr_err("build_path corrupt dentry %p\n", dentry);
1408                         return ERR_PTR(-EINVAL);
1409                 }
1410         }
1411         if (len)
1412                 len--;  /* no leading '/' */
1413
1414         path = kmalloc(len+1, GFP_NOFS);
1415         if (path == NULL)
1416                 return ERR_PTR(-ENOMEM);
1417         pos = len;
1418         path[pos] = 0;  /* trailing null */
1419         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1420                 struct inode *inode = temp->d_inode;
1421
1422                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1423                         dout("build_path path+%d: %p SNAPDIR\n",
1424                              pos, temp);
1425                 } else if (stop_on_nosnap && inode &&
1426                            ceph_snap(inode) == CEPH_NOSNAP) {
1427                         break;
1428                 } else {
1429                         pos -= temp->d_name.len;
1430                         if (pos < 0)
1431                                 break;
1432                         strncpy(path + pos, temp->d_name.name,
1433                                 temp->d_name.len);
1434                 }
1435                 if (pos)
1436                         path[--pos] = '/';
1437                 temp = temp->d_parent;
1438                 if (temp == NULL) {
1439                         pr_err("build_path corrupt dentry\n");
1440                         kfree(path);
1441                         return ERR_PTR(-EINVAL);
1442                 }
1443         }
1444         if (pos != 0) {
1445                 pr_err("build_path did not end path lookup where "
1446                        "expected, namelen is %d, pos is %d\n", len, pos);
1447                 /* presumably this is only possible if racing with a
1448                    rename of one of the parent directories (we can not
1449                    lock the dentries above us to prevent this, but
1450                    retrying should be harmless) */
1451                 kfree(path);
1452                 goto retry;
1453         }
1454
1455         *base = ceph_ino(temp->d_inode);
1456         *plen = len;
1457         dout("build_path on %p %d built %llx '%.*s'\n",
1458              dentry, atomic_read(&dentry->d_count), *base, len, path);
1459         return path;
1460 }
1461
1462 static int build_dentry_path(struct dentry *dentry,
1463                              const char **ppath, int *ppathlen, u64 *pino,
1464                              int *pfreepath)
1465 {
1466         char *path;
1467
1468         if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1469                 *pino = ceph_ino(dentry->d_parent->d_inode);
1470                 *ppath = dentry->d_name.name;
1471                 *ppathlen = dentry->d_name.len;
1472                 return 0;
1473         }
1474         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1475         if (IS_ERR(path))
1476                 return PTR_ERR(path);
1477         *ppath = path;
1478         *pfreepath = 1;
1479         return 0;
1480 }
1481
1482 static int build_inode_path(struct inode *inode,
1483                             const char **ppath, int *ppathlen, u64 *pino,
1484                             int *pfreepath)
1485 {
1486         struct dentry *dentry;
1487         char *path;
1488
1489         if (ceph_snap(inode) == CEPH_NOSNAP) {
1490                 *pino = ceph_ino(inode);
1491                 *ppathlen = 0;
1492                 return 0;
1493         }
1494         dentry = d_find_alias(inode);
1495         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1496         dput(dentry);
1497         if (IS_ERR(path))
1498                 return PTR_ERR(path);
1499         *ppath = path;
1500         *pfreepath = 1;
1501         return 0;
1502 }
1503
1504 /*
1505  * request arguments may be specified via an inode *, a dentry *, or
1506  * an explicit ino+path.
1507  */
1508 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1509                                   const char *rpath, u64 rino,
1510                                   const char **ppath, int *pathlen,
1511                                   u64 *ino, int *freepath)
1512 {
1513         int r = 0;
1514
1515         if (rinode) {
1516                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1517                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1518                      ceph_snap(rinode));
1519         } else if (rdentry) {
1520                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1521                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1522                      *ppath);
1523         } else if (rpath) {
1524                 *ino = rino;
1525                 *ppath = rpath;
1526                 *pathlen = strlen(rpath);
1527                 dout(" path %.*s\n", *pathlen, rpath);
1528         }
1529
1530         return r;
1531 }
1532
1533 /*
1534  * called under mdsc->mutex
1535  */
1536 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1537                                                struct ceph_mds_request *req,
1538                                                int mds)
1539 {
1540         struct ceph_msg *msg;
1541         struct ceph_mds_request_head *head;
1542         const char *path1 = NULL;
1543         const char *path2 = NULL;
1544         u64 ino1 = 0, ino2 = 0;
1545         int pathlen1 = 0, pathlen2 = 0;
1546         int freepath1 = 0, freepath2 = 0;
1547         int len;
1548         u16 releases;
1549         void *p, *end;
1550         int ret;
1551
1552         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1553                               req->r_path1, req->r_ino1.ino,
1554                               &path1, &pathlen1, &ino1, &freepath1);
1555         if (ret < 0) {
1556                 msg = ERR_PTR(ret);
1557                 goto out;
1558         }
1559
1560         ret = set_request_path_attr(NULL, req->r_old_dentry,
1561                               req->r_path2, req->r_ino2.ino,
1562                               &path2, &pathlen2, &ino2, &freepath2);
1563         if (ret < 0) {
1564                 msg = ERR_PTR(ret);
1565                 goto out_free1;
1566         }
1567
1568         len = sizeof(*head) +
1569                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1570
1571         /* calculate (max) length for cap releases */
1572         len += sizeof(struct ceph_mds_request_release) *
1573                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1574                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1575         if (req->r_dentry_drop)
1576                 len += req->r_dentry->d_name.len;
1577         if (req->r_old_dentry_drop)
1578                 len += req->r_old_dentry->d_name.len;
1579
1580         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
1581         if (!msg) {
1582                 msg = ERR_PTR(-ENOMEM);
1583                 goto out_free2;
1584         }
1585
1586         msg->hdr.tid = cpu_to_le64(req->r_tid);
1587
1588         head = msg->front.iov_base;
1589         p = msg->front.iov_base + sizeof(*head);
1590         end = msg->front.iov_base + msg->front.iov_len;
1591
1592         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1593         head->op = cpu_to_le32(req->r_op);
1594         head->caller_uid = cpu_to_le32(req->r_uid);
1595         head->caller_gid = cpu_to_le32(req->r_gid);
1596         head->args = req->r_args;
1597
1598         ceph_encode_filepath(&p, end, ino1, path1);
1599         ceph_encode_filepath(&p, end, ino2, path2);
1600
1601         /* make note of release offset, in case we need to replay */
1602         req->r_request_release_offset = p - msg->front.iov_base;
1603
1604         /* cap releases */
1605         releases = 0;
1606         if (req->r_inode_drop)
1607                 releases += ceph_encode_inode_release(&p,
1608                       req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1609                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1610         if (req->r_dentry_drop)
1611                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1612                        mds, req->r_dentry_drop, req->r_dentry_unless);
1613         if (req->r_old_dentry_drop)
1614                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1615                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1616         if (req->r_old_inode_drop)
1617                 releases += ceph_encode_inode_release(&p,
1618                       req->r_old_dentry->d_inode,
1619                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1620         head->num_releases = cpu_to_le16(releases);
1621
1622         BUG_ON(p > end);
1623         msg->front.iov_len = p - msg->front.iov_base;
1624         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1625
1626         msg->pages = req->r_pages;
1627         msg->nr_pages = req->r_num_pages;
1628         msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1629         msg->hdr.data_off = cpu_to_le16(0);
1630
1631 out_free2:
1632         if (freepath2)
1633                 kfree((char *)path2);
1634 out_free1:
1635         if (freepath1)
1636                 kfree((char *)path1);
1637 out:
1638         return msg;
1639 }
1640
1641 /*
1642  * called under mdsc->mutex if error, under no mutex if
1643  * success.
1644  */
1645 static void complete_request(struct ceph_mds_client *mdsc,
1646                              struct ceph_mds_request *req)
1647 {
1648         if (req->r_callback)
1649                 req->r_callback(mdsc, req);
1650         else
1651                 complete_all(&req->r_completion);
1652 }
1653
1654 /*
1655  * called under mdsc->mutex
1656  */
1657 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1658                                   struct ceph_mds_request *req,
1659                                   int mds)
1660 {
1661         struct ceph_mds_request_head *rhead;
1662         struct ceph_msg *msg;
1663         int flags = 0;
1664
1665         req->r_mds = mds;
1666         req->r_attempts++;
1667         if (req->r_inode) {
1668                 struct ceph_cap *cap =
1669                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1670
1671                 if (cap)
1672                         req->r_sent_on_mseq = cap->mseq;
1673                 else
1674                         req->r_sent_on_mseq = -1;
1675         }
1676         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1677              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1678
1679         if (req->r_got_unsafe) {
1680                 /*
1681                  * Replay.  Do not regenerate message (and rebuild
1682                  * paths, etc.); just use the original message.
1683                  * Rebuilding paths will break for renames because
1684                  * d_move mangles the src name.
1685                  */
1686                 msg = req->r_request;
1687                 rhead = msg->front.iov_base;
1688
1689                 flags = le32_to_cpu(rhead->flags);
1690                 flags |= CEPH_MDS_FLAG_REPLAY;
1691                 rhead->flags = cpu_to_le32(flags);
1692
1693                 if (req->r_target_inode)
1694                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1695
1696                 rhead->num_retry = req->r_attempts - 1;
1697
1698                 /* remove cap/dentry releases from message */
1699                 rhead->num_releases = 0;
1700                 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1701                 msg->front.iov_len = req->r_request_release_offset;
1702                 return 0;
1703         }
1704
1705         if (req->r_request) {
1706                 ceph_msg_put(req->r_request);
1707                 req->r_request = NULL;
1708         }
1709         msg = create_request_message(mdsc, req, mds);
1710         if (IS_ERR(msg)) {
1711                 req->r_err = PTR_ERR(msg);
1712                 complete_request(mdsc, req);
1713                 return PTR_ERR(msg);
1714         }
1715         req->r_request = msg;
1716
1717         rhead = msg->front.iov_base;
1718         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1719         if (req->r_got_unsafe)
1720                 flags |= CEPH_MDS_FLAG_REPLAY;
1721         if (req->r_locked_dir)
1722                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1723         rhead->flags = cpu_to_le32(flags);
1724         rhead->num_fwd = req->r_num_fwd;
1725         rhead->num_retry = req->r_attempts - 1;
1726         rhead->ino = 0;
1727
1728         dout(" r_locked_dir = %p\n", req->r_locked_dir);
1729         return 0;
1730 }
1731
1732 /*
1733  * send request, or put it on the appropriate wait list.
1734  */
1735 static int __do_request(struct ceph_mds_client *mdsc,
1736                         struct ceph_mds_request *req)
1737 {
1738         struct ceph_mds_session *session = NULL;
1739         int mds = -1;
1740         int err = -EAGAIN;
1741
1742         if (req->r_err || req->r_got_result)
1743                 goto out;
1744
1745         if (req->r_timeout &&
1746             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1747                 dout("do_request timed out\n");
1748                 err = -EIO;
1749                 goto finish;
1750         }
1751
1752         mds = __choose_mds(mdsc, req);
1753         if (mds < 0 ||
1754             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1755                 dout("do_request no mds or not active, waiting for map\n");
1756                 list_add(&req->r_wait, &mdsc->waiting_for_map);
1757                 goto out;
1758         }
1759
1760         /* get, open session */
1761         session = __ceph_lookup_mds_session(mdsc, mds);
1762         if (!session) {
1763                 session = register_session(mdsc, mds);
1764                 if (IS_ERR(session)) {
1765                         err = PTR_ERR(session);
1766                         goto finish;
1767                 }
1768         }
1769         dout("do_request mds%d session %p state %s\n", mds, session,
1770              session_state_name(session->s_state));
1771         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1772             session->s_state != CEPH_MDS_SESSION_HUNG) {
1773                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1774                     session->s_state == CEPH_MDS_SESSION_CLOSING)
1775                         __open_session(mdsc, session);
1776                 list_add(&req->r_wait, &session->s_waiting);
1777                 goto out_session;
1778         }
1779
1780         /* send request */
1781         req->r_session = get_session(session);
1782         req->r_resend_mds = -1;   /* forget any previous mds hint */
1783
1784         if (req->r_request_started == 0)   /* note request start time */
1785                 req->r_request_started = jiffies;
1786
1787         err = __prepare_send_request(mdsc, req, mds);
1788         if (!err) {
1789                 ceph_msg_get(req->r_request);
1790                 ceph_con_send(&session->s_con, req->r_request);
1791         }
1792
1793 out_session:
1794         ceph_put_mds_session(session);
1795 out:
1796         return err;
1797
1798 finish:
1799         req->r_err = err;
1800         complete_request(mdsc, req);
1801         goto out;
1802 }
1803
1804 /*
1805  * called under mdsc->mutex
1806  */
1807 static void __wake_requests(struct ceph_mds_client *mdsc,
1808                             struct list_head *head)
1809 {
1810         struct ceph_mds_request *req, *nreq;
1811
1812         list_for_each_entry_safe(req, nreq, head, r_wait) {
1813                 list_del_init(&req->r_wait);
1814                 __do_request(mdsc, req);
1815         }
1816 }
1817
1818 /*
1819  * Wake up threads with requests pending for @mds, so that they can
1820  * resubmit their requests to a possibly different mds.
1821  */
1822 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1823 {
1824         struct ceph_mds_request *req;
1825         struct rb_node *p;
1826
1827         dout("kick_requests mds%d\n", mds);
1828         for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1829                 req = rb_entry(p, struct ceph_mds_request, r_node);
1830                 if (req->r_got_unsafe)
1831                         continue;
1832                 if (req->r_session &&
1833                     req->r_session->s_mds == mds) {
1834                         dout(" kicking tid %llu\n", req->r_tid);
1835                         put_request_session(req);
1836                         __do_request(mdsc, req);
1837                 }
1838         }
1839 }
1840
1841 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1842                               struct ceph_mds_request *req)
1843 {
1844         dout("submit_request on %p\n", req);
1845         mutex_lock(&mdsc->mutex);
1846         __register_request(mdsc, req, NULL);
1847         __do_request(mdsc, req);
1848         mutex_unlock(&mdsc->mutex);
1849 }
1850
1851 /*
1852  * Synchrously perform an mds request.  Take care of all of the
1853  * session setup, forwarding, retry details.
1854  */
1855 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1856                          struct inode *dir,
1857                          struct ceph_mds_request *req)
1858 {
1859         int err;
1860
1861         dout("do_request on %p\n", req);
1862
1863         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1864         if (req->r_inode)
1865                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1866         if (req->r_locked_dir)
1867                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1868         if (req->r_old_dentry)
1869                 ceph_get_cap_refs(
1870                         ceph_inode(req->r_old_dentry->d_parent->d_inode),
1871                         CEPH_CAP_PIN);
1872
1873         /* issue */
1874         mutex_lock(&mdsc->mutex);
1875         __register_request(mdsc, req, dir);
1876         __do_request(mdsc, req);
1877
1878         if (req->r_err) {
1879                 err = req->r_err;
1880                 __unregister_request(mdsc, req);
1881                 dout("do_request early error %d\n", err);
1882                 goto out;
1883         }
1884
1885         /* wait */
1886         mutex_unlock(&mdsc->mutex);
1887         dout("do_request waiting\n");
1888         if (req->r_timeout) {
1889                 err = (long)wait_for_completion_killable_timeout(
1890                         &req->r_completion, req->r_timeout);
1891                 if (err == 0)
1892                         err = -EIO;
1893         } else {
1894                 err = wait_for_completion_killable(&req->r_completion);
1895         }
1896         dout("do_request waited, got %d\n", err);
1897         mutex_lock(&mdsc->mutex);
1898
1899         /* only abort if we didn't race with a real reply */
1900         if (req->r_got_result) {
1901                 err = le32_to_cpu(req->r_reply_info.head->result);
1902         } else if (err < 0) {
1903                 dout("aborted request %lld with %d\n", req->r_tid, err);
1904
1905                 /*
1906                  * ensure we aren't running concurrently with
1907                  * ceph_fill_trace or ceph_readdir_prepopulate, which
1908                  * rely on locks (dir mutex) held by our caller.
1909                  */
1910                 mutex_lock(&req->r_fill_mutex);
1911                 req->r_err = err;
1912                 req->r_aborted = true;
1913                 mutex_unlock(&req->r_fill_mutex);
1914
1915                 if (req->r_locked_dir &&
1916                     (req->r_op & CEPH_MDS_OP_WRITE))
1917                         ceph_invalidate_dir_request(req);
1918         } else {
1919                 err = req->r_err;
1920         }
1921
1922 out:
1923         mutex_unlock(&mdsc->mutex);
1924         dout("do_request %p done, result %d\n", req, err);
1925         return err;
1926 }
1927
1928 /*
1929  * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
1930  * namespace request.
1931  */
1932 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
1933 {
1934         struct inode *inode = req->r_locked_dir;
1935         struct ceph_inode_info *ci = ceph_inode(inode);
1936
1937         dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
1938         spin_lock(&inode->i_lock);
1939         ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1940         ci->i_release_count++;
1941         spin_unlock(&inode->i_lock);
1942
1943         if (req->r_dentry)
1944                 ceph_invalidate_dentry_lease(req->r_dentry);
1945         if (req->r_old_dentry)
1946                 ceph_invalidate_dentry_lease(req->r_old_dentry);
1947 }
1948
1949 /*
1950  * Handle mds reply.
1951  *
1952  * We take the session mutex and parse and process the reply immediately.
1953  * This preserves the logical ordering of replies, capabilities, etc., sent
1954  * by the MDS as they are applied to our local cache.
1955  */
1956 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
1957 {
1958         struct ceph_mds_client *mdsc = session->s_mdsc;
1959         struct ceph_mds_request *req;
1960         struct ceph_mds_reply_head *head = msg->front.iov_base;
1961         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
1962         u64 tid;
1963         int err, result;
1964         int mds = session->s_mds;
1965
1966         if (msg->front.iov_len < sizeof(*head)) {
1967                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
1968                 ceph_msg_dump(msg);
1969                 return;
1970         }
1971
1972         /* get request, session */
1973         tid = le64_to_cpu(msg->hdr.tid);
1974         mutex_lock(&mdsc->mutex);
1975         req = __lookup_request(mdsc, tid);
1976         if (!req) {
1977                 dout("handle_reply on unknown tid %llu\n", tid);
1978                 mutex_unlock(&mdsc->mutex);
1979                 return;
1980         }
1981         dout("handle_reply %p\n", req);
1982
1983         /* correct session? */
1984         if (req->r_session != session) {
1985                 pr_err("mdsc_handle_reply got %llu on session mds%d"
1986                        " not mds%d\n", tid, session->s_mds,
1987                        req->r_session ? req->r_session->s_mds : -1);
1988                 mutex_unlock(&mdsc->mutex);
1989                 goto out;
1990         }
1991
1992         /* dup? */
1993         if ((req->r_got_unsafe && !head->safe) ||
1994             (req->r_got_safe && head->safe)) {
1995                 pr_warning("got a dup %s reply on %llu from mds%d\n",
1996                            head->safe ? "safe" : "unsafe", tid, mds);
1997                 mutex_unlock(&mdsc->mutex);
1998                 goto out;
1999         }
2000         if (req->r_got_safe && !head->safe) {
2001                 pr_warning("got unsafe after safe on %llu from mds%d\n",
2002                            tid, mds);
2003                 mutex_unlock(&mdsc->mutex);
2004                 goto out;
2005         }
2006
2007         result = le32_to_cpu(head->result);
2008
2009         /*
2010          * Handle an ESTALE
2011          * if we're not talking to the authority, send to them
2012          * if the authority has changed while we weren't looking,
2013          * send to new authority
2014          * Otherwise we just have to return an ESTALE
2015          */
2016         if (result == -ESTALE) {
2017                 dout("got ESTALE on request %llu", req->r_tid);
2018                 if (!req->r_inode) {
2019                         /* do nothing; not an authority problem */
2020                 } else if (req->r_direct_mode != USE_AUTH_MDS) {
2021                         dout("not using auth, setting for that now");
2022                         req->r_direct_mode = USE_AUTH_MDS;
2023                         __do_request(mdsc, req);
2024                         mutex_unlock(&mdsc->mutex);
2025                         goto out;
2026                 } else  {
2027                         struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2028                         struct ceph_cap *cap =
2029                                 ceph_get_cap_for_mds(ci, req->r_mds);;
2030
2031                         dout("already using auth");
2032                         if ((!cap || cap != ci->i_auth_cap) ||
2033                             (cap->mseq != req->r_sent_on_mseq)) {
2034                                 dout("but cap changed, so resending");
2035                                 __do_request(mdsc, req);
2036                                 mutex_unlock(&mdsc->mutex);
2037                                 goto out;
2038                         }
2039                 }
2040                 dout("have to return ESTALE on request %llu", req->r_tid);
2041         }
2042
2043
2044         if (head->safe) {
2045                 req->r_got_safe = true;
2046                 __unregister_request(mdsc, req);
2047                 complete_all(&req->r_safe_completion);
2048
2049                 if (req->r_got_unsafe) {
2050                         /*
2051                          * We already handled the unsafe response, now do the
2052                          * cleanup.  No need to examine the response; the MDS
2053                          * doesn't include any result info in the safe
2054                          * response.  And even if it did, there is nothing
2055                          * useful we could do with a revised return value.
2056                          */
2057                         dout("got safe reply %llu, mds%d\n", tid, mds);
2058                         list_del_init(&req->r_unsafe_item);
2059
2060                         /* last unsafe request during umount? */
2061                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2062                                 complete_all(&mdsc->safe_umount_waiters);
2063                         mutex_unlock(&mdsc->mutex);
2064                         goto out;
2065                 }
2066         } else {
2067                 req->r_got_unsafe = true;
2068                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2069         }
2070
2071         dout("handle_reply tid %lld result %d\n", tid, result);
2072         rinfo = &req->r_reply_info;
2073         err = parse_reply_info(msg, rinfo);
2074         mutex_unlock(&mdsc->mutex);
2075
2076         mutex_lock(&session->s_mutex);
2077         if (err < 0) {
2078                 pr_err("mdsc_handle_reply got corrupt reply mds%d\n", mds);
2079                 ceph_msg_dump(msg);
2080                 goto out_err;
2081         }
2082
2083         /* snap trace */
2084         if (rinfo->snapblob_len) {
2085                 down_write(&mdsc->snap_rwsem);
2086                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2087                                rinfo->snapblob + rinfo->snapblob_len,
2088                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2089                 downgrade_write(&mdsc->snap_rwsem);
2090         } else {
2091                 down_read(&mdsc->snap_rwsem);
2092         }
2093
2094         /* insert trace into our cache */
2095         mutex_lock(&req->r_fill_mutex);
2096         err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2097         if (err == 0) {
2098                 if (result == 0 && rinfo->dir_nr)
2099                         ceph_readdir_prepopulate(req, req->r_session);
2100                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2101         }
2102         mutex_unlock(&req->r_fill_mutex);
2103
2104         up_read(&mdsc->snap_rwsem);
2105 out_err:
2106         mutex_lock(&mdsc->mutex);
2107         if (!req->r_aborted) {
2108                 if (err) {
2109                         req->r_err = err;
2110                 } else {
2111                         req->r_reply = msg;
2112                         ceph_msg_get(msg);
2113                         req->r_got_result = true;
2114                 }
2115         } else {
2116                 dout("reply arrived after request %lld was aborted\n", tid);
2117         }
2118         mutex_unlock(&mdsc->mutex);
2119
2120         ceph_add_cap_releases(mdsc, req->r_session);
2121         mutex_unlock(&session->s_mutex);
2122
2123         /* kick calling process */
2124         complete_request(mdsc, req);
2125 out:
2126         ceph_mdsc_put_request(req);
2127         return;
2128 }
2129
2130
2131
2132 /*
2133  * handle mds notification that our request has been forwarded.
2134  */
2135 static void handle_forward(struct ceph_mds_client *mdsc,
2136                            struct ceph_mds_session *session,
2137                            struct ceph_msg *msg)
2138 {
2139         struct ceph_mds_request *req;
2140         u64 tid = le64_to_cpu(msg->hdr.tid);
2141         u32 next_mds;
2142         u32 fwd_seq;
2143         int err = -EINVAL;
2144         void *p = msg->front.iov_base;
2145         void *end = p + msg->front.iov_len;
2146
2147         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2148         next_mds = ceph_decode_32(&p);
2149         fwd_seq = ceph_decode_32(&p);
2150
2151         mutex_lock(&mdsc->mutex);
2152         req = __lookup_request(mdsc, tid);
2153         if (!req) {
2154                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2155                 goto out;  /* dup reply? */
2156         }
2157
2158         if (req->r_aborted) {
2159                 dout("forward tid %llu aborted, unregistering\n", tid);
2160                 __unregister_request(mdsc, req);
2161         } else if (fwd_seq <= req->r_num_fwd) {
2162                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2163                      tid, next_mds, req->r_num_fwd, fwd_seq);
2164         } else {
2165                 /* resend. forward race not possible; mds would drop */
2166                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2167                 BUG_ON(req->r_err);
2168                 BUG_ON(req->r_got_result);
2169                 req->r_num_fwd = fwd_seq;
2170                 req->r_resend_mds = next_mds;
2171                 put_request_session(req);
2172                 __do_request(mdsc, req);
2173         }
2174         ceph_mdsc_put_request(req);
2175 out:
2176         mutex_unlock(&mdsc->mutex);
2177         return;
2178
2179 bad:
2180         pr_err("mdsc_handle_forward decode error err=%d\n", err);
2181 }
2182
2183 /*
2184  * handle a mds session control message
2185  */
2186 static void handle_session(struct ceph_mds_session *session,
2187                            struct ceph_msg *msg)
2188 {
2189         struct ceph_mds_client *mdsc = session->s_mdsc;
2190         u32 op;
2191         u64 seq;
2192         int mds = session->s_mds;
2193         struct ceph_mds_session_head *h = msg->front.iov_base;
2194         int wake = 0;
2195
2196         /* decode */
2197         if (msg->front.iov_len != sizeof(*h))
2198                 goto bad;
2199         op = le32_to_cpu(h->op);
2200         seq = le64_to_cpu(h->seq);
2201
2202         mutex_lock(&mdsc->mutex);
2203         if (op == CEPH_SESSION_CLOSE)
2204                 __unregister_session(mdsc, session);
2205         /* FIXME: this ttl calculation is generous */
2206         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2207         mutex_unlock(&mdsc->mutex);
2208
2209         mutex_lock(&session->s_mutex);
2210
2211         dout("handle_session mds%d %s %p state %s seq %llu\n",
2212              mds, ceph_session_op_name(op), session,
2213              session_state_name(session->s_state), seq);
2214
2215         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2216                 session->s_state = CEPH_MDS_SESSION_OPEN;
2217                 pr_info("mds%d came back\n", session->s_mds);
2218         }
2219
2220         switch (op) {
2221         case CEPH_SESSION_OPEN:
2222                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2223                         pr_info("mds%d reconnect success\n", session->s_mds);
2224                 session->s_state = CEPH_MDS_SESSION_OPEN;
2225                 renewed_caps(mdsc, session, 0);
2226                 wake = 1;
2227                 if (mdsc->stopping)
2228                         __close_session(mdsc, session);
2229                 break;
2230
2231         case CEPH_SESSION_RENEWCAPS:
2232                 if (session->s_renew_seq == seq)
2233                         renewed_caps(mdsc, session, 1);
2234                 break;
2235
2236         case CEPH_SESSION_CLOSE:
2237                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2238                         pr_info("mds%d reconnect denied\n", session->s_mds);
2239                 remove_session_caps(session);
2240                 wake = 1; /* for good measure */
2241                 wake_up_all(&mdsc->session_close_wq);
2242                 kick_requests(mdsc, mds);
2243                 break;
2244
2245         case CEPH_SESSION_STALE:
2246                 pr_info("mds%d caps went stale, renewing\n",
2247                         session->s_mds);
2248                 spin_lock(&session->s_cap_lock);
2249                 session->s_cap_gen++;
2250                 session->s_cap_ttl = 0;
2251                 spin_unlock(&session->s_cap_lock);
2252                 send_renew_caps(mdsc, session);
2253                 break;
2254
2255         case CEPH_SESSION_RECALL_STATE:
2256                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2257                 break;
2258
2259         default:
2260                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2261                 WARN_ON(1);
2262         }
2263
2264         mutex_unlock(&session->s_mutex);
2265         if (wake) {
2266                 mutex_lock(&mdsc->mutex);
2267                 __wake_requests(mdsc, &session->s_waiting);
2268                 mutex_unlock(&mdsc->mutex);
2269         }
2270         return;
2271
2272 bad:
2273         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2274                (int)msg->front.iov_len);
2275         ceph_msg_dump(msg);
2276         return;
2277 }
2278
2279
2280 /*
2281  * called under session->mutex.
2282  */
2283 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2284                                    struct ceph_mds_session *session)
2285 {
2286         struct ceph_mds_request *req, *nreq;
2287         int err;
2288
2289         dout("replay_unsafe_requests mds%d\n", session->s_mds);
2290
2291         mutex_lock(&mdsc->mutex);
2292         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2293                 err = __prepare_send_request(mdsc, req, session->s_mds);
2294                 if (!err) {
2295                         ceph_msg_get(req->r_request);
2296                         ceph_con_send(&session->s_con, req->r_request);
2297                 }
2298         }
2299         mutex_unlock(&mdsc->mutex);
2300 }
2301
2302 /*
2303  * Encode information about a cap for a reconnect with the MDS.
2304  */
2305 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2306                           void *arg)
2307 {
2308         union {
2309                 struct ceph_mds_cap_reconnect v2;
2310                 struct ceph_mds_cap_reconnect_v1 v1;
2311         } rec;
2312         size_t reclen;
2313         struct ceph_inode_info *ci;
2314         struct ceph_reconnect_state *recon_state = arg;
2315         struct ceph_pagelist *pagelist = recon_state->pagelist;
2316         char *path;
2317         int pathlen, err;
2318         u64 pathbase;
2319         struct dentry *dentry;
2320
2321         ci = cap->ci;
2322
2323         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2324              inode, ceph_vinop(inode), cap, cap->cap_id,
2325              ceph_cap_string(cap->issued));
2326         err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2327         if (err)
2328                 return err;
2329
2330         dentry = d_find_alias(inode);
2331         if (dentry) {
2332                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2333                 if (IS_ERR(path)) {
2334                         err = PTR_ERR(path);
2335                         goto out_dput;
2336                 }
2337         } else {
2338                 path = NULL;
2339                 pathlen = 0;
2340         }
2341         err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2342         if (err)
2343                 goto out_free;
2344
2345         spin_lock(&inode->i_lock);
2346         cap->seq = 0;        /* reset cap seq */
2347         cap->issue_seq = 0;  /* and issue_seq */
2348
2349         if (recon_state->flock) {
2350                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2351                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2352                 rec.v2.issued = cpu_to_le32(cap->issued);
2353                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2354                 rec.v2.pathbase = cpu_to_le64(pathbase);
2355                 rec.v2.flock_len = 0;
2356                 reclen = sizeof(rec.v2);
2357         } else {
2358                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2359                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2360                 rec.v1.issued = cpu_to_le32(cap->issued);
2361                 rec.v1.size = cpu_to_le64(inode->i_size);
2362                 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2363                 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2364                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2365                 rec.v1.pathbase = cpu_to_le64(pathbase);
2366                 reclen = sizeof(rec.v1);
2367         }
2368         spin_unlock(&inode->i_lock);
2369
2370         if (recon_state->flock) {
2371                 int num_fcntl_locks, num_flock_locks;
2372                 struct ceph_pagelist_cursor trunc_point;
2373
2374                 ceph_pagelist_set_cursor(pagelist, &trunc_point);
2375                 do {
2376                         lock_flocks();
2377                         ceph_count_locks(inode, &num_fcntl_locks,
2378                                          &num_flock_locks);
2379                         rec.v2.flock_len = (2*sizeof(u32) +
2380                                             (num_fcntl_locks+num_flock_locks) *
2381                                             sizeof(struct ceph_filelock));
2382                         unlock_flocks();
2383
2384                         /* pre-alloc pagelist */
2385                         ceph_pagelist_truncate(pagelist, &trunc_point);
2386                         err = ceph_pagelist_append(pagelist, &rec, reclen);
2387                         if (!err)
2388                                 err = ceph_pagelist_reserve(pagelist,
2389                                                             rec.v2.flock_len);
2390
2391                         /* encode locks */
2392                         if (!err) {
2393                                 lock_flocks();
2394                                 err = ceph_encode_locks(inode,
2395                                                         pagelist,
2396                                                         num_fcntl_locks,
2397                                                         num_flock_locks);
2398                                 unlock_flocks();
2399                         }
2400                 } while (err == -ENOSPC);
2401         } else {
2402                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2403         }
2404
2405 out_free:
2406         kfree(path);
2407 out_dput:
2408         dput(dentry);
2409         return err;
2410 }
2411
2412
2413 /*
2414  * If an MDS fails and recovers, clients need to reconnect in order to
2415  * reestablish shared state.  This includes all caps issued through
2416  * this session _and_ the snap_realm hierarchy.  Because it's not
2417  * clear which snap realms the mds cares about, we send everything we
2418  * know about.. that ensures we'll then get any new info the
2419  * recovering MDS might have.
2420  *
2421  * This is a relatively heavyweight operation, but it's rare.
2422  *
2423  * called with mdsc->mutex held.
2424  */
2425 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2426                                struct ceph_mds_session *session)
2427 {
2428         struct ceph_msg *reply;
2429         struct rb_node *p;
2430         int mds = session->s_mds;
2431         int err = -ENOMEM;
2432         struct ceph_pagelist *pagelist;
2433         struct ceph_reconnect_state recon_state;
2434
2435         pr_info("mds%d reconnect start\n", mds);
2436
2437         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2438         if (!pagelist)
2439                 goto fail_nopagelist;
2440         ceph_pagelist_init(pagelist);
2441
2442         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
2443         if (!reply)
2444                 goto fail_nomsg;
2445
2446         mutex_lock(&session->s_mutex);
2447         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2448         session->s_seq = 0;
2449
2450         ceph_con_open(&session->s_con,
2451                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2452
2453         /* replay unsafe requests */
2454         replay_unsafe_requests(mdsc, session);
2455
2456         down_read(&mdsc->snap_rwsem);
2457
2458         dout("session %p state %s\n", session,
2459              session_state_name(session->s_state));
2460
2461         /* drop old cap expires; we're about to reestablish that state */
2462         discard_cap_releases(mdsc, session);
2463
2464         /* traverse this session's caps */
2465         err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2466         if (err)
2467                 goto fail;
2468
2469         recon_state.pagelist = pagelist;
2470         recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2471         err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2472         if (err < 0)
2473                 goto fail;
2474
2475         /*
2476          * snaprealms.  we provide mds with the ino, seq (version), and
2477          * parent for all of our realms.  If the mds has any newer info,
2478          * it will tell us.
2479          */
2480         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2481                 struct ceph_snap_realm *realm =
2482                         rb_entry(p, struct ceph_snap_realm, node);
2483                 struct ceph_mds_snaprealm_reconnect sr_rec;
2484
2485                 dout(" adding snap realm %llx seq %lld parent %llx\n",
2486                      realm->ino, realm->seq, realm->parent_ino);
2487                 sr_rec.ino = cpu_to_le64(realm->ino);
2488                 sr_rec.seq = cpu_to_le64(realm->seq);
2489                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
2490                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2491                 if (err)
2492                         goto fail;
2493         }
2494
2495         reply->pagelist = pagelist;
2496         if (recon_state.flock)
2497                 reply->hdr.version = cpu_to_le16(2);
2498         reply->hdr.data_len = cpu_to_le32(pagelist->length);
2499         reply->nr_pages = calc_pages_for(0, pagelist->length);
2500         ceph_con_send(&session->s_con, reply);
2501
2502         mutex_unlock(&session->s_mutex);
2503
2504         mutex_lock(&mdsc->mutex);
2505         __wake_requests(mdsc, &session->s_waiting);
2506         mutex_unlock(&mdsc->mutex);
2507
2508         up_read(&mdsc->snap_rwsem);
2509         return;
2510
2511 fail:
2512         ceph_msg_put(reply);
2513         up_read(&mdsc->snap_rwsem);
2514         mutex_unlock(&session->s_mutex);
2515 fail_nomsg:
2516         ceph_pagelist_release(pagelist);
2517         kfree(pagelist);
2518 fail_nopagelist:
2519         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2520         return;
2521 }
2522
2523
2524 /*
2525  * compare old and new mdsmaps, kicking requests
2526  * and closing out old connections as necessary
2527  *
2528  * called under mdsc->mutex.
2529  */
2530 static void check_new_map(struct ceph_mds_client *mdsc,
2531                           struct ceph_mdsmap *newmap,
2532                           struct ceph_mdsmap *oldmap)
2533 {
2534         int i;
2535         int oldstate, newstate;
2536         struct ceph_mds_session *s;
2537
2538         dout("check_new_map new %u old %u\n",
2539              newmap->m_epoch, oldmap->m_epoch);
2540
2541         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2542                 if (mdsc->sessions[i] == NULL)
2543                         continue;
2544                 s = mdsc->sessions[i];
2545                 oldstate = ceph_mdsmap_get_state(oldmap, i);
2546                 newstate = ceph_mdsmap_get_state(newmap, i);
2547
2548                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2549                      i, ceph_mds_state_name(oldstate),
2550                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2551                      ceph_mds_state_name(newstate),
2552                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2553                      session_state_name(s->s_state));
2554
2555                 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2556                            ceph_mdsmap_get_addr(newmap, i),
2557                            sizeof(struct ceph_entity_addr))) {
2558                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2559                                 /* the session never opened, just close it
2560                                  * out now */
2561                                 __wake_requests(mdsc, &s->s_waiting);
2562                                 __unregister_session(mdsc, s);
2563                         } else {
2564                                 /* just close it */
2565                                 mutex_unlock(&mdsc->mutex);
2566                                 mutex_lock(&s->s_mutex);
2567                                 mutex_lock(&mdsc->mutex);
2568                                 ceph_con_close(&s->s_con);
2569                                 mutex_unlock(&s->s_mutex);
2570                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2571                         }
2572
2573                         /* kick any requests waiting on the recovering mds */
2574                         kick_requests(mdsc, i);
2575                 } else if (oldstate == newstate) {
2576                         continue;  /* nothing new with this mds */
2577                 }
2578
2579                 /*
2580                  * send reconnect?
2581                  */
2582                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2583                     newstate >= CEPH_MDS_STATE_RECONNECT) {
2584                         mutex_unlock(&mdsc->mutex);
2585                         send_mds_reconnect(mdsc, s);
2586                         mutex_lock(&mdsc->mutex);
2587                 }
2588
2589                 /*
2590                  * kick request on any mds that has gone active.
2591                  */
2592                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2593                     newstate >= CEPH_MDS_STATE_ACTIVE) {
2594                         if (oldstate != CEPH_MDS_STATE_CREATING &&
2595                             oldstate != CEPH_MDS_STATE_STARTING)
2596                                 pr_info("mds%d recovery completed\n", s->s_mds);
2597                         kick_requests(mdsc, i);
2598                         ceph_kick_flushing_caps(mdsc, s);
2599                         wake_up_session_caps(s, 1);
2600                 }
2601         }
2602
2603         for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2604                 s = mdsc->sessions[i];
2605                 if (!s)
2606                         continue;
2607                 if (!ceph_mdsmap_is_laggy(newmap, i))
2608                         continue;
2609                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2610                     s->s_state == CEPH_MDS_SESSION_HUNG ||
2611                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
2612                         dout(" connecting to export targets of laggy mds%d\n",
2613                              i);
2614                         __open_export_target_sessions(mdsc, s);
2615                 }
2616         }
2617 }
2618
2619
2620
2621 /*
2622  * leases
2623  */
2624
2625 /*
2626  * caller must hold session s_mutex, dentry->d_lock
2627  */
2628 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2629 {
2630         struct ceph_dentry_info *di = ceph_dentry(dentry);
2631
2632         ceph_put_mds_session(di->lease_session);
2633         di->lease_session = NULL;
2634 }
2635
2636 static void handle_lease(struct ceph_mds_client *mdsc,
2637                          struct ceph_mds_session *session,
2638                          struct ceph_msg *msg)
2639 {
2640         struct super_block *sb = mdsc->fsc->sb;
2641         struct inode *inode;
2642         struct ceph_inode_info *ci;
2643         struct dentry *parent, *dentry;
2644         struct ceph_dentry_info *di;
2645         int mds = session->s_mds;
2646         struct ceph_mds_lease *h = msg->front.iov_base;
2647         u32 seq;
2648         struct ceph_vino vino;
2649         int mask;
2650         struct qstr dname;
2651         int release = 0;
2652
2653         dout("handle_lease from mds%d\n", mds);
2654
2655         /* decode */
2656         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2657                 goto bad;
2658         vino.ino = le64_to_cpu(h->ino);
2659         vino.snap = CEPH_NOSNAP;
2660         mask = le16_to_cpu(h->mask);
2661         seq = le32_to_cpu(h->seq);
2662         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2663         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2664         if (dname.len != get_unaligned_le32(h+1))
2665                 goto bad;
2666
2667         mutex_lock(&session->s_mutex);
2668         session->s_seq++;
2669
2670         /* lookup inode */
2671         inode = ceph_find_inode(sb, vino);
2672         dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
2673              ceph_lease_op_name(h->action), mask, vino.ino, inode,
2674              dname.len, dname.name);
2675         if (inode == NULL) {
2676                 dout("handle_lease no inode %llx\n", vino.ino);
2677                 goto release;
2678         }
2679         ci = ceph_inode(inode);
2680
2681         /* dentry */
2682         parent = d_find_alias(inode);
2683         if (!parent) {
2684                 dout("no parent dentry on inode %p\n", inode);
2685                 WARN_ON(1);
2686                 goto release;  /* hrm... */
2687         }
2688         dname.hash = full_name_hash(dname.name, dname.len);
2689         dentry = d_lookup(parent, &dname);
2690         dput(parent);
2691         if (!dentry)
2692                 goto release;
2693
2694         spin_lock(&dentry->d_lock);
2695         di = ceph_dentry(dentry);
2696         switch (h->action) {
2697         case CEPH_MDS_LEASE_REVOKE:
2698                 if (di && di->lease_session == session) {
2699                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2700                                 h->seq = cpu_to_le32(di->lease_seq);
2701                         __ceph_mdsc_drop_dentry_lease(dentry);
2702                 }
2703                 release = 1;
2704                 break;
2705
2706         case CEPH_MDS_LEASE_RENEW:
2707                 if (di && di->lease_session == session &&
2708                     di->lease_gen == session->s_cap_gen &&
2709                     di->lease_renew_from &&
2710                     di->lease_renew_after == 0) {
2711                         unsigned long duration =
2712                                 le32_to_cpu(h->duration_ms) * HZ / 1000;
2713
2714                         di->lease_seq = seq;
2715                         dentry->d_time = di->lease_renew_from + duration;
2716                         di->lease_renew_after = di->lease_renew_from +
2717                                 (duration >> 1);
2718                         di->lease_renew_from = 0;
2719                 }
2720                 break;
2721         }
2722         spin_unlock(&dentry->d_lock);
2723         dput(dentry);
2724
2725         if (!release)
2726                 goto out;
2727
2728 release:
2729         /* let's just reuse the same message */
2730         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2731         ceph_msg_get(msg);
2732         ceph_con_send(&session->s_con, msg);
2733
2734 out:
2735         iput(inode);
2736         mutex_unlock(&session->s_mutex);
2737         return;
2738
2739 bad:
2740         pr_err("corrupt lease message\n");
2741         ceph_msg_dump(msg);
2742 }
2743
2744 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2745                               struct inode *inode,
2746                               struct dentry *dentry, char action,
2747                               u32 seq)
2748 {
2749         struct ceph_msg *msg;
2750         struct ceph_mds_lease *lease;
2751         int len = sizeof(*lease) + sizeof(u32);
2752         int dnamelen = 0;
2753
2754         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2755              inode, dentry, ceph_lease_op_name(action), session->s_mds);
2756         dnamelen = dentry->d_name.len;
2757         len += dnamelen;
2758
2759         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
2760         if (!msg)
2761                 return;
2762         lease = msg->front.iov_base;
2763         lease->action = action;
2764         lease->mask = cpu_to_le16(1);
2765         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2766         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2767         lease->seq = cpu_to_le32(seq);
2768         put_unaligned_le32(dnamelen, lease + 1);
2769         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2770
2771         /*
2772          * if this is a preemptive lease RELEASE, no need to
2773          * flush request stream, since the actual request will
2774          * soon follow.
2775          */
2776         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2777
2778         ceph_con_send(&session->s_con, msg);
2779 }
2780
2781 /*
2782  * Preemptively release a lease we expect to invalidate anyway.
2783  * Pass @inode always, @dentry is optional.
2784  */
2785 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2786                              struct dentry *dentry, int mask)
2787 {
2788         struct ceph_dentry_info *di;
2789         struct ceph_mds_session *session;
2790         u32 seq;
2791
2792         BUG_ON(inode == NULL);
2793         BUG_ON(dentry == NULL);
2794         BUG_ON(mask == 0);
2795
2796         /* is dentry lease valid? */
2797         spin_lock(&dentry->d_lock);
2798         di = ceph_dentry(dentry);
2799         if (!di || !di->lease_session ||
2800             di->lease_session->s_mds < 0 ||
2801             di->lease_gen != di->lease_session->s_cap_gen ||
2802             !time_before(jiffies, dentry->d_time)) {
2803                 dout("lease_release inode %p dentry %p -- "
2804                      "no lease on %d\n",
2805                      inode, dentry, mask);
2806                 spin_unlock(&dentry->d_lock);
2807                 return;
2808         }
2809
2810         /* we do have a lease on this dentry; note mds and seq */
2811         session = ceph_get_mds_session(di->lease_session);
2812         seq = di->lease_seq;
2813         __ceph_mdsc_drop_dentry_lease(dentry);
2814         spin_unlock(&dentry->d_lock);
2815
2816         dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2817              inode, dentry, mask, session->s_mds);
2818         ceph_mdsc_lease_send_msg(session, inode, dentry,
2819                                  CEPH_MDS_LEASE_RELEASE, seq);
2820         ceph_put_mds_session(session);
2821 }
2822
2823 /*
2824  * drop all leases (and dentry refs) in preparation for umount
2825  */
2826 static void drop_leases(struct ceph_mds_client *mdsc)
2827 {
2828         int i;
2829
2830         dout("drop_leases\n");
2831         mutex_lock(&mdsc->mutex);
2832         for (i = 0; i < mdsc->max_sessions; i++) {
2833                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2834                 if (!s)
2835                         continue;
2836                 mutex_unlock(&mdsc->mutex);
2837                 mutex_lock(&s->s_mutex);
2838                 mutex_unlock(&s->s_mutex);
2839                 ceph_put_mds_session(s);
2840                 mutex_lock(&mdsc->mutex);
2841         }
2842         mutex_unlock(&mdsc->mutex);
2843 }
2844
2845
2846
2847 /*
2848  * delayed work -- periodically trim expired leases, renew caps with mds
2849  */
2850 static void schedule_delayed(struct ceph_mds_client *mdsc)
2851 {
2852         int delay = 5;
2853         unsigned hz = round_jiffies_relative(HZ * delay);
2854         schedule_delayed_work(&mdsc->delayed_work, hz);
2855 }
2856
2857 static void delayed_work(struct work_struct *work)
2858 {
2859         int i;
2860         struct ceph_mds_client *mdsc =
2861                 container_of(work, struct ceph_mds_client, delayed_work.work);
2862         int renew_interval;
2863         int renew_caps;
2864
2865         dout("mdsc delayed_work\n");
2866         ceph_check_delayed_caps(mdsc);
2867
2868         mutex_lock(&mdsc->mutex);
2869         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2870         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2871                                    mdsc->last_renew_caps);
2872         if (renew_caps)
2873                 mdsc->last_renew_caps = jiffies;
2874
2875         for (i = 0; i < mdsc->max_sessions; i++) {
2876                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2877                 if (s == NULL)
2878                         continue;
2879                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2880                         dout("resending session close request for mds%d\n",
2881                              s->s_mds);
2882                         request_close_session(mdsc, s);
2883                         ceph_put_mds_session(s);
2884                         continue;
2885                 }
2886                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2887                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2888                                 s->s_state = CEPH_MDS_SESSION_HUNG;
2889                                 pr_info("mds%d hung\n", s->s_mds);
2890                         }
2891                 }
2892                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2893                         /* this mds is failed or recovering, just wait */
2894                         ceph_put_mds_session(s);
2895                         continue;
2896                 }
2897                 mutex_unlock(&mdsc->mutex);
2898
2899                 mutex_lock(&s->s_mutex);
2900                 if (renew_caps)
2901                         send_renew_caps(mdsc, s);
2902                 else
2903                         ceph_con_keepalive(&s->s_con);
2904                 ceph_add_cap_releases(mdsc, s);
2905                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2906                     s->s_state == CEPH_MDS_SESSION_HUNG)
2907                         ceph_send_cap_releases(mdsc, s);
2908                 mutex_unlock(&s->s_mutex);
2909                 ceph_put_mds_session(s);
2910
2911                 mutex_lock(&mdsc->mutex);
2912         }
2913         mutex_unlock(&mdsc->mutex);
2914
2915         schedule_delayed(mdsc);
2916 }
2917
2918 int ceph_mdsc_init(struct ceph_fs_client *fsc)
2919
2920 {
2921         struct ceph_mds_client *mdsc;
2922
2923         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
2924         if (!mdsc)
2925                 return -ENOMEM;
2926         mdsc->fsc = fsc;
2927         fsc->mdsc = mdsc;
2928         mutex_init(&mdsc->mutex);
2929         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2930         if (mdsc->mdsmap == NULL)
2931                 return -ENOMEM;
2932
2933         init_completion(&mdsc->safe_umount_waiters);
2934         init_waitqueue_head(&mdsc->session_close_wq);
2935         INIT_LIST_HEAD(&mdsc->waiting_for_map);
2936         mdsc->sessions = NULL;
2937         mdsc->max_sessions = 0;
2938         mdsc->stopping = 0;
2939         init_rwsem(&mdsc->snap_rwsem);
2940         mdsc->snap_realms = RB_ROOT;
2941         INIT_LIST_HEAD(&mdsc->snap_empty);
2942         spin_lock_init(&mdsc->snap_empty_lock);
2943         mdsc->last_tid = 0;
2944         mdsc->request_tree = RB_ROOT;
2945         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
2946         mdsc->last_renew_caps = jiffies;
2947         INIT_LIST_HEAD(&mdsc->cap_delay_list);
2948         spin_lock_init(&mdsc->cap_delay_lock);
2949         INIT_LIST_HEAD(&mdsc->snap_flush_list);
2950         spin_lock_init(&mdsc->snap_flush_lock);
2951         mdsc->cap_flush_seq = 0;
2952         INIT_LIST_HEAD(&mdsc->cap_dirty);
2953         mdsc->num_cap_flushing = 0;
2954         spin_lock_init(&mdsc->cap_dirty_lock);
2955         init_waitqueue_head(&mdsc->cap_flushing_wq);
2956         spin_lock_init(&mdsc->dentry_lru_lock);
2957         INIT_LIST_HEAD(&mdsc->dentry_lru);
2958
2959         ceph_caps_init(mdsc);
2960         ceph_adjust_min_caps(mdsc, fsc->min_caps);
2961
2962         return 0;
2963 }
2964
2965 /*
2966  * Wait for safe replies on open mds requests.  If we time out, drop
2967  * all requests from the tree to avoid dangling dentry refs.
2968  */
2969 static void wait_requests(struct ceph_mds_client *mdsc)
2970 {
2971         struct ceph_mds_request *req;
2972         struct ceph_fs_client *fsc = mdsc->fsc;
2973
2974         mutex_lock(&mdsc->mutex);
2975         if (__get_oldest_req(mdsc)) {
2976                 mutex_unlock(&mdsc->mutex);
2977
2978                 dout("wait_requests waiting for requests\n");
2979                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
2980                                     fsc->client->options->mount_timeout * HZ);
2981
2982                 /* tear down remaining requests */
2983                 mutex_lock(&mdsc->mutex);
2984                 while ((req = __get_oldest_req(mdsc))) {
2985                         dout("wait_requests timed out on tid %llu\n",
2986                              req->r_tid);
2987                         __unregister_request(mdsc, req);
2988                 }
2989         }
2990         mutex_unlock(&mdsc->mutex);
2991         dout("wait_requests done\n");
2992 }
2993
2994 /*
2995  * called before mount is ro, and before dentries are torn down.
2996  * (hmm, does this still race with new lookups?)
2997  */
2998 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
2999 {
3000         dout("pre_umount\n");
3001         mdsc->stopping = 1;
3002
3003         drop_leases(mdsc);
3004         ceph_flush_dirty_caps(mdsc);
3005         wait_requests(mdsc);
3006
3007         /*
3008          * wait for reply handlers to drop their request refs and
3009          * their inode/dcache refs
3010          */
3011         ceph_msgr_flush();
3012 }
3013
3014 /*
3015  * wait for all write mds requests to flush.
3016  */
3017 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3018 {
3019         struct ceph_mds_request *req = NULL, *nextreq;
3020         struct rb_node *n;
3021
3022         mutex_lock(&mdsc->mutex);
3023         dout("wait_unsafe_requests want %lld\n", want_tid);
3024 restart:
3025         req = __get_oldest_req(mdsc);
3026         while (req && req->r_tid <= want_tid) {
3027                 /* find next request */
3028                 n = rb_next(&req->r_node);
3029                 if (n)
3030                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3031                 else
3032                         nextreq = NULL;
3033                 if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3034                         /* write op */
3035                         ceph_mdsc_get_request(req);
3036                         if (nextreq)
3037                                 ceph_mdsc_get_request(nextreq);
3038                         mutex_unlock(&mdsc->mutex);
3039                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3040                              req->r_tid, want_tid);
3041                         wait_for_completion(&req->r_safe_completion);
3042                         mutex_lock(&mdsc->mutex);
3043                         ceph_mdsc_put_request(req);
3044                         if (!nextreq)
3045                                 break;  /* next dne before, so we're done! */
3046                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
3047                                 /* next request was removed from tree */
3048                                 ceph_mdsc_put_request(nextreq);
3049                                 goto restart;
3050                         }
3051                         ceph_mdsc_put_request(nextreq);  /* won't go away */
3052                 }
3053                 req = nextreq;
3054         }
3055         mutex_unlock(&mdsc->mutex);
3056         dout("wait_unsafe_requests done\n");
3057 }
3058
3059 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3060 {
3061         u64 want_tid, want_flush;
3062
3063         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3064                 return;
3065
3066         dout("sync\n");
3067         mutex_lock(&mdsc->mutex);
3068         want_tid = mdsc->last_tid;
3069         want_flush = mdsc->cap_flush_seq;
3070         mutex_unlock(&mdsc->mutex);
3071         dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3072
3073         ceph_flush_dirty_caps(mdsc);
3074
3075         wait_unsafe_requests(mdsc, want_tid);
3076         wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3077 }
3078
3079 /*
3080  * true if all sessions are closed, or we force unmount
3081  */
3082 bool done_closing_sessions(struct ceph_mds_client *mdsc)
3083 {
3084         int i, n = 0;
3085
3086         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3087                 return true;
3088
3089         mutex_lock(&mdsc->mutex);
3090         for (i = 0; i < mdsc->max_sessions; i++)
3091                 if (mdsc->sessions[i])
3092                         n++;
3093         mutex_unlock(&mdsc->mutex);
3094         return n == 0;
3095 }
3096
3097 /*
3098  * called after sb is ro.
3099  */
3100 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3101 {
3102         struct ceph_mds_session *session;
3103         int i;
3104         struct ceph_fs_client *fsc = mdsc->fsc;
3105         unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3106
3107         dout("close_sessions\n");
3108
3109         /* close sessions */
3110         mutex_lock(&mdsc->mutex);
3111         for (i = 0; i < mdsc->max_sessions; i++) {
3112                 session = __ceph_lookup_mds_session(mdsc, i);
3113                 if (!session)
3114                         continue;
3115                 mutex_unlock(&mdsc->mutex);
3116                 mutex_lock(&session->s_mutex);
3117                 __close_session(mdsc, session);
3118                 mutex_unlock(&session->s_mutex);
3119                 ceph_put_mds_session(session);
3120                 mutex_lock(&mdsc->mutex);
3121         }
3122         mutex_unlock(&mdsc->mutex);
3123
3124         dout("waiting for sessions to close\n");
3125         wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3126                            timeout);
3127
3128         /* tear down remaining sessions */
3129         mutex_lock(&mdsc->mutex);
3130         for (i = 0; i < mdsc->max_sessions; i++) {
3131                 if (mdsc->sessions[i]) {
3132                         session = get_session(mdsc->sessions[i]);
3133                         __unregister_session(mdsc, session);
3134                         mutex_unlock(&mdsc->mutex);
3135                         mutex_lock(&session->s_mutex);
3136                         remove_session_caps(session);
3137                         mutex_unlock(&session->s_mutex);
3138                         ceph_put_mds_session(session);
3139                         mutex_lock(&mdsc->mutex);
3140                 }
3141         }
3142         WARN_ON(!list_empty(&mdsc->cap_delay_list));
3143         mutex_unlock(&mdsc->mutex);
3144
3145         ceph_cleanup_empty_realms(mdsc);
3146
3147         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3148
3149         dout("stopped\n");
3150 }
3151
3152 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3153 {
3154         dout("stop\n");
3155         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3156         if (mdsc->mdsmap)
3157                 ceph_mdsmap_destroy(mdsc->mdsmap);
3158         kfree(mdsc->sessions);
3159         ceph_caps_finalize(mdsc);
3160 }
3161
3162 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3163 {
3164         struct ceph_mds_client *mdsc = fsc->mdsc;
3165
3166         ceph_mdsc_stop(mdsc);
3167         fsc->mdsc = NULL;
3168         kfree(mdsc);
3169 }
3170
3171
3172 /*
3173  * handle mds map update.
3174  */
3175 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3176 {
3177         u32 epoch;
3178         u32 maplen;
3179         void *p = msg->front.iov_base;
3180         void *end = p + msg->front.iov_len;
3181         struct ceph_mdsmap *newmap, *oldmap;
3182         struct ceph_fsid fsid;
3183         int err = -EINVAL;
3184
3185         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3186         ceph_decode_copy(&p, &fsid, sizeof(fsid));
3187         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3188                 return;
3189         epoch = ceph_decode_32(&p);
3190         maplen = ceph_decode_32(&p);
3191         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3192
3193         /* do we need it? */
3194         ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3195         mutex_lock(&mdsc->mutex);
3196         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3197                 dout("handle_map epoch %u <= our %u\n",
3198                      epoch, mdsc->mdsmap->m_epoch);
3199                 mutex_unlock(&mdsc->mutex);
3200                 return;
3201         }
3202
3203         newmap = ceph_mdsmap_decode(&p, end);
3204         if (IS_ERR(newmap)) {
3205                 err = PTR_ERR(newmap);
3206                 goto bad_unlock;
3207         }
3208
3209         /* swap into place */
3210         if (mdsc->mdsmap) {
3211                 oldmap = mdsc->mdsmap;
3212                 mdsc->mdsmap = newmap;
3213                 check_new_map(mdsc, newmap, oldmap);
3214                 ceph_mdsmap_destroy(oldmap);
3215         } else {
3216                 mdsc->mdsmap = newmap;  /* first mds map */
3217         }
3218         mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3219
3220         __wake_requests(mdsc, &mdsc->waiting_for_map);
3221
3222         mutex_unlock(&mdsc->mutex);
3223         schedule_delayed(mdsc);
3224         return;
3225
3226 bad_unlock:
3227         mutex_unlock(&mdsc->mutex);
3228 bad:
3229         pr_err("error decoding mdsmap %d\n", err);
3230         return;
3231 }
3232
3233 static struct ceph_connection *con_get(struct ceph_connection *con)
3234 {
3235         struct ceph_mds_session *s = con->private;
3236
3237         if (get_session(s)) {
3238                 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3239                 return con;
3240         }
3241         dout("mdsc con_get %p FAIL\n", s);
3242         return NULL;
3243 }
3244
3245 static void con_put(struct ceph_connection *con)
3246 {
3247         struct ceph_mds_session *s = con->private;
3248
3249         ceph_put_mds_session(s);
3250         dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
3251 }
3252
3253 /*
3254  * if the client is unresponsive for long enough, the mds will kill
3255  * the session entirely.
3256  */
3257 static void peer_reset(struct ceph_connection *con)
3258 {
3259         struct ceph_mds_session *s = con->private;
3260         struct ceph_mds_client *mdsc = s->s_mdsc;
3261
3262         pr_warning("mds%d closed our session\n", s->s_mds);
3263         send_mds_reconnect(mdsc, s);
3264 }
3265
3266 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3267 {
3268         struct ceph_mds_session *s = con->private;
3269         struct ceph_mds_client *mdsc = s->s_mdsc;
3270         int type = le16_to_cpu(msg->hdr.type);
3271
3272         mutex_lock(&mdsc->mutex);
3273         if (__verify_registered_session(mdsc, s) < 0) {
3274                 mutex_unlock(&mdsc->mutex);
3275                 goto out;
3276         }
3277         mutex_unlock(&mdsc->mutex);
3278
3279         switch (type) {
3280         case CEPH_MSG_MDS_MAP:
3281                 ceph_mdsc_handle_map(mdsc, msg);
3282                 break;
3283         case CEPH_MSG_CLIENT_SESSION:
3284                 handle_session(s, msg);
3285                 break;
3286         case CEPH_MSG_CLIENT_REPLY:
3287                 handle_reply(s, msg);
3288                 break;
3289         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3290                 handle_forward(mdsc, s, msg);
3291                 break;
3292         case CEPH_MSG_CLIENT_CAPS:
3293                 ceph_handle_caps(s, msg);
3294                 break;
3295         case CEPH_MSG_CLIENT_SNAP:
3296                 ceph_handle_snap(mdsc, s, msg);
3297                 break;
3298         case CEPH_MSG_CLIENT_LEASE:
3299                 handle_lease(mdsc, s, msg);
3300                 break;
3301
3302         default:
3303                 pr_err("received unknown message type %d %s\n", type,
3304                        ceph_msg_type_name(type));
3305         }
3306 out:
3307         ceph_msg_put(msg);
3308 }
3309
3310 /*
3311  * authentication
3312  */
3313 static int get_authorizer(struct ceph_connection *con,
3314                           void **buf, int *len, int *proto,
3315                           void **reply_buf, int *reply_len, int force_new)
3316 {
3317         struct ceph_mds_session *s = con->private;
3318         struct ceph_mds_client *mdsc = s->s_mdsc;
3319         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3320         int ret = 0;
3321
3322         if (force_new && s->s_authorizer) {
3323                 ac->ops->destroy_authorizer(ac, s->s_authorizer);
3324                 s->s_authorizer = NULL;
3325         }
3326         if (s->s_authorizer == NULL) {
3327                 if (ac->ops->create_authorizer) {
3328                         ret = ac->ops->create_authorizer(
3329                                 ac, CEPH_ENTITY_TYPE_MDS,
3330                                 &s->s_authorizer,
3331                                 &s->s_authorizer_buf,
3332                                 &s->s_authorizer_buf_len,
3333                                 &s->s_authorizer_reply_buf,
3334                                 &s->s_authorizer_reply_buf_len);
3335                         if (ret)
3336                                 return ret;
3337                 }
3338         }
3339
3340         *proto = ac->protocol;
3341         *buf = s->s_authorizer_buf;
3342         *len = s->s_authorizer_buf_len;
3343         *reply_buf = s->s_authorizer_reply_buf;
3344         *reply_len = s->s_authorizer_reply_buf_len;
3345         return 0;
3346 }
3347
3348
3349 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3350 {
3351         struct ceph_mds_session *s = con->private;
3352         struct ceph_mds_client *mdsc = s->s_mdsc;
3353         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3354
3355         return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
3356 }
3357
3358 static int invalidate_authorizer(struct ceph_connection *con)
3359 {
3360         struct ceph_mds_session *s = con->private;
3361         struct ceph_mds_client *mdsc = s->s_mdsc;
3362         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3363
3364         if (ac->ops->invalidate_authorizer)
3365                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3366
3367         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3368 }
3369
3370 static const struct ceph_connection_operations mds_con_ops = {
3371         .get = con_get,
3372         .put = con_put,
3373         .dispatch = dispatch,
3374         .get_authorizer = get_authorizer,
3375         .verify_authorizer_reply = verify_authorizer_reply,
3376         .invalidate_authorizer = invalidate_authorizer,
3377         .peer_reset = peer_reset,
3378 };
3379
3380 /* eof */