首頁 >> 新聞

在Vovida的基礎(chǔ)上實(shí)現(xiàn)自己的SIP協(xié)議棧(四)

在Vovida的基礎(chǔ)上實(shí)現(xiàn)自己的SIP協(xié)議棧(五)

盧政 2003/08/07

3.2.8.2處理RTP/RTCP包:

  前面說了ResGwDevice::processSessionMsg處理掛在設(shè)備處理隊(duì)列里的各個(gè)命令,我們具體來看具體的應(yīng)用程序處理過程:

a.處理用戶發(fā)出的終端消息,并且打開設(shè)備發(fā)送媒體包。
ResGwDevice::processSessionMsg( Sptr event ):
void ResGwDevice::processSessionMsg( Sptr event )
{
Sptr msg;
msg.dynamicCast( event );
if( msg != 0 )
{
cpLog( LOG_DEBUG, "Got message type: %d", msg->type );
switch( msg->type )
{
case HardwareSignalType://這個(gè)狀態(tài)是為Voicamail而設(shè)定的,在Feauture
//Server這章里面會(huì)說道
……
case HardwareAudioType:
switch ((msg->signalOrRequest).request.type)
{
case AudioStart://打開聲音設(shè)備建立RTP/RTCP會(huì)話
audioStart((msg->signalOrRequest).request);
break;
case AudioStop://停止聲音設(shè)備,并且釋設(shè)備占用的資源并且停止建立的
//話,將其資源釋放;
audioStop();
break;
case AudioSuspend:
audioSuspend();//暫停設(shè)備,但是資源不釋放,RTP會(huì)話也不停止。
break;
case AudioResume:
audioResume((msg->signalOrRequest).request);//重新啟動(dòng)設(shè)備
break;
default:
cpLog( LOG_ERR, "Unknown audio request: %d",
(msg->signalOrRequest).request.type );
}
break;
… …
}
}
}

b.根據(jù)遠(yuǎn)端和本地的SDP建立RTP會(huì)話(經(jīng)過簡化):
int SoundCardDevice::audioStart( const HardwareAudioRequest& request )
{
deviceMutex.lock();

// create new audioStack for this audio session
// 0 is rtpPayloadPCUM
// last paramter, -1, disables jitter buffer
if( audioStack == 0 )
{
int remoteRtcpPort = (request.remotePort > 0) ? request.remotePort + 1 : 0;
int localRtcpPort = (request.localPort > 0) ? request.localPort + 1 : 0;
cerr << "%%% Remote rtcp port : " << remoteRtcpPort << "\n";
cerr << "%%% Local rtcp port : " << localRtcpPort << "\n\n";
const char* remoteHost = 0;
if ( request.remotePort != 0 )
remoteHost = request.remoteHost;
//創(chuàng)建RTP會(huì)話,帶入的參數(shù)有:被地本地/遠(yuǎn)端的主機(jī)/RTP,RTCP,端口RTP的載荷類型,//網(wǎng)絡(luò)承載類型,創(chuàng)建接收/發(fā)送RTP/RTCP包的控制臺(tái),以及接受播放的緩沖區(qū)Inbuff
audioStack = new RtpSession( remoteHost, request.remotePort,
request.localPort, remoteRtcpPort,
localRtcpPort, rtpPayloadPCMU,
rtpPayloadPCMU, 0 );
}
else
{
… …
}
//決定是否開啟/關(guān)斷向遠(yuǎn)方回送的震鈴
if( request.sendRingback )
startSendRingback();
else
stopSendRingback();
… …
// apiFormat_clockRate
// apiFormat_payloadSize
//設(shè)置RTP包的承載類型,目前設(shè)置為PCMU方式,以及包的大小
audioStack->setApiFormat( rtpPayloadPCMU, request.rtpPacketSize*8 );
//傳輸/接收時(shí)的RTP包的大小,這里設(shè)置成和RTP包相同大小類型
audioStack->setNetworkFormat( rtpPayloadPCMU, request.rtpPacketSize*8 );
deviceMutex.unlock();
reopenAudioHardware();
return 0;
}

