本文共 4636 字,大约阅读时间需要 15 分钟。
大家都清楚,如果用TCP传输文件的话,是很简单的,根本都不用操心会丢包,除非是网络坏了,就得重来。用UDP的话,因为UDP是不可靠的,所以用它传输文件,要保证不丢包,就得我们自己写额外的来保障了。本文就说说如果保证可靠传输。
要实现无差错的传输数据,我们可以采用重发请求(ARQ),它又可分为连续ARQ、选择重发ARQ、滑动窗口。本文重点介绍滑动窗口,其它的两种有兴趣的可参考相关的网络通信之类的书。
采用滑动窗口,限制已发送出去但未被确认的数据帧的数目。循环重复使用已收到的那些数据帧的序号。具体实现是在发送端和接收端分别设定发送窗口和接收窗口。 (1)发送窗口 发送窗口用来对发送端进行流量控制。发送窗口的大小Wt代表在还没有收到对方确认的条件下,发送端最多可以发送的数据帧的个数。具体意思请参考下图:(2)接收窗口 接收窗口用来控制接收数据帧。只有当接收到的数据帧的发送序号落在接收窗口内,才允许将该数据帧收下,否则一律丢弃。接收窗口的大小用Wr来表示,在连续ARQ中,Wr = 1。接收窗口的意义可参考下图:
在接收窗口和发送窗口间存在着这样的关系:接收窗口发生旋转后,发送窗口才可能向前旋转,接收窗口保持不动时,发送窗口是不会旋转的。这种收发窗口按如此规律顺时钟方向不断旋转的就犯法为滑动窗口。
好了,在上面对滑动窗口有大致了解后,我们还是进入正题吧:)
发送端的发送线程:
int ret; int nPacketCount = 0; DWORD dwRet; SendBuf sendbuf; DWORD dwRead; DWORD dwReadSize;SendBuf* pushbuf;
//计算一共要读的文件次数,若文件已读完,但客户端没有接收完,
//则要发送的内容不再从文件里读取,而从mbufqueue里提取 nPacketCount = mdwFileSize / sizeof(sendbuf.buf); //若不能整除,则应加1 if(mdwFileSize % sizeof(sendbuf.buf) != 0) ++nPacketCount;SetEvent(mhEvent);
CHtime htime;
//若已发送大小小于文件大小并且发送窗口前沿等于后沿,则继续发送
//否则退出循环if(mdwSend < mdwFileSize) // 文件没有传输完时才继续传输
{ while(1) { dwRet = WaitForSingleObject(mhEvent, 1000); if(dwRet == WAITFAILED) { return false; } else if(dwRet == WAITTIMEOUT) { //重发 ::EnterCriticalSection(&mcsQueue); // 进入mbufqueue的排斥区 ret = mhsocket.hsendto((char*)mbufqueue.front(), sizeof(sendbuf)); ::LeaveCriticalSection(&mcsQueue); // 退出mbufqueue的排斥区 if(ret == SOCKETERROR) { cout << "重发失败,继续重发" << endl; continue; }ResetEvent(mhEvent);
continue; }//若发送窗口大小 < 预定大小 && 已读文件次数(nReadIndex) < 需要读文件的次数(nReadCount),则继续读取发送
//否则,要发送的内容从mbufqueue里提取 if(mdwSend < mdwFileSize) { dwReadSize = mdwFileSize - mdwSend; dwReadSize = dwReadSize < MAXBUFSIZE ? dwReadSize : MAXBUFSIZE;memset(sendbuf.buf, 0, sizeof(sendbuf.buf));
if(!ReadFile(mhFile, sendbuf.buf, dwReadSize, &dwRead, NULL)) { //AfxMessageBox("读取文件失败,请确认文件存在或有读取权限."); cout << "读取文件失败,请确认文件存在或有读取权限." << endl; return false; }mdwSend += dwRead;
sendbuf.index = mnSendIndexHead;
mnSendIndexHead = (mnSendIndexHead + 1) % SlidingWindowSize; // 发送窗口前沿向前移一格sendbuf.dwLen = dwRead;
//保存发送过的数据,以便重发
::EnterCriticalSection(&mcsQueue); // 进入mbufqueue的排斥区 pushbuf = GetBufFromLookaside();memcpy(pushbuf, &sendbuf, sizeof(sendbuf));
mbufqueue.push(pushbuf);
if(mdwSend >= mdwFileSize) // 文件已读完,在队列中加一FileEnd标志,以便判断是否需要继续发送 { pushbuf = GetBufFromLookaside();pushbuf->index = FileEnd;
pushbuf->dwLen = FileEnd; memset(pushbuf->buf, 0, sizeof(pushbuf->buf));mbufqueue.push(pushbuf);
} ::LeaveCriticalSection(&mcsQueue); // 退出mbufqueue的排斥区 }::EnterCriticalSection(&mcsQueue); // 进入mbufqueue的排斥区
if(mbufqueue.front()->index == FileEnd) // 所有数据包已发送完毕,退出循环 { ::LeaveCriticalSection(&mcsQueue); // 退出mbufqueue的排斥区 break; } else if(mbufqueue.size() <= SendWindowSize) // 发送窗口小于指定值,继续发送 { ret = mhsocket.hsendto((char*)mbufqueue.front(), sizeof(sendbuf)); if(ret == SOCKETERROR) { ::LeaveCriticalSection(&mcsQueue); // 退出mbufqueue的排斥区 cout << "发送失败,重发" << endl; continue; }//延时,防止丢包
Sleep(50); } else // 发送窗口大于指定值,等持接收线程接收确认消息 { ResetEvent(mhEvent); } ::LeaveCriticalSection(&mcsQueue); // 退出mbufqueue的排斥区 } }发送端的接收线程:
int ret; RecvBuf recvbuf;while(mhFile != NULL)
{ ret = mhsocket.hrecvfrom((char*)&recvbuf, sizeof(recvbuf)); if(ret == SOCKETERROR) { //AfxMessageBox("接收确认消息出错"); ::EnterCriticalSection(&mcsQueue); if(mbufqueue.front()->index == FileEnd) // 文件传输完毕 { ::LeaveCriticalSection(&mcsQueue); break; } ::LeaveCriticalSection(&mcsQueue);cout << "接收确认消息出错: " << GetLastError() << endl;
return false; }if(recvbuf.flag == FlagAck && recvbuf.index == mnSendIndexTail)
{ mnSendIndexTail = (mnSendIndexTail + 1) % SlidingWindowSize; //该结点已得到确认,将其加入旁视列表,以备再用 ::EnterCriticalSection(&mcsQueue); mbufLookaside.push(mbufqueue.front()); mbufqueue.pop(); ::LeaveCriticalSection(&mcsQueue);SetEvent(mhEvent);
} }接收端的接收线程:
int ret; DWORD dwWritten; SendBuf recvbuf; RecvBuf sendbuf; int nerror = 0;// 设置文件指针位置,指向上次已发送的大小
SetFilePointer(mhFile, 0, NULL, FILEEND);//若已接收文件大小小于需要接收的大小,则继续
while(mdwSend < mdwFileSize) { //接收 memset(&recvbuf, 0, sizeof(recvbuf)); ret = mhsocket.hrecvfrom((char*)&recvbuf, sizeof(recvbuf)); if(ret == SOCKETERROR) { return false; }//不是希望接收的,丢弃,继续接收
if(recvbuf.index != (mnRecvIndex) % SlidingWindowSize) { nerror++; cout << recvbuf.index << "error?" << mnRecvIndex << endl; continue; }if(!WriteFile(mhFile, recvbuf.buf, recvbuf.dwLen, &dwWritten, NULL))
{ //AfxMessageBox("写入文件失败"); cout << "写入文件失败" << endl; return false; }//已接收文件大小
mdwSend += dwWritten;//发送确认消息
sendbuf.flag = FlagAck; sendbuf.index = mnRecvIndex; ret = mhsocket.hsendto((char*)&sendbuf, sizeof(sendbuf)); if(ret == SOCKETERROR) { return false; }//接收窗口前移一格
mnRecvIndex = (mnRecvIndex + 1) % SlidingWindowSize; }本文引用自:
转载地址:http://bacyb.baihongyu.com/