]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - net/ceph/messenger.c
libceph: validate con->state at the top of try_write()
[linux.git] / net / ceph / messenger.c
index 8a4d3758030b73d3b9ca24b09f91b1e65be0844e..3b3d33ea9ed830373c08889f3e38d7bc695f313b 100644 (file)
@@ -277,7 +277,7 @@ static void _ceph_msgr_exit(void)
        ceph_msgr_slab_exit();
 }
 
-int ceph_msgr_init(void)
+int __init ceph_msgr_init(void)
 {
        if (ceph_msgr_slab_init())
                return -ENOMEM;
@@ -299,7 +299,6 @@ int ceph_msgr_init(void)
 
        return -ENOMEM;
 }
-EXPORT_SYMBOL(ceph_msgr_init);
 
 void ceph_msgr_exit(void)
 {
@@ -307,7 +306,6 @@ void ceph_msgr_exit(void)
 
        _ceph_msgr_exit();
 }
-EXPORT_SYMBOL(ceph_msgr_exit);
 
 void ceph_msgr_flush(void)
 {
@@ -839,93 +837,112 @@ static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor *cursor,
                                        size_t length)
 {
        struct ceph_msg_data *data = cursor->data;
-       struct bio *bio;
+       struct ceph_bio_iter *it = &cursor->bio_iter;
 
-       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+       cursor->resid = min_t(size_t, length, data->bio_length);
+       *it = data->bio_pos;
+       if (cursor->resid < it->iter.bi_size)
+               it->iter.bi_size = cursor->resid;
 
-       bio = data->bio;
-       BUG_ON(!bio);
-
-       cursor->resid = min(length, data->bio_length);
-       cursor->bio = bio;
-       cursor->bvec_iter = bio->bi_iter;
-       cursor->last_piece =
-               cursor->resid <= bio_iter_len(bio, cursor->bvec_iter);
+       BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
+       cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
 }
 
 static struct page *ceph_msg_data_bio_next(struct ceph_msg_data_cursor *cursor,
                                                size_t *page_offset,
                                                size_t *length)
 {
-       struct ceph_msg_data *data = cursor->data;
-       struct bio *bio;
-       struct bio_vec bio_vec;
-
-       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
-
-       bio = cursor->bio;
-       BUG_ON(!bio);
-
-       bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
-
-       *page_offset = (size_t) bio_vec.bv_offset;
-       BUG_ON(*page_offset >= PAGE_SIZE);
-       if (cursor->last_piece) /* pagelist offset is always 0 */
-               *length = cursor->resid;
-       else
-               *length = (size_t) bio_vec.bv_len;
-       BUG_ON(*length > cursor->resid);
-       BUG_ON(*page_offset + *length > PAGE_SIZE);
+       struct bio_vec bv = bio_iter_iovec(cursor->bio_iter.bio,
+                                          cursor->bio_iter.iter);
 
-       return bio_vec.bv_page;
+       *page_offset = bv.bv_offset;
+       *length = bv.bv_len;
+       return bv.bv_page;
 }
 
 static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
                                        size_t bytes)
 {
-       struct bio *bio;
-       struct bio_vec bio_vec;
+       struct ceph_bio_iter *it = &cursor->bio_iter;
 
-       BUG_ON(cursor->data->type != CEPH_MSG_DATA_BIO);
+       BUG_ON(bytes > cursor->resid);
+       BUG_ON(bytes > bio_iter_len(it->bio, it->iter));
+       cursor->resid -= bytes;
+       bio_advance_iter(it->bio, &it->iter, bytes);
 
-       bio = cursor->bio;
-       BUG_ON(!bio);
+       if (!cursor->resid) {
+               BUG_ON(!cursor->last_piece);
+               return false;   /* no more data */
+       }
 
-       bio_vec = bio_iter_iovec(bio, cursor->bvec_iter);
+       if (!bytes || (it->iter.bi_size && it->iter.bi_bvec_done))
+               return false;   /* more bytes to process in this segment */
 
-       /* Advance the cursor offset */
+       if (!it->iter.bi_size) {
+               it->bio = it->bio->bi_next;
+               it->iter = it->bio->bi_iter;
+               if (cursor->resid < it->iter.bi_size)
+                       it->iter.bi_size = cursor->resid;
+       }
 
-       BUG_ON(cursor->resid < bytes);
-       cursor->resid -= bytes;
+       BUG_ON(cursor->last_piece);
+       BUG_ON(cursor->resid < bio_iter_len(it->bio, it->iter));
+       cursor->last_piece = cursor->resid == bio_iter_len(it->bio, it->iter);
+       return true;
+}
+#endif /* CONFIG_BLOCK */
 
-       bio_advance_iter(bio, &cursor->bvec_iter, bytes);
+static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
+                                       size_t length)
+{
+       struct ceph_msg_data *data = cursor->data;
+       struct bio_vec *bvecs = data->bvec_pos.bvecs;
 
-       if (bytes < bio_vec.bv_len)
-               return false;   /* more bytes to process in this segment */
+       cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
+       cursor->bvec_iter = data->bvec_pos.iter;
+       cursor->bvec_iter.bi_size = cursor->resid;
 
-       /* Move on to the next segment, and possibly the next bio */
+       BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
+       cursor->last_piece =
+           cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
+}
 