c.如何接收或者發(fā)送RTP/RTCP數(shù)據(jù)包:
我們在前面已經(jīng)看到了在SoundCardDevice::processRTP ()調(diào)用了RTPSession::Receive()以及RTPSession::TransimitterRAW()的方法,來接收/發(fā)送RTP,RTCP數(shù)據(jù)流.
1> RTP數(shù)據(jù)流的接收,它不會(huì)直接刪除數(shù)據(jù),但是會(huì)用替代的方式對inbuff數(shù)據(jù)做更新:
RtpPacket* RtpReceiver::receive ()
{
RtpPacket* p = NULL;
int len = 0;
int len1 = 0;
int silencePatched = 0;
bool faking = 0;


// empty network que
NtpTime arrival (0, 0);
while (1) // network empty or time to play return packet
{ //從網(wǎng)絡(luò)設(shè)備的緩沖隊(duì)列中取出數(shù)據(jù)
p = getPacket();
if (p == NULL) break;

// only play packets for valid sources
if (probation < 0)
{
cpLog(LOG_ERR, "****Packet from invalid source");
delete p;
p = NULL;
continue;
}
//獲取包的抵達(dá)時(shí)間
arrival = getNtpTime();
int packetTransit = 0;
int delay = 0;


rtp_ntohl(p);

// convert codec
if (p->getPayloadType() != apiFormat)
{
#ifndef __sparc
// 當(dāng)前接收到的包不符合目前的格式(假設(shè)為PCMU格式,那么下面做格式轉(zhuǎn)換)
//例如mono轉(zhuǎn)PCMU8K 16Bit具體的調(diào)用格式可以參看//convertCodec(RtpPayloadType fromType, RtpPayloadType toType,
// char* in_data, char* out_data, int len)
//它的主要作用在于將數(shù)據(jù)區(qū)內(nèi)的數(shù)據(jù)進(jìn)行格式轉(zhuǎn)換,至于帶入的參數(shù)我想就不//用過多解釋了。有興趣的同志可以參考
//unsigned char linear2ulaw( int pcm_val );
//int ulaw2linear( unsigned char u_val )這兩個(gè)函數(shù)的原代碼
RtpPacket* oldp = p;
p = convertRtpPacketCodec (apiFormat, oldp);
… …
#endif
}
//取得有效載荷的長度
len = p->getPayloadUsage();
if (len <= 0 || len > 1012)
{
delete p;
p = NULL;
continue;
}

// 重新調(diào)整接收到的RTP包的長度,使長度在網(wǎng)絡(luò)承載允許范圍內(nèi)
if (len > networkFormat_payloadSize )
{
int lenold = len;
len = ( len / networkFormat_payloadSize ) * networkFormat_payloadSize;
p->setPayloadUsage( len );
network_pktSampleSize = (lenold / networkFormat_payloadSize) * network_pktSampleSize;
}

… …
根據(jù)接收到RTP的分組序號(hào)和時(shí)間戳標(biāo)志的數(shù)據(jù)對Inbuff里的數(shù)據(jù)包進(jìn)行重排,
if (RtpSeqGreater(p->getSequence(), prevSeqRecv))
{
在這里是把包增加到數(shù)據(jù)的隊(duì)列尾
while (RtpSeqGreater(p->getSequence(), prevSeqRecv))
{
silencePatched = 0;
faking = 0;
//下面程序部分是在收到的分組頭部增加白燥聲。
while( RtpTimeGreater( p->getRtpTime() - network_pktSampleSize, prevPacketRtpTime ) && ((p->getSequence() - 1) == prevSeqRecv))
{
if( silenceCodec == 0 )//
{
cpLog( LOG_DEBUG_STACK, "Patching silence" );
if ((p->getPayloadType() >= rtpPayloadDynMin) &&
(p->getPayloadType() <= rtpPayloadDynMax) &&
(codecString[0] != '\0'))
{
silenceCodec = findSilenceCodecString(codecString, len);
}
else
{//添加白噪音
silenceCodec = findSilenceCodec( p->getPayloadType(), len );
}
if( silenceCodec == 0 )
{
if( len > rtpCodecInfo[ numRtpCodecInfo - 1 ].length )
{
assert( 0 );
}
silenceCodec = (char*)&rtpCodecInfo[ numRtpCodecInfo - 1 ].silence;
faking = 1;
}
}
assert( silenceCodec );

if ((inPos + len) < IN_BUFFER_SIZE)
{
memcpy (inBuff + inPos, silenceCodec, len);
inPos += len;
silencePatched++;
}
else
{
// circular memory copy
len1 = IN_BUFFER_SIZE - inPos;
memcpy (inBuff + inPos, silenceCodec, len1);
memcpy (inBuff, silenceCodec + len1, len - len1);
inPos = len - len1;
//printf("inPos S=%d\n", inPos);
silencePatched++;
}
prevPacketRtpTime += network_pktSampleSize;
}
if( prevPacketRtpTime != p->getRtpTime() - network_pktSampleSize)
{
prevPacketRtpTime = p->getRtpTime() - network_pktSampleSize;
}
//在inbuff隊(duì)列中插入已經(jīng)待播放的分組,


if ((inPos + len) < IN_BUFFER_SIZE)
{
memcpy (inBuff + inPos, p->getPayloadLoc(), len);
inPos += len;
}
else
{
// circular memory copy
len1 = IN_BUFFER_SIZE - inPos;
memcpy (inBuff + inPos, p->getPayloadLoc(), len1);
memcpy (inBuff, p->getPayloadLoc() + len1, len - len1);
inPos = len - len1;
}

//更新受到包的計(jì)數(shù)器
RtpSeqNumber tSeq = prevSeqRecv;
prevSeqRecv++;
if(prevSeqRecv > RTP_SEQ_MOD)
{
prevSeqRecv = 0;
}
if (prevSeqRecv < tSeq)
{
cpLog(LOG_DEBUG_STACK, "Recv cycle");
assert(prevSeqRecv == 0);
recvCycles += RTP_SEQ_MOD;
}
}
prevPacketRtpTime = p->getRtpTime();
if (silencePatched > 0)
cpLog(LOG_DEBUG_STACK, "silencePatched = %d", silencePatched);
if (faking)
silenceCodec = 0;
if (p->getSequence() != prevSeqRecv)
{
cpLog(LOG_DEBUG_STACK, "Unequal packet:%d stack:%d",
prevSeqRecv, p->getSequence());
}
}
else
{
RtpSeqNumber base_prevSeqRecv = prevSeqRecv;
int inSeqRecv = 1;
while (RtpSeqGreater(base_prevSeqRecv, p->getSequence()))
{
inSeqRecv++;
base_prevSeqRecv--;
}
int inPosTemp = inPos - inSeqRecv * len;
if (inPosTemp < 0) inPosTemp = IN_BUFFER_SIZE + inPosTemp;

if ((inPosTemp + len) < IN_BUFFER_SIZE)
{
memcpy (inBuff + inPosTemp, p->getPayloadLoc(), len);
}
else
{
// circular memory copy
len1 = IN_BUFFER_SIZE - inPosTemp;
memcpy (inBuff + inPosTemp, p->getPayloadLoc(), len1);
memcpy (inBuff, (p->getPayloadLoc()) + len1, len - len1);
}
}

// update packet received
packetReceived++;
payloadReceived += len;

// update jitter calculation
packetTransit = arrival - rtp2ntp(p->getRtpTime());
delay = packetTransit - transit;
transit = packetTransit;
if (delay < 0) delay = -delay;
jitter += delay - ((jitter + 8) >> 4);

// fractional
// s->jitterTime += (1./16.) * ((double)deley - s->jitterTime);
// integer
//jitterTime += delay - ((jitterTime+8) >> 4);


if (p)
{
delete p;
p = NULL;
}
}

int packetSize = apiFormat_payloadSize;
… …

//按照apiformat_playloadsize的長度,分割原有的數(shù)據(jù)包,重新構(gòu)造一個(gè)RTP數(shù)據(jù)包以便適
//合設(shè)備播放,當(dāng)然如果雙方把a(bǔ)piformat_playloadsize和networkFormat_payloadSize設(shè)置相
//同也可以。
assert (!p);
p = new RtpPacket (packetSize);
if ( (playPos + packetSize) < IN_BUFFER_SIZE)
{
memcpy (p->getPayloadLoc(), inBuff + playPos, packetSize);
playPos += packetSize;
}
else
{
len1 = IN_BUFFER_SIZE - playPos;
memcpy (p->getPayloadLoc(), inBuff + playPos, len1);
memcpy (p->getPayloadLoc() + len1, inBuff, packetSize - len1);
playPos = packetSize - len1;
}

//構(gòu)造RTP數(shù)據(jù)包的包頭部分
p->setSSRC (ssrc);
p->setPayloadType (apiFormat);
p->setPayloadUsage (packetSize);
p->setRtpTime (prevRtpTime + api_pktSampleSize);
p->setSequence (prevSeqPlay + 1);

if (probation > 0) probation --;
receiverError = recv_success;
prevRtpTime = p->getRtpTime();
prevNtpTime = getNtpTime();
gotime = rtp2ntp (p->getRtpTime() + api_pktSampleSize) + jitterTime;
//更新已經(jīng)播放的數(shù)據(jù)包的計(jì)數(shù)器
RtpSeqNumber sSeq = prevSeqPlay;
prevSeqPlay++;
if (prevSeqPlay < sSeq)
{
playCycles += RTP_SEQ_MOD;
}

return p;
}

