rgwput流程-execute分析

put 流程分析

前言

本文主题: 当我们用 s3cmd 上传文件是,rgw是如何处理的

image-20230207101046899

整体上传:

相关概念

小于 4M :

数据存放在 .rgw.buckets.data 池中,会生成一个rados 对象(默认4M)

大于4M :

文件会分割成 4M大小rados

c2c8dcbe-1203-4be1-b58b-5dc19a95fca9.4194.1__shadow_file.ECv7yKnV7oudlcFrIWHTsJ5QrYBhNdd_0
{桶id}__shadow_file.{upload_id} _{part_num}

具体流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  //是否开启桶版本验证
if (s->bucket_info.versioning_enabled()) {
if (!version_id.empty()) {
obj.key.set_instance(version_id);
} else {
store->gen_rand_obj_instance_name(&obj);
version_id = obj.key.instance;
}
}
ldpp_dout(this, 20) << "atomic" << "other" << dendl;

//pdest_placement = &s->dest_placement; // storage class
s->dest_placement = store->amend_placement_rule(s->dest_placement);
pdest_placement = &s->dest_placement;

s->bucket_info.placement_rule = s->dest_placement;
//这里是初始化不同上传功能类 用 processor类来管理
processor.emplace<AtomicObjectProcessor>(
&aio, store, s->bucket_info, pdest_placement,
s->bucket_owner.get_id(), obj_ctx, obj, olh_epoch, s->req_id);
}
//关键 ,在正式上传数据前做一些初始化操作
op_ret = processor->prepare();

AtomicObjectProcessor::prepare()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
int int AtomicObjectProcessor::prepare()
{
uint64_t max_head_chunk_size;
uint64_t head_max_size;
uint64_t chunk_size = 0;
uint64_t alignment;
rgw_pool head_pool;

// 获取 上传对象存放的池
if (!store->get_obj_data_pool(bucket_info.placement_rule, head_obj, &head_pool)) {
ldout(store->ctx(), 0) << "fail to get head pool " << head_obj << ",rule=" << bucket_info.placement_rule << dendl;
return -EIO;
}
//设定 max_head_chunk_size值 (第一个 head)
int r = store->get_max_chunk_size(head_pool, &max_head_chunk_size, &alignment);
if (r < 0) {
return r;
}


manifest.set_trivial_rule(head_max_size, stripe_size);

/*change atomic obj oid, add obj-name*/
char buf[33];
gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
string oid_prefix = ".";
oid_prefix.append(buf);
oid_prefix.append("_");

//生成 后缀 随机生成的内容
//c2c8dcbe-1203-4be1-b58b-5dc19a95fca9.4194.1__shadow_{file.ECv7yKnV7oudlcFrIWHTsJ5QrYBhNdd_}0
manifest.set_prefix( head_obj.key.name + oid_prefix);

r = manifest_gen.create_begin(store->ctx(), &manifest,
bucket_info.placement_rule,
&tail_placement_rule,
head_obj.bucket, head_obj);
if (r < 0) {
ldout(store->ctx(), 0) << "fail to create manifest " << head_obj << ",ret=" << r << dendl;
return r;
}
//生成 mainfest ? 关系head 文件
rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);

r = writer.set_stripe_obj(stripe_obj);
if (r < 0) {
ldout(store->ctx(), 0) << "fail to set_stripe_obj " << head_obj << ",ret=" << r << dendl;
return r;
}

set_head_chunk_size(head_max_size);
// initialize the processors
chunk = ChunkProcessor(&writer, chunk_size);
stripe = StripeProcessor(&chunk, this, head_max_size);
return 0;
}

循环读数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
do {
bufferlist data;
if (fst > lst)
break;
// 没有拷贝源
if (copy_source.empty()) {
// RGWPutObj_ObjStore::get_data(bufferlist& bl)
//每次都是抽出 4Md
len = get_data(data);
}
// 有拷贝源
else {
uint64_t cur_lst = min(fst + s->cct->_conf->rgw_max_chunk_size - 1, lst);
op_ret = get_data(fst, cur_lst, data);
if (op_ret < 0)
return;
len = data.length();
s->content_length += len;
fst += len;
}
if (len < 0) {
op_ret = len;
ldpp_dout(this, 20) << "get_data() returned ret=" << op_ret << dendl;
return;
} else if (len == 0) {
break;
}

if (need_calc_md5) {
hash.Update((const unsigned char *)data.c_str(), data.length());
}

/* update torrrent */ //计算 sh1值
torrent.update(data);
// HeadObjectProcessor::process 开始处理函数
op_ret = filter->process(std::move(data), ofs);
if (op_ret < 0) {
ldpp_dout(this, 20) << "processor->process() returned ret="
<< op_ret << dendl;
return;
}

ofs += len;
} while (len > 0);

filter->process(std::move(data), ofs); —-> HeadObjectProcessor::process

收到数据 现在这处理(关键处理函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
{
//等数据 读完了 更新 flush转态
const bool flush = (data.length() == 0);
// capture the first chunk for special handling
// data_offset 记录了当前这个对象大小的偏移量
if (data_offset < head_chunk_size || data_offset == 0) {
if (flush) {
// flush partial chunk
//不足一个Chunk,在这里处理,数据从 head_data 读取。
ldout(store->ctx(), 20) << "process flush offset:" << data_offset << " data len:" << head_data.length() <<dendl;
return process_first_chunk(std::move(head_data), &processor);
}

auto remaining = head_chunk_size - data_offset;
auto count = std::min<uint64_t>(data.length(), remaining);
data.splice(0, count, &head_data);
data_offset += count;

//head 大小刚好 达到4M
if (data_offset == head_chunk_size) {
// process the first complete chunk
//这里只处理整个CHUNK的写入,不足一个Chunk的,数据暂存在 head_data里,在后续flush 操作中处理
ceph_assert(head_data.length() == head_chunk_size);

ldout(store->ctx(), 20) << "process first chunk, offset:" << data_offset << " data len:" << head_data.length() <<dendl;

int r = process_first_chunk(std::move(head_data), &processor);
if (r < 0) {
return r;
}
}
ldout(store->ctx(), 20) << " !!! "<<dendl;

if (data.length() == 0) { // avoid flushing stripe processor
return 0;
}
}

ceph_assert(processor); // process_first_chunk() must initialize

// send everything else through the processor
auto write_offset = data_offset;
data_offset += data.length();

ldout(store->ctx(), 20) << "process data, write offset:" << write_offset << " data len:" << data.length() <<dendl;
//StripeProcessor::process(bufferlist&& data, uint64_t offset)
return processor->process(std::move(data), write_offset);
}

HeadObjectProcessor::process 主要作用是 把首个4M的对象单独处理,此外将4M以后的 数据交给下一个 process (StripeProcessor::process)

4M 以后写数据流程 是 StripeProcessor::process -> ChunkProcessor::process() -> RadosWriter::process()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

int StripeProcessor::process(bufferlist&& data, uint64_t offset)
{

//这里的 bounds.first 和 bounds.second 是这这个范围 [first,second] 在
ceph_assert(offset >= bounds.first);
const bool flush = (data.length() == 0);

if (flush) {
return Pipe::process({}, offset - bounds.first);
}

auto max = bounds.second - offset;

while (data.length() > max) {
//max > 0 说r明不够一个 条带??
if (max > 0) {
bufferlist bl;
data.splice(0, max, &bl);

int r = Pipe::process(std::move(bl), offset - bounds.first);
if (r < 0) {
return r;
}
offset += max;
}

// flush the current chunk ChunkProcessor::process
int r = Pipe::process({}, offset - bounds.first); // -> ChunkProcessor::process ->
if (r < 0) {
return r;
}
// generate the next stripe
uint64_t stripe_size;
r = gen->next(offset, &stripe_size);
if (r < 0) {
return r;
}
ceph_assert(stripe_size > 0);

bounds.first = offset;
bounds.second = offset + stripe_size;

max = stripe_size; //max 为4M
}

if (data.length() == 0) { // don't flush the chunk here
return 0;
}
// offset - bounds.first = 0
return Pipe::process(std::move(data), offset - bounds.first);
}

接下来 以上传一个 10M 大小的文件例子,结合代码数据是怎么切割下发到 rados的

10M 的属于整体上传,会被切割成3份(在 上面所提到的do… while 中每次取 4M) ,分别是 4M 4M 2M

第一个4M

1
2
3
4
5
6
7
8
9
10
// data 为 4M  ofs 最开始的偏移量为 0 
filter->process(std::move(data), ofs); //(4M,0)
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) // (4M,0)
//满足第一个对象 满足4M
--->process_first_chunk(std::move(head_data), &processor);
//直接入一个对象
--->writer.process(std::move(data), 0) // RadosWriter::process(4M,0)
---> op.write_full(data)
ofs += len; //更新偏移量 len 为data长度 此时 ofs 为4M

第二个 4M

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//还是在这个循环中  又抽取了4M

filter->process(std::move(data), ofs); //(4M , 4M)
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) // (4M,4M)
//此时不是第一个对象 ,直接交由 StripeProcessor 处理
---> processor->process(std::move(data), write_offset); //StripeProcessor::process(4M, 4M)
--->const bool flush = (data.length() == 0); // flush = false;
--->auto max = bounds.second - offset; // max = 4M - 4M bounds.second 是条带上限 默认 4M,详情见prepare
---> while (data.length() > max) {..} // 进入循环 data.length() = 4M
---> Pipe::process({}, offset - bounds.first)// ChunkProcessor::process({},4M) data 为空,相当于没写数据
---> gen->next(offset, &stripe_size); // 预先写好 manifest(后续再说) ManifestObjectProcessor::next
---> bounds.first = offset; //更新strip 边界 bounds.first = 4M
---> bounds.second = offset + stripe_size; // bounds.second = 8M
---> max = stripe_size; //max 为4M
---> Pipe::process(std::move(data), offset - bounds.first); // ChunkProcessor::process(4M,0)
---> ChunkProcessor::process(bufferlist&& data, uint64_t offset) //
---> int64_t position = offset - chunk.length(); // 4M - 0 目前还没有给chunk 存数据 chunk.length() 为0
---> flush = (data.length() == 0); // flush =false
---> chunk.claim_append(data); //取出 data 数据 并清空data
---> while (chunk.length() >= chunk_size) // chunk.length() = 4 m chunk_size 默认4M 详情见prepare
---> bufferlist bl;
---> chunk.splice(0, chunk_size, &bl); // chunk 会被清除
---> int r = Pipe::process(std::move(bl), position); // RadosWriter::process(4M,4M)
---> op.write(offset, data)
ofs += len; //更新偏移量 len 为data长度 此时 ofs 为8M


剩下的数据 2M

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
filter->process(std::move(data), ofs);  //(2M , 8M)
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) // (2M,8M)
//此时不是第一个对象 ,直接交由 StripeProcessor 处理
---> processor->process(std::move(data), write_offset); //StripeProcessor::process(4M, 8M) offset = 8M
---> const bool flush = (data.length() == 0); // flush = false;
---> auto max = bounds.second - offset; // mac = 8M - 8M
---> while (data.length() > max) {..} // 进入循环 data.length() = 2M
---> Pipe::process({}, offset - bounds.first)// ChunkProcessor::process({},4M) data 为空,相当于没写数据
---> gen->next(offset, &stripe_size); // 预先写好 manifest(后续再说) ManifestObjectProcessor::next
---> bounds.first = offset; //更新strip 边界 bounds.first = 8M
---> bounds.second = offset + stripe_size; // bounds.second = 12M
---> max = stripe_size; //max 为4M
---> Pipe::process(std::move(data), offset - bounds.first); //
---> ChunkProcessor::process(bufferlist&& data, uint64_t offset) // ChunkProcessor::process(2M,0)
---> int64_t position = offset - chunk.length(); // 2M - 0 目前还没有给chunk 存数据 chunk.length() 为0
---> flush = (data.length() == 0); // flush =false
---> chunk.claim_append(data); //取出 data 数据 并清空data
---> while (chunk.length() >= chunk_size) // chunk.length() = 2m 没有进入循环 ,这个数据留在最后刷新


---> bufferlist bl;
---> chunk.splice(0, chunk_size, &bl);
---> int r = Pipe::process(std::move(bl), position); // RadosWriter::process(4M,4M)
---> op.write(offset, data)
ofs += len; //更新偏移量 len 为data长度 此时 ofs 为10M



// 数据读取完后 ,刷新 残留的数据
// flush any data in filters
op_ret = filter->process({}, ofs); //ofs = 10M
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) // (0,10M)
---> processor->process(std::move(data), write_offset); //StripeProcessor::process(0, 10M) offset = 10M
---> const bool flush = (data.length() == 0); // flush = true;
---> Pipe::process({}, offset - bounds.first); // ChunkProcessor::process({},2M) 10M -8M
---> ChunkProcessor::process({},2M)
---> const bool flush = (data.length() == 0); //flush == true
---> if (chunk.length() > 0)
---> Pipe::process(std::move(chunk), position); // RadosWriter::process(2M,0)
---> op.write_full(data);

整体上传比较特殊的是 第一个 part 和 最后一段数据 (do….while 读取完才刷新)

接下介绍对象 元数据是如何存的(head )