Skip to content
Snippets Groups Projects
Commit 6bed1f06 authored by Lyubomir Marinov's avatar Lyubomir Marinov
Browse files

Fixes an issue which could cause an RTP packet to be read more than once.

parent 854a48d4
No related branches found
No related tags found
No related merge requests found
...@@ -8,6 +8,8 @@ ...@@ -8,6 +8,8 @@
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import javax.media.protocol.*; import javax.media.protocol.*;
...@@ -37,6 +39,13 @@ public abstract class RTPConnectorInputStream ...@@ -37,6 +39,13 @@ public abstract class RTPConnectorInputStream
*/ */
public static final int PACKET_RECEIVE_BUFFER_LENGTH = 4 * 1024; public static final int PACKET_RECEIVE_BUFFER_LENGTH = 4 * 1024;
/**
* The <tt>Logger</tt> used by the <tt>RTPConnectorInputStream</tt> class
* and its instances to print debug information.
*/
private static final Logger logger
= Logger.getLogger(RTPConnectorInputStream.class);
/** /**
* Packet receive buffer * Packet receive buffer
*/ */
...@@ -48,6 +57,13 @@ public abstract class RTPConnectorInputStream ...@@ -48,6 +57,13 @@ public abstract class RTPConnectorInputStream
*/ */
protected boolean closed; protected boolean closed;
/**
* The <tt>DatagramPacketFilter</tt>s which allow dropping
* <tt>DatagramPacket</tt>s before they are converted into
* <tt>RawPacket</tt>s.
*/
private DatagramPacketFilter[] datagramPacketFilters;
/** /**
* Caught an IO exception during read from socket * Caught an IO exception during read from socket
*/ */
...@@ -57,12 +73,19 @@ public abstract class RTPConnectorInputStream ...@@ -57,12 +73,19 @@ public abstract class RTPConnectorInputStream
* The packet data to be read out of this instance through its * The packet data to be read out of this instance through its
* {@link #read(byte[], int, int)} method. * {@link #read(byte[], int, int)} method.
*/ */
protected RawPacket pkt; private RawPacket pkt;
/** /**
* SourceTransferHandler object which is used to read packets. * The <tt>Object</tt> which synchronizes the access to {@link #pkt}.
*/ */
private SourceTransferHandler transferHandler; private final Object pktSyncRoot = new Object();
/**
* The pool of <tt>RawPacket</tt> instances to reduce their allocations and
* garbage collection.
*/
private final Queue<RawPacket> rawPacketPool
= new LinkedBlockingQueue<RawPacket>();
/** /**
* The Thread receiving packets. * The Thread receiving packets.
...@@ -70,11 +93,9 @@ public abstract class RTPConnectorInputStream ...@@ -70,11 +93,9 @@ public abstract class RTPConnectorInputStream
protected Thread receiverThread = null; protected Thread receiverThread = null;
/** /**
* The <tt>DatagramPacketFilter</tt>s which allow dropping * SourceTransferHandler object which is used to read packets.
* <tt>DatagramPacket</tt>s before they are converted into
* <tt>RawPacket</tt>s.
*/ */
private DatagramPacketFilter[] datagramPacketFilters; private SourceTransferHandler transferHandler;
/** /**
* Initializes a new <tt>RTPConnectorInputStream</tt> which is to receive * Initializes a new <tt>RTPConnectorInputStream</tt> which is to receive
...@@ -135,17 +156,22 @@ public synchronized void close() ...@@ -135,17 +156,22 @@ public synchronized void close()
*/ */
protected RawPacket createRawPacket(DatagramPacket datagramPacket) protected RawPacket createRawPacket(DatagramPacket datagramPacket)
{ {
RawPacket pkt = rawPacketPool.poll();
if (pkt == null) if (pkt == null)
{ {
return pkt
new RawPacket( = new RawPacket(
datagramPacket.getData(), datagramPacket.getData(),
datagramPacket.getOffset(), datagramPacket.getOffset(),
datagramPacket.getLength()); datagramPacket.getLength());
} }
pkt.setBuffer(datagramPacket.getData()); else
pkt.setLength(datagramPacket.getLength()); {
pkt.setOffset(datagramPacket.getOffset()); pkt.setBuffer(datagramPacket.getData());
pkt.setLength(datagramPacket.getLength());
pkt.setOffset(datagramPacket.getOffset());
}
return pkt; return pkt;
} }
...@@ -182,7 +208,7 @@ public ContentDescriptor getContentDescriptor() ...@@ -182,7 +208,7 @@ public ContentDescriptor getContentDescriptor()
*/ */
public long getContentLength() public long getContentLength()
{ {
return pkt.getLength(); return LENGTH_UNKNOWN;
} }
/** /**
...@@ -242,21 +268,86 @@ public int getMinimumTransferSize() ...@@ -242,21 +268,86 @@ public int getMinimumTransferSize()
public int read(byte[] buffer, int offset, int length) public int read(byte[] buffer, int offset, int length)
throws IOException throws IOException
{ {
if (buffer == null)
throw new NullPointerException("buffer");
if (ioError) if (ioError)
return -1; return -1;
int pktLength = pkt.getLength(); RawPacket pkt;
if (length < pktLength) synchronized (pktSyncRoot)
{ {
throw new IOException( pkt = this.pkt;
"Input buffer not big enough for " + pktLength); this.pkt = null;
} }
System.arraycopy( int pktLength;
pkt.getBuffer(), pkt.getOffset(),
buffer, offset, if (pkt == null)
pktLength); {
pktLength = 0;
}
else
{
// By default, pkt will be returned to the pool after it was read.
boolean poolPkt = true;
try
{
pktLength = pkt.getLength();
if (length < pktLength)
{
/*
* If pkt is still the latest RawPacket made available to
* reading, reinstate it for the next invocation of read;
* otherwise, return it to the pool.
*/
poolPkt = false;
throw new IOException(
"Input buffer not big enough for " + pktLength);
}
else
{
byte[] pktBuffer = pkt.getBuffer();
if (pktBuffer == null)
{
throw new NullPointerException(
"pkt.buffer null, pkt.length " + pktLength
+ ", pkt.offset " + pkt.getOffset());
}
else
{
System.arraycopy(
pkt.getBuffer(), pkt.getOffset(),
buffer, offset,
pktLength);
}
}
}
finally
{
if (!poolPkt)
{
synchronized (pktSyncRoot)
{
if (this.pkt == null)
this.pkt = pkt;
else
poolPkt = true;
}
}
if (poolPkt)
{
// Return pkt to the pool because it was successfully read.
pkt.setBuffer(null);
pkt.setLength(0);
pkt.setOffset(0);
rawPacketPool.offer(pkt);
}
}
}
return pktLength; return pktLength;
} }
...@@ -335,15 +426,73 @@ public void run() ...@@ -335,15 +426,73 @@ public void run()
if (accept) if (accept)
{ {
pkt = createRawPacket(p); RawPacket pkt = createRawPacket(p);
/* /*
* If we got extended, the delivery of the packet may have been * If we got extended, the delivery of the packet may have been
* canceled. * canceled.
*/ */
if ((pkt != null) && (!pkt.isInvalid()) if (pkt != null)
&& (transferHandler != null) && !closed) {
transferHandler.transferData(this); if (pkt.isInvalid())
{
/*
* Return pkt to the pool because it is invalid and,
* consequently, will not be made available to reading.
*/
pkt.setBuffer(null);
pkt.setLength(0);
pkt.setOffset(0);
rawPacketPool.offer(pkt);
}
else
{
RawPacket oldPkt;
synchronized (pktSyncRoot)
{
oldPkt = this.pkt;
this.pkt = pkt;
}
if (oldPkt != null)
{
/*
* Return oldPkt to the pool because it was made
* available to reading and it was not read.
*/
oldPkt.setBuffer(null);
oldPkt.setLength(0);
oldPkt.setOffset(0);
rawPacketPool.offer(pkt);
}
if ((transferHandler != null) && !closed)
{
try
{
transferHandler.transferData(this);
}
catch (Throwable t)
{
/*
* XXX We cannot allow transferHandler to kill
* us.
*/
if (t instanceof ThreadDeath)
{
throw (ThreadDeath) t;
}
else
{
logger.warn(
"An RTP packet may have not been"
+ " fully handled.",
t);
}
}
}
}
}
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment