put 流程分析
前言 本文主题: 当我们用 s3cmd 上传文件是,rgw是如何处理的
整体上传: 相关概念 小于 4M : 数据存放在 .rgw.buckets.data 池中,会生成一个rados 对象(默认4M)
大于4M : 文件会分割成 4M大小rados
c2c8dcbe-1203-4be1-b58b-5dc19a95fca9.4194.1__shadow_file.ECv7yKnV7oudlcFrIWHTsJ5QrYBhNdd_0 {桶id}__shadow_file.{upload_id} _{part_num}
具体流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 if (s->bucket_info.versioning_enabled ()) { if (!version_id.empty ()) { obj.key.set_instance (version_id); } else { store->gen_rand_obj_instance_name (&obj); version_id = obj.key.instance; } } ldpp_dout (this , 20 ) << "atomic" << "other" << dendl; s->dest_placement = store->amend_placement_rule (s->dest_placement); pdest_placement = &s->dest_placement; s->bucket_info.placement_rule = s->dest_placement; processor.emplace <AtomicObjectProcessor>( &aio, store, s->bucket_info, pdest_placement, s->bucket_owner.get_id (), obj_ctx, obj, olh_epoch, s->req_id); } op_ret = processor->prepare ();
AtomicObjectProcessor::prepare() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 int int AtomicObjectProcessor::prepare () { uint64_t max_head_chunk_size; uint64_t head_max_size; uint64_t chunk_size = 0 ; uint64_t alignment; rgw_pool head_pool; if (!store->get_obj_data_pool (bucket_info.placement_rule, head_obj, &head_pool)) { ldout (store->ctx (), 0 ) << "fail to get head pool " << head_obj << ",rule=" << bucket_info.placement_rule << dendl; return -EIO; } int r = store->get_max_chunk_size (head_pool, &max_head_chunk_size, &alignment); if (r < 0 ) { return r; } manifest.set_trivial_rule (head_max_size, stripe_size); char buf[33 ]; gen_rand_alphanumeric (store->ctx (), buf, sizeof (buf) - 1 ); string oid_prefix = "." ; oid_prefix.append (buf); oid_prefix.append ("_" ); manifest.set_prefix ( head_obj.key.name + oid_prefix); r = manifest_gen.create_begin (store->ctx (), &manifest, bucket_info.placement_rule, &tail_placement_rule, head_obj.bucket, head_obj); if (r < 0 ) { ldout (store->ctx (), 0 ) << "fail to create manifest " << head_obj << ",ret=" << r << dendl; return r; } rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj (store); r = writer.set_stripe_obj (stripe_obj); if (r < 0 ) { ldout (store->ctx (), 0 ) << "fail to set_stripe_obj " << head_obj << ",ret=" << r << dendl; return r; } set_head_chunk_size (head_max_size); chunk = ChunkProcessor (&writer, chunk_size); stripe = StripeProcessor (&chunk, this , head_max_size); return 0 ; }
循环读数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 do { bufferlist data; if (fst > lst) break ; if (copy_source.empty ()) { len = get_data (data); } else { uint64_t cur_lst = min (fst + s->cct->_conf->rgw_max_chunk_size - 1 , lst); op_ret = get_data (fst, cur_lst, data); if (op_ret < 0 ) return ; len = data.length (); s->content_length += len; fst += len; } if (len < 0 ) { op_ret = len; ldpp_dout (this , 20 ) << "get_data() returned ret=" << op_ret << dendl; return ; } else if (len == 0 ) { break ; } if (need_calc_md5) { hash.Update ((const unsigned char *)data.c_str (), data.length ()); } torrent.update (data); op_ret = filter->process (std::move (data), ofs); if (op_ret < 0 ) { ldpp_dout (this , 20 ) << "processor->process() returned ret=" << op_ret << dendl; return ; } ofs += len; } while (len > 0 );
filter->process(std::move(data), ofs); —-> HeadObjectProcessor::process
收到数据 现在这处理(关键处理函数)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 int HeadObjectProcessor::process (bufferlist&& data, uint64_t logical_offset) { const bool flush = (data.length () == 0 ); if (data_offset < head_chunk_size || data_offset == 0 ) { if (flush) { ldout (store->ctx (), 20 ) << "process flush offset:" << data_offset << " data len:" << head_data.length () <<dendl; return process_first_chunk (std::move (head_data), &processor); } auto remaining = head_chunk_size - data_offset; auto count = std::min <uint64_t >(data.length (), remaining); data.splice (0 , count, &head_data); data_offset += count; if (data_offset == head_chunk_size) { ceph_assert (head_data.length () == head_chunk_size); ldout (store->ctx (), 20 ) << "process first chunk, offset:" << data_offset << " data len:" << head_data.length () <<dendl; int r = process_first_chunk (std::move (head_data), &processor); if (r < 0 ) { return r; } } ldout (store->ctx (), 20 ) << " !!! " <<dendl; if (data.length () == 0 ) { return 0 ; } } ceph_assert (processor); auto write_offset = data_offset; data_offset += data.length (); ldout (store->ctx (), 20 ) << "process data, write offset:" << write_offset << " data len:" << data.length () <<dendl; return processor->process (std::move (data), write_offset); }
HeadObjectProcessor::process 主要作用是 把首个4M的对象单独处理,此外将4M以后的 数据交给下一个 process (StripeProcessor::process)
4M 以后写数据流程 是 StripeProcessor::process -> ChunkProcessor::process() -> RadosWriter::process()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 int StripeProcessor::process (bufferlist&& data, uint64_t offset) { ceph_assert (offset >= bounds.first); const bool flush = (data.length () == 0 ); if (flush) { return Pipe::process ({}, offset - bounds.first); } auto max = bounds.second - offset; while (data.length () > max) { if (max > 0 ) { bufferlist bl; data.splice (0 , max, &bl); int r = Pipe::process (std::move (bl), offset - bounds.first); if (r < 0 ) { return r; } offset += max; } int r = Pipe::process ({}, offset - bounds.first); if (r < 0 ) { return r; } uint64_t stripe_size; r = gen->next (offset, &stripe_size); if (r < 0 ) { return r; } ceph_assert (stripe_size > 0 ); bounds.first = offset; bounds.second = offset + stripe_size; max = stripe_size; } if (data.length () == 0 ) { return 0 ; } return Pipe::process (std::move (data), offset - bounds.first); }
接下来 以上传一个 10M 大小的文件例子,结合代码数据是怎么切割下发到 rados的
10M 的属于整体上传,会被切割成3份(在 上面所提到的do… while 中每次取 4M) ,分别是 4M 4M 2M
第一个4M 1 2 3 4 5 6 7 8 9 10 filter->process (std::move (data), ofs); --->HeadObjectProcessor::process (bufferlist&& data, uint64_t logical_offset) --->process_first_chunk (std::move (head_data), &processor); --->writer.process (std::move (data), 0 ) ---> op.write_full (data) ofs += len;
第二个 4M 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 filter->process (std::move (data), ofs); --->HeadObjectProcessor::process (bufferlist&& data, uint64_t logical_offset) ---> processor->process (std::move (data), write_offset); --->const bool flush = (data.length () == 0 ); --->auto max = bounds.second - offset; ---> while (data.length () > max) {..} ---> Pipe::process ({}, offset - bounds.first) ---> gen->next (offset, &stripe_size); ---> bounds.first = offset; ---> bounds.second = offset + stripe_size; ---> max = stripe_size; ---> Pipe::process (std::move (data), offset - bounds.first); ---> ChunkProcessor::process (bufferlist&& data, uint64_t offset) ---> int64_t position = offset - chunk.length (); ---> flush = (data.length () == 0 ); ---> chunk.claim_append (data); ---> while (chunk.length () >= chunk_size) ---> bufferlist bl; ---> chunk.splice (0 , chunk_size, &bl); ---> int r = Pipe::process (std::move (bl), position); ---> op.write (offset, data) ofs += len;
剩下的数据 2M 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 filter->process (std::move (data), ofs); --->HeadObjectProcessor::process (bufferlist&& data, uint64_t logical_offset) ---> processor->process (std::move (data), write_offset); ---> const bool flush = (data.length () == 0 ); ---> auto max = bounds.second - offset; ---> while (data.length () > max) {..} ---> Pipe::process ({}, offset - bounds.first) ---> gen->next (offset, &stripe_size); ---> bounds.first = offset; ---> bounds.second = offset + stripe_size; ---> max = stripe_size; ---> Pipe::process (std::move (data), offset - bounds.first); ---> ChunkProcessor::process (bufferlist&& data, uint64_t offset) ---> int64_t position = offset - chunk.length (); ---> flush = (data.length () == 0 ); ---> chunk.claim_append (data); ---> while (chunk.length () >= chunk_size) ---> bufferlist bl; ---> chunk.splice (0 , chunk_size, &bl); ---> int r = Pipe::process (std::move (bl), position); ---> op.write (offset, data) ofs += len; op_ret = filter->process ({}, ofs); --->HeadObjectProcessor::process (bufferlist&& data, uint64_t logical_offset) ---> processor->process (std::move (data), write_offset); ---> const bool flush = (data.length () == 0 ); ---> Pipe::process ({}, offset - bounds.first); ---> ChunkProcessor::process ({},2 M) ---> const bool flush = (data.length () == 0 ); ---> if (chunk.length () > 0 ) ---> Pipe::process (std::move (chunk), position); ---> op.write_full (data);
整体上传比较特殊的是 第一个 part 和 最后一段数据 (do….while 读取完才刷新)
接下介绍对象 元数据是如何存的(head )