From 179151f1ab6b9a0d2303a92c4a7606b333537325 Mon Sep 17 00:00:00 2001 From: Boris Grozev <boris@jitsi.org> Date: Tue, 3 Jun 2014 19:23:29 +0200 Subject: [PATCH] Re-implements the VP8 depacketizer. --- .../codec/video/vp8/DePacketizer.java | 537 +++++++++++++----- 1 file changed, 407 insertions(+), 130 deletions(-) diff --git a/src/org/jitsi/impl/neomedia/codec/video/vp8/DePacketizer.java b/src/org/jitsi/impl/neomedia/codec/video/vp8/DePacketizer.java index 3e8afe44..3b17a8ad 100644 --- a/src/org/jitsi/impl/neomedia/codec/video/vp8/DePacketizer.java +++ b/src/org/jitsi/impl/neomedia/codec/video/vp8/DePacketizer.java @@ -11,27 +11,18 @@ import org.jitsi.util.*; import javax.media.*; import javax.media.format.*; +import java.util.*; +import java.util.concurrent.*; /** * A depacketizer from VP8. - * See {@link "http://tools.ietf.org/html/draft-ietf-payload-vp8-07"} - * - * It is not yet fully compliant with the draft above, it can't successfully - * process all valid streams. - * It works by concatenating packets' payload (stripping the payload - * descriptor), until it encounters a packet with it's Start of Partition bit - * set, at which point it outputs the concatenated data and starts again. + * See {@link "http://tools.ietf.org/html/draft-ietf-payload-vp8-11"} * * @author Boris Grozev */ public class DePacketizer extends AbstractCodec2 { - /** - * Size of <tt>buffer</tt> - */ - private static final int BUFFER_SIZE = 100000; - /** * The <tt>Logger</tt> used by the <tt>DePacketizer</tt> class and its * instances for logging output. @@ -39,42 +30,120 @@ public class DePacketizer private static final Logger logger = Logger.getLogger(DePacketizer.class); /** - * Certain output will only be logged if this is set to true in addition to - * 'trace' being enable in the logger. This is because the output is long - * and would be rarely used and to let compiler optimize the conditionals. + * Whether trace logging is enabled. + */ + private static final boolean TRACE = logger.isTraceEnabled(); + + /** + * A <tt>Comparator</tt> implementation for RTP sequence numbers. + * Compares <tt>a</tt> and <tt>b</tt>, taking into account the wrap at 2^16. + * + * IMPORTANT: This is a valid <tt>Comparator</tt> implementation only if + * used for subsets of [0, 2^16) which don't span more than 2^15 elements. + * + * E.g. it works for: [0, 2^15-1] and ([50000, 2^16) u [0, 10000]) + * Doesn't work for: [0, 2^15] and ([0, 2^15-1] u {2^16-1}) and [0, 2^16) + */ + private static final Comparator<? super Long> seqNumComparator + = new Comparator<Long>() { + @Override + public int compare(Long a, Long b) + { + if (a.equals(b)) + return 0; + else if (a > b) + { + if (a - b < 32768) + return 1; + else + return -1; + } + else //a < b + { + if (b - a < 32768) + return -1; + else + return 1; + } + } + }; + + /** + * Stores the RTP payloads (VP8 payload descriptor stripped) from RTP packets + * belonging to a single VP8 compressed frame. + */ + private SortedMap<Long, Container> data + = new TreeMap<Long, Container>(seqNumComparator); + + /** + * Stores unused <tt>Container</tt>'s. + */ + private Queue<Container> free = new ArrayBlockingQueue<Container>(100); + + /** + * Stores the first (earliest) sequence number stored in <tt>data</tt>, or + * -1 if <tt>data</tt> is empty. + */ + private long firstSeq = -1; + + /** + * Stores the last (latest) sequence number stored in <tt>data</tt>, or -1 + * if <tt>data</tt> is empty. + */ + private long lastSeq = -1; + + /** + * Stores the value of the <tt>PictureID</tt> field for the VP8 compressed + * frame, parts of which are currently stored in <tt>data</tt>, or -1 if + * the <tt>PictureID</tt> field is not in use or <tt>data</tt> is empty. + */ + private int pictureId = -1; + + /** + * Stores the RTP timestamp of the packets stored in <tt>data</tt>, or -1 + * if they don't have a timestamp set. + */ + private long timestamp = -1; + + /** + * Whether we have stored any packets in <tt>data</tt>. Equivalent to + * <tt>data.isEmpty()</tt>. */ - private static final boolean TRACE = false; + private boolean empty = true; /** - * Buffer used to store the payload of packets + * Whether we have stored in <tt>data</tt> the last RTP packet of the VP8 + * compressed frame, parts of which are currently stored in <tt>data</tt>. */ - private byte[] buffer = new byte[BUFFER_SIZE]; + private boolean haveEnd = false; /** - * Pointer to the last byte used in buffer. + * Whether we have stored in <tt>data</tt> the first RTP packet of the VP8 + * compressed frame, parts of which are currently stored in <tt>data</tt>. */ - private int bufferPointer = 0; + private boolean haveStart = false; /** - * Whether a frame has been output + * Stores the sum of the lengths of the data stored in <tt>data</tt>, that + * is the total length of the VP8 compressed frame to be constructed. */ - private boolean haveSent = false; + private int frameLength = 0; /** - * The buffer was corrupted for some reason, wait for a new 'start of - * partition' packet before resuming. + * The sequence number of the last RTP packet, which was included in the + * output. */ - private boolean waitForNewStart = false; + private long lastSentSeq = -1; /** * Initializes a new <tt>JNIEncoder</tt> instance. */ public DePacketizer() { - super("VP8 RTP DePacketizer", + super("VP8 RTP DePacketizer", VideoFormat.class, - new VideoFormat[]{new VideoFormat(Constants.VP8)}); - inputFormats = new VideoFormat[] {new VideoFormat(Constants.VP8_RTP)}; + new VideoFormat[]{ new VideoFormat(Constants.VP8) }); + inputFormats = new VideoFormat[] { new VideoFormat(Constants.VP8_RTP) }; } /** @@ -83,7 +152,6 @@ public DePacketizer() @Override protected void doClose() { - return; } /** @@ -92,11 +160,63 @@ protected void doClose() @Override protected void doOpen() throws ResourceUnavailableException { - if(logger.isTraceEnabled()) - logger.trace("Opened VP8 de-packetizer"); - return; + if(logger.isInfoEnabled()) + logger.info("Opened VP8 depacketizer"); } + /** + * Re-initializes the fields which store information about the currently + * held data. Empties <tt>data</tt>. + */ + private void reinit() + { + firstSeq = lastSeq = timestamp = -1; + pictureId = -1; + empty = true; + haveEnd = haveStart = false; + frameLength = 0; + + Iterator<Map.Entry<Long,Container>> it = data.entrySet().iterator(); + Map.Entry<Long, Container> e; + while (it.hasNext()) + { + e = it.next(); + free.offer(e.getValue()); + it.remove(); + } + } + + /** + * Checks whether the currently held VP8 compressed frame is complete (e.g + * all its packets are stored in <tt>data</tt>). + * @return <tt>true</tt> if the currently help VP8 compressed frame is + * complete, <tt>false</tt> otherwise. + */ + private boolean frameComplete() + { + return haveStart && haveEnd && !haveMissing(); + } + + /** + * Checks whether there are packets with sequence numbers between + * <tt>firstSeq</tt> and <tt>lastSeq</tt> which are *not* stored in + * <tt>data</tt>. + * @return <tt>true</tt> if there are packets with sequence numbers between + * <tt>firstSeq</tt> and <tt>lastSeq</tt> which are *not* stored in + * <tt>data</tt>. + */ + private boolean haveMissing() + { + Set<Long> seqs = data.keySet(); + long s = firstSeq; + while (s != lastSeq) + { + if (!seqs.contains(s)) + return true; + s = (s+1) % (1<<16); + } + return false; + } /** * {@inheritDoc} @@ -104,145 +224,228 @@ protected void doOpen() throws ResourceUnavailableException @Override protected int doProcess(Buffer inBuffer, Buffer outBuffer) { - int ret; - byte[] in = (byte[]) inBuffer.getData(); - boolean start - = VP8PayloadDescriptor.isStartOfPartition(in, inBuffer.getOffset()); - if(waitForNewStart) - { - if(start) - { - waitForNewStart = false; - } - else - { - outBuffer.setDiscard(true); - return BUFFER_PROCESSED_OK; - } - } + System.err.println("depack "+hashCode()+" in "+inBuffer.getSequenceNumber()); + byte[] inData = (byte[])inBuffer.getData(); + int inOffset = inBuffer.getOffset(); - int pdSize; - try + if (!VP8PayloadDescriptor.isValid(inData, inOffset)) { - pdSize = VP8PayloadDescriptor.getSize(in, inBuffer.getOffset()); + System.err.println("depack "+hashCode()+" invalid VP8 PD "+inBuffer.getSequenceNumber()); + logger.warn("Invalid RTP/VP8 packet discarded."); + outBuffer.setDiscard(true); + return BUFFER_PROCESSED_FAILED; //XXX: FAILED or OK? } - catch (Exception e) + + long inSeq = inBuffer.getSequenceNumber(); + long inRtpTimestamp = inBuffer.getRtpTimeStamp(); + int inPictureId = VP8PayloadDescriptor.getPictureId(inData, inOffset); + boolean inMarker = (inBuffer.getFlags() & Buffer.FLAG_RTP_MARKER) != 0; + boolean inIsStartOfFrame + = VP8PayloadDescriptor.isStartOfFrame(inData, inOffset); + int inLength = inBuffer.getLength(); + int inPdSize = VP8PayloadDescriptor.getSize(inData, inOffset); + int inPayloadLength = inLength - inPdSize; + + if (empty + && lastSentSeq != -1 + && seqNumComparator.compare(inSeq, lastSentSeq) != 1) { + if (logger.isInfoEnabled()) + logger.info("Discarding old packet (while empty) " + inSeq); outBuffer.setDiscard(true); - return BUFFER_PROCESSED_FAILED; + System.err.println("depack "+hashCode()+" discard old "+inSeq); + return BUFFER_PROCESSED_OK; } - if(TRACE && logger.isTraceEnabled()) + if (!empty) { - logger.trace( - "Packet: "+inBuffer.getSequenceNumber() + ", length=" - + inBuffer.getLength() + ", pdSize=" + pdSize - + ", start=" + start + "\nPayload descriptor:" ); - for(int i = inBuffer.getOffset(); - i < inBuffer.getOffset() + pdSize + 1; - i++) + // if the incoming packet has a different PictureID or timestamp + // than those of the current frame, then it belongs to a different + // frame. + if ( (inPictureId != -1 && pictureId != -1 + && inPictureId != pictureId) + | (timestamp != -1 && inRtpTimestamp != -1 + && inRtpTimestamp != timestamp) ) { - logger.trace( - "\t\t" - + ((in[i]&0x80)==0?"0":"1") - + ((in[i]&0x40)==0?"0":"1") - + ((in[i]&0x20)==0?"0":"1") - + ((in[i]&0x10)==0?"0":"1") - + ((in[i]&0x08)==0?"0":"1") - + ((in[i]&0x04)==0?"0":"1") - + ((in[i]&0x02)==0?"0":"1") - + ((in[i]&0x01)==0?"0":"1")); + if (seqNumComparator + .compare(inSeq, firstSeq) != 1) //inSeq <= firstSeq + { + // the packet belongs to a previous frame. discard it + if (logger.isInfoEnabled()) + logger.info("Discarding old packet " + inSeq); + outBuffer.setDiscard(true); + System.err.println("depack "+hashCode()+" discard old2 "+inBuffer.getSequenceNumber()); + return BUFFER_PROCESSED_OK; + } + else //inSeq > firstSeq (and also presumably isSeq > lastSeq) + { + // the packet belongs to a subsequent frame (to the one + // currently being held). Drop the current frame. + + if (logger.isInfoEnabled()) + logger.info("Discarding saved packets on arrival of" + + " a packet for a subsequent frame: " + inSeq); + + // TODO: this would be the place to complain about the + // not-well-received PictureID by sending a RTCP SLI or NACK. + reinit(); + } } } - if(start && haveSent) + // a whole frame in a single packet. avoid the extra copy to + // this.data and output it immediately. + if (empty && inMarker && inIsStartOfFrame) { - //start of a new frame, flush the buffer - if(logger.isTraceEnabled()) - logger.trace("Sending a frame, size=" + bufferPointer); + byte[] outData + = validateByteArraySize(outBuffer, inPayloadLength, false); + System.arraycopy( + inData, + inOffset + inPdSize, + outData, + 0, + inPayloadLength); + outBuffer.setOffset(0); + outBuffer.setLength(inPayloadLength); + outBuffer.setRtpTimeStamp(inBuffer.getRtpTimeStamp()); - byte[] out = validateByteArraySize(outBuffer, bufferPointer, false); + if (TRACE) + logger.trace("Out PictureID=" + inPictureId); - System.arraycopy(buffer, 0, out, 0, bufferPointer); - outBuffer.setFormat(new VideoFormat(Constants.VP8)); - outBuffer.setLength(bufferPointer); - outBuffer.setOffset(0); + lastSentSeq = inSeq; - bufferPointer = 0; - ret = BUFFER_PROCESSED_OK; - } - else - { - ret = OUTPUT_BUFFER_NOT_FILLED; + System.err.println("depack "+hashCode()+" out pictureID="+pictureId); + return BUFFER_PROCESSED_OK; } - int len = inBuffer.getLength(); - if(bufferPointer + len - pdSize >= BUFFER_SIZE) + // add to this.data + Container container = free.poll(); + if (container == null) + container = new Container(); + if (container.buf == null || container.buf.length < inPayloadLength) + container.buf = new byte[inPayloadLength]; + + if (data.get(inSeq) != null) { - //our buffer is not big enough - bufferPointer = 0; + if (logger.isInfoEnabled()) + logger.info("(Probable) duplicate packet detected, discarding " + + inSeq); outBuffer.setDiscard(true); - waitForNewStart = true; - return BUFFER_PROCESSED_FAILED; + System.err.println("depack "+hashCode()+" discard dup "+inBuffer.getSequenceNumber()); + return BUFFER_PROCESSED_OK; } - System.arraycopy(in, - inBuffer.getOffset() + pdSize, - buffer, - bufferPointer, - len - pdSize); - bufferPointer += len - pdSize; - - if(TRACE && logger.isTraceEnabled()) + + System.arraycopy( + inData, + inOffset + inPdSize, + container.buf, + 0, + inPayloadLength); + container.len = inPayloadLength; + data.put(inSeq, container); + + // update fields + frameLength += inPayloadLength; + if (firstSeq == -1 + || (seqNumComparator.compare(firstSeq, inSeq) == 1)) + firstSeq = inSeq; + if (lastSeq == -1 + || (seqNumComparator.compare(inSeq, lastSeq) == 1)) + lastSeq = inSeq; + + if (empty) { - logger.trace( - "Saving payload to buffer, seq num:" - + inBuffer.getSequenceNumber() - + ", bufferPointer=" + bufferPointer); + // the first received packet for the current frame was just added + empty = false; + timestamp = inRtpTimestamp; + pictureId = inPictureId; } - haveSent = true; - return ret; + if (inMarker) + haveEnd = true; + if (inIsStartOfFrame) + haveStart = true; + + // check if we have a full frame + if (frameComplete()) + { + byte[] outData + = validateByteArraySize(outBuffer, frameLength, false); + int ptr = 0; + Container b; + for (Map.Entry<Long, Container> entry : data.entrySet()) + { + b = entry.getValue(); + System.arraycopy( + b.buf, + 0, + outData, + ptr, + b.len); + ptr += b.len; + } + + outBuffer.setOffset(0); + outBuffer.setLength(frameLength); + outBuffer.setRtpTimeStamp(inBuffer.getRtpTimeStamp()); + + if (TRACE) + logger.trace("Out PictureID=" + inPictureId); + lastSentSeq = lastSeq; + + // prepare for the next frame + reinit(); + + System.err.println("depack "+hashCode()+" out2 pictureID="+inPictureId); + return BUFFER_PROCESSED_OK; + } + else + { + // frame not complete yet + outBuffer.setDiscard(true); + return OUTPUT_BUFFER_NOT_FILLED; + } } /** * A class that represents the VP8 Payload Descriptor structure defined - * in {@link "http://tools.ietf.org/html/draft-ietf-payload-vp8-07"} + * in {@link "http://tools.ietf.org/html/draft-ietf-payload-vp8-10"} */ static class VP8PayloadDescriptor { /** - * I bit from the X byte of the Payload Descriptor + * I bit from the X byte of the Payload Descriptor. */ private static final byte I_BIT = (byte) 0x80; /** - * K bit from the X byte of the Payload Descriptor + * K bit from the X byte of the Payload Descriptor. */ private static final byte K_BIT = (byte) 0x10; /** - * L bit from the X byte of the Payload Descriptor + * L bit from the X byte of the Payload Descriptor. */ private static final byte L_BIT = (byte) 0x40; /** - * I bit from the I byte of the Payload Descriptor + * I bit from the I byte of the Payload Descriptor. */ private static final byte M_BIT = (byte) 0x80; /** - * Maximum length of a VP8 Payload Descriptor + * Maximum length of a VP8 Payload Descriptor. */ public static final int MAX_LENGTH = 6; /** - * S bit from the first byte of the Payload Descriptor + * S bit from the first byte of the Payload Descriptor. */ private static final byte S_BIT = (byte) 0x10; /** - * T bit from the X byte of the Payload Descriptor + * T bit from the X byte of the Payload Descriptor. */ private static final byte T_BIT = (byte) 0x20; /** - * X bit from the first byte of the Payload Descriptor + * X bit from the first byte of the Payload Descriptor. */ private static final byte X_BIT = (byte) 0x80; @@ -270,44 +473,118 @@ public static byte[] create(boolean startOfPartition) * @param input input * @param offset offset * @return The size in bytes of the Payload Descriptor at offset - * <tt>offset</tt> in <tt>input</tt>. The size is between 1 and 6. - * @throws Exception if there isn't a valid Payload Descriptor structure - * at offset <tt>offset</tt> in <tt>input</tt>. + * <tt>offset</tt> in <tt>input</tt>, or -1 if the input is not a valid + * VP8 Payload Descriptor. The size is between 1 and 6. */ - public static int getSize(byte[] input, int offset) throws Exception + public static int getSize(byte[] input, int offset) { + if (!isValid(input, offset)) + return -1; - if(input.length < offset+1) - throw new Exception("Invalid VP8 Payload Descriptor"); - - if((input[offset] & X_BIT) == 0) + if ((input[offset] & X_BIT) == 0) return 1; int size = 2; - if((input[offset+1] & I_BIT) != 0) + if ((input[offset+1] & I_BIT) != 0) { size++; - if((input[offset+2] & M_BIT) != 0) + if ((input[offset+2] & M_BIT) != 0) size++; } - if((input[offset+1] & L_BIT) != 0) + if ((input[offset+1] & L_BIT) != 0) size++; - if((input[offset+1] & (T_BIT | K_BIT)) != 0) + if ((input[offset+1] & (T_BIT | K_BIT)) != 0) size++; return size; } /** - * Checks whether the 'start of partition' bit is set in the the - * Payload Descriptor at offset <tt>offset</tt> in <tt>input</tt> + * Gets the value of the PictureID field of a VP8 Payload Descriptor. + * @param input + * @param offset + * @return the value of the PictureID field of a VP8 Payload Descriptor, + * or -1 if the fields is not present. + */ + private static int getPictureId(byte[] input, int offset) + { + if (!isValid(input, offset)) + return -1; + + if ((input[offset] & X_BIT) == 0 + || (input[offset+1] & I_BIT) == 0) + return -1; + + boolean isLong = (input[offset+2] & M_BIT) != 0; + if (isLong) + return (input[offset+2] & 0x7f) << 8 + | (input[offset+3] & 0xff); + else + return input[offset+2] & 0x7f; + + } + + private static boolean isValid(byte[] input, int offset) + { + return true; + } + + /** + * Checks whether the '<tt>start of partition</tt>' bit is set in the + * VP8 Payload Descriptor at offset <tt>offset</tt> in <tt>input</tt>. * @param input input * @param offset offset - * @return + * @return <tt>true</tt> if the '<tt>start of partition</tt>' bit is set, + * <tt>false</tt> otherwise. */ public static boolean isStartOfPartition(byte[] input, int offset) { return (input[offset] & S_BIT) != 0; } + + /** + * Returns <tt>true</tt> if both the '<tt>start of partition</tt>' bit + * is set and the <tt>PID</tt> fields has value 0 in the VP8 Payload + * Descriptor at offset <tt>offset</tt> in <tt>input</tt>. + * @param input + * @param offset + * @return <tt>true</tt> if both the '<tt>start of partition</tt>' bit + * is set and the <tt>PID</tt> fields has value 0 in the VP8 Payload + * Descriptor at offset <tt>offset</tt> in <tt>input</tt>. + */ + public static boolean isStartOfFrame(byte[] input, int offset) + { + return isStartOfPartition(input, offset) + && getPartitionId(input, offset) == 0; + } + + /** + * Returns the value of the <tt>PID</tt> (partition ID) field of the + * VP8 Payload Descriptor at offset <tt>offset</tt> in <tt>input</tt>. + * @param input + * @param offset + * @return the value of the <tt>PID</tt> (partition ID) field of the + * VP8 Payload Descriptor at offset <tt>offset</tt> in <tt>input</tt>. + */ + public static int getPartitionId(byte[] input, int offset) + { + return input[offset] & 0x07; + } + } + + /** + * A simple container for a <tt>byte[]</tt> and an integer. + */ + private class Container + { + /** + * This <tt>Container</tt>'s data. + */ + private byte[] buf; + + /** + * Length used. + */ + private int len = 0; } } -- GitLab