Skip to content
Snippets Groups Projects
Commit 179151f1 authored by Boris Grozev's avatar Boris Grozev
Browse files

Re-implements the VP8 depacketizer.

parent f2765c02
No related branches found
No related tags found
No related merge requests found
......@@ -11,27 +11,18 @@
import org.jitsi.util.*;
import javax.media.*;
import javax.media.format.*;
import java.util.*;
import java.util.concurrent.*;
/**
* A depacketizer from VP8.
* See {@link "http://tools.ietf.org/html/draft-ietf-payload-vp8-07"}
*
* It is not yet fully compliant with the draft above, it can't successfully
* process all valid streams.
* It works by concatenating packets' payload (stripping the payload
* descriptor), until it encounters a packet with it's Start of Partition bit
* set, at which point it outputs the concatenated data and starts again.
* See {@link "http://tools.ietf.org/html/draft-ietf-payload-vp8-11"}
*
* @author Boris Grozev
*/
public class DePacketizer
extends AbstractCodec2
{
/**
* Size of <tt>buffer</tt>
*/
private static final int BUFFER_SIZE = 100000;
/**
* The <tt>Logger</tt> used by the <tt>DePacketizer</tt> class and its
* instances for logging output.
......@@ -39,42 +30,120 @@ public class DePacketizer
private static final Logger logger = Logger.getLogger(DePacketizer.class);
/**
* Certain output will only be logged if this is set to true in addition to
* 'trace' being enable in the logger. This is because the output is long
* and would be rarely used and to let compiler optimize the conditionals.
* Whether trace logging is enabled.
*/
private static final boolean TRACE = logger.isTraceEnabled();
/**
* A <tt>Comparator</tt> implementation for RTP sequence numbers.
* Compares <tt>a</tt> and <tt>b</tt>, taking into account the wrap at 2^16.
*
* IMPORTANT: This is a valid <tt>Comparator</tt> implementation only if
* used for subsets of [0, 2^16) which don't span more than 2^15 elements.
*
* E.g. it works for: [0, 2^15-1] and ([50000, 2^16) u [0, 10000])
* Doesn't work for: [0, 2^15] and ([0, 2^15-1] u {2^16-1}) and [0, 2^16)
*/
private static final Comparator<? super Long> seqNumComparator
= new Comparator<Long>() {
@Override
public int compare(Long a, Long b)
{
if (a.equals(b))
return 0;
else if (a > b)
{
if (a - b < 32768)
return 1;
else
return -1;
}
else //a < b
{
if (b - a < 32768)
return -1;
else
return 1;
}
}
};
/**
* Stores the RTP payloads (VP8 payload descriptor stripped) from RTP packets
* belonging to a single VP8 compressed frame.
*/
private SortedMap<Long, Container> data
= new TreeMap<Long, Container>(seqNumComparator);
/**
* Stores unused <tt>Container</tt>'s.
*/
private Queue<Container> free = new ArrayBlockingQueue<Container>(100);
/**
* Stores the first (earliest) sequence number stored in <tt>data</tt>, or
* -1 if <tt>data</tt> is empty.
*/
private long firstSeq = -1;
/**
* Stores the last (latest) sequence number stored in <tt>data</tt>, or -1
* if <tt>data</tt> is empty.
*/
private long lastSeq = -1;
/**
* Stores the value of the <tt>PictureID</tt> field for the VP8 compressed
* frame, parts of which are currently stored in <tt>data</tt>, or -1 if
* the <tt>PictureID</tt> field is not in use or <tt>data</tt> is empty.
*/
private int pictureId = -1;
/**
* Stores the RTP timestamp of the packets stored in <tt>data</tt>, or -1
* if they don't have a timestamp set.
*/
private long timestamp = -1;
/**
* Whether we have stored any packets in <tt>data</tt>. Equivalent to
* <tt>data.isEmpty()</tt>.
*/
private static final boolean TRACE = false;
private boolean empty = true;
/**
* Buffer used to store the payload of packets
* Whether we have stored in <tt>data</tt> the last RTP packet of the VP8
* compressed frame, parts of which are currently stored in <tt>data</tt>.
*/
private byte[] buffer = new byte[BUFFER_SIZE];
private boolean haveEnd = false;
/**
* Pointer to the last byte used in buffer.
* Whether we have stored in <tt>data</tt> the first RTP packet of the VP8
* compressed frame, parts of which are currently stored in <tt>data</tt>.
*/
private int bufferPointer = 0;
private boolean haveStart = false;
/**
* Whether a frame has been output
* Stores the sum of the lengths of the data stored in <tt>data</tt>, that
* is the total length of the VP8 compressed frame to be constructed.
*/
private boolean haveSent = false;
private int frameLength = 0;
/**
* The buffer was corrupted for some reason, wait for a new 'start of
* partition' packet before resuming.
* The sequence number of the last RTP packet, which was included in the
* output.
*/
private boolean waitForNewStart = false;
private long lastSentSeq = -1;
/**
* Initializes a new <tt>JNIEncoder</tt> instance.
*/
public DePacketizer()
{
super("VP8 RTP DePacketizer",
super("VP8 RTP DePacketizer",
VideoFormat.class,
new VideoFormat[]{new VideoFormat(Constants.VP8)});
inputFormats = new VideoFormat[] {new VideoFormat(Constants.VP8_RTP)};
new VideoFormat[]{ new VideoFormat(Constants.VP8) });
inputFormats = new VideoFormat[] { new VideoFormat(Constants.VP8_RTP) };
}
/**
......@@ -83,7 +152,6 @@ public DePacketizer()
@Override
protected void doClose()
{
return;
}
/**
......@@ -92,11 +160,63 @@ protected void doClose()
@Override
protected void doOpen() throws ResourceUnavailableException
{
if(logger.isTraceEnabled())
logger.trace("Opened VP8 de-packetizer");
return;
if(logger.isInfoEnabled())
logger.info("Opened VP8 depacketizer");
}
/**
* Re-initializes the fields which store information about the currently
* held data. Empties <tt>data</tt>.
*/
private void reinit()
{
firstSeq = lastSeq = timestamp = -1;
pictureId = -1;
empty = true;
haveEnd = haveStart = false;
frameLength = 0;
Iterator<Map.Entry<Long,Container>> it = data.entrySet().iterator();
Map.Entry<Long, Container> e;
while (it.hasNext())
{
e = it.next();
free.offer(e.getValue());
it.remove();
}
}
/**
* Checks whether the currently held VP8 compressed frame is complete (e.g
* all its packets are stored in <tt>data</tt>).
* @return <tt>true</tt> if the currently help VP8 compressed frame is
* complete, <tt>false</tt> otherwise.
*/
private boolean frameComplete()
{
return haveStart && haveEnd && !haveMissing();
}
/**
* Checks whether there are packets with sequence numbers between
* <tt>firstSeq</tt> and <tt>lastSeq</tt> which are *not* stored in
* <tt>data</tt>.
* @return <tt>true</tt> if there are packets with sequence numbers between
* <tt>firstSeq</tt> and <tt>lastSeq</tt> which are *not* stored in
* <tt>data</tt>.
*/
private boolean haveMissing()
{
Set<Long> seqs = data.keySet();
long s = firstSeq;
while (s != lastSeq)
{
if (!seqs.contains(s))
return true;
s = (s+1) % (1<<16);
}
return false;
}
/**
* {@inheritDoc}
......@@ -104,145 +224,228 @@ protected void doOpen() throws ResourceUnavailableException
@Override
protected int doProcess(Buffer inBuffer, Buffer outBuffer)
{
int ret;
byte[] in = (byte[]) inBuffer.getData();
boolean start
= VP8PayloadDescriptor.isStartOfPartition(in, inBuffer.getOffset());
if(waitForNewStart)
{
if(start)
{
waitForNewStart = false;
}
else
{
outBuffer.setDiscard(true);
return BUFFER_PROCESSED_OK;
}
}
System.err.println("depack "+hashCode()+" in "+inBuffer.getSequenceNumber());
byte[] inData = (byte[])inBuffer.getData();
int inOffset = inBuffer.getOffset();
int pdSize;
try
if (!VP8PayloadDescriptor.isValid(inData, inOffset))
{
pdSize = VP8PayloadDescriptor.getSize(in, inBuffer.getOffset());
System.err.println("depack "+hashCode()+" invalid VP8 PD "+inBuffer.getSequenceNumber());
logger.warn("Invalid RTP/VP8 packet discarded.");
outBuffer.setDiscard(true);
return BUFFER_PROCESSED_FAILED; //XXX: FAILED or OK?
}
catch (Exception e)
long inSeq = inBuffer.getSequenceNumber();
long inRtpTimestamp = inBuffer.getRtpTimeStamp();
int inPictureId = VP8PayloadDescriptor.getPictureId(inData, inOffset);
boolean inMarker = (inBuffer.getFlags() & Buffer.FLAG_RTP_MARKER) != 0;
boolean inIsStartOfFrame
= VP8PayloadDescriptor.isStartOfFrame(inData, inOffset);
int inLength = inBuffer.getLength();
int inPdSize = VP8PayloadDescriptor.getSize(inData, inOffset);
int inPayloadLength = inLength - inPdSize;
if (empty
&& lastSentSeq != -1
&& seqNumComparator.compare(inSeq, lastSentSeq) != 1)
{
if (logger.isInfoEnabled())
logger.info("Discarding old packet (while empty) " + inSeq);
outBuffer.setDiscard(true);
return BUFFER_PROCESSED_FAILED;
System.err.println("depack "+hashCode()+" discard old "+inSeq);
return BUFFER_PROCESSED_OK;
}
if(TRACE && logger.isTraceEnabled())
if (!empty)
{
logger.trace(
"Packet: "+inBuffer.getSequenceNumber() + ", length="
+ inBuffer.getLength() + ", pdSize=" + pdSize
+ ", start=" + start + "\nPayload descriptor:" );
for(int i = inBuffer.getOffset();
i < inBuffer.getOffset() + pdSize + 1;
i++)
// if the incoming packet has a different PictureID or timestamp
// than those of the current frame, then it belongs to a different
// frame.
if ( (inPictureId != -1 && pictureId != -1
&& inPictureId != pictureId)
| (timestamp != -1 && inRtpTimestamp != -1
&& inRtpTimestamp != timestamp) )
{
logger.trace(
"\t\t"
+ ((in[i]&0x80)==0?"0":"1")
+ ((in[i]&0x40)==0?"0":"1")
+ ((in[i]&0x20)==0?"0":"1")
+ ((in[i]&0x10)==0?"0":"1")
+ ((in[i]&0x08)==0?"0":"1")
+ ((in[i]&0x04)==0?"0":"1")
+ ((in[i]&0x02)==0?"0":"1")
+ ((in[i]&0x01)==0?"0":"1"));
if (seqNumComparator
.compare(inSeq, firstSeq) != 1) //inSeq <= firstSeq
{
// the packet belongs to a previous frame. discard it
if (logger.isInfoEnabled())
logger.info("Discarding old packet " + inSeq);
outBuffer.setDiscard(true);
System.err.println("depack "+hashCode()+" discard old2 "+inBuffer.getSequenceNumber());
return BUFFER_PROCESSED_OK;
}
else //inSeq > firstSeq (and also presumably isSeq > lastSeq)
{
// the packet belongs to a subsequent frame (to the one
// currently being held). Drop the current frame.
if (logger.isInfoEnabled())
logger.info("Discarding saved packets on arrival of" +
" a packet for a subsequent frame: " + inSeq);
// TODO: this would be the place to complain about the
// not-well-received PictureID by sending a RTCP SLI or NACK.
reinit();
}
}
}
if(start && haveSent)
// a whole frame in a single packet. avoid the extra copy to
// this.data and output it immediately.
if (empty && inMarker && inIsStartOfFrame)
{
//start of a new frame, flush the buffer
if(logger.isTraceEnabled())
logger.trace("Sending a frame, size=" + bufferPointer);
byte[] outData
= validateByteArraySize(outBuffer, inPayloadLength, false);
System.arraycopy(
inData,
inOffset + inPdSize,
outData,
0,
inPayloadLength);
outBuffer.setOffset(0);
outBuffer.setLength(inPayloadLength);
outBuffer.setRtpTimeStamp(inBuffer.getRtpTimeStamp());
byte[] out = validateByteArraySize(outBuffer, bufferPointer, false);
if (TRACE)
logger.trace("Out PictureID=" + inPictureId);
System.arraycopy(buffer, 0, out, 0, bufferPointer);
outBuffer.setFormat(new VideoFormat(Constants.VP8));
outBuffer.setLength(bufferPointer);
outBuffer.setOffset(0);
lastSentSeq = inSeq;
bufferPointer = 0;
ret = BUFFER_PROCESSED_OK;
}
else
{
ret = OUTPUT_BUFFER_NOT_FILLED;
System.err.println("depack "+hashCode()+" out pictureID="+pictureId);
return BUFFER_PROCESSED_OK;
}
int len = inBuffer.getLength();
if(bufferPointer + len - pdSize >= BUFFER_SIZE)
// add to this.data
Container container = free.poll();
if (container == null)
container = new Container();
if (container.buf == null || container.buf.length < inPayloadLength)
container.buf = new byte[inPayloadLength];
if (data.get(inSeq) != null)
{
//our buffer is not big enough
bufferPointer = 0;
if (logger.isInfoEnabled())
logger.info("(Probable) duplicate packet detected, discarding "
+ inSeq);
outBuffer.setDiscard(true);
waitForNewStart = true;
return BUFFER_PROCESSED_FAILED;
System.err.println("depack "+hashCode()+" discard dup "+inBuffer.getSequenceNumber());
return BUFFER_PROCESSED_OK;
}
System.arraycopy(in,
inBuffer.getOffset() + pdSize,
buffer,
bufferPointer,
len - pdSize);
bufferPointer += len - pdSize;
if(TRACE && logger.isTraceEnabled())
System.arraycopy(
inData,
inOffset + inPdSize,
container.buf,
0,
inPayloadLength);
container.len = inPayloadLength;
data.put(inSeq, container);
// update fields
frameLength += inPayloadLength;
if (firstSeq == -1
|| (seqNumComparator.compare(firstSeq, inSeq) == 1))
firstSeq = inSeq;
if (lastSeq == -1
|| (seqNumComparator.compare(inSeq, lastSeq) == 1))
lastSeq = inSeq;
if (empty)
{
logger.trace(
"Saving payload to buffer, seq num:"
+ inBuffer.getSequenceNumber()
+ ", bufferPointer=" + bufferPointer);
// the first received packet for the current frame was just added
empty = false;
timestamp = inRtpTimestamp;
pictureId = inPictureId;
}
haveSent = true;
return ret;
if (inMarker)
haveEnd = true;
if (inIsStartOfFrame)
haveStart = true;
// check if we have a full frame
if (frameComplete())
{
byte[] outData
= validateByteArraySize(outBuffer, frameLength, false);
int ptr = 0;
Container b;
for (Map.Entry<Long, Container> entry : data.entrySet())
{
b = entry.getValue();
System.arraycopy(
b.buf,
0,
outData,
ptr,
b.len);
ptr += b.len;
}
outBuffer.setOffset(0);
outBuffer.setLength(frameLength);
outBuffer.setRtpTimeStamp(inBuffer.getRtpTimeStamp());
if (TRACE)
logger.trace("Out PictureID=" + inPictureId);
lastSentSeq = lastSeq;
// prepare for the next frame
reinit();
System.err.println("depack "+hashCode()+" out2 pictureID="+inPictureId);
return BUFFER_PROCESSED_OK;
}
else
{
// frame not complete yet
outBuffer.setDiscard(true);
return OUTPUT_BUFFER_NOT_FILLED;
}
}
/**
* A class that represents the VP8 Payload Descriptor structure defined
* in {@link "http://tools.ietf.org/html/draft-ietf-payload-vp8-07"}
* in {@link "http://tools.ietf.org/html/draft-ietf-payload-vp8-10"}
*/
static class VP8PayloadDescriptor
{
/**
* I bit from the X byte of the Payload Descriptor
* I bit from the X byte of the Payload Descriptor.
*/
private static final byte I_BIT = (byte) 0x80;
/**
* K bit from the X byte of the Payload Descriptor
* K bit from the X byte of the Payload Descriptor.
*/
private static final byte K_BIT = (byte) 0x10;
/**
* L bit from the X byte of the Payload Descriptor
* L bit from the X byte of the Payload Descriptor.
*/
private static final byte L_BIT = (byte) 0x40;
/**
* I bit from the I byte of the Payload Descriptor
* I bit from the I byte of the Payload Descriptor.
*/
private static final byte M_BIT = (byte) 0x80;
/**
* Maximum length of a VP8 Payload Descriptor
* Maximum length of a VP8 Payload Descriptor.
*/
public static final int MAX_LENGTH = 6;
/**
* S bit from the first byte of the Payload Descriptor
* S bit from the first byte of the Payload Descriptor.
*/
private static final byte S_BIT = (byte) 0x10;
/**
* T bit from the X byte of the Payload Descriptor
* T bit from the X byte of the Payload Descriptor.
*/
private static final byte T_BIT = (byte) 0x20;
/**
* X bit from the first byte of the Payload Descriptor
* X bit from the first byte of the Payload Descriptor.
*/
private static final byte X_BIT = (byte) 0x80;
......@@ -270,44 +473,118 @@ public static byte[] create(boolean startOfPartition)
* @param input input
* @param offset offset
* @return The size in bytes of the Payload Descriptor at offset
* <tt>offset</tt> in <tt>input</tt>. The size is between 1 and 6.
* @throws Exception if there isn't a valid Payload Descriptor structure
* at offset <tt>offset</tt> in <tt>input</tt>.
* <tt>offset</tt> in <tt>input</tt>, or -1 if the input is not a valid
* VP8 Payload Descriptor. The size is between 1 and 6.
*/
public static int getSize(byte[] input, int offset) throws Exception
public static int getSize(byte[] input, int offset)
{
if (!isValid(input, offset))
return -1;
if(input.length < offset+1)
throw new Exception("Invalid VP8 Payload Descriptor");
if((input[offset] & X_BIT) == 0)
if ((input[offset] & X_BIT) == 0)
return 1;
int size = 2;
if((input[offset+1] & I_BIT) != 0)
if ((input[offset+1] & I_BIT) != 0)
{
size++;
if((input[offset+2] & M_BIT) != 0)
if ((input[offset+2] & M_BIT) != 0)
size++;
}
if((input[offset+1] & L_BIT) != 0)
if ((input[offset+1] & L_BIT) != 0)
size++;
if((input[offset+1] & (T_BIT | K_BIT)) != 0)
if ((input[offset+1] & (T_BIT | K_BIT)) != 0)
size++;
return size;
}
/**
* Checks whether the 'start of partition' bit is set in the the
* Payload Descriptor at offset <tt>offset</tt> in <tt>input</tt>
* Gets the value of the PictureID field of a VP8 Payload Descriptor.
* @param input
* @param offset
* @return the value of the PictureID field of a VP8 Payload Descriptor,
* or -1 if the fields is not present.
*/
private static int getPictureId(byte[] input, int offset)
{
if (!isValid(input, offset))
return -1;
if ((input[offset] & X_BIT) == 0
|| (input[offset+1] & I_BIT) == 0)
return -1;
boolean isLong = (input[offset+2] & M_BIT) != 0;
if (isLong)
return (input[offset+2] & 0x7f) << 8
| (input[offset+3] & 0xff);
else
return input[offset+2] & 0x7f;
}
private static boolean isValid(byte[] input, int offset)
{
return true;
}
/**
* Checks whether the '<tt>start of partition</tt>' bit is set in the
* VP8 Payload Descriptor at offset <tt>offset</tt> in <tt>input</tt>.
* @param input input
* @param offset offset
* @return
* @return <tt>true</tt> if the '<tt>start of partition</tt>' bit is set,
* <tt>false</tt> otherwise.
*/
public static boolean isStartOfPartition(byte[] input, int offset)
{
return (input[offset] & S_BIT) != 0;
}
/**
* Returns <tt>true</tt> if both the '<tt>start of partition</tt>' bit
* is set and the <tt>PID</tt> fields has value 0 in the VP8 Payload
* Descriptor at offset <tt>offset</tt> in <tt>input</tt>.
* @param input
* @param offset
* @return <tt>true</tt> if both the '<tt>start of partition</tt>' bit
* is set and the <tt>PID</tt> fields has value 0 in the VP8 Payload
* Descriptor at offset <tt>offset</tt> in <tt>input</tt>.
*/
public static boolean isStartOfFrame(byte[] input, int offset)
{
return isStartOfPartition(input, offset)
&& getPartitionId(input, offset) == 0;
}
/**
* Returns the value of the <tt>PID</tt> (partition ID) field of the
* VP8 Payload Descriptor at offset <tt>offset</tt> in <tt>input</tt>.
* @param input
* @param offset
* @return the value of the <tt>PID</tt> (partition ID) field of the
* VP8 Payload Descriptor at offset <tt>offset</tt> in <tt>input</tt>.
*/
public static int getPartitionId(byte[] input, int offset)
{
return input[offset] & 0x07;
}
}
/**
* A simple container for a <tt>byte[]</tt> and an integer.
*/
private class Container
{
/**
* This <tt>Container</tt>'s data.
*/
private byte[] buf;
/**
* Length used.
*/
private int len = 0;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment