一、fastdfs网络IO模型的结构
fdfs文件服务器主要有3种线程,accept线程、work线程(网络io处理)、dio线程(处理文件)
accept新连接,有个专门的accept线程去处理。每个线程池处理自己的事,比如在业务中,还要设计一个视频解码的功能,要另开个线程池,处理专门的任务。而不是把所有逻辑都放在一个线程池里面。
nio是net io的意思 (网络io)
dio是data io的意思 (文件io)
1.accept线程在接受新的连接后,封装成一个任务对象,选择worker线程,写入管道pipe,pipe是能够触发work线程中的epoll。(在接受新连接前,在work线程中就已经把pipe加入了监听)
2.文件io线程,通过从队列中取任务,然后写入到磁盘中。
那么如何通知文件IO线程呢,直接往文件处理队列(阻塞队列)中添加即可,在文件io线程中就能取到(锁和条件变量,取消阻塞)。没有直接在文件io线程中检测网络io事件,主要是为了将功能解耦合,让work线程(处理网络io)去做这件事,每个线程只做自己的事,使得逻辑清楚。
3.dio线程不会直接给nio线程设置各种读写事件,?是通过
FDFS_STORAGE_STAGE_NIO_INIT、FDFS_STORAGE_STAGE_NIO_recv、
FDFS_STORAGE_STAGE_NIO_SEND、FDFS_STORAGE_STAGE_NIO_CLOSE、
FDFS_STORAGE_STAGE_DIO_THREAD等状态 + 通过pipe通知nio线程响应storage_recv_notify_read
进?io事件的处理。
只要文件io线程读取完了文件处理队列中所有的数据,那么就会请求fd加入可写事件(通过pipe向work线程发送信号,触发epoll去做这件事),让网络io去读取新的事件。
不同线程之间通过以下方式进行通信
1.队列 (阻塞队列,锁+条件变量)
2.管道(通过pipe创建)
二、服务端的一些逻辑
新连接:
首先通过accept线程去接受新的连接incomingsock,让后去获取它的信息,并从对象池取出一个pTask,将新的连接fd以及它的信息封装成pTask。通过管道将pTask发送出去,在work线程epoll中就已经监听了该管道的读fd,accept线程发送的pipe,在work线程中epoll能检测到pipe事件,并读取信息。然后将该客户端fd读事件加入到epoll中。
上传:
work线程的epoll检测到客户端发送的信息,通过解析协议,获取它的数据,如果其中的CMD是上传的标志,那么就会执行client_sock_read–>storage_deal_task
–>storage_upload_File–>storage_write_to_file
然后会将协议解析的数据和上传回调函数(保存到服务器磁盘)dio_write_file,上传完成的回调函数storage_upload_file_done_callback等信息都封装到pTask,然后将它加入队列中。
然后dio线程中队列就能检测到新的任务,就会执行deal_func也就是dio_write_file,进行写入磁盘。
相关视频推荐
学习地址:C/C++Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂
需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
三、源码阅读
1、fastdfs/storage/fdfs_storage.c
这一部分是进行启动storage服务端
- 一些初始化工作(如创建socket对象(进行listen),等)
- 创建work线程(读写网络io)
- 创建dio线程(dataIO线程,也就是处理文件的,比如将上传的内容写入到磁盘)
- 创建accept线程(用于接受新的连接)
当然还有写同步的功能,但不在本文介绍。
int main(int argc, char *argv[])
{
...
sock = socketServer(g_bind_addr, g_server_port, &result);//socket、bind、listen
if (sock < 0)
{
logCrit("exit abnormally!\n");
delete_pid_file(pidFilename);
log_destroy();
return result;
}
...
if ((result=storage_service_init()) != 0)//storage_service初始化,包含work线程初始化(网络IO部分)
{
logCrit("file: "__FILE__", line: %d, " \
"storage_service_init fail, program exit!", __LINE__);
g_continue_flag = false;
return result;
}
...
if ((result=storage_dio_init()) != 0)//初始化dio线程(dataIO线程,就也是处理文件的)
{
logCrit("exit abnormally!\n");
log_destroy();
return result;
}
log_set_cache(true);
bTerminateFlag = false;
accept_stage = ACCEPT_STAGE_DOING;
storage_accept_loop(sock);//初始化accept线程
accept_stage = ACCEPT_STAGE_DONE;
...
}
2、storage_accept_loop
创建accept线程(数量由g_accept_threads决定)
线程的执行函数为accept_thread_entrance
void storage_accept_loop(int server_sock)
{
if (g_accept_threads > 1)
{
pthread_t tid;
pthread_attr_t thread_attr;
int result;
int i;
if ((result=init_pthread_attr(&thread_attr, g_thread_stack_size)) != 0)
{
logWarning("file: "__FILE__", line: %d, " \
"init_pthread_attr fail!", __LINE__);
}
else
{
for (i=1; i<g_accept_threads; i++)
{
if ((result=pthread_create(&tid, &thread_attr, \
accept_thread_entrance, \
(void *)(long)server_sock)) != 0)//创建accept线程
{
logError("file: "__FILE__", line: %d, " \
"create thread failed, startup threads: %d, " \
"errno: %d, error info: %s", \
__LINE__, i, result, STRERROR(result));
break;
}
}
pthread_attr_destroy(&thread_attr);
}
}
accept_thread_entrance((void *)(long)server_sock);
}
1)accept_thread_entrance
流程:
其中获取任务对象是从对象池中获取,然后 将新连接的信息封装到任务对象。
通过轮询的方式指定work线程,然后将任务对象发送(write)给该work线程
源码:
static void *accept_thread_entrance(void* arg)
{
int server_sock;
int incomesock;
struct sockaddr_in inaddr;
socklen_t sockaddr_len;
in_addr_t client_addr;
char szClientIp[IP_ADDRESS_SIZE];
long task_addr;
struct fast_task_info *pTask;
StorageClientInfo *pClientInfo;
struct storage_nio_thread_data *pThreadData;
server_sock = (long)arg;
while (g_continue_flag)
{
sockaddr_len = sizeof(inaddr);
incomesock = accept(server_sock, (struct sockaddr*)&inaddr, \
&sockaddr_len);//accept
if (incomesock < 0) //error
{
if (!(errno == EINTR || errno == EAGAIN))
{
logError("file: "__FILE__", line: %d, " \
"accept failed, " \
"errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
}
continue;
}
client_addr = getPeerIpaddr(incomesock, \
szClientIp, IP_ADDRESS_SIZE);
if (g_allow_ip_count >= 0)
{
if (bsearch(&client_addr, g_allow_ip_addrs, \
g_allow_ip_count, sizeof(in_addr_t), \
cmp_by_ip_addr_t) == NULL)
{
logError("file: "__FILE__", line: %d, " \
"ip addr %s is not allowed to access", \
__LINE__, szClientIp);
close(incomesock);
continue;
}
}
if (tcpsetnonblockopt(incomesock) != 0)
{
close(incomesock);
continue;
}
pTask = free_queue_pop(); // 取task对象(从对象池中取)
if (pTask == NULL)
{
logError("file: "__FILE__", line: %d, "
"malloc task buff fail, you should "
"increase the parameter \"max_connections\" "
"in storage.conf, or check your applications "
"for connection leaks", __LINE__);
close(incomesock);
continue;
}
pClientInfo = (StorageClientInfo *)pTask->arg; // 封装客户端信息
pTask->event.fd = incomesock; // socket fd
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_INIT; // 初始化client的状态
pClientInfo->nio_thread_index = pTask->event.fd % g_work_threads;//通过轮询的方式,发送给对应的work线程
pThreadData = g_nio_thread_data + pClientInfo->nio_thread_index;//g_nio_thread_data是一个全局的信息(指针),加上一个index就可以指向,具体的线程信息
strcpy(pTask->client_ip, szClientIp);
task_addr = (long)pTask;
if (write(pThreadData->thread_data.pipe_fds[1], &task_addr, \
sizeof(task_addr)) != sizeof(task_addr))//写入管道
{
close(incomesock);
free_queue_push(pTask);//如果写入失败,就把对象重新放入对象池中
logError("file: "__FILE__", line: %d, " \
"call write failed, " \
"errno: %d, error info: %s", \
__LINE__, errno, STRERROR(errno));
}
else
{
int current_connections;
current_connections = __sync_add_and_fetch(&g_storage_stat.connection.
current_count, 1);//连接数量+1 (CAS)
if (current_connections > g_storage_stat.connection.max_count) {
g_storage_stat.connection.max_count = current_connections;
}
++g_stat_change_count;
}
}
return NULL;
}
注意pipe_fds[1]是管道的写端,pipe_fds[0]是读端。因此如果pipe_fds[0]加入epoll的话,往pipe_fds[1]中写入数据,那么epoll就能监听到。
3、storage_service_init
这部分的主要内容是创建work线程,work线程的执行函数为work_thread_entrance
int storage_service_init()
{
...
bytes = sizeof(struct storage_nio_thread_data) * g_work_threads;//work线程默认个数是4,也可以从配置文件中读出来
g_nio_thread_data = (struct storage_nio_thread_data *)malloc(bytes);
if (g_nio_thread_data == NULL)
{
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, errno: %d, error info: %s", \
__LINE__, bytes, errno, STRERROR(errno));
return errno != 0 ? errno : ENOMEM;
}
memset(g_nio_thread_data, 0, bytes);
g_storage_thread_count = 0;
pDataEnd = g_nio_thread_data + g_work_threads;
for (pThreadData=g_nio_thread_data; pThreadData<pDataEnd; pThreadData++)
{
...
if (pipe(pThreadData->thread_data.pipe_fds) != 0)//创建管道
{
result = errno != 0 ? errno : EPERM;
logError("file: "__FILE__", line: %d, " \
"call pipe fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
break;
}
...
if ((result=pthread_create(&tid, &thread_attr, \
work_thread_entrance, pThreadData)) != 0)//创建work线程
{
logError("file: "__FILE__", line: %d, " \
"create thread failed, startup threads: %d, " \
"errno: %d, error info: %s", \
__LINE__, g_storage_thread_count, \
result, STRERROR(result));
break;
}
else
{
if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
g_storage_thread_count++;//创建线程成功,因此+1
if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"call pthread_mutex_lock fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
}
}
}
...
}
1)work_thread_entrance
ioevent_loop是 事件循环所在
static void *work_thread_entrance(void* arg)
{
int result;
struct storage_nio_thread_data *pThreadData;
pThreadData = (struct storage_nio_thread_data *)arg;
...
// 事件循环所在
ioevent_loop(&pThreadData->thread_data, storage_recv_notify_read,
task_finish_clean_up, &g_continue_flag);
ioevent_destroy(&pThreadData->thread_data.ev_puller);
...
return NULL;
}
2)ioevent_loop
事件循环所在
比如将pipe_fds[0]管道的读端fd加入epoll管理
进行epoll_wait
如果事件触发,就执行相应的回调函数
int ioevent_loop(struct nio_thread_data *pThreadData,
IOEventCallback recv_notify_callback, TaskCleanUpCallback
clean_up_callback, volatile bool *continue_flag)
{
int result;
struct ioevent_notify_entry ev_notify;
FastTimerEntry head;
struct fast_task_info *task;
time_t last_check_time;
int count;
memset(&ev_notify, 0, sizeof(ev_notify));
ev_notify.event.fd = FC_NOTIFY_READ_FD(pThreadData); // socket fd
ev_notify.event.callback = recv_notify_callback; // 对应的是 storage_recv_notify_read
ev_notify.thread_data = pThreadData; // 自己所属的线程
if (ioevent_attach(&pThreadData->ev_puller,
pThreadData->pipe_fds[0], IOEVENT_READ,
&ev_notify) != 0) // 管道添加到epoll管理
{
result = errno != 0 ? errno : ENOMEM;
logCrit("file: "__FILE__", line: %d, " \
"ioevent_attach fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
pThreadData->deleted_list = NULL;
last_check_time = g_current_time;
while (*continue_flag)
{
pThreadData->ev_puller.iterator.count = ioevent_poll( // 实际是调用epoll_wait
&pThreadData->ev_puller);
if (pThreadData->ev_puller.iterator.count > 0)
{
deal_ioevents(&pThreadData->ev_puller); // 真正有数据来进入该函数(执行回调函数)
}
else if (pThreadData->ev_puller.iterator.count < 0)
{
result = errno != 0 ? errno : EINVAL;
if (result != EINTR)
{
logError("file: "__FILE__", line: %d, " \
"ioevent_poll fail, " \
"errno: %d, error info: %s", \
__LINE__, result, STRERROR(result));
return result;
}
}
...
}
return 0;
}
3)storage_recv_notify_read
// 这里的socket实际是pipevoid storage_recv_notify_read(int sock, short event, void *arg)// 数据服务器socket事件回调,比如说在上传文件时,接收了一部分之后,调用storage_nio_notify(pTask){struct fast_task_info *pTask;StorageClientInfo *pClientInfo;//注意这个参数是不同的,一个是跟踪服务器参数,一个是数据服务器参数long task_addr; // 读取task的地址int64_t remain_bytes;int bytes;int result;while (1) // 循环读取task任务{if ((bytes=read(sock, &task_addr, sizeof(task_addr))) < 0) // 读取task任务{if (!(errno == EAGAIN || errno == EWOULDBLOCK)){logError("file: "__FILE__", line: %d, " \"call read failed, " \"errno: %d, error info: %s", \__LINE__, errno, STRERROR(errno));}break; // 没有task可读}else if (bytes == 0){logError("file: "__FILE__", line: %d, " \"call read failed, end of file", __LINE__);break;}pTask = (struct fast_task_info *)task_addr; // 还原任务pClientInfo = (StorageClientInfo *)pTask->arg;if (pTask->event.fd < 0) //quit flag, 这个是对应的 socket fd{return;}/* //logInfo("=====thread index: %d, pTask->event.fd=%d", \pClientInfo->nio_thread_index, pTask->event.fd);*/if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD){pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD;}switch (pClientInfo->stage){case FDFS_STORAGE_STAGE_NIO_INIT: //数据服务器服务端socket接收过来的任务的pClientInfo->stage=FDFS_STORAGE_STAGE_NIO_INITresult = storage_nio_init(pTask); //因此在这里在重新绑定读写事件 //每连接一个客户端,在这里都会触发这个动作break;case FDFS_STORAGE_STAGE_NIO_RECV:pTask->offset = 0; //在次接受包体时pTask->offset偏移量被重置remain_bytes = pClientInfo->total_length - \pClientInfo->total_offset;//任务的长度=包的总长度-包的总偏移量if (remain_bytes > pTask->size) //总是试图将余下的自己一次接收收完{pTask->length = pTask->size; // pTask->size 是每次最大的数据接收长度}else{pTask->length = remain_bytes;}if (set_recv_event(pTask) == 0){client_sock_read(pTask->event.fd, // 通过socket fd读取数据IOEVENT_READ, pTask); // 读取数据}result = 0;break;case FDFS_STORAGE_STAGE_NIO_SEND:result = storage_send_add_event(pTask); // 数据发送break;case FDFS_STORAGE_STAGE_NIO_CLOSE:result = EIO; //close this socketbreak;default:logError("file: "__FILE__", line: %d, " \"invalid stage: %d", __LINE__, \pClientInfo->stage);result = EINVAL;break;}if (result != 0){ioevent_add_to_deleted_list(pTask); // 如果出错再将对应的task加入到删除队列进行处理}}}
(1)client_sock_read
从socket读取数据(也可以是pipe)
大部分内容都是读取数据的操作,最后一步才是关键的
recv读取完数据后,分为两种,
1、storage_deal_task(pTask)
这里面可以解析协议中的CMD,后续调用到storage_upload_file进行文件上传的初始化工作
2、data数据处理
读取完的数据交由dio(dataIO线程)进行处理storage_dio_queue_push(pTask),放入队列中,那么dio线程就能读取到,并执行dio_write_file写入磁盘中
static void client_sock_read(int sock, short event, void *arg)
{
...
while (1)
{
if (pClientInfo->total_length == 0) //recv header //初始时pClientInfo->total_length=0 pTask->offset=0
{
recv_bytes = sizeof(TrackerHeader) - pTask->offset;
}
else // 至少读到了10个字节后 sizeof(TrackerHeader)
{
recv_bytes = pTask->length - pTask->offset; //在次接受上传文件的数据包时,因为发生storage_nio_notify(pTask)
}
/*
logInfo("total_length=%"PRId64", recv_bytes=%d, "
"pTask->length=%d, pTask->offset=%d",
pClientInfo->total_length, recv_bytes,
pTask->length, pTask->offset);
*/
bytes = recv(sock, pTask->data + pTask->offset, recv_bytes, 0); // 根据buffer情况读取数据
if (bytes < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
}
else if (errno == EINTR)
{
continue;
}
else
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, recv failed, " \
"errno: %d, error info: %s", \
__LINE__, pTask->client_ip, \
errno, STRERROR(errno));
task_finish_clean_up(pTask);
}
return;
}
else if (bytes == 0)
{
logDebug("file: "__FILE__", line: %d, " \
"client ip: %s, recv failed, " \
"connection disconnected.", \
__LINE__, pTask->client_ip);
task_finish_clean_up(pTask);
return;
}
if (pClientInfo->total_length == 0) //header
{ // 要来解析header
if (pTask->offset + bytes < sizeof(TrackerHeader)) // 还没有读够 header
{
pTask->offset += bytes;
return;
}
pClientInfo->total_length=buff2long(((TrackerHeader *) \ //确定包data的总长度:比如下载文件时,接收的包,就只有包的长度, 这里不包括header
pTask->data)->pkg_len);
if (pClientInfo->total_length < 0)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, pkg length: " \
"%"PRId64" < 0", \
__LINE__, pTask->client_ip, \
pClientInfo->total_length);
task_finish_clean_up(pTask);
return;
}
//包的总长度=包头+包体的长度 //设想发送的场景:包头+包体+包体+...(其中在包头里面含有多个包体的总长度)
pClientInfo->total_length += sizeof(TrackerHeader); //因为默认的接收缓冲只有K,所以会分次发送, 计算出来包括header的长度
if (pClientInfo->total_length > pTask->size)
{
pTask->length = pTask->size; //如果包的总长大于包的分配的长度,那么任务长度等于任务分配的长度, 读到对应的数据就去触发dio
}
else
{
pTask->length = pClientInfo->total_length; //确定任务的长度
}
}
pTask->offset += bytes; // offset增加
if (pTask->offset >= pTask->length) //recv current pkg done //接收到当前包完成
{
if (pClientInfo->total_offset + pTask->length >= \ //上次操作接收的总的偏移量+这次接收的数据长度,如果大于包的总长度,那么说明包接收完毕
pClientInfo->total_length)
{
/* current req recv done */
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND;
pTask->req_count++;
}
if (pClientInfo->total_offset == 0)
{ // 说明还没有开始处理
pClientInfo->total_offset = pTask->length; //数据服务器进行处理
storage_deal_task(pTask); // 解析header 以及我们协议附加的信息
}
else
{
pClientInfo->total_offset += pTask->length; //否则继续写文件
/* continue write to file */
storage_dio_queue_push(pTask); // 比如文件增加
}
return;
}
}
return;
}
(2)client_sock_write
服务端发送数据write给客户端,对于客户端来说,就是下载文件
一个下载的任务,要分成好几个ptask,让网络io线程去发送(work线程)。
如果ptask的数据发送完了(ptask->offset>=ptask->length),那么看总任务的是不是都发送完了。如果总任务都发送完成了,那么就切换 接受RECV状态。
如果总任务还没发送,那么就加入队列中storage_dio_queue_push(pTask),让dio线程再去读取数据。
static void client_sock_write(int sock, short event, void *arg)
{
int bytes;
struct fast_task_info *pTask;
StorageClientInfo *pClientInfo;
pTask = (struct fast_task_info *)arg;
pClientInfo = (StorageClientInfo *)pTask->arg;
if (pTask->canceled)
{
return;
}
if (event & IOEVENT_TIMEOUT)
{
logError("file: "__FILE__", line: %d, "
"client ip: %s, send timeout, offset: %d, "
"remain bytes: %d", __LINE__, pTask->client_ip,
pTask->offset, pTask->length - pTask->offset);
task_finish_clean_up(pTask);
return;
}
if (event & IOEVENT_ERROR)
{
logDebug("file: "__FILE__", line: %d, "
"client ip: %s, recv error event: %d, "
"close connection", __LINE__, pTask->client_ip, event);
task_finish_clean_up(pTask);
return;
}
while (1)
{
fast_timer_modify(&pTask->thread_data->timer,
&pTask->event.timer, g_current_time +
g_fdfs_network_timeout);
bytes = send(sock, pTask->data + pTask->offset, \
pTask->length - pTask->offset, 0);
//printf("%08X sended %d bytes\n", (int)pTask, bytes);
if (bytes < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
set_send_event(pTask);
}
else if (errno == EINTR)
{
continue;
}
else
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, recv failed, " \
"errno: %d, error info: %s", \
__LINE__, pTask->client_ip, \
errno, STRERROR(errno));
task_finish_clean_up(pTask);
}
return;
}
else if (bytes == 0)
{
logWarning("file: "__FILE__", line: %d, " \
"send failed, connection disconnected.", \
__LINE__);
task_finish_clean_up(pTask);
return;
}
pTask->offset += bytes;
if (pTask->offset >= pTask->length)
{
if (set_recv_event(pTask) != 0)
{
return;
}
pClientInfo->total_offset += pTask->length;
if (pClientInfo->total_offset>=pClientInfo->total_length)
{
if (pClientInfo->total_length == sizeof(TrackerHeader)
&& ((TrackerHeader *)pTask->data)->status == EINVAL)
{
logDebug("file: "__FILE__", line: %d, "\
"close conn: #%d, client ip: %s", \
__LINE__, pTask->event.fd,
pTask->client_ip);
task_finish_clean_up(pTask);
return;
}
/* response done, try to recv again */
pClientInfo->total_length = 0;
pClientInfo->total_offset = 0;
pTask->offset = 0;
pTask->length = 0;
pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV;
}
else //continue to send file content
{
pTask->length = 0;
/* continue read from file */
storage_dio_queue_push(pTask); // 继续发送数据
}
return;
}
}
}
4)storage_upload_file
如果client_sock_read读取数据,协议中CMD为上传的指令,那么在work进程中(nio)中
执行该函数storage_upload_file,初始化要保存的文件信息,并且storage_upload_file末尾,通过storage_write_to_file将写入磁盘任务,放入队列中storage_dio_queue_push,让dio线程去执行
上传文件是在dio线程中执行dio_write_file,它是作为回调函数执行的。
static int storage_upload_file(struct fast_task_info *pTask, bool bAppenderFile)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
DisconnectCleanFunc clean_func;
char *p;
char filename[128];
char file_ext_name[FDFS_FILE_PREFIX_MAX_LEN + 1];
int64_t nInPackLen;
int64_t file_offset;
int64_t file_bytes;
int crc32;
int store_path_index;
int result;
int filename_len;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context); // 对应一个文件上下文
nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader);
if (nInPackLen < 1 + FDFS_PROTO_PKG_LEN_SIZE +
FDFS_FILE_EXT_NAME_MAX_LEN)
{
logError("file: "__FILE__", line: %d, " \
"cmd=%d, client ip: %s, package size " \
"%"PRId64" is not correct, " \
"expect length >= %d", __LINE__, \
STORAGE_PROTO_CMD_UPLOAD_FILE, \
pTask->client_ip, nInPackLen, \
1 + FDFS_PROTO_PKG_LEN_SIZE + \
FDFS_FILE_EXT_NAME_MAX_LEN);
return EINVAL;
}
p = pTask->data + sizeof(TrackerHeader); // 跳过header
store_path_index = *p++; // store_path_index 解析store path值
if (store_path_index == -1) //
{
if ((result=storage_get_storage_path_index( \
&store_path_index)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"get_storage_path_index fail, " \
"errno: %d, error info: %s", __LINE__, \
result, STRERROR(result));
return result;
}
}
else if (store_path_index < 0 || store_path_index >= \
g_fdfs_store_paths.count)
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, store_path_index: %d " \
"is invalid", __LINE__, \
pTask->client_ip, store_path_index);
return EINVAL;
}
file_bytes = buff2long(p); // 解析处理要上传的文件大小
p += FDFS_PROTO_PKG_LEN_SIZE;
if (file_bytes < 0 || file_bytes != nInPackLen - \
(1 + FDFS_PROTO_PKG_LEN_SIZE + \
FDFS_FILE_EXT_NAME_MAX_LEN))
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, pkg length is not correct, " \
"invalid file bytes: %"PRId64 \
", total body length: %"PRId64, \
__LINE__, pTask->client_ip, file_bytes, nInPackLen);
return EINVAL;
}
memcpy(file_ext_name, p, FDFS_FILE_EXT_NAME_MAX_LEN);
*(file_ext_name + FDFS_FILE_EXT_NAME_MAX_LEN) = '\0';
p += FDFS_FILE_EXT_NAME_MAX_LEN;
if ((result=fdfs_validate_filename(file_ext_name)) != 0) // 检验扩展名
{
logError("file: "__FILE__", line: %d, " \
"client ip: %s, file_ext_name: %s " \
"is invalid!", __LINE__, \
pTask->client_ip, file_ext_name);
return result;
}
pFileContext->calc_crc32 = true;
pFileContext->calc_file_hash = g_check_file_duplicate; // 是否要检测文件唯一性
pFileContext->extra_info.upload.start_time = g_current_time;
strcpy(pFileContext->extra_info.upload.file_ext_name, file_ext_name);
storage_format_ext_name(file_ext_name, \
pFileContext->extra_info.upload.formatted_ext_name);
pFileContext->extra_info.upload.trunk_info.path. \
store_path_index = store_path_index;
pFileContext->extra_info.upload.file_type = _FILE_TYPE_REGULAR; // 常规文件
pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_CREATE_FILE; // 创建文件
pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time; // 时间戳
pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE;
if (bAppenderFile)
{
pFileContext->extra_info.upload.file_type |= \ // 是否为追加文件模式
_FILE_TYPE_APPENDER;
}
else
{
if (g_if_use_trunk_file && trunk_check_size( \
TRUNK_CALC_SIZE(file_bytes)))
{
pFileContext->extra_info.upload.file_type |= \ // 附加信息
_FILE_TYPE_TRUNK; // 设置为trunk文件模式
}
}
if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK) // trunk文件
{
FDFSTrunkFullInfo *pTrunkInfo;
pFileContext->extra_info.upload.if_sub_path_alloced = true;
pTrunkInfo = &(pFileContext->extra_info.upload.trunk_info);
if ((result=trunk_client_trunk_alloc_space( \ // TRUNK_CALC_SIZE(file_bytes) = trunk header + file size
TRUNK_CALC_SIZE(file_bytes), pTrunkInfo)) != 0)
{
return result;
}
clean_func = dio_trunk_write_finish_clean_up;
file_offset = TRUNK_FILE_START_OFFSET((*pTrunkInfo));
pFileContext->extra_info.upload.if_gen_filename = true;
trunk_get_full_filename(pTrunkInfo, pFileContext->filename, \
sizeof(pFileContext->filename));
pFileContext->extra_info.upload.before_open_callback = \
dio_check_trunk_file_when_upload;
pFileContext->extra_info.upload.before_close_callback = \
dio_write_chunk_header;
pFileContext->open_flags = O_RDWR | g_extra_open_file_flags;
}
else
{
char reserved_space_str[32];
if (!storage_check_reserved_space_path(g_fdfs_store_paths.paths \
[store_path_index].total_mb, g_fdfs_store_paths.paths \
[store_path_index].free_mb - (file_bytes/FDFS_ONE_MB), \
g_avg_storage_reserved_mb))
{
logError("file: "__FILE__", line: %d, " \
"no space to upload file, "
"free space: %d MB is too small, file bytes: " \
"%"PRId64", reserved space: %s", \
__LINE__, g_fdfs_store_paths.paths[store_path_index].\
free_mb, file_bytes, \
fdfs_storage_reserved_space_to_string_ex( \
g_storage_reserved_space.flag, \
g_avg_storage_reserved_mb, \
g_fdfs_store_paths.paths[store_path_index]. \
total_mb, g_storage_reserved_space.rs.ratio,\
reserved_space_str));
return ENOSPC;
}
crc32 = rand();
*filename = '\0';
filename_len = 0;
pFileContext->extra_info.upload.if_sub_path_alloced = false;
if ((result=storage_get_filename(pClientInfo, \ // 获取file id
pFileContext->extra_info.upload.start_time, \
file_bytes, crc32, pFileContext->extra_info.upload.\
formatted_ext_name, filename, &filename_len, \ // 生成的文件需要扩展名
pFileContext->filename)) != 0)
{
return result;
}
clean_func = dio_write_finish_clean_up;
file_offset = 0;
pFileContext->extra_info.upload.if_gen_filename = true;
pFileContext->extra_info.upload.before_open_callback = NULL;
pFileContext->extra_info.upload.before_close_callback = NULL;
pFileContext->open_flags = O_WRONLY | O_CREAT | O_TRUNC \
| g_extra_open_file_flags;
}
pFileContext->continue_callback = storage_nio_notify; // 处理完毕后
return storage_write_to_file(pTask, file_offset, file_bytes, \
p - pTask->data, dio_write_file, \
storage_upload_file_done_callback, \
clean_func, store_path_index);
}
4、storage_dio_init
主要是对dio线程进行初始化
线程的执行函数是dio_thread_entrance
int storage_dio_init()
{
...
for (pThreadData=g_dio_thread_data; pThreadData<pDataEnd; pThreadData++)
{
...
for (pContext=pThreadData->contexts; pContext<pContextEnd; \
pContext++)
{
if ((result=blocked_queue_init(&(pContext->queue))) != 0)
{
return result;
}
if ((result=pthread_create(&tid, &thread_attr, \
dio_thread_entrance, pContext)) != 0)
{
logError("file: "__FILE__", line: %d, " \
"create thread failed, " \
"startup threads: %d, " \
"errno: %d, error info: %s", \
__LINE__, g_dio_thread_count, \
result, STRERROR(result));
return result;
}
else
{
pthread_mutex_lock(&g_dio_thread_lock);
g_dio_thread_count++;
pthread_mutex_unlock(&g_dio_thread_lock);
}
}
}
pthread_attr_destroy(&thread_attr);
return result;
}
1)dio_thread_entrance
dio线程要做的事,就是从阻塞队列中,一旦获取数据,就执行回调函数
static void *dio_thread_entrance(void* arg)
{
...
while (g_continue_flag)
{
while ((pTask=blocked_queue_pop(&(pContext->queue))) != NULL)
{
((StorageClientInfo *)pTask->arg)->deal_func(pTask);//执行回调函数
}
}
...
}
2)dio_write_file
在dio线程中执行,将上传的文件写入磁盘中
int dio_write_file(struct fast_task_info *pTask)
{
StorageClientInfo *pClientInfo;
StorageFileContext *pFileContext;
int result;
int write_bytes;
char *pDataBuff;
pClientInfo = (StorageClientInfo *)pTask->arg;
pFileContext = &(pClientInfo->file_context);
result = 0;
do
{
if (pFileContext->fd < 0)
{
if (pFileContext->extra_info.upload.before_open_callback!=NULL)
{
result = pFileContext->extra_info.upload. \
before_open_callback(pTask);
if (result != 0)
{
break;
}
}
if ((result=dio_open_file(pFileContext)) != 0)
{
break;
}
}
pDataBuff = pTask->data + pFileContext->buff_offset; // 跳过header以及附加信息, 在deal task的时候赋值的 pFileContext->buff_offset
write_bytes = pTask->length - pFileContext->buff_offset; //
if (fc_safe_write(pFileContext->fd, pDataBuff, write_bytes) != write_bytes)
{
result = errno != 0 ? errno : EIO;
logError("file: "__FILE__", line: %d, " \
"write to file: %s fail, fd=%d, write_bytes=%d, " \
"errno: %d, error info: %s", \
__LINE__, pFileContext->filename, \
pFileContext->fd, write_bytes, \
result, STRERROR(result));
}
pthread_mutex_lock(&g_dio_thread_lock);
g_storage_stat.total_file_write_count++;
if (result == 0)
{
g_storage_stat.success_file_write_count++;
}
pthread_mutex_unlock(&g_dio_thread_lock);
if (result != 0)
{
break;
}
if (pFileContext->calc_crc32)
{
pFileContext->crc32 = CRC32_ex(pDataBuff, write_bytes, \
pFileContext->crc32);
}
if (pFileContext->calc_file_hash)
{
if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
{
CALC_HASH_CODES4(pDataBuff, write_bytes, \
pFileContext->file_hash_codes)
}
else
{
my_md5_update(&pFileContext->md5_context, \
(unsigned char *)pDataBuff, write_bytes);
}
}
/*
logInfo("###dio write bytes: %d, pTask->length=%d, buff_offset=%d", \
write_bytes, pTask->length, pFileContext->buff_offset);
*/
pFileContext->offset += write_bytes; // 增加写入文件的字数数量
if (pFileContext->offset < pFileContext->end) // pFileContext->end实际是指文件的大小。
{
pFileContext->buff_offset = 0; // 为什么设置为0?因为下一次传输的数据全部为文件内容了
pFileContext->continue_callback(pTask); // 等待下一次的继续触发,比如 storage_nio_notify
}
else // 文件已经写入完毕
{
if (pFileContext->calc_crc32)
{
pFileContext->crc32 = CRC32_FINAL( \
pFileContext->crc32);
}
if (pFileContext->calc_file_hash)
{
if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH)
{
FINISH_HASH_CODES4(pFileContext->file_hash_codes)
}
else
{
my_md5_final((unsigned char *)(pFileContext-> \
file_hash_codes), &pFileContext->md5_context);
}
}
if (pFileContext->extra_info.upload.before_close_callback != NULL)
{
result = pFileContext->extra_info.upload. \
before_close_callback(pTask);
}
/* file write done, close it */
close(pFileContext->fd);
pFileContext->fd = -1;
if (pFileContext->done_callback != NULL)
{
pFileContext->done_callback(pTask, result);// 比如 storage_upload_file_done_callback
}
}
return 0;
} while (0);
pClientInfo->clean_func(pTask);
if (pFileContext->done_callback != NULL)
{
pFileContext->done_callback(pTask, result);
}
return result;
}
如若转载,请注明出处:https://www.daxuejiayuan.com/44837.html