2> RTP數(shù)據(jù)流的發(fā)送:
  RTPSession::TransimitterRAW方法,RTP數(shù)據(jù)流的發(fā)送方法,發(fā)送沒有接收這么復(fù)雜,不需要針對Buff中數(shù)據(jù)按照包序列號(hào)和NTP排序,最后再根據(jù)本地的包長度充足重組,只要寫入Outbuff中直接加上RTP頭就可以直接發(fā)送了。
int RtpTransmitter::transmitRaw (char* data, int len)
{
int len1;
//如果媒體設(shè)備所能接受的長度制式和網(wǎng)絡(luò)傳輸?shù)闹剖讲荒芟喾ヅ,那么調(diào)用轉(zhuǎn)換程序進(jìn)行轉(zhuǎn)//化,把本地播放制式長度轉(zhuǎn)換成網(wǎng)絡(luò)制式長度。ConvertCodec這個(gè)函數(shù)我們在前面已經(jīng)有過//介紹。
if( apiFormat != networkFormat)
{
char* buffer = new char[1012];
len = convertCodec(apiFormat, networkFormat, data, buffer, len);
data = buffer;
}
// 把發(fā)送的字節(jié)發(fā)送到Outbuff中準(zhǔn)備發(fā)送出去;
if( (outPos + len) < OUT_BUFFER_SIZE)
{
memcpy (outBuff + outPos, data, len);
outPos += len;
}
else
{
// circular memory copy
len1 = OUT_BUFFER_SIZE - outPos;
memcpy (outBuff + outPos, data, len1);
memcpy (outBuff, data + len1, len - len1);
outPos = len - len1;
}


// check if enough data to send out packet
int packetSize = networkFormat_payloadSize;
//發(fā)送新的RTP數(shù)據(jù)包
int result = 0;
//創(chuàng)建新的RTP數(shù)據(jù)包
RtpPacket* p = new RtpPacket (networkFormat_payloadSize);
assert (p);
//創(chuàng)建RTP包頭
p->setSSRC (ssrc);
p->setPayloadType (networkFormat);
p->setPayloadUsage (packetSize);

//使用Outbuff中的數(shù)據(jù),填充前面新創(chuàng)建的RTP包的內(nèi)容,,每填充一個(gè)Packet包的指//針recPos向前移動(dòng)一個(gè)Packet位置
while ( ((outPos + OUT_BUFFER_SIZE - recPos) % OUT_BUFFER_SIZE) >= packetSize )
{
if( (recPos + packetSize) < OUT_BUFFER_SIZE)
{ memcpy (p->getPayloadLoc(), outBuff + recPos, packetSize);
recPos += packetSize;
}
else
{ len1 = OUT_BUFFER_SIZE - recPos;
memcpy (p->getPayloadLoc(), outBuff + recPos, len1);
memcpy (p->getPayloadLoc() + len1, outBuff, packetSize - len1);
recPos = packetSize - len1;
}
//發(fā)送RTP包
result += transmit(p);
}
if( p) delete p;
p = NULL;
return result;
}

  3>上面說完了RTP包的發(fā)送和接收,現(xiàn)在該說說RTCP包的發(fā)送和接收了,我們知道RTCP包的目的在于向與參與會(huì)話者發(fā)送自量質(zhì)量的反饋消息,實(shí)現(xiàn)多媒體同步的功能,不過問題在于RTCP包的數(shù)量隨著參與者的數(shù)量增加而增加,所以一般說來點(diǎn)對點(diǎn)的話,沒有必要使用RTCP控制,另外隨著RSVP的普遍應(yīng)用,Qos的控制機(jī)制愈加完善,也許沒有必要用這么低級的Qos控制方式了。
