]> asedeno.scripts.mit.edu Git - linux.git/blob - drivers/staging/lustre/lustre/osc/osc_cache.c
d1a7d6beee60f09187dfab9c021a79b139858c0b
[linux.git] / drivers / staging / lustre / lustre / osc / osc_cache.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  *
28  */
29 /*
30  * This file is part of Lustre, http://www.lustre.org/
31  * Lustre is a trademark of Sun Microsystems, Inc.
32  *
33  * osc cache management.
34  *
35  * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_OSC
39
40 #include "osc_cl_internal.h"
41 #include "osc_internal.h"
42
43 static int extent_debug; /* set it to be true for more debug */
44
45 static void osc_update_pending(struct osc_object *obj, int cmd, int delta);
46 static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
47                            int state);
48 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
49                               struct osc_async_page *oap, int sent, int rc);
50 static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
51                           int cmd);
52 static int osc_refresh_count(const struct lu_env *env,
53                              struct osc_async_page *oap, int cmd);
54 static int osc_io_unplug_async(const struct lu_env *env,
55                                struct client_obd *cli, struct osc_object *osc);
56 static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
57                            unsigned int lost_grant);
58
59 static void osc_extent_tree_dump0(int level, struct osc_object *obj,
60                                   const char *func, int line);
61 #define osc_extent_tree_dump(lvl, obj) \
62         osc_extent_tree_dump0(lvl, obj, __func__, __LINE__)
63
64 /** \addtogroup osc
65  *  @{
66  */
67
68 /* ------------------ osc extent ------------------ */
69 static inline char *ext_flags(struct osc_extent *ext, char *flags)
70 {
71         char *buf = flags;
72         *buf++ = ext->oe_rw ? 'r' : 'w';
73         if (ext->oe_intree)
74                 *buf++ = 'i';
75         if (ext->oe_sync)
76                 *buf++ = 'S';
77         if (ext->oe_srvlock)
78                 *buf++ = 's';
79         if (ext->oe_hp)
80                 *buf++ = 'h';
81         if (ext->oe_urgent)
82                 *buf++ = 'u';
83         if (ext->oe_memalloc)
84                 *buf++ = 'm';
85         if (ext->oe_trunc_pending)
86                 *buf++ = 't';
87         if (ext->oe_fsync_wait)
88                 *buf++ = 'Y';
89         *buf = 0;
90         return flags;
91 }
92
93 static inline char list_empty_marker(struct list_head *list)
94 {
95         return list_empty(list) ? '-' : '+';
96 }
97
98 #define EXTSTR       "[%lu -> %lu/%lu]"
99 #define EXTPARA(ext) (ext)->oe_start, (ext)->oe_end, (ext)->oe_max_end
100 static const char *oes_strings[] = {
101         "inv", "active", "cache", "locking", "lockdone", "rpc", "trunc", NULL };
102
103 #define OSC_EXTENT_DUMP(lvl, extent, fmt, ...) do {                           \
104         struct osc_extent *__ext = (extent);                                  \
105         char __buf[16];                                                       \
106                                                                               \
107         CDEBUG(lvl,                                                           \
108                 "extent %p@{" EXTSTR ", "                                     \
109                 "[%d|%d|%c|%s|%s|%p], [%d|%d|%c|%c|%p|%u|%p]} " fmt,          \
110                 /* ----- extent part 0 ----- */                               \
111                 __ext, EXTPARA(__ext),                                        \
112                 /* ----- part 1 ----- */                                      \
113                 atomic_read(&__ext->oe_refc),                                 \
114                 atomic_read(&__ext->oe_users),                                \
115                 list_empty_marker(&__ext->oe_link),                           \
116                 oes_strings[__ext->oe_state], ext_flags(__ext, __buf),        \
117                 __ext->oe_obj,                                                \
118                 /* ----- part 2 ----- */                                      \
119                 __ext->oe_grants, __ext->oe_nr_pages,                         \
120                 list_empty_marker(&__ext->oe_pages),                          \
121                 waitqueue_active(&__ext->oe_waitq) ? '+' : '-',               \
122                 __ext->oe_dlmlock, __ext->oe_mppr, __ext->oe_owner,           \
123                 /* ----- part 4 ----- */                                      \
124                 ## __VA_ARGS__);                                              \
125         if (lvl == D_ERROR && __ext->oe_dlmlock)                              \
126                 LDLM_ERROR(__ext->oe_dlmlock, "extent: %p", __ext);           \
127         else                                                                  \
128                 LDLM_DEBUG(__ext->oe_dlmlock, "extent: %p", __ext);           \
129 } while (0)
130
131 #undef EASSERTF
132 #define EASSERTF(expr, ext, fmt, args...) do {                          \
133         if (!(expr)) {                                                  \
134                 OSC_EXTENT_DUMP(D_ERROR, (ext), fmt, ##args);           \
135                 osc_extent_tree_dump(D_ERROR, (ext)->oe_obj);           \
136                 LASSERT(expr);                                          \
137         }                                                               \
138 } while (0)
139
140 #undef EASSERT
141 #define EASSERT(expr, ext) EASSERTF(expr, ext, "\n")
142
143 static inline struct osc_extent *rb_extent(struct rb_node *n)
144 {
145         if (!n)
146                 return NULL;
147
148         return container_of(n, struct osc_extent, oe_node);
149 }
150
151 static inline struct osc_extent *next_extent(struct osc_extent *ext)
152 {
153         if (!ext)
154                 return NULL;
155
156         LASSERT(ext->oe_intree);
157         return rb_extent(rb_next(&ext->oe_node));
158 }
159
160 static inline struct osc_extent *prev_extent(struct osc_extent *ext)
161 {
162         if (!ext)
163                 return NULL;
164
165         LASSERT(ext->oe_intree);
166         return rb_extent(rb_prev(&ext->oe_node));
167 }
168
169 static inline struct osc_extent *first_extent(struct osc_object *obj)
170 {
171         return rb_extent(rb_first(&obj->oo_root));
172 }
173
174 /* object must be locked by caller. */
175 static int osc_extent_sanity_check0(struct osc_extent *ext,
176                                     const char *func, const int line)
177 {
178         struct osc_object *obj = ext->oe_obj;
179         struct osc_async_page *oap;
180         int page_count;
181         int rc = 0;
182
183         if (!osc_object_is_locked(obj)) {
184                 rc = 9;
185                 goto out;
186         }
187
188         if (ext->oe_state >= OES_STATE_MAX) {
189                 rc = 10;
190                 goto out;
191         }
192
193         if (atomic_read(&ext->oe_refc) <= 0) {
194                 rc = 20;
195                 goto out;
196         }
197
198         if (atomic_read(&ext->oe_refc) < atomic_read(&ext->oe_users)) {
199                 rc = 30;
200                 goto out;
201         }
202
203         switch (ext->oe_state) {
204         case OES_INV:
205                 if (ext->oe_nr_pages > 0 || !list_empty(&ext->oe_pages))
206                         rc = 35;
207                 else
208                         rc = 0;
209                 goto out;
210         case OES_ACTIVE:
211                 if (atomic_read(&ext->oe_users) == 0) {
212                         rc = 40;
213                         goto out;
214                 }
215                 if (ext->oe_hp) {
216                         rc = 50;
217                         goto out;
218                 }
219                 if (ext->oe_fsync_wait && !ext->oe_urgent) {
220                         rc = 55;
221                         goto out;
222                 }
223                 break;
224         case OES_CACHE:
225                 if (ext->oe_grants == 0) {
226                         rc = 60;
227                         goto out;
228                 }
229                 if (ext->oe_fsync_wait && !ext->oe_urgent && !ext->oe_hp) {
230                         rc = 65;
231                         goto out;
232                 }
233         default:
234                 if (atomic_read(&ext->oe_users) > 0) {
235                         rc = 70;
236                         goto out;
237                 }
238         }
239
240         if (ext->oe_max_end < ext->oe_end || ext->oe_end < ext->oe_start) {
241                 rc = 80;
242                 goto out;
243         }
244
245         if (ext->oe_sync && ext->oe_grants > 0) {
246                 rc = 90;
247                 goto out;
248         }
249
250         if (ext->oe_dlmlock) {
251                 struct ldlm_extent *extent;
252
253                 extent = &ext->oe_dlmlock->l_policy_data.l_extent;
254                 if (!(extent->start <= cl_offset(osc2cl(obj), ext->oe_start) &&
255                       extent->end >= cl_offset(osc2cl(obj), ext->oe_max_end))) {
256                         rc = 100;
257                         goto out;
258                 }
259
260                 if (!(ext->oe_dlmlock->l_granted_mode & (LCK_PW | LCK_GROUP))) {
261                         rc = 102;
262                         goto out;
263                 }
264         }
265
266         if (ext->oe_nr_pages > ext->oe_mppr) {
267                 rc = 105;
268                 goto out;
269         }
270
271         /* Do not verify page list if extent is in RPC. This is because an
272          * in-RPC extent is supposed to be exclusively accessible w/o lock.
273          */
274         if (ext->oe_state > OES_CACHE) {
275                 rc = 0;
276                 goto out;
277         }
278
279         if (!extent_debug) {
280                 rc = 0;
281                 goto out;
282         }
283
284         page_count = 0;
285         list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
286                 pgoff_t index = osc_index(oap2osc(oap));
287                 ++page_count;
288                 if (index > ext->oe_end || index < ext->oe_start) {
289                         rc = 110;
290                         goto out;
291                 }
292         }
293         if (page_count != ext->oe_nr_pages) {
294                 rc = 120;
295                 goto out;
296         }
297
298 out:
299         if (rc != 0)
300                 OSC_EXTENT_DUMP(D_ERROR, ext,
301                                 "%s:%d sanity check %p failed with rc = %d\n",
302                                 func, line, ext, rc);
303         return rc;
304 }
305
306 #define sanity_check_nolock(ext) \
307         osc_extent_sanity_check0(ext, __func__, __LINE__)
308
309 #define sanity_check(ext) ({                                            \
310         int __res;                                                      \
311         osc_object_lock((ext)->oe_obj);                                 \
312         __res = sanity_check_nolock(ext);                               \
313         osc_object_unlock((ext)->oe_obj);                               \
314         __res;                                                          \
315 })
316
317 /**
318  * sanity check - to make sure there is no overlapped extent in the tree.
319  */
320 static int osc_extent_is_overlapped(struct osc_object *obj,
321                                     struct osc_extent *ext)
322 {
323         struct osc_extent *tmp;
324
325         LASSERT(osc_object_is_locked(obj));
326
327         if (!extent_debug)
328                 return 0;
329
330         for (tmp = first_extent(obj); tmp; tmp = next_extent(tmp)) {
331                 if (tmp == ext)
332                         continue;
333                 if (tmp->oe_end >= ext->oe_start &&
334                     tmp->oe_start <= ext->oe_end)
335                         return 1;
336         }
337         return 0;
338 }
339
340 static void osc_extent_state_set(struct osc_extent *ext, int state)
341 {
342         LASSERT(osc_object_is_locked(ext->oe_obj));
343         LASSERT(state >= OES_INV && state < OES_STATE_MAX);
344
345         /* Never try to sanity check a state changing extent :-) */
346         /* LASSERT(sanity_check_nolock(ext) == 0); */
347
348         /* TODO: validate the state machine */
349         ext->oe_state = state;
350         wake_up_all(&ext->oe_waitq);
351 }
352
353 static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
354 {
355         struct osc_extent *ext;
356
357         ext = kmem_cache_zalloc(osc_extent_kmem, GFP_NOFS);
358         if (!ext)
359                 return NULL;
360
361         RB_CLEAR_NODE(&ext->oe_node);
362         ext->oe_obj = obj;
363         atomic_set(&ext->oe_refc, 1);
364         atomic_set(&ext->oe_users, 0);
365         INIT_LIST_HEAD(&ext->oe_link);
366         ext->oe_state = OES_INV;
367         INIT_LIST_HEAD(&ext->oe_pages);
368         init_waitqueue_head(&ext->oe_waitq);
369         ext->oe_dlmlock = NULL;
370
371         return ext;
372 }
373
374 static void osc_extent_free(struct osc_extent *ext)
375 {
376         kmem_cache_free(osc_extent_kmem, ext);
377 }
378
379 static struct osc_extent *osc_extent_get(struct osc_extent *ext)
380 {
381         LASSERT(atomic_read(&ext->oe_refc) >= 0);
382         atomic_inc(&ext->oe_refc);
383         return ext;
384 }
385
386 static void osc_extent_put(const struct lu_env *env, struct osc_extent *ext)
387 {
388         LASSERT(atomic_read(&ext->oe_refc) > 0);
389         if (atomic_dec_and_test(&ext->oe_refc)) {
390                 LASSERT(list_empty(&ext->oe_link));
391                 LASSERT(atomic_read(&ext->oe_users) == 0);
392                 LASSERT(ext->oe_state == OES_INV);
393                 LASSERT(!ext->oe_intree);
394
395                 if (ext->oe_dlmlock) {
396                         lu_ref_add(&ext->oe_dlmlock->l_reference,
397                                    "osc_extent", ext);
398                         LDLM_LOCK_PUT(ext->oe_dlmlock);
399                         ext->oe_dlmlock = NULL;
400                 }
401                 osc_extent_free(ext);
402         }
403 }
404
405 /**
406  * osc_extent_put_trust() is a special version of osc_extent_put() when
407  * it's known that the caller is not the last user. This is to address the
408  * problem of lacking of lu_env ;-).
409  */
410 static void osc_extent_put_trust(struct osc_extent *ext)
411 {
412         LASSERT(atomic_read(&ext->oe_refc) > 1);
413         LASSERT(osc_object_is_locked(ext->oe_obj));
414         atomic_dec(&ext->oe_refc);
415 }
416
417 /**
418  * Return the extent which includes pgoff @index, or return the greatest
419  * previous extent in the tree.
420  */
421 static struct osc_extent *osc_extent_search(struct osc_object *obj,
422                                             pgoff_t index)
423 {
424         struct rb_node *n = obj->oo_root.rb_node;
425         struct osc_extent *tmp, *p = NULL;
426
427         LASSERT(osc_object_is_locked(obj));
428         while (n) {
429                 tmp = rb_extent(n);
430                 if (index < tmp->oe_start) {
431                         n = n->rb_left;
432                 } else if (index > tmp->oe_end) {
433                         p = rb_extent(n);
434                         n = n->rb_right;
435                 } else {
436                         return tmp;
437                 }
438         }
439         return p;
440 }
441
442 /*
443  * Return the extent covering @index, otherwise return NULL.
444  * caller must have held object lock.
445  */
446 static struct osc_extent *osc_extent_lookup(struct osc_object *obj,
447                                             pgoff_t index)
448 {
449         struct osc_extent *ext;
450
451         ext = osc_extent_search(obj, index);
452         if (ext && ext->oe_start <= index && index <= ext->oe_end)
453                 return osc_extent_get(ext);
454         return NULL;
455 }
456
457 /* caller must have held object lock. */
458 static void osc_extent_insert(struct osc_object *obj, struct osc_extent *ext)
459 {
460         struct rb_node **n = &obj->oo_root.rb_node;
461         struct rb_node *parent = NULL;
462         struct osc_extent *tmp;
463
464         LASSERT(ext->oe_intree == 0);
465         LASSERT(ext->oe_obj == obj);
466         LASSERT(osc_object_is_locked(obj));
467         while (*n) {
468                 tmp = rb_extent(*n);
469                 parent = *n;
470
471                 if (ext->oe_end < tmp->oe_start)
472                         n = &(*n)->rb_left;
473                 else if (ext->oe_start > tmp->oe_end)
474                         n = &(*n)->rb_right;
475                 else
476                         EASSERTF(0, tmp, EXTSTR"\n", EXTPARA(ext));
477         }
478         rb_link_node(&ext->oe_node, parent, n);
479         rb_insert_color(&ext->oe_node, &obj->oo_root);
480         osc_extent_get(ext);
481         ext->oe_intree = 1;
482 }
483
484 /* caller must have held object lock. */
485 static void osc_extent_erase(struct osc_extent *ext)
486 {
487         struct osc_object *obj = ext->oe_obj;
488
489         LASSERT(osc_object_is_locked(obj));
490         if (ext->oe_intree) {
491                 rb_erase(&ext->oe_node, &obj->oo_root);
492                 ext->oe_intree = 0;
493                 /* rbtree held a refcount */
494                 osc_extent_put_trust(ext);
495         }
496 }
497
498 static struct osc_extent *osc_extent_hold(struct osc_extent *ext)
499 {
500         struct osc_object *obj = ext->oe_obj;
501
502         LASSERT(osc_object_is_locked(obj));
503         LASSERT(ext->oe_state == OES_ACTIVE || ext->oe_state == OES_CACHE);
504         if (ext->oe_state == OES_CACHE) {
505                 osc_extent_state_set(ext, OES_ACTIVE);
506                 osc_update_pending(obj, OBD_BRW_WRITE, -ext->oe_nr_pages);
507         }
508         atomic_inc(&ext->oe_users);
509         list_del_init(&ext->oe_link);
510         return osc_extent_get(ext);
511 }
512
513 static void __osc_extent_remove(struct osc_extent *ext)
514 {
515         LASSERT(osc_object_is_locked(ext->oe_obj));
516         LASSERT(list_empty(&ext->oe_pages));
517         osc_extent_erase(ext);
518         list_del_init(&ext->oe_link);
519         osc_extent_state_set(ext, OES_INV);
520         OSC_EXTENT_DUMP(D_CACHE, ext, "destroyed.\n");
521 }
522
523 static void osc_extent_remove(struct osc_extent *ext)
524 {
525         struct osc_object *obj = ext->oe_obj;
526
527         osc_object_lock(obj);
528         __osc_extent_remove(ext);
529         osc_object_unlock(obj);
530 }
531
532 /**
533  * This function is used to merge extents to get better performance. It checks
534  * if @cur and @victim are contiguous at chunk level.
535  */
536 static int osc_extent_merge(const struct lu_env *env, struct osc_extent *cur,
537                             struct osc_extent *victim)
538 {
539         struct osc_object *obj = cur->oe_obj;
540         pgoff_t chunk_start;
541         pgoff_t chunk_end;
542         int ppc_bits;
543
544         LASSERT(cur->oe_state == OES_CACHE);
545         LASSERT(osc_object_is_locked(obj));
546         if (!victim)
547                 return -EINVAL;
548
549         if (victim->oe_state != OES_CACHE || victim->oe_fsync_wait)
550                 return -EBUSY;
551
552         if (cur->oe_max_end != victim->oe_max_end)
553                 return -ERANGE;
554
555         LASSERT(cur->oe_dlmlock == victim->oe_dlmlock);
556         ppc_bits = osc_cli(obj)->cl_chunkbits - PAGE_SHIFT;
557         chunk_start = cur->oe_start >> ppc_bits;
558         chunk_end = cur->oe_end >> ppc_bits;
559         if (chunk_start != (victim->oe_end >> ppc_bits) + 1 &&
560             chunk_end + 1 != victim->oe_start >> ppc_bits)
561                 return -ERANGE;
562
563         OSC_EXTENT_DUMP(D_CACHE, victim, "will be merged by %p.\n", cur);
564
565         cur->oe_start = min(cur->oe_start, victim->oe_start);
566         cur->oe_end = max(cur->oe_end, victim->oe_end);
567         cur->oe_grants += victim->oe_grants;
568         cur->oe_nr_pages += victim->oe_nr_pages;
569         /* only the following bits are needed to merge */
570         cur->oe_urgent |= victim->oe_urgent;
571         cur->oe_memalloc |= victim->oe_memalloc;
572         list_splice_init(&victim->oe_pages, &cur->oe_pages);
573         list_del_init(&victim->oe_link);
574         victim->oe_nr_pages = 0;
575
576         osc_extent_get(victim);
577         __osc_extent_remove(victim);
578         osc_extent_put(env, victim);
579
580         OSC_EXTENT_DUMP(D_CACHE, cur, "after merging %p.\n", victim);
581         return 0;
582 }
583
584 /**
585  * Drop user count of osc_extent, and unplug IO asynchronously.
586  */
587 void osc_extent_release(const struct lu_env *env, struct osc_extent *ext)
588 {
589         struct osc_object *obj = ext->oe_obj;
590
591         LASSERT(atomic_read(&ext->oe_users) > 0);
592         LASSERT(sanity_check(ext) == 0);
593         LASSERT(ext->oe_grants > 0);
594
595         if (atomic_dec_and_lock(&ext->oe_users, &obj->oo_lock)) {
596                 LASSERT(ext->oe_state == OES_ACTIVE);
597                 if (ext->oe_trunc_pending) {
598                         /* a truncate process is waiting for this extent.
599                          * This may happen due to a race, check
600                          * osc_cache_truncate_start().
601                          */
602                         osc_extent_state_set(ext, OES_TRUNC);
603                         ext->oe_trunc_pending = 0;
604                 } else {
605                         osc_extent_state_set(ext, OES_CACHE);
606                         osc_update_pending(obj, OBD_BRW_WRITE,
607                                            ext->oe_nr_pages);
608
609                         /* try to merge the previous and next extent. */
610                         osc_extent_merge(env, ext, prev_extent(ext));
611                         osc_extent_merge(env, ext, next_extent(ext));
612
613                         if (ext->oe_urgent)
614                                 list_move_tail(&ext->oe_link,
615                                                &obj->oo_urgent_exts);
616                 }
617                 osc_object_unlock(obj);
618
619                 osc_io_unplug_async(env, osc_cli(obj), obj);
620         }
621         osc_extent_put(env, ext);
622 }
623
624 static inline int overlapped(struct osc_extent *ex1, struct osc_extent *ex2)
625 {
626         return !(ex1->oe_end < ex2->oe_start || ex2->oe_end < ex1->oe_start);
627 }
628
629 /**
630  * Find or create an extent which includes @index, core function to manage
631  * extent tree.
632  */
633 static struct osc_extent *osc_extent_find(const struct lu_env *env,
634                                           struct osc_object *obj, pgoff_t index,
635                                           int *grants)
636 {
637         struct client_obd *cli = osc_cli(obj);
638         struct osc_lock   *olck;
639         struct cl_lock_descr *descr;
640         struct osc_extent *cur;
641         struct osc_extent *ext;
642         struct osc_extent *conflict = NULL;
643         struct osc_extent *found = NULL;
644         pgoff_t chunk;
645         pgoff_t max_end;
646         int max_pages; /* max_pages_per_rpc */
647         int chunksize;
648         int ppc_bits; /* pages per chunk bits */
649         int chunk_mask;
650         int rc;
651
652         cur = osc_extent_alloc(obj);
653         if (!cur)
654                 return ERR_PTR(-ENOMEM);
655
656         olck = osc_env_io(env)->oi_write_osclock;
657         LASSERTF(olck, "page %lu is not covered by lock\n", index);
658         LASSERT(olck->ols_state == OLS_GRANTED);
659
660         descr = &olck->ols_cl.cls_lock->cll_descr;
661         LASSERT(descr->cld_mode >= CLM_WRITE);
662
663         LASSERT(cli->cl_chunkbits >= PAGE_SHIFT);
664         ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
665         chunk_mask = ~((1 << ppc_bits) - 1);
666         chunksize = 1 << cli->cl_chunkbits;
667         chunk = index >> ppc_bits;
668
669         /* align end to rpc edge, rpc size may not be a power 2 integer. */
670         max_pages = cli->cl_max_pages_per_rpc;
671         LASSERT((max_pages & ~chunk_mask) == 0);
672         max_end = index - (index % max_pages) + max_pages - 1;
673         max_end = min_t(pgoff_t, max_end, descr->cld_end);
674
675         /* initialize new extent by parameters so far */
676         cur->oe_max_end = max_end;
677         cur->oe_start = index & chunk_mask;
678         cur->oe_end = ((index + ~chunk_mask + 1) & chunk_mask) - 1;
679         if (cur->oe_start < descr->cld_start)
680                 cur->oe_start = descr->cld_start;
681         if (cur->oe_end > max_end)
682                 cur->oe_end = max_end;
683         cur->oe_grants = 0;
684         cur->oe_mppr = max_pages;
685         if (olck->ols_dlmlock) {
686                 LASSERT(olck->ols_hold);
687                 cur->oe_dlmlock = LDLM_LOCK_GET(olck->ols_dlmlock);
688                 lu_ref_add(&olck->ols_dlmlock->l_reference, "osc_extent", cur);
689         }
690
691         /* grants has been allocated by caller */
692         LASSERTF(*grants >= chunksize + cli->cl_extent_tax,
693                  "%u/%u/%u.\n", *grants, chunksize, cli->cl_extent_tax);
694         LASSERTF((max_end - cur->oe_start) < max_pages, EXTSTR"\n",
695                  EXTPARA(cur));
696
697 restart:
698         osc_object_lock(obj);
699         ext = osc_extent_search(obj, cur->oe_start);
700         if (!ext)
701                 ext = first_extent(obj);
702         while (ext) {
703                 loff_t ext_chk_start = ext->oe_start >> ppc_bits;
704                 loff_t ext_chk_end = ext->oe_end >> ppc_bits;
705
706                 LASSERT(sanity_check_nolock(ext) == 0);
707                 if (chunk > ext_chk_end + 1)
708                         break;
709
710                 /* if covering by different locks, no chance to match */
711                 if (olck->ols_dlmlock != ext->oe_dlmlock) {
712                         EASSERTF(!overlapped(ext, cur), ext,
713                                  EXTSTR"\n", EXTPARA(cur));
714
715                         ext = next_extent(ext);
716                         continue;
717                 }
718
719                 /* discontiguous chunks? */
720                 if (chunk + 1 < ext_chk_start) {
721                         ext = next_extent(ext);
722                         continue;
723                 }
724
725                 /* ok, from now on, ext and cur have these attrs:
726                  * 1. covered by the same lock
727                  * 2. contiguous at chunk level or overlapping.
728                  */
729
730                 if (overlapped(ext, cur)) {
731                         /* cur is the minimum unit, so overlapping means
732                          * full contain.
733                          */
734                         EASSERTF((ext->oe_start <= cur->oe_start &&
735                                   ext->oe_end >= cur->oe_end),
736                                  ext, EXTSTR"\n", EXTPARA(cur));
737
738                         if (ext->oe_state > OES_CACHE || ext->oe_fsync_wait) {
739                                 /* for simplicity, we wait for this extent to
740                                  * finish before going forward.
741                                  */
742                                 conflict = osc_extent_get(ext);
743                                 break;
744                         }
745
746                         found = osc_extent_hold(ext);
747                         break;
748                 }
749
750                 /* non-overlapped extent */
751                 if (ext->oe_state != OES_CACHE || ext->oe_fsync_wait) {
752                         /* we can't do anything for a non OES_CACHE extent, or
753                          * if there is someone waiting for this extent to be
754                          * flushed, try next one.
755                          */
756                         ext = next_extent(ext);
757                         continue;
758                 }
759
760                 /* check if they belong to the same rpc slot before trying to
761                  * merge. the extents are not overlapped and contiguous at
762                  * chunk level to get here.
763                  */
764                 if (ext->oe_max_end != max_end) {
765                         /* if they don't belong to the same RPC slot or
766                          * max_pages_per_rpc has ever changed, do not merge.
767                          */
768                         ext = next_extent(ext);
769                         continue;
770                 }
771
772                 /* it's required that an extent must be contiguous at chunk
773                  * level so that we know the whole extent is covered by grant
774                  * (the pages in the extent are NOT required to be contiguous).
775                  * Otherwise, it will be too much difficult to know which
776                  * chunks have grants allocated.
777                  */
778
779                 /* try to do front merge - extend ext's start */
780                 if (chunk + 1 == ext_chk_start) {
781                         /* ext must be chunk size aligned */
782                         EASSERT((ext->oe_start & ~chunk_mask) == 0, ext);
783
784                         /* pull ext's start back to cover cur */
785                         ext->oe_start = cur->oe_start;
786                         ext->oe_grants += chunksize;
787                         *grants -= chunksize;
788
789                         found = osc_extent_hold(ext);
790                 } else if (chunk == ext_chk_end + 1) {
791                         /* rear merge */
792                         ext->oe_end = cur->oe_end;
793                         ext->oe_grants += chunksize;
794                         *grants -= chunksize;
795
796                         /* try to merge with the next one because we just fill
797                          * in a gap
798                          */
799                         if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
800                                 /* we can save extent tax from next extent */
801                                 *grants += cli->cl_extent_tax;
802
803                         found = osc_extent_hold(ext);
804                 }
805                 if (found)
806                         break;
807
808                 ext = next_extent(ext);
809         }
810
811         osc_extent_tree_dump(D_CACHE, obj);
812         if (found) {
813                 LASSERT(!conflict);
814                 if (!IS_ERR(found)) {
815                         LASSERT(found->oe_dlmlock == cur->oe_dlmlock);
816                         OSC_EXTENT_DUMP(D_CACHE, found,
817                                         "found caching ext for %lu.\n", index);
818                 }
819         } else if (!conflict) {
820                 /* create a new extent */
821                 EASSERT(osc_extent_is_overlapped(obj, cur) == 0, cur);
822                 cur->oe_grants = chunksize + cli->cl_extent_tax;
823                 *grants -= cur->oe_grants;
824                 LASSERT(*grants >= 0);
825
826                 cur->oe_state = OES_CACHE;
827                 found = osc_extent_hold(cur);
828                 osc_extent_insert(obj, cur);
829                 OSC_EXTENT_DUMP(D_CACHE, cur, "add into tree %lu/%lu.\n",
830                                 index, descr->cld_end);
831         }
832         osc_object_unlock(obj);
833
834         if (conflict) {
835                 LASSERT(!found);
836
837                 /* waiting for IO to finish. Please notice that it's impossible
838                  * to be an OES_TRUNC extent.
839                  */
840                 rc = osc_extent_wait(env, conflict, OES_INV);
841                 osc_extent_put(env, conflict);
842                 conflict = NULL;
843                 if (rc < 0) {
844                         found = ERR_PTR(rc);
845                         goto out;
846                 }
847
848                 goto restart;
849         }
850
851 out:
852         osc_extent_put(env, cur);
853         LASSERT(*grants >= 0);
854         return found;
855 }
856
857 /**
858  * Called when IO is finished to an extent.
859  */
860 int osc_extent_finish(const struct lu_env *env, struct osc_extent *ext,
861                       int sent, int rc)
862 {
863         struct client_obd *cli = osc_cli(ext->oe_obj);
864         struct osc_async_page *oap;
865         struct osc_async_page *tmp;
866         int nr_pages = ext->oe_nr_pages;
867         int lost_grant = 0;
868         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
869         __u64 last_off = 0;
870         int last_count = -1;
871
872         OSC_EXTENT_DUMP(D_CACHE, ext, "extent finished.\n");
873
874         ext->oe_rc = rc ?: ext->oe_nr_pages;
875         EASSERT(ergo(rc == 0, ext->oe_state == OES_RPC), ext);
876
877         osc_lru_add_batch(cli, &ext->oe_pages);
878         list_for_each_entry_safe(oap, tmp, &ext->oe_pages, oap_pending_item) {
879                 list_del_init(&oap->oap_rpc_item);
880                 list_del_init(&oap->oap_pending_item);
881                 if (last_off <= oap->oap_obj_off) {
882                         last_off = oap->oap_obj_off;
883                         last_count = oap->oap_count;
884                 }
885
886                 --ext->oe_nr_pages;
887                 osc_ap_completion(env, cli, oap, sent, rc);
888         }
889         EASSERT(ext->oe_nr_pages == 0, ext);
890
891         if (!sent) {
892                 lost_grant = ext->oe_grants;
893         } else if (blocksize < PAGE_SIZE &&
894                    last_count != PAGE_SIZE) {
895                 /* For short writes we shouldn't count parts of pages that
896                  * span a whole chunk on the OST side, or our accounting goes
897                  * wrong.  Should match the code in filter_grant_check.
898                  */
899                 int offset = last_off & ~PAGE_MASK;
900                 int count = last_count + (offset & (blocksize - 1));
901                 int end = (offset + last_count) & (blocksize - 1);
902                 if (end)
903                         count += blocksize - end;
904
905                 lost_grant = PAGE_SIZE - count;
906         }
907         if (ext->oe_grants > 0)
908                 osc_free_grant(cli, nr_pages, lost_grant);
909
910         osc_extent_remove(ext);
911         /* put the refcount for RPC */
912         osc_extent_put(env, ext);
913         return 0;
914 }
915
916 static int extent_wait_cb(struct osc_extent *ext, int state)
917 {
918         int ret;
919
920         osc_object_lock(ext->oe_obj);
921         ret = ext->oe_state == state;
922         osc_object_unlock(ext->oe_obj);
923
924         return ret;
925 }
926
927 /**
928  * Wait for the extent's state to become @state.
929  */
930 static int osc_extent_wait(const struct lu_env *env, struct osc_extent *ext,
931                            int state)
932 {
933         struct osc_object *obj = ext->oe_obj;
934         struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
935                                                   LWI_ON_SIGNAL_NOOP, NULL);
936         int rc = 0;
937
938         osc_object_lock(obj);
939         LASSERT(sanity_check_nolock(ext) == 0);
940         /* `Kick' this extent only if the caller is waiting for it to be
941          * written out.
942          */
943         if (state == OES_INV && !ext->oe_urgent && !ext->oe_hp &&
944             !ext->oe_trunc_pending) {
945                 if (ext->oe_state == OES_ACTIVE) {
946                         ext->oe_urgent = 1;
947                 } else if (ext->oe_state == OES_CACHE) {
948                         ext->oe_urgent = 1;
949                         osc_extent_hold(ext);
950                         rc = 1;
951                 }
952         }
953         osc_object_unlock(obj);
954         if (rc == 1)
955                 osc_extent_release(env, ext);
956
957         /* wait for the extent until its state becomes @state */
958         rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state), &lwi);
959         if (rc == -ETIMEDOUT) {
960                 OSC_EXTENT_DUMP(D_ERROR, ext,
961                         "%s: wait ext to %d timedout, recovery in progress?\n",
962                         osc_export(obj)->exp_obd->obd_name, state);
963
964                 lwi = LWI_INTR(NULL, NULL);
965                 rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
966                                   &lwi);
967         }
968         if (rc == 0 && ext->oe_rc < 0)
969                 rc = ext->oe_rc;
970         return rc;
971 }
972
973 /**
974  * Discard pages with index greater than @size. If @ext is overlapped with
975  * @size, then partial truncate happens.
976  */
977 static int osc_extent_truncate(struct osc_extent *ext, pgoff_t trunc_index,
978                                bool partial)
979 {
980         struct cl_env_nest nest;
981         struct lu_env *env;
982         struct cl_io *io;
983         struct osc_object *obj = ext->oe_obj;
984         struct client_obd *cli = osc_cli(obj);
985         struct osc_async_page *oap;
986         struct osc_async_page *tmp;
987         int pages_in_chunk = 0;
988         int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
989         __u64 trunc_chunk = trunc_index >> ppc_bits;
990         int grants = 0;
991         int nr_pages = 0;
992         int rc = 0;
993
994         LASSERT(sanity_check(ext) == 0);
995         EASSERT(ext->oe_state == OES_TRUNC, ext);
996         EASSERT(!ext->oe_urgent, ext);
997
998         /* Request new lu_env.
999          * We can't use that env from osc_cache_truncate_start() because
1000          * it's from lov_io_sub and not fully initialized.
1001          */
1002         env = cl_env_nested_get(&nest);
1003         io  = &osc_env_info(env)->oti_io;
1004         io->ci_obj = cl_object_top(osc2cl(obj));
1005         rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
1006         if (rc < 0)
1007                 goto out;
1008
1009         /* discard all pages with index greater then trunc_index */
1010         list_for_each_entry_safe(oap, tmp, &ext->oe_pages, oap_pending_item) {
1011                 pgoff_t index = osc_index(oap2osc(oap));
1012                 struct cl_page *page = oap2cl_page(oap);
1013
1014                 LASSERT(list_empty(&oap->oap_rpc_item));
1015
1016                 /* only discard the pages with their index greater than
1017                  * trunc_index, and ...
1018                  */
1019                 if (index < trunc_index ||
1020                     (index == trunc_index && partial)) {
1021                         /* accounting how many pages remaining in the chunk
1022                          * so that we can calculate grants correctly. */
1023                         if (index >> ppc_bits == trunc_chunk)
1024                                 ++pages_in_chunk;
1025                         continue;
1026                 }
1027
1028                 list_del_init(&oap->oap_pending_item);
1029
1030                 cl_page_get(page);
1031                 lu_ref_add(&page->cp_reference, "truncate", current);
1032
1033                 if (cl_page_own(env, io, page) == 0) {
1034                         cl_page_discard(env, io, page);
1035                         cl_page_disown(env, io, page);
1036                 } else {
1037                         LASSERT(page->cp_state == CPS_FREEING);
1038                         LASSERT(0);
1039                 }
1040
1041                 lu_ref_del(&page->cp_reference, "truncate", current);
1042                 cl_page_put(env, page);
1043
1044                 --ext->oe_nr_pages;
1045                 ++nr_pages;
1046         }
1047         EASSERTF(ergo(ext->oe_start >= trunc_index + !!partial,
1048                       ext->oe_nr_pages == 0),
1049                 ext, "trunc_index %lu, partial %d\n", trunc_index, partial);
1050
1051         osc_object_lock(obj);
1052         if (ext->oe_nr_pages == 0) {
1053                 LASSERT(pages_in_chunk == 0);
1054                 grants = ext->oe_grants;
1055                 ext->oe_grants = 0;
1056         } else { /* calculate how many grants we can free */
1057                 int chunks = (ext->oe_end >> ppc_bits) - trunc_chunk;
1058                 pgoff_t last_index;
1059
1060                 /* if there is no pages in this chunk, we can also free grants
1061                  * for the last chunk
1062                  */
1063                 if (pages_in_chunk == 0) {
1064                         /* if this is the 1st chunk and no pages in this chunk,
1065                          * ext->oe_nr_pages must be zero, so we should be in
1066                          * the other if-clause.
1067                          */
1068                         LASSERT(trunc_chunk > 0);
1069                         --trunc_chunk;
1070                         ++chunks;
1071                 }
1072
1073                 /* this is what we can free from this extent */
1074                 grants = chunks << cli->cl_chunkbits;
1075                 ext->oe_grants -= grants;
1076                 last_index = ((trunc_chunk + 1) << ppc_bits) - 1;
1077                 ext->oe_end = min(last_index, ext->oe_max_end);
1078                 LASSERT(ext->oe_end >= ext->oe_start);
1079                 LASSERT(ext->oe_grants > 0);
1080         }
1081         osc_object_unlock(obj);
1082
1083         if (grants > 0 || nr_pages > 0)
1084                 osc_free_grant(cli, nr_pages, grants);
1085
1086 out:
1087         cl_io_fini(env, io);
1088         cl_env_nested_put(&nest, env);
1089         return rc;
1090 }
1091
1092 /**
1093  * This function is used to make the extent prepared for transfer.
1094  * A race with flushing page - ll_writepage() has to be handled cautiously.
1095  */
1096 static int osc_extent_make_ready(const struct lu_env *env,
1097                                  struct osc_extent *ext)
1098 {
1099         struct osc_async_page *oap;
1100         struct osc_async_page *last = NULL;
1101         struct osc_object *obj = ext->oe_obj;
1102         int page_count = 0;
1103         int rc;
1104
1105         /* we're going to grab page lock, so object lock must not be taken. */
1106         LASSERT(sanity_check(ext) == 0);
1107         /* in locking state, any process should not touch this extent. */
1108         EASSERT(ext->oe_state == OES_LOCKING, ext);
1109         EASSERT(ext->oe_owner, ext);
1110
1111         OSC_EXTENT_DUMP(D_CACHE, ext, "make ready\n");
1112
1113         list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1114                 ++page_count;
1115                 if (!last || last->oap_obj_off < oap->oap_obj_off)
1116                         last = oap;
1117
1118                 /* checking ASYNC_READY is race safe */
1119                 if ((oap->oap_async_flags & ASYNC_READY) != 0)
1120                         continue;
1121
1122                 rc = osc_make_ready(env, oap, OBD_BRW_WRITE);
1123                 switch (rc) {
1124                 case 0:
1125                         spin_lock(&oap->oap_lock);
1126                         oap->oap_async_flags |= ASYNC_READY;
1127                         spin_unlock(&oap->oap_lock);
1128                         break;
1129                 case -EALREADY:
1130                         LASSERT((oap->oap_async_flags & ASYNC_READY) != 0);
1131                         break;
1132                 default:
1133                         LASSERTF(0, "unknown return code: %d\n", rc);
1134                 }
1135         }
1136
1137         LASSERT(page_count == ext->oe_nr_pages);
1138         LASSERT(last);
1139         /* the last page is the only one we need to refresh its count by
1140          * the size of file.
1141          */
1142         if (!(last->oap_async_flags & ASYNC_COUNT_STABLE)) {
1143                 last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
1144                 LASSERT(last->oap_count > 0);
1145                 LASSERT(last->oap_page_off + last->oap_count <= PAGE_SIZE);
1146                 spin_lock(&last->oap_lock);
1147                 last->oap_async_flags |= ASYNC_COUNT_STABLE;
1148                 spin_unlock(&last->oap_lock);
1149         }
1150
1151         /* for the rest of pages, we don't need to call osf_refresh_count()
1152          * because it's known they are not the last page
1153          */
1154         list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1155                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
1156                         oap->oap_count = PAGE_SIZE - oap->oap_page_off;
1157                         spin_lock(&last->oap_lock);
1158                         oap->oap_async_flags |= ASYNC_COUNT_STABLE;
1159                         spin_unlock(&last->oap_lock);
1160                 }
1161         }
1162
1163         osc_object_lock(obj);
1164         osc_extent_state_set(ext, OES_RPC);
1165         osc_object_unlock(obj);
1166         /* get a refcount for RPC. */
1167         osc_extent_get(ext);
1168
1169         return 0;
1170 }
1171
1172 /**
1173  * Quick and simple version of osc_extent_find(). This function is frequently
1174  * called to expand the extent for the same IO. To expand the extent, the
1175  * page index must be in the same or next chunk of ext->oe_end.
1176  */
1177 static int osc_extent_expand(struct osc_extent *ext, pgoff_t index, int *grants)
1178 {
1179         struct osc_object *obj = ext->oe_obj;
1180         struct client_obd *cli = osc_cli(obj);
1181         struct osc_extent *next;
1182         int ppc_bits = cli->cl_chunkbits - PAGE_SHIFT;
1183         pgoff_t chunk = index >> ppc_bits;
1184         pgoff_t end_chunk;
1185         pgoff_t end_index;
1186         int chunksize = 1 << cli->cl_chunkbits;
1187         int rc = 0;
1188
1189         LASSERT(ext->oe_max_end >= index && ext->oe_start <= index);
1190         osc_object_lock(obj);
1191         LASSERT(sanity_check_nolock(ext) == 0);
1192         end_chunk = ext->oe_end >> ppc_bits;
1193         if (chunk > end_chunk + 1) {
1194                 rc = -ERANGE;
1195                 goto out;
1196         }
1197
1198         if (end_chunk >= chunk) {
1199                 rc = 0;
1200                 goto out;
1201         }
1202
1203         LASSERT(end_chunk + 1 == chunk);
1204         /* try to expand this extent to cover @index */
1205         end_index = min(ext->oe_max_end, ((chunk + 1) << ppc_bits) - 1);
1206
1207         next = next_extent(ext);
1208         if (next && next->oe_start <= end_index) {
1209                 /* complex mode - overlapped with the next extent,
1210                  * this case will be handled by osc_extent_find()
1211                  */
1212                 rc = -EAGAIN;
1213                 goto out;
1214         }
1215
1216         ext->oe_end = end_index;
1217         ext->oe_grants += chunksize;
1218         *grants -= chunksize;
1219         LASSERT(*grants >= 0);
1220         EASSERTF(osc_extent_is_overlapped(obj, ext) == 0, ext,
1221                  "overlapped after expanding for %lu.\n", index);
1222
1223 out:
1224         osc_object_unlock(obj);
1225         return rc;
1226 }
1227
1228 static void osc_extent_tree_dump0(int level, struct osc_object *obj,
1229                                   const char *func, int line)
1230 {
1231         struct osc_extent *ext;
1232         int cnt;
1233
1234         CDEBUG(level, "Dump object %p extents at %s:%d, mppr: %u.\n",
1235                obj, func, line, osc_cli(obj)->cl_max_pages_per_rpc);
1236
1237         /* osc_object_lock(obj); */
1238         cnt = 1;
1239         for (ext = first_extent(obj); ext; ext = next_extent(ext))
1240                 OSC_EXTENT_DUMP(level, ext, "in tree %d.\n", cnt++);
1241
1242         cnt = 1;
1243         list_for_each_entry(ext, &obj->oo_hp_exts, oe_link)
1244                 OSC_EXTENT_DUMP(level, ext, "hp %d.\n", cnt++);
1245
1246         cnt = 1;
1247         list_for_each_entry(ext, &obj->oo_urgent_exts, oe_link)
1248                 OSC_EXTENT_DUMP(level, ext, "urgent %d.\n", cnt++);
1249
1250         cnt = 1;
1251         list_for_each_entry(ext, &obj->oo_reading_exts, oe_link)
1252                 OSC_EXTENT_DUMP(level, ext, "reading %d.\n", cnt++);
1253         /* osc_object_unlock(obj); */
1254 }
1255
1256 /* ------------------ osc extent end ------------------ */
1257
1258 static inline int osc_is_ready(struct osc_object *osc)
1259 {
1260         return !list_empty(&osc->oo_ready_item) ||
1261                !list_empty(&osc->oo_hp_ready_item);
1262 }
1263
1264 #define OSC_IO_DEBUG(OSC, STR, args...)                                        \
1265         CDEBUG(D_CACHE, "obj %p ready %d|%c|%c wr %d|%c|%c rd %d|%c " STR,     \
1266                (OSC), osc_is_ready(OSC),                                       \
1267                list_empty_marker(&(OSC)->oo_hp_ready_item),                    \
1268                list_empty_marker(&(OSC)->oo_ready_item),                       \
1269                atomic_read(&(OSC)->oo_nr_writes),                              \
1270                list_empty_marker(&(OSC)->oo_hp_exts),                          \
1271                list_empty_marker(&(OSC)->oo_urgent_exts),                      \
1272                atomic_read(&(OSC)->oo_nr_reads),                               \
1273                list_empty_marker(&(OSC)->oo_reading_exts),                     \
1274                ##args)
1275
1276 static int osc_make_ready(const struct lu_env *env, struct osc_async_page *oap,
1277                           int cmd)
1278 {
1279         struct osc_page *opg = oap2osc_page(oap);
1280         struct cl_page  *page = oap2cl_page(oap);
1281         int result;
1282
1283         LASSERT(cmd == OBD_BRW_WRITE); /* no cached reads */
1284
1285         result = cl_page_make_ready(env, page, CRT_WRITE);
1286         if (result == 0)
1287                 opg->ops_submit_time = cfs_time_current();
1288         return result;
1289 }
1290
1291 static int osc_refresh_count(const struct lu_env *env,
1292                              struct osc_async_page *oap, int cmd)
1293 {
1294         struct osc_page *opg = oap2osc_page(oap);
1295         pgoff_t index = osc_index(oap2osc(oap));
1296         struct cl_object *obj;
1297         struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1298
1299         int result;
1300         loff_t kms;
1301
1302         /* readpage queues with _COUNT_STABLE, shouldn't get here. */
1303         LASSERT(!(cmd & OBD_BRW_READ));
1304         obj = opg->ops_cl.cpl_obj;
1305
1306         cl_object_attr_lock(obj);
1307         result = cl_object_attr_get(env, obj, attr);
1308         cl_object_attr_unlock(obj);
1309         if (result < 0)
1310                 return result;
1311         kms = attr->cat_kms;
1312         if (cl_offset(obj, index) >= kms)
1313                 /* catch race with truncate */
1314                 return 0;
1315         else if (cl_offset(obj, index + 1) > kms)
1316                 /* catch sub-page write at end of file */
1317                 return kms % PAGE_SIZE;
1318         else
1319                 return PAGE_SIZE;
1320 }
1321
1322 static int osc_completion(const struct lu_env *env, struct osc_async_page *oap,
1323                           int cmd, int rc)
1324 {
1325         struct osc_page *opg = oap2osc_page(oap);
1326         struct cl_page    *page = oap2cl_page(oap);
1327         struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
1328         enum cl_req_type crt;
1329         int srvlock;
1330
1331         cmd &= ~OBD_BRW_NOQUOTA;
1332         LASSERTF(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ),
1333                  "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
1334         LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
1335                  "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
1336         LASSERT(opg->ops_transfer_pinned);
1337
1338         /*
1339          * page->cp_req can be NULL if io submission failed before
1340          * cl_req was allocated.
1341          */
1342         if (page->cp_req)
1343                 cl_req_page_done(env, page);
1344         LASSERT(!page->cp_req);
1345
1346         crt = cmd == OBD_BRW_READ ? CRT_READ : CRT_WRITE;
1347         /* Clear opg->ops_transfer_pinned before VM lock is released. */
1348         opg->ops_transfer_pinned = 0;
1349
1350         spin_lock(&obj->oo_seatbelt);
1351         LASSERT(opg->ops_submitter);
1352         LASSERT(!list_empty(&opg->ops_inflight));
1353         list_del_init(&opg->ops_inflight);
1354         opg->ops_submitter = NULL;
1355         spin_unlock(&obj->oo_seatbelt);
1356
1357         opg->ops_submit_time = 0;
1358         srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
1359
1360         /* statistic */
1361         if (rc == 0 && srvlock) {
1362                 struct lu_device *ld = opg->ops_cl.cpl_obj->co_lu.lo_dev;
1363                 struct osc_stats *stats = &lu2osc_dev(ld)->od_stats;
1364                 int bytes = oap->oap_count;
1365
1366                 if (crt == CRT_READ)
1367                         stats->os_lockless_reads += bytes;
1368                 else
1369                         stats->os_lockless_writes += bytes;
1370         }
1371
1372         /*
1373          * This has to be the last operation with the page, as locks are
1374          * released in cl_page_completion() and nothing except for the
1375          * reference counter protects page from concurrent reclaim.
1376          */
1377         lu_ref_del(&page->cp_reference, "transfer", page);
1378
1379         cl_page_completion(env, page, crt, rc);
1380
1381         return 0;
1382 }
1383
1384 #define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do {                           \
1385         struct client_obd *__tmp = (cli);                                     \
1386         CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d "          \
1387                "unstable_pages: %d/%d dropped: %ld avail: %ld, "              \
1388                "reserved: %ld, flight: %d } lru {in list: %d, "               \
1389                "left: %d, waiters: %d }" fmt,                                 \
1390                __tmp->cl_import->imp_obd->obd_name,                           \
1391                __tmp->cl_dirty, __tmp->cl_dirty_max,                          \
1392                atomic_read(&obd_dirty_pages), obd_max_dirty_pages,            \
1393                atomic_read(&obd_unstable_pages), obd_max_dirty_pages,         \
1394                __tmp->cl_lost_grant, __tmp->cl_avail_grant,                   \
1395                __tmp->cl_reserved_grant, __tmp->cl_w_in_flight,               \
1396                atomic_read(&__tmp->cl_lru_in_list),                           \
1397                atomic_read(&__tmp->cl_lru_busy),                              \
1398                atomic_read(&__tmp->cl_lru_shrinkers), ##args);                \
1399 } while (0)
1400
1401 /* caller must hold loi_list_lock */
1402 static void osc_consume_write_grant(struct client_obd *cli,
1403                                     struct brw_page *pga)
1404 {
1405         assert_spin_locked(&cli->cl_loi_list_lock);
1406         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
1407         atomic_inc(&obd_dirty_pages);
1408         cli->cl_dirty += PAGE_SIZE;
1409         pga->flag |= OBD_BRW_FROM_GRANT;
1410         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
1411                PAGE_SIZE, pga, pga->pg);
1412         osc_update_next_shrink(cli);
1413 }
1414
1415 /* the companion to osc_consume_write_grant, called when a brw has completed.
1416  * must be called with the loi lock held.
1417  */
1418 static void osc_release_write_grant(struct client_obd *cli,
1419                                     struct brw_page *pga)
1420 {
1421         assert_spin_locked(&cli->cl_loi_list_lock);
1422         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
1423                 return;
1424         }
1425
1426         pga->flag &= ~OBD_BRW_FROM_GRANT;
1427         atomic_dec(&obd_dirty_pages);
1428         cli->cl_dirty -= PAGE_SIZE;
1429         if (pga->flag & OBD_BRW_NOCACHE) {
1430                 pga->flag &= ~OBD_BRW_NOCACHE;
1431                 atomic_dec(&obd_dirty_transit_pages);
1432                 cli->cl_dirty_transit -= PAGE_SIZE;
1433         }
1434 }
1435
1436 /**
1437  * To avoid sleeping with object lock held, it's good for us allocate enough
1438  * grants before entering into critical section.
1439  *
1440  * spin_lock held by caller
1441  */
1442 static int osc_reserve_grant(struct client_obd *cli, unsigned int bytes)
1443 {
1444         int rc = -EDQUOT;
1445
1446         if (cli->cl_avail_grant >= bytes) {
1447                 cli->cl_avail_grant -= bytes;
1448                 cli->cl_reserved_grant += bytes;
1449                 rc = 0;
1450         }
1451         return rc;
1452 }
1453
1454 static void __osc_unreserve_grant(struct client_obd *cli,
1455                                   unsigned int reserved, unsigned int unused)
1456 {
1457         /* it's quite normal for us to get more grant than reserved.
1458          * Thinking about a case that two extents merged by adding a new
1459          * chunk, we can save one extent tax. If extent tax is greater than
1460          * one chunk, we can save more grant by adding a new chunk
1461          */
1462         cli->cl_reserved_grant -= reserved;
1463         if (unused > reserved) {
1464                 cli->cl_avail_grant += reserved;
1465                 cli->cl_lost_grant  += unused - reserved;
1466         } else {
1467                 cli->cl_avail_grant += unused;
1468         }
1469 }
1470
1471 static void osc_unreserve_grant(struct client_obd *cli,
1472                                 unsigned int reserved, unsigned int unused)
1473 {
1474         spin_lock(&cli->cl_loi_list_lock);
1475         __osc_unreserve_grant(cli, reserved, unused);
1476         if (unused > 0)
1477                 osc_wake_cache_waiters(cli);
1478         spin_unlock(&cli->cl_loi_list_lock);
1479 }
1480
1481 /**
1482  * Free grant after IO is finished or canceled.
1483  *
1484  * @lost_grant is used to remember how many grants we have allocated but not
1485  * used, we should return these grants to OST. There're two cases where grants
1486  * can be lost:
1487  * 1. truncate;
1488  * 2. blocksize at OST is less than PAGE_SIZE and a partial page was
1489  *    written. In this case OST may use less chunks to serve this partial
1490  *    write. OSTs don't actually know the page size on the client side. so
1491  *    clients have to calculate lost grant by the blocksize on the OST.
1492  *    See filter_grant_check() for details.
1493  */
1494 static void osc_free_grant(struct client_obd *cli, unsigned int nr_pages,
1495                            unsigned int lost_grant)
1496 {
1497         int grant = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
1498
1499         spin_lock(&cli->cl_loi_list_lock);
1500         atomic_sub(nr_pages, &obd_dirty_pages);
1501         cli->cl_dirty -= nr_pages << PAGE_SHIFT;
1502         cli->cl_lost_grant += lost_grant;
1503         if (cli->cl_avail_grant < grant && cli->cl_lost_grant >= grant) {
1504                 /* borrow some grant from truncate to avoid the case that
1505                  * truncate uses up all avail grant
1506                  */
1507                 cli->cl_lost_grant -= grant;
1508                 cli->cl_avail_grant += grant;
1509         }
1510         osc_wake_cache_waiters(cli);
1511         spin_unlock(&cli->cl_loi_list_lock);
1512         CDEBUG(D_CACHE, "lost %u grant: %lu avail: %lu dirty: %lu\n",
1513                lost_grant, cli->cl_lost_grant,
1514                cli->cl_avail_grant, cli->cl_dirty);
1515 }
1516
1517 /**
1518  * The companion to osc_enter_cache(), called when @oap is no longer part of
1519  * the dirty accounting due to error.
1520  */
1521 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
1522 {
1523         spin_lock(&cli->cl_loi_list_lock);
1524         osc_release_write_grant(cli, &oap->oap_brw_page);
1525         spin_unlock(&cli->cl_loi_list_lock);
1526 }
1527
1528 /**
1529  * Non-blocking version of osc_enter_cache() that consumes grant only when it
1530  * is available.
1531  */
1532 static int osc_enter_cache_try(struct client_obd *cli,
1533                                struct osc_async_page *oap,
1534                                int bytes, int transient)
1535 {
1536         int rc;
1537
1538         OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
1539
1540         rc = osc_reserve_grant(cli, bytes);
1541         if (rc < 0)
1542                 return 0;
1543
1544         if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
1545             atomic_read(&obd_unstable_pages) + 1 +
1546             atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
1547                 osc_consume_write_grant(cli, &oap->oap_brw_page);
1548                 if (transient) {
1549                         cli->cl_dirty_transit += PAGE_SIZE;
1550                         atomic_inc(&obd_dirty_transit_pages);
1551                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
1552                 }
1553                 rc = 1;
1554         } else {
1555                 __osc_unreserve_grant(cli, bytes, bytes);
1556                 rc = 0;
1557         }
1558         return rc;
1559 }
1560
1561 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
1562 {
1563         int rc;
1564
1565         spin_lock(&cli->cl_loi_list_lock);
1566         rc = list_empty(&ocw->ocw_entry);
1567         spin_unlock(&cli->cl_loi_list_lock);
1568         return rc;
1569 }
1570
1571 /**
1572  * The main entry to reserve dirty page accounting. Usually the grant reserved
1573  * in this function will be freed in bulk in osc_free_grant() unless it fails
1574  * to add osc cache, in that case, it will be freed in osc_exit_cache().
1575  *
1576  * The process will be put into sleep if it's already run out of grant.
1577  */
1578 static int osc_enter_cache(const struct lu_env *env, struct client_obd *cli,
1579                            struct osc_async_page *oap, int bytes)
1580 {
1581         struct osc_object *osc = oap->oap_obj;
1582         struct lov_oinfo *loi = osc->oo_oinfo;
1583         struct osc_cache_waiter ocw;
1584         struct l_wait_info lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(600), NULL,
1585                                                   LWI_ON_SIGNAL_NOOP, NULL);
1586         int rc = -EDQUOT;
1587
1588         OSC_DUMP_GRANT(D_CACHE, cli, "need:%d.\n", bytes);
1589
1590         spin_lock(&cli->cl_loi_list_lock);
1591
1592         /* force the caller to try sync io.  this can jump the list
1593          * of queued writes and create a discontiguous rpc stream
1594          */
1595         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
1596             cli->cl_dirty_max < PAGE_SIZE     ||
1597             cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync) {
1598                 rc = -EDQUOT;
1599                 goto out;
1600         }
1601
1602         /* Hopefully normal case - cache space and write credits available */
1603         if (osc_enter_cache_try(cli, oap, bytes, 0)) {
1604                 rc = 0;
1605                 goto out;
1606         }
1607
1608         /* We can get here for two reasons: too many dirty pages in cache, or
1609          * run out of grants. In both cases we should write dirty pages out.
1610          * Adding a cache waiter will trigger urgent write-out no matter what
1611          * RPC size will be.
1612          * The exiting condition is no avail grants and no dirty pages caching,
1613          * that really means there is no space on the OST.
1614          */
1615         init_waitqueue_head(&ocw.ocw_waitq);
1616         ocw.ocw_oap   = oap;
1617         ocw.ocw_grant = bytes;
1618         while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
1619                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
1620                 ocw.ocw_rc = 0;
1621                 spin_unlock(&cli->cl_loi_list_lock);
1622
1623                 osc_io_unplug_async(env, cli, NULL);
1624
1625                 CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
1626                        cli->cl_import->imp_obd->obd_name, &ocw, oap);
1627
1628                 rc = l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
1629
1630                 spin_lock(&cli->cl_loi_list_lock);
1631
1632                 /* l_wait_event is interrupted by signal, or timed out */
1633                 if (rc < 0) {
1634                         if (rc == -ETIMEDOUT) {
1635                                 OSC_DUMP_GRANT(D_ERROR, cli,
1636                                                "try to reserve %d.\n", bytes);
1637                                 osc_extent_tree_dump(D_ERROR, osc);
1638                                 rc = -EDQUOT;
1639                         }
1640
1641                         list_del_init(&ocw.ocw_entry);
1642                         goto out;
1643                 }
1644
1645                 LASSERT(list_empty(&ocw.ocw_entry));
1646                 rc = ocw.ocw_rc;
1647
1648                 if (rc != -EDQUOT)
1649                         goto out;
1650                 if (osc_enter_cache_try(cli, oap, bytes, 0)) {
1651                         rc = 0;
1652                         goto out;
1653                 }
1654         }
1655 out:
1656         spin_unlock(&cli->cl_loi_list_lock);
1657         OSC_DUMP_GRANT(D_CACHE, cli, "returned %d.\n", rc);
1658         return rc;
1659 }
1660
1661 /* caller must hold loi_list_lock */
1662 void osc_wake_cache_waiters(struct client_obd *cli)
1663 {
1664         struct list_head *l, *tmp;
1665         struct osc_cache_waiter *ocw;
1666
1667         list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
1668                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
1669                 list_del_init(&ocw->ocw_entry);
1670
1671                 ocw->ocw_rc = -EDQUOT;
1672                 /* we can't dirty more */
1673                 if ((cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) ||
1674                     (atomic_read(&obd_unstable_pages) + 1 +
1675                      atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) {
1676                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld osc max %ld, sys max %d\n",
1677                                cli->cl_dirty,
1678                                cli->cl_dirty_max, obd_max_dirty_pages);
1679                         goto wakeup;
1680                 }
1681
1682                 ocw->ocw_rc = 0;
1683                 if (!osc_enter_cache_try(cli, ocw->ocw_oap, ocw->ocw_grant, 0))
1684                         ocw->ocw_rc = -EDQUOT;
1685
1686 wakeup:
1687                 CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
1688                        ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
1689
1690                 wake_up(&ocw->ocw_waitq);
1691         }
1692 }
1693
1694 static int osc_max_rpc_in_flight(struct client_obd *cli, struct osc_object *osc)
1695 {
1696         int hprpc = !!list_empty(&osc->oo_hp_exts);
1697
1698         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
1699 }
1700
1701 /* This maintains the lists of pending pages to read/write for a given object
1702  * (lop).  This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
1703  * to quickly find objects that are ready to send an RPC.
1704  */
1705 static int osc_makes_rpc(struct client_obd *cli, struct osc_object *osc,
1706                          int cmd)
1707 {
1708         int invalid_import = 0;
1709
1710         /* if we have an invalid import we want to drain the queued pages
1711          * by forcing them through rpcs that immediately fail and complete
1712          * the pages.  recovery relies on this to empty the queued pages
1713          * before canceling the locks and evicting down the llite pages
1714          */
1715         if (!cli->cl_import || cli->cl_import->imp_invalid)
1716                 invalid_import = 1;
1717
1718         if (cmd & OBD_BRW_WRITE) {
1719                 if (atomic_read(&osc->oo_nr_writes) == 0)
1720                         return 0;
1721                 if (invalid_import) {
1722                         CDEBUG(D_CACHE, "invalid import forcing RPC\n");
1723                         return 1;
1724                 }
1725                 if (!list_empty(&osc->oo_hp_exts)) {
1726                         CDEBUG(D_CACHE, "high prio request forcing RPC\n");
1727                         return 1;
1728                 }
1729                 if (!list_empty(&osc->oo_urgent_exts)) {
1730                         CDEBUG(D_CACHE, "urgent request forcing RPC\n");
1731                         return 1;
1732                 }
1733                 /* trigger a write rpc stream as long as there are dirtiers
1734                  * waiting for space.  as they're waiting, they're not going to
1735                  * create more pages to coalesce with what's waiting..
1736                  */
1737                 if (!list_empty(&cli->cl_cache_waiters)) {
1738                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
1739                         return 1;
1740                 }
1741                 if (atomic_read(&osc->oo_nr_writes) >=
1742                     cli->cl_max_pages_per_rpc)
1743                         return 1;
1744         } else {
1745                 if (atomic_read(&osc->oo_nr_reads) == 0)
1746                         return 0;
1747                 if (invalid_import) {
1748                         CDEBUG(D_CACHE, "invalid import forcing RPC\n");
1749                         return 1;
1750                 }
1751                 /* all read are urgent. */
1752                 if (!list_empty(&osc->oo_reading_exts))
1753                         return 1;
1754         }
1755
1756         return 0;
1757 }
1758
1759 static void osc_update_pending(struct osc_object *obj, int cmd, int delta)
1760 {
1761         struct client_obd *cli = osc_cli(obj);
1762
1763         if (cmd & OBD_BRW_WRITE) {
1764                 atomic_add(delta, &obj->oo_nr_writes);
1765                 atomic_add(delta, &cli->cl_pending_w_pages);
1766                 LASSERT(atomic_read(&obj->oo_nr_writes) >= 0);
1767         } else {
1768                 atomic_add(delta, &obj->oo_nr_reads);
1769                 atomic_add(delta, &cli->cl_pending_r_pages);
1770                 LASSERT(atomic_read(&obj->oo_nr_reads) >= 0);
1771         }
1772         OSC_IO_DEBUG(obj, "update pending cmd %d delta %d.\n", cmd, delta);
1773 }
1774
1775 static int osc_makes_hprpc(struct osc_object *obj)
1776 {
1777         return !list_empty(&obj->oo_hp_exts);
1778 }
1779
1780 static void on_list(struct list_head *item, struct list_head *list, int should_be_on)
1781 {
1782         if (list_empty(item) && should_be_on)
1783                 list_add_tail(item, list);
1784         else if (!list_empty(item) && !should_be_on)
1785                 list_del_init(item);
1786 }
1787
1788 /* maintain the osc's cli list membership invariants so that osc_send_oap_rpc
1789  * can find pages to build into rpcs quickly
1790  */
1791 static int __osc_list_maint(struct client_obd *cli, struct osc_object *osc)
1792 {
1793         if (osc_makes_hprpc(osc)) {
1794                 /* HP rpc */
1795                 on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list, 0);
1796                 on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
1797         } else {
1798                 on_list(&osc->oo_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
1799                 on_list(&osc->oo_ready_item, &cli->cl_loi_ready_list,
1800                         osc_makes_rpc(cli, osc, OBD_BRW_WRITE) ||
1801                         osc_makes_rpc(cli, osc, OBD_BRW_READ));
1802         }
1803
1804         on_list(&osc->oo_write_item, &cli->cl_loi_write_list,
1805                 atomic_read(&osc->oo_nr_writes) > 0);
1806
1807         on_list(&osc->oo_read_item, &cli->cl_loi_read_list,
1808                 atomic_read(&osc->oo_nr_reads) > 0);
1809
1810         return osc_is_ready(osc);
1811 }
1812
1813 static int osc_list_maint(struct client_obd *cli, struct osc_object *osc)
1814 {
1815         int is_ready;
1816
1817         spin_lock(&cli->cl_loi_list_lock);
1818         is_ready = __osc_list_maint(cli, osc);
1819         spin_unlock(&cli->cl_loi_list_lock);
1820
1821         return is_ready;
1822 }
1823
1824 /* this is trying to propagate async writeback errors back up to the
1825  * application.  As an async write fails we record the error code for later if
1826  * the app does an fsync.  As long as errors persist we force future rpcs to be
1827  * sync so that the app can get a sync error and break the cycle of queueing
1828  * pages for which writeback will fail.
1829  */
1830 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
1831                            int rc)
1832 {
1833         if (rc) {
1834                 if (!ar->ar_rc)
1835                         ar->ar_rc = rc;
1836
1837                 ar->ar_force_sync = 1;
1838                 ar->ar_min_xid = ptlrpc_sample_next_xid();
1839                 return;
1840         }
1841
1842         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
1843                 ar->ar_force_sync = 0;
1844 }
1845
1846 /**
1847  * Performs "unstable" page accounting. This function balances the
1848  * increment operations performed in osc_inc_unstable_pages. It is
1849  * registered as the RPC request callback, and is executed when the
1850  * bulk RPC is committed on the server. Thus at this point, the pages
1851  * involved in the bulk transfer are no longer considered unstable.
1852  */
1853 void osc_dec_unstable_pages(struct ptlrpc_request *req)
1854 {
1855         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
1856         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
1857         int page_count = desc->bd_iov_count;
1858         int i;
1859
1860         /* No unstable page tracking */
1861         if (!cli->cl_cache)
1862                 return;
1863
1864         LASSERT(page_count >= 0);
1865
1866         for (i = 0; i < page_count; i++)
1867                 dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
1868
1869         atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
1870         LASSERT(atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
1871
1872         atomic_sub(page_count, &cli->cl_unstable_count);
1873         LASSERT(atomic_read(&cli->cl_unstable_count) >= 0);
1874
1875         atomic_sub(page_count, &obd_unstable_pages);
1876         LASSERT(atomic_read(&obd_unstable_pages) >= 0);
1877
1878         spin_lock(&req->rq_lock);
1879         req->rq_committed = 1;
1880         req->rq_unstable  = 0;
1881         spin_unlock(&req->rq_lock);
1882
1883         wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
1884 }
1885
1886 /* "unstable" page accounting. See: osc_dec_unstable_pages. */
1887 void osc_inc_unstable_pages(struct ptlrpc_request *req)
1888 {
1889         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
1890         struct ptlrpc_bulk_desc *desc = req->rq_bulk;
1891         long page_count = desc->bd_iov_count;
1892         int i;
1893
1894         /* No unstable page tracking */
1895         if (!cli->cl_cache)
1896                 return;
1897
1898         LASSERT(page_count >= 0);
1899
1900         for (i = 0; i < page_count; i++)
1901                 inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
1902
1903         LASSERT(atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
1904         atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
1905
1906         LASSERT(atomic_read(&cli->cl_unstable_count) >= 0);
1907         atomic_add(page_count, &cli->cl_unstable_count);
1908
1909         LASSERT(atomic_read(&obd_unstable_pages) >= 0);
1910         atomic_add(page_count, &obd_unstable_pages);
1911
1912         spin_lock(&req->rq_lock);
1913
1914         /*
1915          * If the request has already been committed (i.e. brw_commit
1916          * called via rq_commit_cb), we need to undo the unstable page
1917          * increments we just performed because rq_commit_cb wont be
1918          * called again. Otherwise, just set the commit callback so the
1919          * unstable page accounting is properly updated when the request
1920          * is committed
1921          */
1922         if (req->rq_committed) {
1923                 /* Drop lock before calling osc_dec_unstable_pages */
1924                 spin_unlock(&req->rq_lock);
1925                 osc_dec_unstable_pages(req);
1926                 spin_lock(&req->rq_lock);
1927         } else {
1928                 req->rq_unstable = 1;
1929                 req->rq_commit_cb = osc_dec_unstable_pages;
1930         }
1931
1932         spin_unlock(&req->rq_lock);
1933 }
1934
1935 /* this must be called holding the loi list lock to give coverage to exit_cache,
1936  * async_flag maintenance, and oap_request
1937  */
1938 static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
1939                               struct osc_async_page *oap, int sent, int rc)
1940 {
1941         struct osc_object *osc = oap->oap_obj;
1942         struct lov_oinfo *loi = osc->oo_oinfo;
1943         __u64 xid = 0;
1944
1945         if (oap->oap_request) {
1946                 if (!rc)
1947                         osc_inc_unstable_pages(oap->oap_request);
1948
1949                 xid = ptlrpc_req_xid(oap->oap_request);
1950                 ptlrpc_req_finished(oap->oap_request);
1951                 oap->oap_request = NULL;
1952         }
1953
1954         /* As the transfer for this page is being done, clear the flags */
1955         spin_lock(&oap->oap_lock);
1956         oap->oap_async_flags = 0;
1957         spin_unlock(&oap->oap_lock);
1958         oap->oap_interrupted = 0;
1959
1960         if (oap->oap_cmd & OBD_BRW_WRITE && xid > 0) {
1961                 spin_lock(&cli->cl_loi_list_lock);
1962                 osc_process_ar(&cli->cl_ar, xid, rc);
1963                 osc_process_ar(&loi->loi_ar, xid, rc);
1964                 spin_unlock(&cli->cl_loi_list_lock);
1965         }
1966
1967         rc = osc_completion(env, oap, oap->oap_cmd, rc);
1968         if (rc)
1969                 CERROR("completion on oap %p obj %p returns %d.\n",
1970                        oap, osc, rc);
1971 }
1972
1973 /**
1974  * Try to add extent to one RPC. We need to think about the following things:
1975  * - # of pages must not be over max_pages_per_rpc
1976  * - extent must be compatible with previous ones
1977  */
1978 static int try_to_add_extent_for_io(struct client_obd *cli,
1979                                     struct osc_extent *ext, struct list_head *rpclist,
1980                                     int *pc, unsigned int *max_pages)
1981 {
1982         struct osc_extent *tmp;
1983         struct osc_async_page *oap = list_first_entry(&ext->oe_pages,
1984                                                       struct osc_async_page,
1985                                                       oap_pending_item);
1986
1987         EASSERT((ext->oe_state == OES_CACHE || ext->oe_state == OES_LOCK_DONE),
1988                 ext);
1989
1990         *max_pages = max(ext->oe_mppr, *max_pages);
1991         if (*pc + ext->oe_nr_pages > *max_pages)
1992                 return 0;
1993
1994         list_for_each_entry(tmp, rpclist, oe_link) {
1995                 struct osc_async_page *oap2;
1996
1997                 oap2 = list_first_entry(&tmp->oe_pages, struct osc_async_page,
1998                                         oap_pending_item);
1999                 EASSERT(tmp->oe_owner == current, tmp);
2000                 if (oap2cl_page(oap)->cp_type != oap2cl_page(oap2)->cp_type) {
2001                         CDEBUG(D_CACHE, "Do not permit different type of IO"
2002                                         " for a same RPC\n");
2003                         return 0;
2004                 }
2005
2006                 if (tmp->oe_srvlock != ext->oe_srvlock ||
2007                     !tmp->oe_grants != !ext->oe_grants)
2008                         return 0;
2009
2010                 /* remove break for strict check */
2011                 break;
2012         }
2013
2014         *pc += ext->oe_nr_pages;
2015         list_move_tail(&ext->oe_link, rpclist);
2016         ext->oe_owner = current;
2017         return 1;
2018 }
2019
2020 /**
2021  * In order to prevent multiple ptlrpcd from breaking contiguous extents,
2022  * get_write_extent() takes all appropriate extents in atomic.
2023  *
2024  * The following policy is used to collect extents for IO:
2025  * 1. Add as many HP extents as possible;
2026  * 2. Add the first urgent extent in urgent extent list and take it out of
2027  *    urgent list;
2028  * 3. Add subsequent extents of this urgent extent;
2029  * 4. If urgent list is not empty, goto 2;
2030  * 5. Traverse the extent tree from the 1st extent;
2031  * 6. Above steps exit if there is no space in this RPC.
2032  */
2033 static int get_write_extents(struct osc_object *obj, struct list_head *rpclist)
2034 {
2035         struct client_obd *cli = osc_cli(obj);
2036         struct osc_extent *ext;
2037         struct osc_extent *temp;
2038         int page_count = 0;
2039         unsigned int max_pages = cli->cl_max_pages_per_rpc;
2040
2041         LASSERT(osc_object_is_locked(obj));
2042         list_for_each_entry_safe(ext, temp, &obj->oo_hp_exts, oe_link) {
2043                 LASSERT(ext->oe_state == OES_CACHE);
2044                 if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
2045                                               &max_pages))
2046                         return page_count;
2047                 EASSERT(ext->oe_nr_pages <= max_pages, ext);
2048         }
2049         if (page_count == max_pages)
2050                 return page_count;
2051
2052         while (!list_empty(&obj->oo_urgent_exts)) {
2053                 ext = list_entry(obj->oo_urgent_exts.next,
2054                                  struct osc_extent, oe_link);
2055                 if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
2056                                               &max_pages))
2057                         return page_count;
2058
2059                 if (!ext->oe_intree)
2060                         continue;
2061
2062                 while ((ext = next_extent(ext)) != NULL) {
2063                         if ((ext->oe_state != OES_CACHE) ||
2064                             (!list_empty(&ext->oe_link) &&
2065                              ext->oe_owner))
2066                                 continue;
2067
2068                         if (!try_to_add_extent_for_io(cli, ext, rpclist,
2069                                                       &page_count, &max_pages))
2070                                 return page_count;
2071                 }
2072         }
2073         if (page_count == max_pages)
2074                 return page_count;
2075
2076         ext = first_extent(obj);
2077         while (ext) {
2078                 if ((ext->oe_state != OES_CACHE) ||
2079                     /* this extent may be already in current rpclist */
2080                     (!list_empty(&ext->oe_link) && ext->oe_owner)) {
2081                         ext = next_extent(ext);
2082                         continue;
2083                 }
2084
2085                 if (!try_to_add_extent_for_io(cli, ext, rpclist, &page_count,
2086                                               &max_pages))
2087                         return page_count;
2088
2089                 ext = next_extent(ext);
2090         }
2091         return page_count;
2092 }
2093
2094 static int
2095 osc_send_write_rpc(const struct lu_env *env, struct client_obd *cli,
2096                    struct osc_object *osc)
2097         __must_hold(osc)
2098 {
2099         LIST_HEAD(rpclist);
2100         struct osc_extent *ext;
2101         struct osc_extent *tmp;
2102         struct osc_extent *first = NULL;
2103         u32 page_count = 0;
2104         int srvlock = 0;
2105         int rc = 0;
2106
2107         LASSERT(osc_object_is_locked(osc));
2108
2109         page_count = get_write_extents(osc, &rpclist);
2110         LASSERT(equi(page_count == 0, list_empty(&rpclist)));
2111
2112         if (list_empty(&rpclist))
2113                 return 0;
2114
2115         osc_update_pending(osc, OBD_BRW_WRITE, -page_count);
2116
2117         list_for_each_entry(ext, &rpclist, oe_link) {
2118                 LASSERT(ext->oe_state == OES_CACHE ||
2119                         ext->oe_state == OES_LOCK_DONE);
2120                 if (ext->oe_state == OES_CACHE)
2121                         osc_extent_state_set(ext, OES_LOCKING);
2122                 else
2123                         osc_extent_state_set(ext, OES_RPC);
2124         }
2125
2126         /* we're going to grab page lock, so release object lock because
2127          * lock order is page lock -> object lock.
2128          */
2129         osc_object_unlock(osc);
2130
2131         list_for_each_entry_safe(ext, tmp, &rpclist, oe_link) {
2132                 if (ext->oe_state == OES_LOCKING) {
2133                         rc = osc_extent_make_ready(env, ext);
2134                         if (unlikely(rc < 0)) {
2135                                 list_del_init(&ext->oe_link);
2136                                 osc_extent_finish(env, ext, 0, rc);
2137                                 continue;
2138                         }
2139                 }
2140                 if (!first) {
2141                         first = ext;
2142                         srvlock = ext->oe_srvlock;
2143                 } else {
2144                         LASSERT(srvlock == ext->oe_srvlock);
2145                 }
2146         }
2147
2148         if (!list_empty(&rpclist)) {
2149                 LASSERT(page_count > 0);
2150                 rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_WRITE);
2151                 LASSERT(list_empty(&rpclist));
2152         }
2153
2154         osc_object_lock(osc);
2155         return rc;
2156 }
2157
2158 /**
2159  * prepare pages for ASYNC io and put pages in send queue.
2160  *
2161  * \param cmd OBD_BRW_* macroses
2162  * \param lop pending pages
2163  *
2164  * \return zero if no page added to send queue.
2165  * \return 1 if pages successfully added to send queue.
2166  * \return negative on errors.
2167  */
2168 static int
2169 osc_send_read_rpc(const struct lu_env *env, struct client_obd *cli,
2170                   struct osc_object *osc)
2171         __must_hold(osc)
2172 {
2173         struct osc_extent *ext;
2174         struct osc_extent *next;
2175         LIST_HEAD(rpclist);
2176         int page_count = 0;
2177         unsigned int max_pages = cli->cl_max_pages_per_rpc;
2178         int rc = 0;
2179
2180         LASSERT(osc_object_is_locked(osc));
2181         list_for_each_entry_safe(ext, next, &osc->oo_reading_exts, oe_link) {
2182                 EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
2183                 if (!try_to_add_extent_for_io(cli, ext, &rpclist, &page_count,
2184                                               &max_pages))
2185                         break;
2186                 osc_extent_state_set(ext, OES_RPC);
2187                 EASSERT(ext->oe_nr_pages <= max_pages, ext);
2188         }
2189         LASSERT(page_count <= max_pages);
2190
2191         osc_update_pending(osc, OBD_BRW_READ, -page_count);
2192
2193         if (!list_empty(&rpclist)) {
2194                 osc_object_unlock(osc);
2195
2196                 LASSERT(page_count > 0);
2197                 rc = osc_build_rpc(env, cli, &rpclist, OBD_BRW_READ);
2198                 LASSERT(list_empty(&rpclist));
2199
2200                 osc_object_lock(osc);
2201         }
2202         return rc;
2203 }
2204
2205 #define list_to_obj(list, item) ({                                            \
2206         struct list_head *__tmp = (list)->next;                               \
2207         list_del_init(__tmp);                                                 \
2208         list_entry(__tmp, struct osc_object, oo_##item);                      \
2209 })
2210
2211 /* This is called by osc_check_rpcs() to find which objects have pages that
2212  * we could be sending.  These lists are maintained by osc_makes_rpc().
2213  */
2214 static struct osc_object *osc_next_obj(struct client_obd *cli)
2215 {
2216         /* First return objects that have blocked locks so that they
2217          * will be flushed quickly and other clients can get the lock,
2218          * then objects which have pages ready to be stuffed into RPCs
2219          */
2220         if (!list_empty(&cli->cl_loi_hp_ready_list))
2221                 return list_to_obj(&cli->cl_loi_hp_ready_list, hp_ready_item);
2222         if (!list_empty(&cli->cl_loi_ready_list))
2223                 return list_to_obj(&cli->cl_loi_ready_list, ready_item);
2224
2225         /* then if we have cache waiters, return all objects with queued
2226          * writes.  This is especially important when many small files
2227          * have filled up the cache and not been fired into rpcs because
2228          * they don't pass the nr_pending/object threshold
2229          */
2230         if (!list_empty(&cli->cl_cache_waiters) &&
2231             !list_empty(&cli->cl_loi_write_list))
2232                 return list_to_obj(&cli->cl_loi_write_list, write_item);
2233
2234         /* then return all queued objects when we have an invalid import
2235          * so that they get flushed
2236          */
2237         if (!cli->cl_import || cli->cl_import->imp_invalid) {
2238                 if (!list_empty(&cli->cl_loi_write_list))
2239                         return list_to_obj(&cli->cl_loi_write_list, write_item);
2240                 if (!list_empty(&cli->cl_loi_read_list))
2241                         return list_to_obj(&cli->cl_loi_read_list, read_item);
2242         }
2243         return NULL;
2244 }
2245
2246 /* called with the loi list lock held */
2247 static void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2248         __must_hold(&cli->cl_loi_list_lock)
2249 {
2250         struct osc_object *osc;
2251         int rc = 0;
2252
2253         while ((osc = osc_next_obj(cli)) != NULL) {
2254                 struct cl_object *obj = osc2cl(osc);
2255                 struct lu_ref_link link;
2256
2257                 OSC_IO_DEBUG(osc, "%lu in flight\n", rpcs_in_flight(cli));
2258
2259                 if (osc_max_rpc_in_flight(cli, osc)) {
2260                         __osc_list_maint(cli, osc);
2261                         break;
2262                 }
2263
2264                 cl_object_get(obj);
2265                 spin_unlock(&cli->cl_loi_list_lock);
2266                 lu_object_ref_add_at(&obj->co_lu, &link, "check", current);
2267
2268                 /* attempt some read/write balancing by alternating between
2269                  * reads and writes in an object.  The makes_rpc checks here
2270                  * would be redundant if we were getting read/write work items
2271                  * instead of objects.  we don't want send_oap_rpc to drain a
2272                  * partial read pending queue when we're given this object to
2273                  * do io on writes while there are cache waiters
2274                  */
2275                 osc_object_lock(osc);
2276                 if (osc_makes_rpc(cli, osc, OBD_BRW_WRITE)) {
2277                         rc = osc_send_write_rpc(env, cli, osc);
2278                         if (rc < 0) {
2279                                 CERROR("Write request failed with %d\n", rc);
2280
2281                                 /* osc_send_write_rpc failed, mostly because of
2282                                  * memory pressure.
2283                                  *
2284                                  * It can't break here, because if:
2285                                  *  - a page was submitted by osc_io_submit, so
2286                                  *    page locked;
2287                                  *  - no request in flight
2288                                  *  - no subsequent request
2289                                  * The system will be in live-lock state,
2290                                  * because there is no chance to call
2291                                  * osc_io_unplug() and osc_check_rpcs() any
2292                                  * more. pdflush can't help in this case,
2293                                  * because it might be blocked at grabbing
2294                                  * the page lock as we mentioned.
2295                                  *
2296                                  * Anyway, continue to drain pages.
2297                                  */
2298                                 /* break; */
2299                         }
2300                 }
2301                 if (osc_makes_rpc(cli, osc, OBD_BRW_READ)) {
2302                         rc = osc_send_read_rpc(env, cli, osc);
2303                         if (rc < 0)
2304                                 CERROR("Read request failed with %d\n", rc);
2305                 }
2306                 osc_object_unlock(osc);
2307
2308                 osc_list_maint(cli, osc);
2309                 lu_object_ref_del_at(&obj->co_lu, &link, "check", current);
2310                 cl_object_put(env, obj);
2311
2312                 spin_lock(&cli->cl_loi_list_lock);
2313         }
2314 }
2315
2316 static int osc_io_unplug0(const struct lu_env *env, struct client_obd *cli,
2317                           struct osc_object *osc, int async)
2318 {
2319         int rc = 0;
2320
2321         if (osc && osc_list_maint(cli, osc) == 0)
2322                 return 0;
2323
2324         if (!async) {
2325                 /* disable osc_lru_shrink() temporarily to avoid
2326                  * potential stack overrun problem. LU-2859
2327                  */
2328                 atomic_inc(&cli->cl_lru_shrinkers);
2329                 spin_lock(&cli->cl_loi_list_lock);
2330                 osc_check_rpcs(env, cli);
2331                 spin_unlock(&cli->cl_loi_list_lock);
2332                 atomic_dec(&cli->cl_lru_shrinkers);
2333         } else {
2334                 CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli);
2335                 LASSERT(cli->cl_writeback_work);
2336                 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
2337         }
2338         return rc;
2339 }
2340
2341 static int osc_io_unplug_async(const struct lu_env *env,
2342                                struct client_obd *cli, struct osc_object *osc)
2343 {
2344         return osc_io_unplug0(env, cli, osc, 1);
2345 }
2346
2347 void osc_io_unplug(const struct lu_env *env, struct client_obd *cli,
2348                    struct osc_object *osc)
2349 {
2350         (void)osc_io_unplug0(env, cli, osc, 0);
2351 }
2352
2353 int osc_prep_async_page(struct osc_object *osc, struct osc_page *ops,
2354                         struct page *page, loff_t offset)
2355 {
2356         struct obd_export *exp = osc_export(osc);
2357         struct osc_async_page *oap = &ops->ops_oap;
2358
2359         if (!page)
2360                 return cfs_size_round(sizeof(*oap));
2361
2362         oap->oap_magic = OAP_MAGIC;
2363         oap->oap_cli = &exp->exp_obd->u.cli;
2364         oap->oap_obj = osc;
2365
2366         oap->oap_page = page;
2367         oap->oap_obj_off = offset;
2368         LASSERT(!(offset & ~PAGE_MASK));
2369
2370         if (capable(CFS_CAP_SYS_RESOURCE))
2371                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2372
2373         INIT_LIST_HEAD(&oap->oap_pending_item);
2374         INIT_LIST_HEAD(&oap->oap_rpc_item);
2375
2376         spin_lock_init(&oap->oap_lock);
2377         CDEBUG(D_INFO, "oap %p page %p obj off %llu\n",
2378                oap, page, oap->oap_obj_off);
2379         return 0;
2380 }
2381
2382 int osc_queue_async_io(const struct lu_env *env, struct cl_io *io,
2383                        struct osc_page *ops)
2384 {
2385         struct osc_io *oio = osc_env_io(env);
2386         struct osc_extent *ext = NULL;
2387         struct osc_async_page *oap = &ops->ops_oap;
2388         struct client_obd *cli = oap->oap_cli;
2389         struct osc_object *osc = oap->oap_obj;
2390         pgoff_t index;
2391         int grants = 0;
2392         int brw_flags = OBD_BRW_ASYNC;
2393         int cmd = OBD_BRW_WRITE;
2394         int need_release = 0;
2395         int rc = 0;
2396
2397         if (oap->oap_magic != OAP_MAGIC)
2398                 return -EINVAL;
2399
2400         if (!cli->cl_import || cli->cl_import->imp_invalid)
2401                 return -EIO;
2402
2403         if (!list_empty(&oap->oap_pending_item) ||
2404             !list_empty(&oap->oap_rpc_item))
2405                 return -EBUSY;
2406
2407         /* Set the OBD_BRW_SRVLOCK before the page is queued. */
2408         brw_flags |= ops->ops_srvlock ? OBD_BRW_SRVLOCK : 0;
2409         if (capable(CFS_CAP_SYS_RESOURCE)) {
2410                 brw_flags |= OBD_BRW_NOQUOTA;
2411                 cmd |= OBD_BRW_NOQUOTA;
2412         }
2413
2414         /* check if the file's owner/group is over quota */
2415         if (!(cmd & OBD_BRW_NOQUOTA)) {
2416                 struct cl_object *obj;
2417                 struct cl_attr *attr;
2418                 unsigned int qid[MAXQUOTAS];
2419
2420                 obj = cl_object_top(&osc->oo_cl);
2421                 attr = &osc_env_info(env)->oti_attr;
2422
2423                 cl_object_attr_lock(obj);
2424                 rc = cl_object_attr_get(env, obj, attr);
2425                 cl_object_attr_unlock(obj);
2426
2427                 qid[USRQUOTA] = attr->cat_uid;
2428                 qid[GRPQUOTA] = attr->cat_gid;
2429                 if (rc == 0 && osc_quota_chkdq(cli, qid) == NO_QUOTA)
2430                         rc = -EDQUOT;
2431                 if (rc)
2432                         return rc;
2433         }
2434
2435         if (osc_over_unstable_soft_limit(cli))
2436                 brw_flags |= OBD_BRW_SOFT_SYNC;
2437
2438         oap->oap_cmd = cmd;
2439         oap->oap_page_off = ops->ops_from;
2440         oap->oap_count = ops->ops_to - ops->ops_from;
2441         /*
2442          * No need to hold a lock here,
2443          * since this page is not in any list yet.
2444          */
2445         oap->oap_async_flags = 0;
2446         oap->oap_brw_flags = brw_flags;
2447
2448         OSC_IO_DEBUG(osc, "oap %p page %p added for cmd %d\n",
2449                      oap, oap->oap_page, oap->oap_cmd & OBD_BRW_RWMASK);
2450
2451         index = osc_index(oap2osc(oap));
2452
2453         /* Add this page into extent by the following steps:
2454          * 1. if there exists an active extent for this IO, mostly this page
2455          *    can be added to the active extent and sometimes we need to
2456          *    expand extent to accommodate this page;
2457          * 2. otherwise, a new extent will be allocated.
2458          */
2459
2460         ext = oio->oi_active;
2461         if (ext && ext->oe_start <= index && ext->oe_max_end >= index) {
2462                 /* one chunk plus extent overhead must be enough to write this
2463                  * page
2464                  */
2465                 grants = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
2466                 if (ext->oe_end >= index)
2467                         grants = 0;
2468
2469                 /* it doesn't need any grant to dirty this page */
2470                 spin_lock(&cli->cl_loi_list_lock);
2471                 rc = osc_enter_cache_try(cli, oap, grants, 0);
2472                 spin_unlock(&cli->cl_loi_list_lock);
2473                 if (rc == 0) { /* try failed */
2474                         grants = 0;
2475                         need_release = 1;
2476                 } else if (ext->oe_end < index) {
2477                         int tmp = grants;
2478                         /* try to expand this extent */
2479                         rc = osc_extent_expand(ext, index, &tmp);
2480                         if (rc < 0) {
2481                                 need_release = 1;
2482                                 /* don't free reserved grant */
2483                         } else {
2484                                 OSC_EXTENT_DUMP(D_CACHE, ext,
2485                                                 "expanded for %lu.\n", index);
2486                                 osc_unreserve_grant(cli, grants, tmp);
2487                                 grants = 0;
2488                         }
2489                 }
2490                 rc = 0;
2491         } else if (ext) {
2492                 /* index is located outside of active extent */
2493                 need_release = 1;
2494         }
2495         if (need_release) {
2496                 osc_extent_release(env, ext);
2497                 oio->oi_active = NULL;
2498                 ext = NULL;
2499         }
2500
2501         if (!ext) {
2502                 int tmp = (1 << cli->cl_chunkbits) + cli->cl_extent_tax;
2503
2504                 /* try to find new extent to cover this page */
2505                 LASSERT(!oio->oi_active);
2506                 /* we may have allocated grant for this page if we failed
2507                  * to expand the previous active extent.
2508                  */
2509                 LASSERT(ergo(grants > 0, grants >= tmp));
2510
2511                 rc = 0;
2512                 if (grants == 0) {
2513                         /* we haven't allocated grant for this page. */
2514                         rc = osc_enter_cache(env, cli, oap, tmp);
2515                         if (rc == 0)
2516                                 grants = tmp;
2517                 }
2518
2519                 tmp = grants;
2520                 if (rc == 0) {
2521                         ext = osc_extent_find(env, osc, index, &tmp);
2522                         if (IS_ERR(ext)) {
2523                                 LASSERT(tmp == grants);
2524                                 osc_exit_cache(cli, oap);
2525                                 rc = PTR_ERR(ext);
2526                                 ext = NULL;
2527                         } else {
2528                                 oio->oi_active = ext;
2529                         }
2530                 }
2531                 if (grants > 0)
2532                         osc_unreserve_grant(cli, grants, tmp);
2533         }
2534
2535         LASSERT(ergo(rc == 0, ext));
2536         if (ext) {
2537                 EASSERTF(ext->oe_end >= index && ext->oe_start <= index,
2538                          ext, "index = %lu.\n", index);
2539                 LASSERT((oap->oap_brw_flags & OBD_BRW_FROM_GRANT) != 0);
2540
2541                 osc_object_lock(osc);
2542                 if (ext->oe_nr_pages == 0)
2543                         ext->oe_srvlock = ops->ops_srvlock;
2544                 else
2545                         LASSERT(ext->oe_srvlock == ops->ops_srvlock);
2546                 ++ext->oe_nr_pages;
2547                 list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
2548                 osc_object_unlock(osc);
2549         }
2550         return rc;
2551 }
2552
2553 int osc_teardown_async_page(const struct lu_env *env,
2554                             struct osc_object *obj, struct osc_page *ops)
2555 {
2556         struct osc_async_page *oap = &ops->ops_oap;
2557         struct osc_extent *ext = NULL;
2558         int rc = 0;
2559
2560         LASSERT(oap->oap_magic == OAP_MAGIC);
2561
2562         CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
2563                oap, ops, osc_index(oap2osc(oap)));
2564
2565         osc_object_lock(obj);
2566         if (!list_empty(&oap->oap_rpc_item)) {
2567                 CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap);
2568                 rc = -EBUSY;
2569         } else if (!list_empty(&oap->oap_pending_item)) {
2570                 ext = osc_extent_lookup(obj, osc_index(oap2osc(oap)));
2571                 /* only truncated pages are allowed to be taken out.
2572                  * See osc_extent_truncate() and osc_cache_truncate_start()
2573                  * for details.
2574                  */
2575                 if (ext && ext->oe_state != OES_TRUNC) {
2576                         OSC_EXTENT_DUMP(D_ERROR, ext, "trunc at %lu.\n",
2577                                         osc_index(oap2osc(oap)));
2578                         rc = -EBUSY;
2579                 }
2580         }
2581         osc_object_unlock(obj);
2582         if (ext)
2583                 osc_extent_put(env, ext);
2584         return rc;
2585 }
2586
2587 /**
2588  * This is called when a page is picked up by kernel to write out.
2589  *
2590  * We should find out the corresponding extent and add the whole extent
2591  * into urgent list. The extent may be being truncated or used, handle it
2592  * carefully.
2593  */
2594 int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
2595                          struct osc_page *ops)
2596 {
2597         struct osc_extent *ext = NULL;
2598         struct osc_object *obj = cl2osc(ops->ops_cl.cpl_obj);
2599         struct cl_page *cp = ops->ops_cl.cpl_page;
2600         pgoff_t            index = osc_index(ops);
2601         struct osc_async_page *oap = &ops->ops_oap;
2602         bool unplug = false;
2603         int rc = 0;
2604
2605         osc_object_lock(obj);
2606         ext = osc_extent_lookup(obj, index);
2607         if (!ext) {
2608                 osc_extent_tree_dump(D_ERROR, obj);
2609                 LASSERTF(0, "page index %lu is NOT covered.\n", index);
2610         }
2611
2612         switch (ext->oe_state) {
2613         case OES_RPC:
2614         case OES_LOCK_DONE:
2615                 CL_PAGE_DEBUG(D_ERROR, env, cp, "flush an in-rpc page?\n");
2616                 LASSERT(0);
2617                 break;
2618         case OES_LOCKING:
2619                 /* If we know this extent is being written out, we should abort
2620                  * so that the writer can make this page ready. Otherwise, there
2621                  * exists a deadlock problem because other process can wait for
2622                  * page writeback bit holding page lock; and meanwhile in
2623                  * vvp_page_make_ready(), we need to grab page lock before
2624                  * really sending the RPC.
2625                  */
2626         case OES_TRUNC:
2627                 /* race with truncate, page will be redirtied */
2628         case OES_ACTIVE:
2629                 /* The extent is active so we need to abort and let the caller
2630                  * re-dirty the page. If we continued on here, and we were the
2631                  * one making the extent active, we could deadlock waiting for
2632                  * the page writeback to clear but it won't because the extent
2633                  * is active and won't be written out.
2634                  */
2635                 rc = -EAGAIN;
2636                 goto out;
2637         default:
2638                 break;
2639         }
2640
2641         rc = cl_page_prep(env, io, cp, CRT_WRITE);
2642         if (rc)
2643                 goto out;
2644
2645         spin_lock(&oap->oap_lock);
2646         oap->oap_async_flags |= ASYNC_READY|ASYNC_URGENT;
2647         spin_unlock(&oap->oap_lock);
2648
2649         if (memory_pressure_get())
2650                 ext->oe_memalloc = 1;
2651
2652         ext->oe_urgent = 1;
2653         if (ext->oe_state == OES_CACHE) {
2654                 OSC_EXTENT_DUMP(D_CACHE, ext,
2655                                 "flush page %p make it urgent.\n", oap);
2656                 if (list_empty(&ext->oe_link))
2657                         list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
2658                 unplug = true;
2659         }
2660         rc = 0;
2661
2662 out:
2663         osc_object_unlock(obj);
2664         osc_extent_put(env, ext);
2665         if (unplug)
2666                 osc_io_unplug_async(env, osc_cli(obj), obj);
2667         return rc;
2668 }
2669
2670 /**
2671  * this is called when a sync waiter receives an interruption.  Its job is to
2672  * get the caller woken as soon as possible.  If its page hasn't been put in an
2673  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2674  * desiring interruption which will forcefully complete the rpc once the rpc
2675  * has timed out.
2676  */
2677 int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops)
2678 {
2679         struct osc_async_page *oap = &ops->ops_oap;
2680         struct osc_object *obj = oap->oap_obj;
2681         struct client_obd *cli = osc_cli(obj);
2682         struct osc_extent *ext;
2683         struct osc_extent *found = NULL;
2684         struct list_head *plist;
2685         pgoff_t index = osc_index(ops);
2686         int rc = -EBUSY;
2687         int cmd;
2688
2689         LASSERT(!oap->oap_interrupted);
2690         oap->oap_interrupted = 1;
2691
2692         /* Find out the caching extent */
2693         osc_object_lock(obj);
2694         if (oap->oap_cmd & OBD_BRW_WRITE) {
2695                 plist = &obj->oo_urgent_exts;
2696                 cmd = OBD_BRW_WRITE;
2697         } else {
2698                 plist = &obj->oo_reading_exts;
2699                 cmd = OBD_BRW_READ;
2700         }
2701         list_for_each_entry(ext, plist, oe_link) {
2702                 if (ext->oe_start <= index && ext->oe_end >= index) {
2703                         LASSERT(ext->oe_state == OES_LOCK_DONE);
2704                         /* For OES_LOCK_DONE state extent, it has already held
2705                          * a refcount for RPC.
2706                          */
2707                         found = osc_extent_get(ext);
2708                         break;
2709                 }
2710         }
2711         if (found) {
2712                 list_del_init(&found->oe_link);
2713                 osc_update_pending(obj, cmd, -found->oe_nr_pages);
2714                 osc_object_unlock(obj);
2715
2716                 osc_extent_finish(env, found, 0, -EINTR);
2717                 osc_extent_put(env, found);
2718                 rc = 0;
2719         } else {
2720                 osc_object_unlock(obj);
2721                 /* ok, it's been put in an rpc. only one oap gets a request
2722                  * reference
2723                  */
2724                 if (oap->oap_request) {
2725                         ptlrpc_mark_interrupted(oap->oap_request);
2726                         ptlrpcd_wake(oap->oap_request);
2727                         ptlrpc_req_finished(oap->oap_request);
2728                         oap->oap_request = NULL;
2729                 }
2730         }
2731
2732         osc_list_maint(cli, obj);
2733         return rc;
2734 }
2735
2736 int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
2737                          struct list_head *list, int cmd, int brw_flags)
2738 {
2739         struct client_obd *cli = osc_cli(obj);
2740         struct osc_extent *ext;
2741         struct osc_async_page *oap, *tmp;
2742         int page_count = 0;
2743         int mppr = cli->cl_max_pages_per_rpc;
2744         pgoff_t start = CL_PAGE_EOF;
2745         pgoff_t end = 0;
2746
2747         list_for_each_entry(oap, list, oap_pending_item) {
2748                 pgoff_t index = osc_index(oap2osc(oap));
2749
2750                 if (index > end)
2751                         end = index;
2752                 if (index < start)
2753                         start = index;
2754                 ++page_count;
2755                 mppr <<= (page_count > mppr);
2756         }
2757
2758         ext = osc_extent_alloc(obj);
2759         if (!ext) {
2760                 list_for_each_entry_safe(oap, tmp, list, oap_pending_item) {
2761                         list_del_init(&oap->oap_pending_item);
2762                         osc_ap_completion(env, cli, oap, 0, -ENOMEM);
2763                 }
2764                 return -ENOMEM;
2765         }
2766
2767         ext->oe_rw = !!(cmd & OBD_BRW_READ);
2768         ext->oe_sync = 1;
2769         ext->oe_urgent = 1;
2770         ext->oe_start = start;
2771         ext->oe_end = end;
2772         ext->oe_max_end = end;
2773         ext->oe_obj = obj;
2774         ext->oe_srvlock = !!(brw_flags & OBD_BRW_SRVLOCK);
2775         ext->oe_nr_pages = page_count;
2776         ext->oe_mppr = mppr;
2777         list_splice_init(list, &ext->oe_pages);
2778
2779         osc_object_lock(obj);
2780         /* Reuse the initial refcount for RPC, don't drop it */
2781         osc_extent_state_set(ext, OES_LOCK_DONE);
2782         if (cmd & OBD_BRW_WRITE) {
2783                 list_add_tail(&ext->oe_link, &obj->oo_urgent_exts);
2784                 osc_update_pending(obj, OBD_BRW_WRITE, page_count);
2785         } else {
2786                 list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
2787                 osc_update_pending(obj, OBD_BRW_READ, page_count);
2788         }
2789         osc_object_unlock(obj);
2790
2791         osc_io_unplug_async(env, cli, obj);
2792         return 0;
2793 }
2794
2795 /**
2796  * Called by osc_io_setattr_start() to freeze and destroy covering extents.
2797  */
2798 int osc_cache_truncate_start(const struct lu_env *env, struct osc_io *oio,
2799                              struct osc_object *obj, __u64 size)
2800 {
2801         struct client_obd *cli = osc_cli(obj);
2802         struct osc_extent *ext;
2803         struct osc_extent *temp;
2804         struct osc_extent *waiting = NULL;
2805         pgoff_t index;
2806         LIST_HEAD(list);
2807         int result = 0;
2808         bool partial;
2809
2810         /* pages with index greater or equal to index will be truncated. */
2811         index = cl_index(osc2cl(obj), size);
2812         partial = size > cl_offset(osc2cl(obj), index);
2813
2814 again:
2815         osc_object_lock(obj);
2816         ext = osc_extent_search(obj, index);
2817         if (!ext)
2818                 ext = first_extent(obj);
2819         else if (ext->oe_end < index)
2820                 ext = next_extent(ext);
2821         while (ext) {
2822                 EASSERT(ext->oe_state != OES_TRUNC, ext);
2823
2824                 if (ext->oe_state > OES_CACHE || ext->oe_urgent) {
2825                         /* if ext is in urgent state, it means there must exist
2826                          * a page already having been flushed by write_page().
2827                          * We have to wait for this extent because we can't
2828                          * truncate that page.
2829                          */
2830                         LASSERT(!ext->oe_hp);
2831                         OSC_EXTENT_DUMP(D_CACHE, ext,
2832                                         "waiting for busy extent\n");
2833                         waiting = osc_extent_get(ext);
2834                         break;
2835                 }
2836
2837                 OSC_EXTENT_DUMP(D_CACHE, ext, "try to trunc:%llu.\n", size);
2838
2839                 osc_extent_get(ext);
2840                 if (ext->oe_state == OES_ACTIVE) {
2841                         /* though we grab inode mutex for write path, but we
2842                          * release it before releasing extent(in osc_io_end()),
2843                          * so there is a race window that an extent is still
2844                          * in OES_ACTIVE when truncate starts.
2845                          */
2846                         LASSERT(!ext->oe_trunc_pending);
2847                         ext->oe_trunc_pending = 1;
2848                 } else {
2849                         EASSERT(ext->oe_state == OES_CACHE, ext);
2850                         osc_extent_state_set(ext, OES_TRUNC);
2851                         osc_update_pending(obj, OBD_BRW_WRITE,
2852                                            -ext->oe_nr_pages);
2853                 }
2854                 EASSERT(list_empty(&ext->oe_link), ext);
2855                 list_add_tail(&ext->oe_link, &list);
2856
2857                 ext = next_extent(ext);
2858         }
2859         osc_object_unlock(obj);
2860
2861         osc_list_maint(cli, obj);
2862
2863         list_for_each_entry_safe(ext, temp, &list, oe_link) {
2864                 int rc;
2865
2866                 list_del_init(&ext->oe_link);
2867
2868                 /* extent may be in OES_ACTIVE state because inode mutex
2869                  * is released before osc_io_end() in file write case
2870                  */
2871                 if (ext->oe_state != OES_TRUNC)
2872                         osc_extent_wait(env, ext, OES_TRUNC);
2873
2874                 rc = osc_extent_truncate(ext, index, partial);
2875                 if (rc < 0) {
2876                         if (result == 0)
2877                                 result = rc;
2878
2879                         OSC_EXTENT_DUMP(D_ERROR, ext,
2880                                         "truncate error %d\n", rc);
2881                 } else if (ext->oe_nr_pages == 0) {
2882                         osc_extent_remove(ext);
2883                 } else {
2884                         /* this must be an overlapped extent which means only
2885                          * part of pages in this extent have been truncated.
2886                          */
2887                         EASSERTF(ext->oe_start <= index, ext,
2888                                  "trunc index = %lu/%d.\n", index, partial);
2889                         /* fix index to skip this partially truncated extent */
2890                         index = ext->oe_end + 1;
2891                         partial = false;
2892
2893                         /* we need to hold this extent in OES_TRUNC state so
2894                          * that no writeback will happen. This is to avoid
2895                          * BUG 17397.
2896                          */
2897                         LASSERT(!oio->oi_trunc);
2898                         oio->oi_trunc = osc_extent_get(ext);
2899                         OSC_EXTENT_DUMP(D_CACHE, ext,
2900                                         "trunc at %llu\n", size);
2901                 }
2902                 osc_extent_put(env, ext);
2903         }
2904         if (waiting) {
2905                 int rc;
2906
2907                 /* ignore the result of osc_extent_wait the write initiator
2908                  * should take care of it.
2909                  */
2910                 rc = osc_extent_wait(env, waiting, OES_INV);
2911                 if (rc < 0)
2912                         OSC_EXTENT_DUMP(D_CACHE, waiting, "error: %d.\n", rc);
2913
2914                 osc_extent_put(env, waiting);
2915                 waiting = NULL;
2916                 goto again;
2917         }
2918         return result;
2919 }
2920
2921 /**
2922  * Called after osc_io_setattr_end to add oio->oi_trunc back to cache.
2923  */
2924 void osc_cache_truncate_end(const struct lu_env *env, struct osc_io *oio,
2925                             struct osc_object *obj)
2926 {
2927         struct osc_extent *ext = oio->oi_trunc;
2928
2929         oio->oi_trunc = NULL;
2930         if (ext) {
2931                 bool unplug = false;
2932
2933                 EASSERT(ext->oe_nr_pages > 0, ext);
2934                 EASSERT(ext->oe_state == OES_TRUNC, ext);
2935                 EASSERT(!ext->oe_urgent, ext);
2936
2937                 OSC_EXTENT_DUMP(D_CACHE, ext, "trunc -> cache.\n");
2938                 osc_object_lock(obj);
2939                 osc_extent_state_set(ext, OES_CACHE);
2940                 if (ext->oe_fsync_wait && !ext->oe_urgent) {
2941                         ext->oe_urgent = 1;
2942                         list_move_tail(&ext->oe_link, &obj->oo_urgent_exts);
2943                         unplug = true;
2944                 }
2945                 osc_update_pending(obj, OBD_BRW_WRITE, ext->oe_nr_pages);
2946                 osc_object_unlock(obj);
2947                 osc_extent_put(env, ext);
2948
2949                 if (unplug)
2950                         osc_io_unplug_async(env, osc_cli(obj), obj);
2951         }
2952 }
2953
2954 /**
2955  * Wait for extents in a specific range to be written out.
2956  * The caller must have called osc_cache_writeback_range() to issue IO
2957  * otherwise it will take a long time for this function to finish.
2958  *
2959  * Caller must hold inode_mutex , or cancel exclusive dlm lock so that
2960  * nobody else can dirty this range of file while we're waiting for
2961  * extents to be written.
2962  */
2963 int osc_cache_wait_range(const struct lu_env *env, struct osc_object *obj,
2964                          pgoff_t start, pgoff_t end)
2965 {
2966         struct osc_extent *ext;
2967         pgoff_t index = start;
2968         int result = 0;
2969
2970 again:
2971         osc_object_lock(obj);
2972         ext = osc_extent_search(obj, index);
2973         if (!ext)
2974                 ext = first_extent(obj);
2975         else if (ext->oe_end < index)
2976                 ext = next_extent(ext);
2977         while (ext) {
2978                 int rc;
2979
2980                 if (ext->oe_start > end)
2981                         break;
2982
2983                 if (!ext->oe_fsync_wait) {
2984                         ext = next_extent(ext);
2985                         continue;
2986                 }
2987
2988                 EASSERT(ergo(ext->oe_state == OES_CACHE,
2989                              ext->oe_hp || ext->oe_urgent), ext);
2990                 EASSERT(ergo(ext->oe_state == OES_ACTIVE,
2991                              !ext->oe_hp && ext->oe_urgent), ext);
2992
2993                 index = ext->oe_end + 1;
2994                 osc_extent_get(ext);
2995                 osc_object_unlock(obj);
2996
2997                 rc = osc_extent_wait(env, ext, OES_INV);
2998                 if (result == 0)
2999                         result = rc;
3000                 osc_extent_put(env, ext);
3001                 goto again;
3002         }
3003         osc_object_unlock(obj);
3004
3005         OSC_IO_DEBUG(obj, "sync file range.\n");
3006         return result;
3007 }
3008
3009 /**
3010  * Called to write out a range of osc object.
3011  *
3012  * @hp     : should be set this is caused by lock cancel;
3013  * @discard: is set if dirty pages should be dropped - file will be deleted or
3014  *         truncated, this implies there is no partially discarding extents.
3015  *
3016  * Return how many pages will be issued, or error code if error occurred.
3017  */
3018 int osc_cache_writeback_range(const struct lu_env *env, struct osc_object *obj,
3019                               pgoff_t start, pgoff_t end, int hp, int discard)
3020 {
3021         struct osc_extent *ext;
3022         LIST_HEAD(discard_list);
3023         bool unplug = false;
3024         int result = 0;
3025
3026         osc_object_lock(obj);
3027         ext = osc_extent_search(obj, start);
3028         if (!ext)
3029                 ext = first_extent(obj);
3030         else if (ext->oe_end < start)
3031                 ext = next_extent(ext);
3032         while (ext) {
3033                 if (ext->oe_start > end)
3034                         break;
3035
3036                 ext->oe_fsync_wait = 1;
3037                 switch (ext->oe_state) {
3038                 case OES_CACHE:
3039                         result += ext->oe_nr_pages;
3040                         if (!discard) {
3041                                 struct list_head *list = NULL;
3042
3043                                 if (hp) {
3044                                         EASSERT(!ext->oe_hp, ext);
3045                                         ext->oe_hp = 1;
3046                                         list = &obj->oo_hp_exts;
3047                                 } else if (!ext->oe_urgent) {
3048                                         ext->oe_urgent = 1;
3049                                         list = &obj->oo_urgent_exts;
3050                                 }
3051                                 if (list)
3052                                         list_move_tail(&ext->oe_link, list);
3053                                 unplug = true;
3054                         } else {
3055                                 /* the only discarder is lock cancelling, so
3056                                  * [start, end] must contain this extent
3057                                  */
3058                                 EASSERT(ext->oe_start >= start &&
3059                                         ext->oe_max_end <= end, ext);
3060                                 osc_extent_state_set(ext, OES_LOCKING);
3061                                 ext->oe_owner = current;
3062                                 list_move_tail(&ext->oe_link, &discard_list);
3063                                 osc_update_pending(obj, OBD_BRW_WRITE,
3064                                                    -ext->oe_nr_pages);
3065                         }
3066                         break;
3067                 case OES_ACTIVE:
3068                         /* It's pretty bad to wait for ACTIVE extents, because
3069                          * we don't know how long we will wait for it to be
3070                          * flushed since it may be blocked at awaiting more
3071                          * grants. We do this for the correctness of fsync.
3072                          */
3073                         LASSERT(hp == 0 && discard == 0);
3074                         ext->oe_urgent = 1;
3075                         break;
3076                 case OES_TRUNC:
3077                         /* this extent is being truncated, can't do anything
3078                          * for it now. it will be set to urgent after truncate
3079                          * is finished in osc_cache_truncate_end().
3080                          */
3081                 default:
3082                         break;
3083                 }
3084                 ext = next_extent(ext);
3085         }
3086         osc_object_unlock(obj);
3087
3088         LASSERT(ergo(!discard, list_empty(&discard_list)));
3089         if (!list_empty(&discard_list)) {
3090                 struct osc_extent *tmp;
3091                 int rc;
3092
3093                 osc_list_maint(osc_cli(obj), obj);
3094                 list_for_each_entry_safe(ext, tmp, &discard_list, oe_link) {
3095                         list_del_init(&ext->oe_link);
3096                         EASSERT(ext->oe_state == OES_LOCKING, ext);
3097
3098                         /* Discard caching pages. We don't actually write this
3099                          * extent out but we complete it as if we did.
3100                          */
3101                         rc = osc_extent_make_ready(env, ext);
3102                         if (unlikely(rc < 0)) {
3103                                 OSC_EXTENT_DUMP(D_ERROR, ext,
3104                                                 "make_ready returned %d\n", rc);
3105                                 if (result >= 0)
3106                                         result = rc;
3107                         }
3108
3109                         /* finish the extent as if the pages were sent */
3110                         osc_extent_finish(env, ext, 0, 0);
3111                 }
3112         }
3113
3114         if (unplug)
3115                 osc_io_unplug(env, osc_cli(obj), obj);
3116
3117         if (hp || discard) {
3118                 int rc;
3119
3120                 rc = osc_cache_wait_range(env, obj, start, end);
3121                 if (result >= 0 && rc < 0)
3122                         result = rc;
3123         }
3124
3125         OSC_IO_DEBUG(obj, "pageout [%lu, %lu], %d.\n", start, end, result);
3126         return result;
3127 }
3128
3129 /**
3130  * Returns a list of pages by a given [start, end] of \a obj.
3131  *
3132  * \param resched If not NULL, then we give up before hogging CPU for too
3133  * long and set *resched = 1, in that case caller should implement a retry
3134  * logic.
3135  *
3136  * Gang tree lookup (radix_tree_gang_lookup()) optimization is absolutely
3137  * crucial in the face of [offset, EOF] locks.
3138  *
3139  * Return at least one page in @queue unless there is no covered page.
3140  */
3141 int osc_page_gang_lookup(const struct lu_env *env, struct cl_io *io,
3142                          struct osc_object *osc, pgoff_t start, pgoff_t end,
3143                          osc_page_gang_cbt cb, void *cbdata)
3144 {
3145         struct osc_page *ops;
3146         void            **pvec;
3147         pgoff_t         idx;
3148         unsigned int    nr;
3149         unsigned int    i;
3150         unsigned int    j;
3151         int             res = CLP_GANG_OKAY;
3152         bool            tree_lock = true;
3153
3154         idx = start;
3155         pvec = osc_env_info(env)->oti_pvec;
3156         spin_lock(&osc->oo_tree_lock);
3157         while ((nr = radix_tree_gang_lookup(&osc->oo_tree, pvec,
3158                                             idx, OTI_PVEC_SIZE)) > 0) {
3159                 struct cl_page *page;
3160                 bool end_of_region = false;
3161
3162                 for (i = 0, j = 0; i < nr; ++i) {
3163                         ops = pvec[i];
3164                         pvec[i] = NULL;
3165
3166                         idx = osc_index(ops);
3167                         if (idx > end) {
3168                                 end_of_region = true;
3169                                 break;
3170                         }
3171
3172                         page = ops->ops_cl.cpl_page;
3173                         LASSERT(page->cp_type == CPT_CACHEABLE);
3174                         if (page->cp_state == CPS_FREEING)
3175                                 continue;
3176
3177                         cl_page_get(page);
3178                         lu_ref_add_atomic(&page->cp_reference,
3179                                           "gang_lookup", current);
3180                         pvec[j++] = ops;
3181                 }
3182                 ++idx;
3183
3184                 /*
3185                  * Here a delicate locking dance is performed. Current thread
3186                  * holds a reference to a page, but has to own it before it
3187                  * can be placed into queue. Owning implies waiting, so
3188                  * radix-tree lock is to be released. After a wait one has to
3189                  * check that pages weren't truncated (cl_page_own() returns
3190                  * error in the latter case).
3191                  */
3192                 spin_unlock(&osc->oo_tree_lock);
3193                 tree_lock = false;
3194
3195                 for (i = 0; i < j; ++i) {
3196                         ops = pvec[i];
3197                         if (res == CLP_GANG_OKAY)
3198                                 res = (*cb)(env, io, ops, cbdata);
3199
3200                         page = ops->ops_cl.cpl_page;
3201                         lu_ref_del(&page->cp_reference, "gang_lookup", current);
3202                         cl_page_put(env, page);
3203                 }
3204                 if (nr < OTI_PVEC_SIZE || end_of_region)
3205                         break;
3206
3207                 if (res == CLP_GANG_OKAY && need_resched())
3208                         res = CLP_GANG_RESCHED;
3209                 if (res != CLP_GANG_OKAY)
3210                         break;
3211
3212                 spin_lock(&osc->oo_tree_lock);
3213                 tree_lock = true;
3214         }
3215         if (tree_lock)
3216                 spin_unlock(&osc->oo_tree_lock);
3217         return res;
3218 }
3219
3220 /**
3221  * Check if page @page is covered by an extra lock or discard it.
3222  */
3223 static int check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
3224                                 struct osc_page *ops, void *cbdata)
3225 {
3226         struct osc_thread_info *info = osc_env_info(env);
3227         struct osc_object *osc = cbdata;
3228         pgoff_t index;
3229
3230         index = osc_index(ops);
3231         if (index >= info->oti_fn_index) {
3232                 struct ldlm_lock *tmp;
3233                 struct cl_page *page = ops->ops_cl.cpl_page;
3234
3235                 /* refresh non-overlapped index */
3236                 tmp = osc_dlmlock_at_pgoff(env, osc, index, 0, 0);
3237                 if (tmp) {
3238                         __u64 end = tmp->l_policy_data.l_extent.end;
3239                         /* Cache the first-non-overlapped index so as to skip
3240                          * all pages within [index, oti_fn_index). This is safe
3241                          * because if tmp lock is canceled, it will discard
3242                          * these pages.
3243                          */
3244                         info->oti_fn_index = cl_index(osc2cl(osc), end + 1);
3245                         if (end == OBD_OBJECT_EOF)
3246                                 info->oti_fn_index = CL_PAGE_EOF;
3247                         LDLM_LOCK_PUT(tmp);
3248                 } else if (cl_page_own(env, io, page) == 0) {
3249                         /* discard the page */
3250                         cl_page_discard(env, io, page);
3251                         cl_page_disown(env, io, page);
3252                 } else {
3253                         LASSERT(page->cp_state == CPS_FREEING);
3254                 }
3255         }
3256
3257         info->oti_next_index = index + 1;
3258         return CLP_GANG_OKAY;
3259 }
3260
3261 static int discard_cb(const struct lu_env *env, struct cl_io *io,
3262                       struct osc_page *ops, void *cbdata)
3263 {
3264         struct osc_thread_info *info = osc_env_info(env);
3265         struct cl_page *page = ops->ops_cl.cpl_page;
3266
3267         /* page is top page. */
3268         info->oti_next_index = osc_index(ops) + 1;
3269         if (cl_page_own(env, io, page) == 0) {
3270                 KLASSERT(ergo(page->cp_type == CPT_CACHEABLE,
3271                               !PageDirty(cl_page_vmpage(page))));
3272
3273                 /* discard the page */
3274                 cl_page_discard(env, io, page);
3275                 cl_page_disown(env, io, page);
3276         } else {
3277                 LASSERT(page->cp_state == CPS_FREEING);
3278         }
3279
3280         return CLP_GANG_OKAY;
3281 }
3282
3283 /**
3284  * Discard pages protected by the given lock. This function traverses radix
3285  * tree to find all covering pages and discard them. If a page is being covered
3286  * by other locks, it should remain in cache.
3287  *
3288  * If error happens on any step, the process continues anyway (the reasoning
3289  * behind this being that lock cancellation cannot be delayed indefinitely).
3290  */
3291 int osc_lock_discard_pages(const struct lu_env *env, struct osc_object *osc,
3292                            pgoff_t start, pgoff_t end, enum cl_lock_mode mode)
3293 {
3294         struct osc_thread_info *info = osc_env_info(env);
3295         struct cl_io *io = &info->oti_io;
3296         osc_page_gang_cbt cb;
3297         int res;
3298         int result;
3299
3300         io->ci_obj = cl_object_top(osc2cl(osc));
3301         io->ci_ignore_layout = 1;
3302         result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
3303         if (result != 0)
3304                 goto out;
3305
3306         cb = mode == CLM_READ ? check_and_discard_cb : discard_cb;
3307         info->oti_fn_index = start;
3308         info->oti_next_index = start;
3309         do {
3310                 res = osc_page_gang_lookup(env, io, osc,
3311                                            info->oti_next_index, end, cb, osc);
3312                 if (info->oti_next_index > end)
3313                         break;
3314
3315                 if (res == CLP_GANG_RESCHED)
3316                         cond_resched();
3317         } while (res != CLP_GANG_OKAY);
3318 out:
3319         cl_io_fini(env, io);
3320         return result;
3321 }
3322
3323 /** @} osc */