windows-nt/Source/XPSP1/NT/enduser/netmeeting/av/nac/rvstream.cpp

425 lines
10 KiB
C++
Raw Permalink Normal View History

2020-09-26 03:20:57 -05:00
/*
RVSTREAM.C
*/
#include "precomp.h"
#define PLAYOUT_DELAY_FACTOR 2
#ifndef MAX_MISORDER
#define MAX_MISORDER 30
#endif
void FreeNetBufList(NETBUF *pNB, IRTPRecv *pRTP)
{
NETBUF *pNBTemp;
while (pNB) {
pNBTemp = pNB;
pNB = pNB->next;
if (pRTP) pRTP->FreePacket(*(WSABUF **)(pNBTemp + 1));
pNBTemp->pool->ReturnBuffer(pNBTemp);
}
}
void AppendNetBufList(NETBUF *pFirstNB, NETBUF *pNB)
{
NETBUF *pNB1 = pFirstNB;
while (pNB1->next) {
ASSERT(pNB != pNB1);
pNB1 = pNB1->next;
}
ASSERT(pNB != pNB1);
pNB1->next = pNB;
}
int RVStream::Initialize(UINT flags, UINT size, IRTPRecv *pRTP, MEDIAPACKETINIT *papi, ULONG ulSamplesPerPacket, ULONG ulSamplesPerSec, VcmFilter *pVideoFilter)
{
m_pVideoFilter = pVideoFilter;
return ((RxStream*)this)->Initialize(flags, size, pRTP, papi, ulSamplesPerPacket, ulSamplesPerSec);
}
/*
Queues a received RTP packet.
The packet is described by pNetBuf.
This routine will take care of freeing pNetBuf (even in error cases)
*/
HRESULT
RVStream::PutNextNetIn(WSABUF *pWsaBuf, DWORD timestamp, UINT seq, UINT fMark, BOOL *pfSkippedData, BOOL *pfSyncPoint)
{
FX_ENTRY("RVStream::PutNextNetIn");
UINT pos;
MediaPacket *pAP;
NETBUF *pNB_Packet;
HRESULT hr;
NETBUF *pNetBuf = (NETBUF *)m_NetBufPool.GetBuffer();
ASSERT(pNetBuf);
EnterCriticalSection(&m_CritSect);
*pfSkippedData = FALSE;
*pfSyncPoint = FALSE;
if (pNetBuf == NULL)
{
hr = E_OUTOFMEMORY;
WARNING_OUT(("RVStream::PutNextNetIn - Out of memory in buffer pool"));
m_pRTP->FreePacket(pWsaBuf);
goto ErrorExit;
}
*(WSABUF **)(pNetBuf+1) = pWsaBuf; // cache the WSABUF pointer so it can be returned later
pNetBuf->data = (PBYTE) pWsaBuf->buf + sizeof(RTP_HDR);
pNetBuf->length = pWsaBuf->len - sizeof(RTP_HDR);
pNetBuf->next = NULL;
pNetBuf->pool = &m_NetBufPool;
hr = ReassembleFrame(pNetBuf, seq, fMark);
if (hr != DPR_SUCCESS)
{
// free pNetBuf since its not yet on m_NetBufList.
// m_NetBufList will be freed at ErrorExit
::FreeNetBufList(pNetBuf,m_pRTP);
goto ErrorExit;
}
// not the end of the frame
if (!fMark)
{
LeaveCriticalSection(&m_CritSect);
return S_FALSE; // success, but not a new frame yet
}
// If we get here we think we have a complete encoded video frame (fMark was
// set on the last packet)
// if the ring is full or the timestamp is earlier, dump everything.This may be too drastic
// and the reset action could be refined to dump only the older
// packets. However, need to make sure the ring doesnt get "stuck"
pos = ModRing(m_MaxPos+1);
if (pos == m_FreePos || TS_EARLIER(timestamp, m_MaxT)) {
Reset(seq,timestamp);
*pfSkippedData = TRUE;
pos = ModRing(m_MaxPos + 1); // check again
if (pos == m_FreePos) {
hr = DPR_OUT_OF_MEMORY;
m_LastGoodSeq -= MAX_MISORDER; //make sure we dont accidentally synchronize
goto ErrorExit;
}
}
// insert frame into ring
pAP = m_Ring[pos];
if (pAP->Busy() || pAP->GetState() != MP_STATE_RESET) {
hr = DPR_DUPLICATE_PACKET;
goto ErrorExit;
}
// new stuff
hr = RestorePacket(m_NetBufList, pAP, timestamp, seq, fMark, pfSyncPoint);
if (FAILED(hr))
{
goto ErrorExit;
}
if (*pfSyncPoint)
{
DEBUGMSG (ZONE_IFRAME, ("%s: Received a keyframe\r\n", _fx_));
}
::FreeNetBufList(m_NetBufList,m_pRTP);
m_NetBufList = NULL;
#ifdef DEBUG
if (!TS_LATER(timestamp, m_MaxT))
{
DEBUGMSG (ZONE_DP, ("PutNextNetIn(): Reconstructed frame's timestamp <= to previous frame's!\r\n"));
}
#endif
m_MaxT = timestamp;
m_MaxPos = pos; // advance m_MaxPos
// end new stuff
LeaveCriticalSection(&m_CritSect);
StartDecode();
return hr;
ErrorExit:
// if we're in the middle of assembling a frame, free buffers
if (m_NetBufList){
::FreeNetBufList(m_NetBufList,m_pRTP);
m_NetBufList = NULL;
}
LeaveCriticalSection(&m_CritSect);
return hr;
}
// Called to force the release of any accumulated NETBUFs back to the owner (RTP).
// This can be called at shutdown or to escape from a out-of-buffer situation
BOOL RVStream::ReleaseNetBuffers()
{
::FreeNetBufList(m_NetBufList, m_pRTP);
m_NetBufList = NULL;
return TRUE;
}
// Take a packet and reassemble it into a frame.
// Doesnt currently process out-of-order packets (ie) the entire frame is
// discarded
// The NETBUF is held onto, unless an error is returned
HRESULT
RVStream::ReassembleFrame(NETBUF *pNetBuf, UINT seq, UINT fMark)
{
++m_LastGoodSeq;
if (seq != m_LastGoodSeq) {
// dont handle out of sequence packets
if (fMark)
m_LastGoodSeq = (WORD)seq;
else
--m_LastGoodSeq; // LastGoodSeq left unchanged
return DPR_OUT_OF_SEQUENCE;
}
if (m_NetBufList ) {
// append to list of fragments
::AppendNetBufList(m_NetBufList,pNetBuf);
} else {
// start of frame
m_NetBufList = pNetBuf;
}
return DPR_SUCCESS;
}
HRESULT
RVStream::SetLastGoodSeq(UINT seq)
{
m_LastGoodSeq = seq ? (WORD)(seq-1) : (WORD)0xFFFF;
return DPR_SUCCESS;
}
// called when restarting after a pause (fSilenceOnly == FALSE) or
// to catch up when latency is getting too much (fSilenceOnly == TRUE)
// determine new play position by skipping any
// stale packets
HRESULT RVStream::FastForward( BOOL fSilenceOnly)
{
UINT pos;
DWORD timestamp = 0;
// restart the receive stream
EnterCriticalSection(&m_CritSect);
if (!TS_EARLIER(m_MaxT , m_PlayT)) {
// there are buffers waiting to be played
// dump them!
if (ModRing(m_MaxPos - m_PlayPos) <= m_DelayPos)
goto Exit; // not too many stale packets;
for (pos=m_PlayPos;pos != ModRing(m_MaxPos -m_DelayPos);pos = ModRing(pos+1),m_PlaySeq++) {
if (m_Ring[pos]->Busy()
|| (m_Ring[pos]->GetState() != MP_STATE_RESET
&& (fSilenceOnly ||ModRing(m_MaxPos-pos) <= m_MaxDelayPos)))
{ // non-empty packet
if (m_Ring[pos]->Busy()) // uncommon case
goto Exit; // bailing out
timestamp = m_Ring[pos]->GetTimestamp();
break;
}
m_Ring[pos]->Recycle(); // free NETBUF and Reset state
LOG((LOGMSG_RX_SKIP,pos));
}
if (timestamp) {// starting from non-empty packet
m_PlayT = timestamp;
//m_Ring[pos]->GetProp(MP_PROP_SEQNUM, &m_PlaySeq);
} else { // starting from (possibly) empty packet
m_PlayT++;
}
// probably also need to update FreePos
if (m_FreePos == ModRing(m_PlayPos-1))
m_FreePos = ModRing(pos-1);
m_PlayPos = pos;
/*
if (pos == ModRing(m_MaxPos+1)) {
DEBUGMSG(1,("Reset:: m_MaxT inconsisten!\n"));
}
*/
LOG((LOGMSG_RX_RESET2,m_MaxT,m_PlayT,m_PlayPos));
}
Exit:
LeaveCriticalSection(&m_CritSect);
return DPR_SUCCESS;
}
HRESULT
RVStream::Reset(UINT seq,DWORD timestamp)
{
UINT pos;
HRESULT hr;
// restart the receive stream
EnterCriticalSection(&m_CritSect);
LOG((LOGMSG_RX_RESET,m_MaxPos,m_PlayT,m_PlayPos));
/*if (!TS_EARLIER(m_MaxT , m_PlayT)) */
{
// there are buffers waiting to be played
// dump them!
// Empty the RVStream and set PlayT appropriately
for (pos = m_PlayPos;
pos != m_FreePos;
pos = ModRing(pos+1))
{
if (m_Ring[pos]->Busy ())
{
DEBUGMSG (1, ("RVStream::Reset: packet is busy, pos=%d\r\n", pos));
ASSERT(1);
hr = DPR_INVALID_PARAMETER;
goto Failed;
}
m_Ring[pos]->Recycle(); // free NETBUF and Reset state
}
}
m_MaxPos = ModRing(m_PlayPos-1);
m_PlayT = timestamp;
m_MaxT = m_PlayT -1; // m_MaxT must be less than m_PlayT
m_PlaySeq = seq;
LOG((LOGMSG_RX_RESET2,m_MaxPos,m_PlayT,m_PlayPos));
hr = DPR_SUCCESS;
Failed:
LeaveCriticalSection(&m_CritSect);
return hr;
}
MediaPacket *RVStream::GetNextPlay(void)
{
MediaPacket *pAP = NULL;
UINT pos,seq;
DWORD timestamp = 0, dwVal;
EnterCriticalSection(&m_CritSect);
pAP = m_Ring[m_PlayPos];
if (pAP->Busy() ||
(pAP->GetState() != MP_STATE_RESET && pAP->GetState() != MP_STATE_DECODED)
|| ModRing(m_PlayPos+1) == m_FreePos) {
LeaveCriticalSection(&m_CritSect);
return NULL;
} else {
// If there are empty buffer(s) at the head of the q followed
// by a talkspurt (non-empty buffers) and if the talkspurt is excessively
// delayed then squeeze out the silence.
//
if (pAP->GetState() == MP_STATE_RESET)
FastForward(TRUE); // skip silence packets if necessary
pAP = m_Ring[m_PlayPos]; // in case the play position changed
if (pAP->GetState() == MP_STATE_DECODED) {
timestamp = pAP->GetTimestamp();
seq = pAP->GetSeqNum();
}
}
pAP->Busy(TRUE);
m_PlayPos = ModRing(m_PlayPos+1);
if (timestamp) {
m_PlayT = timestamp+1;
m_PlaySeq = seq+1;
} else {
m_PlaySeq++;
// we dont really know the timestamp of the next frame to play
// without looking at it, and it may not have arrived
// so m_PlayT is just a lower bound
m_PlayT++;
}
LeaveCriticalSection(&m_CritSect);
return pAP;
}
RVStream::Destroy()
{
ASSERT (!m_NetBufList);
//::FreeNetBufList(m_NetBufList,m_pRTP);
m_NetBufList = NULL;
RxStream::Destroy();
return DPR_SUCCESS;
}
void RVStream::StartDecode()
{
MediaPacket *pVP;
MMRESULT mmr;
// if we have a separate decode thread this will signal it.
// for now we insert the decode loop here
while (pVP = GetNextDecode())
{
mmr = m_pVideoFilter->Convert((VideoPacket*)pVP, VP_DECODE);
if (mmr != MMSYSERR_NOERROR)
pVP->Recycle();
else
pVP->SetState(MP_STATE_DECODED);
Release(pVP);
}
}
HRESULT RVStream::RestorePacket(NETBUF *pNetBuf, MediaPacket *pVP, DWORD timestamp, UINT seq, UINT fMark, BOOL *pfReceivedKeyframe)
{
VOID *pNet;
UINT uSizeNet;
WSABUF bufDesc[MAX_VIDEO_FRAGMENTS]; // limit to at most 32 fragments
UINT i;
DWORD dwReceivedBytes=0;
NETBUF *pnb;
DWORD dwLength;
DWORD_PTR dwPropVal;
MMRESULT mmr;
i = 0;
pnb = pNetBuf;
while (pnb && i < MAX_VIDEO_FRAGMENTS) {
bufDesc[i].buf = (char *)pnb->data;
bufDesc[i].len = pnb->length;
dwReceivedBytes += pnb->length + sizeof(RTP_HDR) + IP_HEADER_SIZE + UDP_HEADER_SIZE;
pnb = pnb->next;
i++;
}
ASSERT(!pnb); // fail if we get a frame with more than MAX_VIDEO_FRAGMENTS
// Write the bits per second counter
UPDATE_COUNTER(g_pctrVideoReceiveBytes, dwReceivedBytes * 8);
pVP->GetNetData(&pNet, &uSizeNet);
// Initialize length to maximum reconstructed frame size
pVP->GetProp(MP_PROP_MAX_NET_LENGTH, &dwPropVal);
dwLength = (DWORD)dwPropVal;
if (pnb==NULL)
{
mmr = m_pVideoFilter->RestorePayload(bufDesc, i, (BYTE*)pNet, &dwLength, pfReceivedKeyframe);
if (mmr == MMSYSERR_NOERROR)
{
pVP->SetNetLength(dwLength);
pVP->Receive(NULL, timestamp, seq, fMark);
return S_OK;
}
}
return E_FAIL;
}