我們可以看到在SoundCardDevice::ProcessRTP中調(diào)用了RTCP的發(fā)送和接收方法::

void RtpSession::processRTCP ()
{
if (rtcpTran)
{//這里的checkIntervalRTCP保證在固定間隔的時(shí)間內(nèi)發(fā)送RTCP分組
if (checkIntervalRTCP()) transmitRTCP();
}
if (rtcpRecv)
{
receiveRTCP();
}
return ;
}
定期發(fā)送SR報(bào)告(發(fā)送者報(bào)告):
int RtpSession::transmitRTCP ()
{
… …
RtcpPacket* p = new RtcpPacket();

// load with report packet
rtcpTran->addSR(p);
//增加源描述項(xiàng),在這里僅僅是發(fā)送方發(fā)送描述項(xiàng),而接收方不發(fā)送
if (tran) rtcpTran->addSDES(p);
//調(diào)用UdpStack::Trasmitto發(fā)送RTCP分組,
int ret = rtcpTran->transmit(p);

if (p) delete p;
return ret;
}
如何構(gòu)造一個(gè)發(fā)送者報(bào)告和接收者報(bào)告:
int RtcpTransmitter::addSR (RtcpPacket* p, int npadSize)
{
// 創(chuàng)建RTCP包的頭部
RtcpHeader* header = reinterpret_cast < RtcpHeader* > (p->freeData());
int usage = p->allocData (sizeof(RtcpHeader));
//填充RTCP包頭的各個(gè)項(xiàng)目/版本/填充位/長度記數(shù)/包類型(SR/RR)
header->version = RTP_VERSION;
header->padding = (npadSize > 0) ? 1 : 0;
header->count = 0;
header->type = (tran) ? rtcpTypeSR : rtcpTypeRR;
//獲取當(dāng)前時(shí)間戳
NtpTime nowNtp = getNtpTime();
//構(gòu)造一個(gè)SR包的記錄
if (tran)
{
RtcpSender* senderInfo = reinterpret_cast < RtcpSender* > (p->freeData());
usage += p->allocData (sizeof(RtcpSender));
int diffNtp = 0;
if (nowNtp > tran->seedNtpTime)
diffNtp = nowNtp - tran->seedNtpTime;
else
if (tran->seedNtpTime > nowNtp)
diffNtp = tran->seedNtpTime - nowNtp;
RtpTime diffRtp = (diffNtp * tran->networkFormat_clockRate) / 1000;
senderInfo->ssrc = htonl(tran->ssrc);//獲得發(fā)送方的SSRC
senderInfo->ntpTimeSec = htonl(nowNtp.getSeconds());
senderInfo->ntpTimeFrac = htonl(nowNtp.getFractional());//獲得NTP時(shí)間戳
senderInfo->rtpTime = htonl(tran->seedRtpTime + diffRtp);//獲得RTP時(shí)間戳
senderInfo->packetCount = htonl(tran->packetSent);//發(fā)送的包記數(shù)
senderInfo->octetCount = htonl(tran->payloadSent);//發(fā)送的字節(jié)記數(shù)
}
… …

// report blocks
if ((rtcpRecv) && (rtcpRecv->getTranInfoCount() > 0))
{
RtpTranInfo* tranInfo = NULL;
RtpReceiver* recvInfoSpec = NULL;
RtcpReport* reportBlock = NULL;
for (int i = 0; i < rtcpRecv->getTranInfoCount(); i++)
{
tranInfo = rtcpRecv->getTranInfoList(i);
recvInfoSpec = tranInfo->recv;
… …
//cpLog (LOG_DEBUG_STACK, "RTCP: Report block for src %d",
// recvInfoSpec->ssrc);
reportBlock = reinterpret_cast < RtcpReport* > (p->freeData());
usage += p->allocData (sizeof(RtcpReport));

reportBlock->ssrc = htonl(recvInfoSpec->ssrc);
reportBlock->fracLost = calcLostFrac(tranInfo);
// 根據(jù)RFC 1889的A.3 計(jì)算包丟失率,根據(jù)接收到的包和周期內(nèi)的期望接收值
//相比而得到。然后按照RTCP的頭安置要求將丟包率擺好。
u_int32_t lost = (calcLostCount(tranInfo)) & 0xffffff;
reportBlock->cumLost[2] = lost & 0xff;
reportBlock->cumLost[1] = (lost & 0xff00) >> 8;
reportBlock->cumLost[0] = (lost & 0xff0000) >> 16;
//累計(jì)丟失分組率
reportBlock->recvCycles = htons(recvInfoSpec->recvCycles);
//擴(kuò)展已接收的最高序號(hào)
reportBlock->lastSeqRecv = htons(recvInfoSpec->prevSeqRecv);
//到達(dá)的時(shí)延抖動(dòng)
reportBlock->jitter = htonl(recvInfoSpec->jitter >> 4);
//最末的SR時(shí)間戳
reportBlock->lastSRTimeStamp = htonl(tranInfo->lastSRTimestamp);
//最末的SR到達(dá)后的時(shí)延
if (tranInfo->lastSRTimestamp == 0)
reportBlock->lastSRDelay = 0;
else
{
NtpTime thenNtp = tranInfo->recvLastSRTimestamp;
reportBlock->lastSRDelay = 0;
if (nowNtp > thenNtp)
reportBlock->lastSRDelay = htonl(nowNtp - thenNtp);
else
reportBlock->lastSRDelay = 0;
}
// next known transmitter
header->count++;
}
}

… …
assert (usage % 4 == 0);
//定義整個(gè)RTCP包的長度。
header->length = htons((usage / 4) - 1);

return usage;
}
如何構(gòu)造一個(gè)源描述相:addSDES

定期接收RTCP包的程序:
int RtpSession::receiveRTCP ()
{
… …
//通過GetPacket的方法從Udp通道中讀出RTCP分組,注:這個(gè)方法和RTP的讀分組方法基本//一樣
RtcpPacket* p = rtcpRecv->getPacket();
… …
if (rtcpRecv->readRTCP(p) == 1)
{
ret = 1;
}

if (p) delete p;
return ret;
}
我們下面來看一下,每一種RTCP包的處理過程:
int RtcpReceiver::readRTCP (RtcpPacket* p)
{
//begin和end均為RTCP隊(duì)列接收的頭尾。
char* begin = reinterpret_cast < char* > (p->getPacketData());
char* end = reinterpret_cast < char* > (begin + p->getTotalUsage());
RtcpHeader* middle = NULL;
int ret = 0;
//掃描整個(gè)隊(duì)列處理RTCP分組
while (begin < end)
{
middle = reinterpret_cast < RtcpHeader* > (begin);
switch (middle->type)
{
case (rtcpTypeSR):
case (rtcpTypeRR):
readSR (middle);//處理SR分組
break;
case (rtcpTypeSDES):
readSDES (middle);//處理SDES分組
break;
case (rtcpTypeBYE):
if ( readBYE (middle) == 0)//處理Bye分組
{
ret = 1;
}
break;
case (rtcpTypeAPP):
readAPP (middle);//處理App分組
break;
default:
break;
}
begin += (ntohs(middle->length) + 1) * sizeof(u_int32_t);
}
return ret;
}
我們以處理SR/RR分組為例子,看一下如何處理RTCP分組消息的:
void RtcpReceiver::readSR (RtcpHeader* head)
{
char* middle = NULL;

NtpTime nowNtp = getNtpTime();
if (head->type == rtcpTypeSR)
{
RtcpSender* senderBlock = reinterpret_cast < RtcpSender* >
((char*)head + sizeof(RtcpHeader));
RtpTranInfo* s = findTranInfo(ntohl(senderBlock->ssrc));
s->lastSRTimestamp = (ntohl(senderBlock->ntpTimeSec) << 16 |
ntohl(senderBlock->ntpTimeFrac) >> 16);
s->recvLastSRTimestamp = nowNtp;
packetReceived++;//包接收記數(shù)增加一

NtpTime thenNtp ( ntohl(senderBlock->ntpTimeSec),
ntohl(senderBlock->ntpTimeFrac) );
//下面兩個(gè)數(shù)值都可以被應(yīng)用層直接調(diào)用,是應(yīng)用層了解目前的RTP流的傳輸狀況
accumOneWayDelay += (nowNtp - thenNtp);//在時(shí)間區(qū)段內(nèi)RTP包抵達(dá)的總延遲
avgOneWayDelay = accumOneWayDelay / packetReceived;//平均延遲
middle = (char*)head + sizeof(RtcpHeader) + sizeof(RtcpSender);
}
else
{
middle = (char*)head + sizeof(RtcpHeader);

RtpSrc* sender = reinterpret_cast < RtpSrc* > (middle);
RtpSrc ssrc;

ssrc = ntohl(*sender);
middle += sizeof(RtpSrc);

packetReceived++;
}
RtcpReport* block = reinterpret_cast < RtcpReport* > (middle);
for (int i = head->count; i > 0; i--)
{
//下面兩個(gè)數(shù)值都可以被應(yīng)用層直接調(diào)用,是應(yīng)用層了解目前的RTP流的接收狀況
NtpTime thenNtp (ntohl(block->lastSRTimeStamp) >> 16,
ntohl(block->lastSRTimeStamp) << 16 );

NtpTime nowNtp1 (nowNtp.getSeconds() & 0x0000FFFF,
nowNtp.getFractional() & 0xFFFF0000);
accumRoundTripDelay += ((nowNtp1 - thenNtp)
- ntohl(block->lastSRDelay)); 在時(shí)間區(qū)段內(nèi)RTP包接收的總延遲
avgRoundTripDelay = accumRoundTripDelay / packetReceived;// 在時(shí)間區(qū)段內(nèi)RTP包接收// 的平均延遲
++block;
}
}