-       if (!cursor->bvec_iter.bi_size) {
-               bio = bio->bi_next;
-               cursor->bio = bio;
-               if (bio)
-                       cursor->bvec_iter = bio->bi_iter;
-               else
-                       memset(&cursor->bvec_iter, 0,
-                              sizeof(cursor->bvec_iter));
-       }
+static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
+                                               size_t *page_offset,
+                                               size_t *length)
+{
+       struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
+                                          cursor->bvec_iter);
+
+       *page_offset = bv.bv_offset;
+       *length = bv.bv_len;
+       return bv.bv_page;
+}
 
-       if (!cursor->last_piece) {
-               BUG_ON(!cursor->resid);
-               BUG_ON(!bio);
-               /* A short read is OK, so use <= rather than == */
-               if (cursor->resid <= bio_iter_len(bio, cursor->bvec_iter))
-                       cursor->last_piece = true;
+static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
+                                       size_t bytes)
+{
+       struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
+
+       BUG_ON(bytes > cursor->resid);
+       BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
+       cursor->resid -= bytes;
+       bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
+
+       if (!cursor->resid) {
+               BUG_ON(!cursor->last_piece);
+               return false;   /* no more data */
        }
 
+       if (!bytes || cursor->bvec_iter.bi_bvec_done)
+               return false;   /* more bytes to process in this segment */
+
+       BUG_ON(cursor->last_piece);
+       BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
+       cursor->last_piece =
+           cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
        return true;
 }
-#endif /* CONFIG_BLOCK */
 
 /*
  * For a page array, a piece comes from the first page in the array
@@ -1110,6 +1127,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
                ceph_msg_data_bio_cursor_init(cursor, length);
                break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
+               ceph_msg_data_bvecs_cursor_init(cursor, length);
+               break;
        case CEPH_MSG_DATA_NONE:
        default:
                /* BUG(); */
@@ -1158,14 +1178,19 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
                page = ceph_msg_data_bio_next(cursor, page_offset, length);
                break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
+               page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
+               break;
        case CEPH_MSG_DATA_NONE:
        default:
                page = NULL;
                break;
        }
+
        BUG_ON(!page);
        BUG_ON(*page_offset + *length > PAGE_SIZE);
        BUG_ON(!*length);
+       BUG_ON(*length > cursor->resid);
        if (last_piece)
                *last_piece = cursor->last_piece;
 
@@ -1194,6 +1219,9 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
                new_piece = ceph_msg_data_bio_advance(cursor, bytes);
                break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
+               new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
+               break;
        case CEPH_MSG_DATA_NONE:
        default:
                BUG();
@@ -1575,13 +1603,18 @@ static int write_partial_message_data(struct ceph_connection *con)
         * been revoked, so use the zero page.
         */
        crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
-       while (cursor->resid) {
+       while (cursor->total_resid) {
                struct page *page;
                size_t page_offset;
                size_t length;
                bool last_piece;
                int ret;
 
+               if (!cursor->resid) {
+                       ceph_msg_data_advance(cursor, 0);
+                       continue;
+               }
+
                page = ceph_msg_data_next(cursor, &page_offset, &length,
                                          &last_piece);
                ret = ceph_tcp_sendpage(con->sock, page, page_offset,
@@ -2297,7 +2330,12 @@ static int read_partial_msg_data(struct ceph_connection *con)
 
        if (do_datacrc)
                crc = con->in_data_crc;
-       while (cursor->resid) {
+       while (cursor->total_resid) {
+               if (!cursor->resid) {
+                       ceph_msg_data_advance(cursor, 0);
+                       continue;
+               }
+
                page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
                ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
                if (ret <= 0) {
@@ -2531,6 +2569,11 @@ static int try_write(struct ceph_connection *con)
        int ret = 1;
 
        dout("try_write start %p state %lu\n", con, con->state);
+       if (con->state != CON_STATE_PREOPEN &&
+           con->state != CON_STATE_CONNECTING &&
+           con->state != CON_STATE_NEGOTIATING &&
+           con->state != CON_STATE_OPEN)
+               return 0;
 
 more:
        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
@@ -2556,6 +2599,8 @@ static int try_write(struct ceph_connection *con)
        }
 
 more_kvec:
+       BUG_ON(!con->sock);
+
        /* kvec data queued? */
        if (con->out_kvec_left) {
                ret = write_partial_kvec(con);
@@ -3262,16 +3307,14 @@ void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 EXPORT_SYMBOL(ceph_msg_data_add_pagelist);
 
 #ifdef CONFIG_BLOCK
-void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
-               size_t length)
+void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
+                          u32 length)
 {
        struct ceph_msg_data *data;
 
-       BUG_ON(!bio);
-
        data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
        BUG_ON(!data);
-       data->bio = bio;
+       data->bio_pos = *bio_pos;
        data->bio_length = length;
 
        list_add_tail(&data->links, &msg->data);
@@ -3280,6 +3323,20 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct bio *bio,
 EXPORT_SYMBOL(ceph_msg_data_add_bio);
 #endif /* CONFIG_BLOCK */
 
+void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
+                            struct ceph_bvec_iter *bvec_pos)
+{
+       struct ceph_msg_data *data;
+
+       data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS);
+       BUG_ON(!data);
+       data->bvec_pos = *bvec_pos;
+
+       list_add_tail(&data->links, &msg->data);
+       msg->data_length += bvec_pos->iter.bi_size;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
+
 /*
  * construct a new message with given type, size
  * the new msg has a ref count of 1.