diff --git a/src/org/jitsi/impl/neomedia/RTPConnectorInputStream.java b/src/org/jitsi/impl/neomedia/RTPConnectorInputStream.java index b7089188028d9375b68f1c004736435c755afc26..62690c6f65b9f19807d138304725ae5b935703fa 100755 --- a/src/org/jitsi/impl/neomedia/RTPConnectorInputStream.java +++ b/src/org/jitsi/impl/neomedia/RTPConnectorInputStream.java @@ -8,6 +8,8 @@ import java.io.*; import java.net.*; +import java.util.*; +import java.util.concurrent.*; import javax.media.protocol.*; @@ -37,6 +39,13 @@ public abstract class RTPConnectorInputStream */ public static final int PACKET_RECEIVE_BUFFER_LENGTH = 4 * 1024; + /** + * The <tt>Logger</tt> used by the <tt>RTPConnectorInputStream</tt> class + * and its instances to print debug information. + */ + private static final Logger logger + = Logger.getLogger(RTPConnectorInputStream.class); + /** * Packet receive buffer */ @@ -48,6 +57,13 @@ public abstract class RTPConnectorInputStream */ protected boolean closed; + /** + * The <tt>DatagramPacketFilter</tt>s which allow dropping + * <tt>DatagramPacket</tt>s before they are converted into + * <tt>RawPacket</tt>s. + */ + private DatagramPacketFilter[] datagramPacketFilters; + /** * Caught an IO exception during read from socket */ @@ -57,12 +73,19 @@ public abstract class RTPConnectorInputStream * The packet data to be read out of this instance through its * {@link #read(byte[], int, int)} method. */ - protected RawPacket pkt; + private RawPacket pkt; /** - * SourceTransferHandler object which is used to read packets. + * The <tt>Object</tt> which synchronizes the access to {@link #pkt}. */ - private SourceTransferHandler transferHandler; + private final Object pktSyncRoot = new Object(); + + /** + * The pool of <tt>RawPacket</tt> instances to reduce their allocations and + * garbage collection. + */ + private final Queue<RawPacket> rawPacketPool + = new LinkedBlockingQueue<RawPacket>(); /** * The Thread receiving packets. @@ -70,11 +93,9 @@ public abstract class RTPConnectorInputStream protected Thread receiverThread = null; /** - * The <tt>DatagramPacketFilter</tt>s which allow dropping - * <tt>DatagramPacket</tt>s before they are converted into - * <tt>RawPacket</tt>s. + * SourceTransferHandler object which is used to read packets. */ - private DatagramPacketFilter[] datagramPacketFilters; + private SourceTransferHandler transferHandler; /** * Initializes a new <tt>RTPConnectorInputStream</tt> which is to receive @@ -135,17 +156,22 @@ public synchronized void close() */ protected RawPacket createRawPacket(DatagramPacket datagramPacket) { + RawPacket pkt = rawPacketPool.poll(); + if (pkt == null) { - return - new RawPacket( + pkt + = new RawPacket( datagramPacket.getData(), datagramPacket.getOffset(), datagramPacket.getLength()); } - pkt.setBuffer(datagramPacket.getData()); - pkt.setLength(datagramPacket.getLength()); - pkt.setOffset(datagramPacket.getOffset()); + else + { + pkt.setBuffer(datagramPacket.getData()); + pkt.setLength(datagramPacket.getLength()); + pkt.setOffset(datagramPacket.getOffset()); + } return pkt; } @@ -182,7 +208,7 @@ public ContentDescriptor getContentDescriptor() */ public long getContentLength() { - return pkt.getLength(); + return LENGTH_UNKNOWN; } /** @@ -242,21 +268,86 @@ public int getMinimumTransferSize() public int read(byte[] buffer, int offset, int length) throws IOException { + if (buffer == null) + throw new NullPointerException("buffer"); + if (ioError) return -1; - int pktLength = pkt.getLength(); + RawPacket pkt; - if (length < pktLength) + synchronized (pktSyncRoot) { - throw new IOException( - "Input buffer not big enough for " + pktLength); + pkt = this.pkt; + this.pkt = null; } - System.arraycopy( - pkt.getBuffer(), pkt.getOffset(), - buffer, offset, - pktLength); + int pktLength; + + if (pkt == null) + { + pktLength = 0; + } + else + { + // By default, pkt will be returned to the pool after it was read. + boolean poolPkt = true; + + try + { + pktLength = pkt.getLength(); + if (length < pktLength) + { + /* + * If pkt is still the latest RawPacket made available to + * reading, reinstate it for the next invocation of read; + * otherwise, return it to the pool. + */ + poolPkt = false; + throw new IOException( + "Input buffer not big enough for " + pktLength); + } + else + { + byte[] pktBuffer = pkt.getBuffer(); + + if (pktBuffer == null) + { + throw new NullPointerException( + "pkt.buffer null, pkt.length " + pktLength + + ", pkt.offset " + pkt.getOffset()); + } + else + { + System.arraycopy( + pkt.getBuffer(), pkt.getOffset(), + buffer, offset, + pktLength); + } + } + } + finally + { + if (!poolPkt) + { + synchronized (pktSyncRoot) + { + if (this.pkt == null) + this.pkt = pkt; + else + poolPkt = true; + } + } + if (poolPkt) + { + // Return pkt to the pool because it was successfully read. + pkt.setBuffer(null); + pkt.setLength(0); + pkt.setOffset(0); + rawPacketPool.offer(pkt); + } + } + } return pktLength; } @@ -335,15 +426,73 @@ public void run() if (accept) { - pkt = createRawPacket(p); + RawPacket pkt = createRawPacket(p); /* * If we got extended, the delivery of the packet may have been * canceled. */ - if ((pkt != null) && (!pkt.isInvalid()) - && (transferHandler != null) && !closed) - transferHandler.transferData(this); + if (pkt != null) + { + if (pkt.isInvalid()) + { + /* + * Return pkt to the pool because it is invalid and, + * consequently, will not be made available to reading. + */ + pkt.setBuffer(null); + pkt.setLength(0); + pkt.setOffset(0); + rawPacketPool.offer(pkt); + } + else + { + RawPacket oldPkt; + + synchronized (pktSyncRoot) + { + oldPkt = this.pkt; + this.pkt = pkt; + } + if (oldPkt != null) + { + /* + * Return oldPkt to the pool because it was made + * available to reading and it was not read. + */ + oldPkt.setBuffer(null); + oldPkt.setLength(0); + oldPkt.setOffset(0); + rawPacketPool.offer(pkt); + } + + if ((transferHandler != null) && !closed) + { + try + { + transferHandler.transferData(this); + } + catch (Throwable t) + { + /* + * XXX We cannot allow transferHandler to kill + * us. + */ + if (t instanceof ThreadDeath) + { + throw (ThreadDeath) t; + } + else + { + logger.warn( + "An RTP packet may have not been" + + " fully handled.", + t); + } + } + } + } + } } } }