3.2.8.3 ACK消息的處理過程OpAck:

1.3 OpAck,這個(gè)操作主要是在被叫收到ACK消息后的處理過程,我們在這里先期做介紹。
const Sptr < State >
OpAck::process( const Sptr < SipProxyEvent > event )
{
... ...
if ( sipMsg->getType() != SIP_ACK )
{
return 0;
}

Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
assert( call != 0 );
... ...
//接收到SDP以后從UaCallInfo中提取出對端的SDP,打開聲音通道的RTP/RTCP,這個(gè)過程的處理機(jī)制可以參看//OpStartAudioDuplex::process
Sptr < SipSdp > remoteSdp = call->getRemoteSdp();
startAudio( localSdp, remoteSdp );
... ...
return 0;
}

3.2.8.4 OpConfTargetOk多方會(huì)議檢測:

  OpConfTargetOk,表示多方會(huì)議時(shí)候的檢測機(jī)制,這個(gè)機(jī)制在目前的設(shè)定中沒有使用,所以沒有必要介紹

  OpFwdDigit,在打開RTP/RTCP媒體通道以后,如果這個(gè)時(shí)候定義了通話轉(zhuǎn)接呼叫的方式,那么按下0-9的按紐,那么該方法通過以下的流程:

UaDevice::getDeviceQueue()->add( signal )-->ResGwDevice::processSessionMsg-->
CaseHardwareSignalType:...-->provideSignal-->provideDtmf-->OpAddDigit ::process--> UaDevice::instance()->getDigitCollector()->addDigit( digit )

  將所輸入的號(hào)碼存儲(chǔ)在DigitCollector中,如果通話繼續(xù)呼叫方式有效,那么在操作隊(duì)列中增加addOperator( new OpSecondCall ),在這個(gè)新增加的操作符中重新開始向新的一端發(fā)送Invite消息(根據(jù)輸入的Digit形成被叫的Url)從而實(shí)現(xiàn)呼叫從一端轉(zhuǎn)接到另外一端的方式。

3.2.9呼叫等待:

  呼叫等待是SIP電話系統(tǒng)中一個(gè)比較有用的應(yīng)用,在 RFC2543對這個(gè)應(yīng)用也做了一些描述,主要的方法是向在通話過程中向等待方發(fā)送一個(gè)INVITE消息,消息中包括了一個(gè)將本地的SDP的C=選項(xiàng)的地址改變成"0.0.0.0"同時(shí)為了和上一個(gè)INVITE消息區(qū)分Cseq項(xiàng)增加1,通過這樣實(shí)現(xiàn)抑制本地的媒體流。

我們看一下流程:


(點(diǎn)擊放大)

3.2.9.1 呼叫等待的詳細(xì)描述:(以Diagram.17為例)

a. A,B兩個(gè)端點(diǎn)通過RTP/RTCP進(jìn)行語音通訊;
b. B接收到了C的一個(gè)呼叫(Invite消息),這個(gè)時(shí)候B處于OpRing的狀態(tài)中,B向C發(fā)送Ring表示已經(jīng)收到C的呼叫,并且讓C處于等待B摘機(jī)的狀態(tài);
c.這個(gè)時(shí)候B進(jìn)入OpStartCallWaitting狀態(tài),在這個(gè)狀態(tài)里,捕捉終端接收的DeviceEventFlash信號(hào),也就是Flash信號(hào),這樣把當(dāng)前的A,B RTP會(huì)話陷入Hold狀態(tài),也就是保持狀態(tài),B把當(dāng)前的會(huì)話的ID號(hào)放置入CallWaitingID的隊(duì)列中去進(jìn)行等待;
d. B在OpStartCallWaiting中向A發(fā)送Reinvite消息,這個(gè)INVITE消息的SDP的C=選項(xiàng)的地址改變成"0.0.0.0",這個(gè)時(shí)候A在OpReinvite狀態(tài)中, B的通話暫時(shí)陷入停止,進(jìn)入StateOnHold狀態(tài)中
c. B和C開始進(jìn)行通訊;
d. C掛機(jī)發(fā)送Bye消息給B這個(gè)時(shí)候B進(jìn)入OpEndCall狀態(tài);
e. B在這個(gè)狀態(tài)的時(shí)候檢測到在呼叫等待,B進(jìn)入到OpConvertCW中,并且把等待隊(duì)列中的CallID帶入myevent隊(duì)列中準(zhǔn)備執(zhí)行;如果這個(gè)時(shí)候捕捉到終端接收的DeviceEventFlash信號(hào),OpRrevert操作向A發(fā)送Reinvite消息,恢復(fù)和A的通訊;
f.A,B之間開始通訊;

3.2.9.2操作之間存在的競爭:

  從上面來看在操作中存在這一定的競爭,A,B之間通訊進(jìn)入終止以后,是進(jìn)入的StateOnHold狀態(tài),同樣在B,C之間的通訊,在在StateInCall狀態(tài)的時(shí)候,用戶也有可能發(fā)出DeviceEventFlash消息,迫使B重入StateOnHold狀態(tài),而不是在對方發(fā)出Bye消息以后,這樣的結(jié)果就是B在StateOnHold狀態(tài)無法返回,修改的方法其實(shí)非常簡單,只要這樣就可以了:

  addOperator( new OpRevert )改成
  addEntryOperator(new OpRevert);
  為什這么改呢?把OpRevert放在不同的隊(duì)列中,這樣,從StateInCall狀態(tài)轉(zhuǎn)入StateOnHold的時(shí)候,就不是只有一個(gè)FlashEvent的條件提供判斷了,狀態(tài)的變化需要通過State:::Process來執(zhí)行,這樣就增加了一個(gè)約束的條件,大家不明白的話可以細(xì)看一下State::Process(…)的代碼。

3.2.9.3 呼叫中所涉及模塊介紹:

  以下對呼叫等待所涉及到的一些模塊和方法的簡單介紹:
