Newer
Older
/*
* Jitsi, the OpenSource Java VoIP and Instant Messaging client.
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jitsi.impl.neomedia;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import javax.media.rtp.*;

Lyubomir Marinov
committed
import org.jitsi.service.libjitsi.*;
import org.jitsi.service.packetlogging.*;

Lyubomir Marinov
committed
import org.jitsi.util.*;

Lyubomir Marinov
committed
/**
* @author Bing SU (nova.su@gmail.com)
* @author Lyubomir Marinov
*/
public abstract class RTPConnectorOutputStream
implements OutputDataStream
{

Lyubomir Marinov
committed
/**
* The <tt>Logger</tt> used by the <tt>RTPConnectorOutputStream</tt> class
* and its instances for logging output.
*/
private static final Logger logger
= Logger.getLogger(RTPConnectorOutputStream.class);
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
/**
* The maximum number of packets to be sent to be kept in the queue of
* <tt>MaxPacketsPerMillisPolicy</tt>. When the maximum is reached, the next
* attempt to write a new packet in the queue will block until at least one
* packet from the queue is sent. Defined in order to prevent
* <tt>OutOfMemoryError</tt>s which, technically, may arise if the capacity
* of the queue is unlimited.
*/
public static final int
MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY
= 256;
/**
* The functionality which allows this <tt>OutputDataStream</tt> to control
* how many RTP packets it sends through its <tt>DatagramSocket</tt> per a
* specific number of milliseconds.
*/
private MaxPacketsPerMillisPolicy maxPacketsPerMillisPolicy;
/**
* Stream targets' IP addresses and ports.
*/
protected final List<InetSocketAddress> targets
= new LinkedList<InetSocketAddress>();
/**
* The pool of <tt>RawPacket</tt> instances which reduces the number of
* allocations performed by {@link #createRawPacket(byte[], int, int)}.
*/
private final LinkedBlockingQueue<RawPacket> rawPacketPool
= new LinkedBlockingQueue<RawPacket>();
/**
* Used for debugging. As we don't log every packet
* we must count them and decide which to log.
*/
private long numberOfPackets = 0;
/**
* Initializes a new <tt>RTPConnectorOutputStream</tt> which is to send
* packet data out through a specific socket.
*/
public RTPConnectorOutputStream()
{
}
/**
* Add a target to stream targets list
*
* @param remoteAddr target ip address
* @param remotePort target port
*/
public void addTarget(InetAddress remoteAddr, int remotePort)
{
InetSocketAddress target
= new InetSocketAddress(remoteAddr, remotePort);
if (!targets.contains(target))
targets.add(target);
}
/**
* Close this output stream.
*/
public void close()
{
if (maxPacketsPerMillisPolicy != null)
{
maxPacketsPerMillisPolicy.close();
maxPacketsPerMillisPolicy = null;
}
removeTargets();
}
/**
* Creates a new <tt>RawPacket</tt> from a specific <tt>byte[]</tt> buffer
* in order to have this instance send its packet data through its
* {@link #write(byte[], int, int)} method. Allows extenders to intercept
* the packet data and possibly filter and/or modify it.
*
* @param buffer the packet data to be sent to the targets of this instance
* @param offset the offset of the packet data in <tt>buffer</tt>
* @param length the length of the packet data in <tt>buffer</tt>
* @return a new <tt>RawPacket</tt> containing the packet data of the
* specified <tt>byte[]</tt> buffer or possibly its modification;
* <tt>null</tt> to ignore the packet data of the specified <tt>byte[]</tt>
* buffer and not send it to the targets of this instance through its
* {@link #write(byte[], int, int)} method
*/
protected RawPacket createRawPacket(byte[] buffer, int offset, int length)
{
RawPacket pkt = rawPacketPool.poll();
if ((pkt == null) || (pkt.getBuffer().length < length))
pkt = new RawPacket(new byte[length], 0, 0);
System.arraycopy(buffer, offset, pkt.getBuffer(), 0, length);
pkt.setLength(length);
pkt.setOffset(0);
return pkt;
}
/**
* Remove a target from stream targets list
*
* @param remoteAddr target ip address
* @param remotePort target port
* @return <tt>true</tt> if the target is in stream target list and can be
* removed; <tt>false</tt>, otherwise
*/
public boolean removeTarget(InetAddress remoteAddr, int remotePort)
{
for (Iterator<InetSocketAddress> targetIter = targets.iterator();
targetIter.hasNext();)
{
InetSocketAddress target = targetIter.next();
if (target.getAddress().equals(remoteAddr)
&& (target.getPort() == remotePort))
{
targetIter.remove();
return true;
}
}
return false;
}
/**
* Remove all stream targets from this session.
*/
public void removeTargets()
{
targets.clear();
}
/**
* Determines whether a <tt>RawPacket</tt> which has a specific number in
* the total number of sent <tt>RawPacket</tt>s is to be logged by
* {@link PacketLoggingService}.
*
* @param numOfPacket the number of the <tt>RawPacket</tt> in the total
* number of sent <tt>RawPacket</tt>s
* @return <tt>true</tt> if the <tt>RawPacket</tt> with the specified
* <tt>numOfPacket</tt> is to be logged by <tt>PacketLoggingService</tt>;
* otherwise, <tt>false</tt>
*/
static boolean logPacket(long numOfPacket)
{
return
(numOfPacket == 1)
|| (numOfPacket == 300)
|| (numOfPacket == 500)
|| (numOfPacket == 1000)
|| ((numOfPacket % 5000) == 0);
}
/**
* Sends a specific <tt>RawPacket</tt> through this
* <tt>OutputDataStream</tt> to a specific <tt>InetSocketAddress</tt>.
*
* @param packet the <tt>RawPacket</tt> to send through this
* <tt>OutputDataStream</tt> to the specified <tt>target</tt>
* @param target the <tt>InetSocketAddress</tt> to which the specified
* <tt>packet</tt> is to be sent through this <tt>OutputDataStream</tt>
* @throws IOException if anything goes wrong while sending the specified
* <tt>packet</tt> through this <tt>OutputDataStream</tt> to the specified
* <tt>target</tt>
*/
protected abstract void sendToTarget(
RawPacket packet,
InetSocketAddress target)
throws IOException;
/**
* Logs a specific <tt>RawPacket</tt> associated with a specific remote
* address.
*
* @param packet packet to log
* @param target the remote address associated with the <tt>packet</tt>
*/
protected abstract void doLogPacket(
RawPacket packet,
InetSocketAddress target);
/**
* Returns whether or not this <tt>RTPConnectorOutputStream</tt> has a valid
* socket.
*
* @return <tt>true</tt> if this <tt>RTPConnectorOutputStream</tt> has a
* valid socket; <tt>false</tt>, otherwise
*/
protected abstract boolean isSocketValid();
/**
* Sends a specific RTP packet through the <tt>DatagramSocket</tt> of this
* <tt>OutputDataSource</tt>.
*
* @param packet the RTP packet to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>
* @return <tt>true</tt> if the specified <tt>packet</tt> was successfully
* sent; otherwise, <tt>false</tt>
*/
private boolean send(RawPacket packet)
{
if(!isSocketValid())
return false;
numberOfPackets++;
for (InetSocketAddress target : targets)
{
try
{
sendToTarget(packet, target);
if(logPacket(numberOfPackets))
{
PacketLoggingService packetLogging

Lyubomir Marinov
committed
= LibJitsi.getPacketLoggingService();
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
if ((packetLogging != null)
&& packetLogging.isLoggingEnabled(
PacketLoggingService.ProtocolName.RTP))
doLogPacket(packet, target);
}
}
catch (IOException ioe)
{
rawPacketPool.offer(packet);
// TODO error handling
return false;
}
}
rawPacketPool.offer(packet);
return true;
}
/**
* Sets the maximum number of RTP packets to be sent by this
* <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
* a specific number of milliseconds.
*
* @param maxPackets the maximum number of RTP packets to be sent by this
* <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per the
* specified number of milliseconds; <tt>-1</tt> if no maximum is to be set
* @param perMillis the number of milliseconds per which <tt>maxPackets</tt>
* are to be sent by this <tt>OutputDataStream</tt> through its
* <tt>DatagramSocket</tt>
*/
public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
{
if (maxPacketsPerMillisPolicy == null)
{
if (maxPackets > 0)
{
if (perMillis < 1)
throw new IllegalArgumentException("perMillis");
maxPacketsPerMillisPolicy
= new MaxPacketsPerMillisPolicy(maxPackets, perMillis);
}
}
else
{
maxPacketsPerMillisPolicy
.setMaxPacketsPerMillis(maxPackets, perMillis);
}
}
/**
* Implements {@link OutputDataStream#write(byte[], int, int)}.
*
* @param buffer the <tt>byte[]</tt> that we'd like to copy the content
* of the packet to.
* @param offset the position where we are supposed to start writing in
* <tt>buffer</tt>.
* @param length the number of <tt>byte</tt>s available for writing in
* <tt>inBuffer</tt>.
*
* @return the number of bytes read
*/
public int write(byte[] buffer, int offset, int length)
{

Lyubomir Marinov
committed
/*
* While calling write without targets can be carried out without a
* problem, such a situation may be a symptom of a problem. For example,
* it was discovered during testing that RTCP was seemingly-endlessly
* sent after hanging up a call.
*/
if (logger.isDebugEnabled() && targets.isEmpty())
logger.debug("Write called without targets!", new Throwable());
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
RawPacket packet = createRawPacket(buffer, offset, length);
/*
* If we got extended, the delivery of the packet may have been
* canceled.
*/
if (packet != null)
{
if (maxPacketsPerMillisPolicy == null)
{
if (!send(packet))
return -1;
}
else
maxPacketsPerMillisPolicy.write(packet);
}
return length;
}
/**
* Changes current thread priority.
* @param priority the new priority.
*/
public void setPriority(int priority)
{
// currently no priority is set

Lyubomir Marinov
committed
// if ((maxPacketsPerMillisPolicy != null)
// && (maxPacketsPerMillisPolicy.sendThread != null))
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
// maxPacketsPerMillisPolicy.sendThread.setPriority(priority);
}
/**
* Implements the functionality which allows this <tt>OutputDataStream</tt>
* to control how many RTP packets it sends through its
* <tt>DatagramSocket</tt> per a specific number of milliseconds.
*/
private class MaxPacketsPerMillisPolicy
{
/**
* The maximum number of RTP packets to be sent by this
* <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
* {@link #perNanos} nanoseconds.
*/
private int maxPackets = -1;
/**
* The time stamp in nanoseconds of the start of the current
* <tt>perNanos</tt> interval.
*/
private long millisStartTime = 0;
/**
* The list of RTP packets to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
*/
private final ArrayBlockingQueue<RawPacket> packetQueue
= new ArrayBlockingQueue<RawPacket>(
MAX_PACKETS_PER_MILLIS_POLICY_PACKET_QUEUE_CAPACITY);
/**
* The number of RTP packets already sent during the current
* <tt>perNanos</tt> interval.
*/
private long packetsSentInMillis = 0;
/**
* The time interval in nanoseconds during which {@link #maxPackets}
* number of RTP packets are to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataSource</tt>.
*/
private long perNanos = -1;
/**
* The <tt>Thread</tt> which is to send the RTP packets in
* {@link #packetQueue} through the <tt>DatagramSocket</tt> of this
* <tt>OutputDataSource</tt>.
*/
private Thread sendThread;
/**
* To signal run or stop condition to send thread.
*/
private boolean sendRun = true;
/**
* Initializes a new <tt>MaxPacketsPerMillisPolicy</tt> instance which
* is to control how many RTP packets this <tt>OutputDataSource</tt> is
* to send through its <tt>DatagramSocket</tt> per a specific number of
* milliseconds.
*
* @param maxPackets the maximum number of RTP packets to be sent per
* <tt>perMillis</tt> milliseconds through the <tt>DatagramSocket</tt>
* of this <tt>OutputDataStream</tt>
* @param perMillis the number of milliseconds per which a maximum of
* <tt>maxPackets</tt> RTP packets are to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
*/
public MaxPacketsPerMillisPolicy(int maxPackets, long perMillis)
{
setMaxPacketsPerMillis(maxPackets, perMillis);
synchronized (this) {
if (sendThread == null)
{
sendThread
= new Thread(getClass().getName())
{
@Override
public void run()
{
runInSendThread();
}
};
sendThread.setDaemon(true);
sendThread.start();
}
}
}
/**
* Closes the connector.
*/
synchronized void close()
{
if (!sendRun)
return;
sendRun = false;
// just offer a new packet to wakeup thread in case it waits for
// a packet.
packetQueue.offer(new RawPacket(null, 0, 0));
}
/**
* Sends the RTP packets in {@link #packetQueue} in accord with
* {@link #maxPackets} and {@link #perNanos}.
*/
private void runInSendThread()
{
try
{
while (sendRun)
{
RawPacket packet = null;
while (true)
{
try
{
packet = packetQueue.take();
break;
}
catch (InterruptedException iex)
{
continue;
}
}
if (!sendRun)
break;
long time = System.nanoTime();
long millisRemainingTime = time - millisStartTime;
if ((perNanos < 1)
|| (millisRemainingTime >= perNanos))
{
millisStartTime = time;
packetsSentInMillis = 0;
}
else if ((maxPackets > 0)
&& (packetsSentInMillis >= maxPackets))
{
while (true)
{
millisRemainingTime = System.nanoTime()
- millisStartTime;
if (millisRemainingTime >= perNanos)
break;
LockSupport.parkNanos(millisRemainingTime);
}
millisStartTime = System.nanoTime();
packetsSentInMillis = 0;
}
send(packet);
packetsSentInMillis++;
}
}
finally
{
packetQueue.clear();
synchronized (packetQueue)
{
if (Thread.currentThread().equals(sendThread))
sendThread = null;
}
}
}
/**
* Sets the maximum number of RTP packets to be sent by this
* <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt> per
* a specific number of milliseconds.
*
* @param maxPackets the maximum number of RTP packets to be sent by
* this <tt>OutputDataStream</tt> through its <tt>DatagramSocket</tt>
* per the specified number of milliseconds; <tt>-1</tt> if no maximum
* is to be set
* @param perMillis the number of milliseconds per which
* <tt>maxPackets</tt> are to be sent by this <tt>OutputDataStream</tt>
* through its <tt>DatagramSocket</tt>
*/
public void setMaxPacketsPerMillis(int maxPackets, long perMillis)
{
if (maxPackets < 1)
{
this.maxPackets = -1;
this.perNanos = -1;
}
else
{
if (perMillis < 1)
throw new IllegalArgumentException("perMillis");
this.maxPackets = maxPackets;
this.perNanos = perMillis * 1000000;
}
}
/**
* Queues a specific RTP packet to be sent through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>.
*
* @param packet the RTP packet to be queued for sending through the
* <tt>DatagramSocket</tt> of this <tt>OutputDataStream</tt>
*/
public void write(RawPacket packet)
{
while (true)
{
try
{
packetQueue.put(packet);
break;
}
catch (InterruptedException iex)
{
}
}
}
}
}