a.OpStartCallWaiting的應(yīng)用:
OpStartCallWaitting主要是檢驗(yàn)是否有進(jìn)入呼叫等待DeviceEventFlash信號(hào),并且把當(dāng)前的對話切換到等待狀態(tài),而當(dāng)前的等待切換為當(dāng)前對話,并且向等待的一方發(fā)送Re-Invite的hold消息。
OpStartCallWaiting::process( const Sptr < SipProxyEvent > event )
{
//如果這個(gè)時(shí)刻,出現(xiàn)C端呼叫B端的情況,如果B端要轉(zhuǎn)移呼叫到C那么按下"f"代表呼叫轉(zhuǎn)//移。
if ( deviceEvent->type != DeviceEventFlash )
{
return 0;
}
//注意這個(gè)時(shí)候C呼叫的CallID已經(jīng)被裝入callwaitinglist中準(zhǔn)備調(diào)用;
Sptr < SipCallId > call2Id = UaDevice::instance()->getCallWaitingId();
if ( call2Id == 0 )
{
// no call on call waiting
return 0;
}


if ( UaConfiguration::instance()->getCallWaitingOn() )
{
//通知當(dāng)前的等待隊(duì)列中的消息準(zhǔn)備準(zhǔn)備開始和B進(jìn)行通訊,主要方式是把它的Call ID掛入//myeventQ中,由SipThread處理在隊(duì)列里的消息。
Sptr < UaCallContainer > calls;
calls.dynamicCast( event->getCallContainer() );
assert( calls != 0 );
Sptr < UaCallInfo > call2 = calls->findCall( *call2Id );

Sptr < Fifo < Sptr < SipProxyEvent > > > eventQ = deviceEvent->getFifo();
Sptr < UaDeviceEvent > event = new UaDeviceEvent( eventQ );
event->type = DeviceEventFlash;
event->callId = call2Id;
eventQ->add( event );

// 把當(dāng)前的A-B通訊裝入等待隊(duì)列中等待;
Sptr < SipCallId > callId = UaDevice::instance()->getCallId();
UaDevice::instance()->setCallId( 0 );
UaDevice::instance()->addCallWaitingId( callId );
}
//準(zhǔn)備回送給A端發(fā)送SDP的"C"為0.0.0.0的Invite消息,并且在Cseq為上一個(gè)的累加;
Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
assert( call != 0 );

// Put current contact on hold
Sptr < InviteMsg > reInvite;

Sptr < Contact > contact = call->getContact();
assert( contact != 0 );

int status = contact->getStatus();
if ( status == 200 )
{
//在作為呼叫方的時(shí)候,這個(gè)Invite消息非常好制作,大部分只要復(fù)制上一個(gè)的一些內(nèi)//容就可以了。
const StatusMsg& msg = contact->getStatusMsg();
if ( &msg != 0 )
{
reInvite = new InviteMsg( msg );

//add SDP
Sptr < SipSdp > localSdp = call->getLocalSdp();
assert( localSdp != 0 );
SipSdp sipSdp = *localSdp;
reInvite->setContentData( &sipSdp );
}
… …
}
else
{
//在作為被叫方的時(shí)候,這個(gè)Invite消息就比較麻煩了,基本上要重新創(chuàng)立。

const InviteMsg& msg = contact->getInviteMsg();
if ( &msg != 0 )
{
string sipPort = UaConfiguration::instance()->getLocalSipPort();
reInvite = new InviteMsg( msg.getFrom().getUrl(),
atoi( sipPort.c_str() ) );
SipFrom from( msg.getTo().getUrl() );
reInvite->setFrom( from );

reInvite->setCallId( msg.getCallId() );

// Convert RecordRoute to reverse Route
int numRecordRoute = msg.getNumRecordRoute();
SipRecordRoute recordroute;
SipRoute route;

for ( int i = 0; i < numRecordRoute; i++ )
{
recordroute = msg.getRecordRoute( i );
route.setUrl( recordroute.getUrl() );
reInvite->setRoute( route ); // to beginning
}

int numContact = msg.getNumContact();
if ( numContact )
{
SipContact contact = msg.getContact( numContact - 1 );
route.setUrl( contact.getUrl() );
reInvite->setRoute( route ); // to beginning
}

}

}
assert( reInvite != 0 );

SipVia sipVia;
sipVia.setprotoVersion( "2.0" );
sipVia.setHost( Data( theSystem.gethostAddress() ) );
sipVia.setPort( atoi( UaConfiguration::instance()->getLocalSipPort().c_str() ) );
reInvite->flushViaList();
reInvite->setVia( sipVia, 0 );

// Set Contact: header
Sptr< SipUrl > myUrl = new SipUrl;
myUrl->setUserValue( UaConfiguration::instance()->getUserName(), "phone" );
myUrl->setHost( Data( theSystem.gethostAddress() ) );
myUrl->setPort( atoi( UaConfiguration::instance()->getLocalSipPort().c_str() ) );
SipContact me;
me.setUrl( myUrl );
reInvite->setNumContact( 0 ); // Clear
reInvite->setContact( me );

//TODO Is it going to be a problem if the other side also use the next
//TODO CSeq at the same time?
unsigned int cseq = contact->getCSeqNum();
contact->setCSeqNum( ++cseq );
SipCSeq sipCSeq = reInvite->getCSeq();
sipCSeq.setCSeq( cseq );
reInvite->setCSeq( sipCSeq );

Sptr<SipSdp> sipSdp;
sipSdp.dynamicCast ( reInvite->getContentData( 0 ) );
assert ( sipSdp != 0 );
SdpSession sdpDesc = sipSdp->getSdpDescriptor();
//在這里把SDP的C=0.0.0.0(hold項(xiàng))設(shè)定;
sdpDesc.setHold();
sipSdp->setSdpDescriptor( sdpDesc );
//發(fā)送Reinvite消息到A端。
deviceEvent->getSipStack()->sendAsync( *reInvite );
Sptr < UaStateMachine > stateMachine;
stateMachine.dynamicCast( event->getCallInfo()->getFeature() );
assert( stateMachine != 0 );
//轉(zhuǎn)程序到StateOnhold
return stateMachine->findState( "StateOnhold" );
}
b.OpReinvite:
OpReinvite在接收到由通訊的對端Invite消息以后,把消息內(nèi)的 RemoteSDP放在本地的UaCallInfo中然后回送一個(gè)OK消息給對端;
c.OpEndCall:
OpEndCall在檢測到Bye消息發(fā)送以后,讓程序回送OK消息并且進(jìn)入StateCallEnded狀態(tài)中;
d.OpRevert:
OpRevert檢測到再次有DeviceEventFlash消息的時(shí)候本地開始發(fā)送INVITE消息,把等待方`處于等待的呼叫喚起;
e.StateCallEnded狀態(tài):
StateCallEnded同樣是檢測DeviceEventFlash的消息,檢測到以后把調(diào)用OpConvertCw的操作,把處于等待隊(duì)列里的呼叫喚起。

(未完待續(xù))

在Vovida的基礎(chǔ)上實(shí)現(xiàn)自己的SIP協(xié)議棧(六)

作者聯(lián)系方法:lu_zheng@21cn.com

作者供稿 CTI論壇編輯


分類信息:     文摘