blob: 372fbe979df05fa09022da55af728802c83c68ce [file] [log] [blame]
/*
* Copyright (C) 2010 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//#define LOG_NDEBUG 0
#define LOG_TAG "ARTPConnection"
#include <utils/Log.h>
#include "ARTPAssembler.h"
#include "ARTPConnection.h"
#include "ARTPSource.h"
#include "ASessionDescription.h"
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/foundation/AString.h>
#include <media/stagefright/foundation/hexdump.h>
#include <arpa/inet.h>
#include <sys/socket.h>
namespace android {
static const size_t kMaxUDPSize = 1500;
static uint16_t u16at(const uint8_t *data) {
return data[0] << 8 | data[1];
}
static uint32_t u32at(const uint8_t *data) {
return u16at(data) << 16 | u16at(&data[2]);
}
static uint64_t u64at(const uint8_t *data) {
return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
}
// static
const int64_t ARTPConnection::kSelectTimeoutUs = 1000ll;
struct ARTPConnection::StreamInfo {
int mRTPSocket;
int mRTCPSocket;
sp<ASessionDescription> mSessionDesc;
size_t mIndex;
sp<AMessage> mNotifyMsg;
KeyedVector<uint32_t, sp<ARTPSource> > mSources;
int64_t mNumRTCPPacketsReceived;
int64_t mNumRTPPacketsReceived;
struct sockaddr_in mRemoteRTCPAddr;
bool mIsInjected;
};
ARTPConnection::ARTPConnection(uint32_t flags)
: mFlags(flags),
mPollEventPending(false),
mLastReceiverReportTimeUs(-1) {
}
ARTPConnection::~ARTPConnection() {
}
void ARTPConnection::addStream(
int rtpSocket, int rtcpSocket,
const sp<ASessionDescription> &sessionDesc,
size_t index,
const sp<AMessage> &notify,
bool injected) {
sp<AMessage> msg = new AMessage(kWhatAddStream, id());
msg->setInt32("rtp-socket", rtpSocket);
msg->setInt32("rtcp-socket", rtcpSocket);
msg->setObject("session-desc", sessionDesc);
msg->setSize("index", index);
msg->setMessage("notify", notify);
msg->setInt32("injected", injected);
msg->post();
}
void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
sp<AMessage> msg = new AMessage(kWhatRemoveStream, id());
msg->setInt32("rtp-socket", rtpSocket);
msg->setInt32("rtcp-socket", rtcpSocket);
msg->post();
}
static void bumpSocketBufferSize(int s) {
int size = 256 * 1024;
CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
}
// static
void ARTPConnection::MakePortPair(
int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
*rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
CHECK_GE(*rtpSocket, 0);
bumpSocketBufferSize(*rtpSocket);
*rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
CHECK_GE(*rtcpSocket, 0);
bumpSocketBufferSize(*rtcpSocket);
/* rand() * 1000 may overflow int type, use long long */
unsigned start = (unsigned)((rand()* 1000ll)/RAND_MAX) + 15550;
start &= ~1;
for (unsigned port = start; port < 65536; port += 2) {
struct sockaddr_in addr;
memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port);
if (bind(*rtpSocket,
(const struct sockaddr *)&addr, sizeof(addr)) < 0) {
continue;
}
addr.sin_port = htons(port + 1);
if (bind(*rtcpSocket,
(const struct sockaddr *)&addr, sizeof(addr)) == 0) {
*rtpPort = port;
return;
}
}
TRESPASS();
}
void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatAddStream:
{
onAddStream(msg);
break;
}
case kWhatRemoveStream:
{
onRemoveStream(msg);
break;
}
case kWhatPollStreams:
{
onPollStreams();
break;
}
case kWhatInjectPacket:
{
onInjectPacket(msg);
break;
}
default:
{
TRESPASS();
break;
}
}
}
void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
mStreams.push_back(StreamInfo());
StreamInfo *info = &*--mStreams.end();
int32_t s;
CHECK(msg->findInt32("rtp-socket", &s));
info->mRTPSocket = s;
CHECK(msg->findInt32("rtcp-socket", &s));
info->mRTCPSocket = s;
int32_t injected;
CHECK(msg->findInt32("injected", &injected));
info->mIsInjected = injected;
sp<RefBase> obj;
CHECK(msg->findObject("session-desc", &obj));
info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
CHECK(msg->findSize("index", &info->mIndex));
CHECK(msg->findMessage("notify", &info->mNotifyMsg));
info->mNumRTCPPacketsReceived = 0;
info->mNumRTPPacketsReceived = 0;
memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
if (!injected) {
postPollEvent();
}
}
void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
int32_t rtpSocket, rtcpSocket;
CHECK(msg->findInt32("rtp-socket", &rtpSocket));
CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
List<StreamInfo>::iterator it = mStreams.begin();
while (it != mStreams.end()
&& (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
++it;
}
if (it == mStreams.end()) {
return;
}
mStreams.erase(it);
}
void ARTPConnection::postPollEvent() {
if (mPollEventPending) {
return;
}
sp<AMessage> msg = new AMessage(kWhatPollStreams, id());
msg->post();
mPollEventPending = true;
}
void ARTPConnection::onPollStreams() {
mPollEventPending = false;
if (mStreams.empty()) {
return;
}
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = kSelectTimeoutUs;
fd_set rs;
FD_ZERO(&rs);
int maxSocket = -1;
for (List<StreamInfo>::iterator it = mStreams.begin();
it != mStreams.end(); ++it) {
if ((*it).mIsInjected) {
continue;
}
FD_SET(it->mRTPSocket, &rs);
FD_SET(it->mRTCPSocket, &rs);
if (it->mRTPSocket > maxSocket) {
maxSocket = it->mRTPSocket;
}
if (it->mRTCPSocket > maxSocket) {
maxSocket = it->mRTCPSocket;
}
}
if (maxSocket == -1) {
return;
}
int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
if (res > 0) {
List<StreamInfo>::iterator it = mStreams.begin();
while (it != mStreams.end()) {
if ((*it).mIsInjected) {
++it;
continue;
}
status_t err = OK;
if (FD_ISSET(it->mRTPSocket, &rs)) {
err = receive(&*it, true);
}
if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
err = receive(&*it, false);
}
if (err == -ECONNRESET) {
// socket failure, this stream is dead, Jim.
ALOGW("failed to receive RTP/RTCP datagram.");
it = mStreams.erase(it);
continue;
}
++it;
}
}
int64_t nowUs = ALooper::GetNowUs();
if (mLastReceiverReportTimeUs <= 0
|| mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
List<StreamInfo>::iterator it = mStreams.begin();
while (it != mStreams.end()) {
StreamInfo *s = &*it;
if (s->mIsInjected) {
++it;
continue;
}
if (s->mNumRTCPPacketsReceived == 0) {
// We have never received any RTCP packets on this stream,
// we don't even know where to send a report.
++it;
continue;
}
buffer->setRange(0, 0);
for (size_t i = 0; i < s->mSources.size(); ++i) {
sp<ARTPSource> source = s->mSources.valueAt(i);
source->addReceiverReport(buffer);
if (mFlags & kRegularlyRequestFIR) {
source->addFIR(buffer);
}
}
if (buffer->size() > 0) {
ALOGV("Sending RR...");
ssize_t n;
do {
n = sendto(
s->mRTCPSocket, buffer->data(), buffer->size(), 0,
(const struct sockaddr *)&s->mRemoteRTCPAddr,
sizeof(s->mRemoteRTCPAddr));
} while (n < 0 && errno == EINTR);
if (n <= 0) {
ALOGW("failed to send RTCP receiver report (%s).",
n == 0 ? "connection gone" : strerror(errno));
it = mStreams.erase(it);
continue;
}
CHECK_EQ(n, (ssize_t)buffer->size());
mLastReceiverReportTimeUs = nowUs;
}
++it;
}
}
if (!mStreams.empty()) {
postPollEvent();
}
}
status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
CHECK(!s->mIsInjected);
sp<ABuffer> buffer = new ABuffer(65536);
socklen_t remoteAddrLen =
(!receiveRTP && s->mNumRTCPPacketsReceived == 0)
? sizeof(s->mRemoteRTCPAddr) : 0;
ssize_t nbytes;
do {
nbytes = recvfrom(
receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
buffer->data(),
buffer->capacity(),
0,
remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
remoteAddrLen > 0 ? &remoteAddrLen : NULL);
} while (nbytes < 0 && errno == EINTR);
if (nbytes <= 0) {
return -ECONNRESET;
}
buffer->setRange(0, nbytes);
// ALOGI("received %d bytes.", buffer->size());
status_t err;
if (receiveRTP) {
err = parseRTP(s, buffer);
} else {
err = parseRTCP(s, buffer);
}
return err;
}
status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
if (s->mNumRTPPacketsReceived++ == 0) {
sp<AMessage> notify = s->mNotifyMsg->dup();
notify->setInt32("first-rtp", true);
notify->post();
}
size_t size = buffer->size();
if (size < 12) {
// Too short to be a valid RTP header.
return -1;
}
const uint8_t *data = buffer->data();
if ((data[0] >> 6) != 2) {
// Unsupported version.
return -1;
}
if (data[0] & 0x20) {
// Padding present.
size_t paddingLength = data[size - 1];
if (paddingLength + 12 > size) {
// If we removed this much padding we'd end up with something
// that's too short to be a valid RTP header.
return -1;
}
size -= paddingLength;
}
int numCSRCs = data[0] & 0x0f;
size_t payloadOffset = 12 + 4 * numCSRCs;
if (size < payloadOffset) {
// Not enough data to fit the basic header and all the CSRC entries.
return -1;
}
if (data[0] & 0x10) {
// Header eXtension present.
if (size < payloadOffset + 4) {
// Not enough data to fit the basic header, all CSRC entries
// and the first 4 bytes of the extension header.
return -1;
}
const uint8_t *extensionData = &data[payloadOffset];
size_t extensionLength =
4 * (extensionData[2] << 8 | extensionData[3]);
if (size < payloadOffset + 4 + extensionLength) {
return -1;
}
payloadOffset += 4 + extensionLength;
}
uint32_t srcId = u32at(&data[8]);
sp<ARTPSource> source = findSource(s, srcId);
uint32_t rtpTime = u32at(&data[4]);
sp<AMessage> meta = buffer->meta();
meta->setInt32("ssrc", srcId);
meta->setInt32("rtp-time", rtpTime);
meta->setInt32("PT", data[1] & 0x7f);
meta->setInt32("M", data[1] >> 7);
buffer->setInt32Data(u16at(&data[2]));
buffer->setRange(payloadOffset, size - payloadOffset);
source->processRTPPacket(buffer);
return OK;
}
status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
if (s->mNumRTCPPacketsReceived++ == 0) {
sp<AMessage> notify = s->mNotifyMsg->dup();
notify->setInt32("first-rtcp", true);
notify->post();
}
const uint8_t *data = buffer->data();
size_t size = buffer->size();
while (size > 0) {
if (size < 8) {
// Too short to be a valid RTCP header
return -1;
}
if ((data[0] >> 6) != 2) {
// Unsupported version.
return -1;
}
if (data[0] & 0x20) {
// Padding present.
size_t paddingLength = data[size - 1];
if (paddingLength + 12 > size) {
// If we removed this much padding we'd end up with something
// that's too short to be a valid RTP header.
return -1;
}
size -= paddingLength;
}
size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
if (size < headerLength) {
// Only received a partial packet?
return -1;
}
switch (data[1]) {
case 200:
{
parseSR(s, data, headerLength);
break;
}
case 201: // RR
case 202: // SDES
case 204: // APP
break;
case 205: // TSFB (transport layer specific feedback)
case 206: // PSFB (payload specific feedback)
// hexdump(data, headerLength);
break;
case 203:
{
parseBYE(s, data, headerLength);
break;
}
default:
{
ALOGW("Unknown RTCP packet type %u of size %zu",
(unsigned)data[1], headerLength);
break;
}
}
data += headerLength;
size -= headerLength;
}
return OK;
}
status_t ARTPConnection::parseBYE(
StreamInfo *s, const uint8_t *data, size_t size) {
size_t SC = data[0] & 0x3f;
if (SC == 0 || size < (4 + SC * 4)) {
// Packet too short for the minimal BYE header.
return -1;
}
uint32_t id = u32at(&data[4]);
sp<ARTPSource> source = findSource(s, id);
source->byeReceived();
return OK;
}
status_t ARTPConnection::parseSR(
StreamInfo *s, const uint8_t *data, size_t size) {
size_t RC = data[0] & 0x1f;
if (size < (7 + RC * 6) * 4) {
// Packet too short for the minimal SR header.
return -1;
}
uint32_t id = u32at(&data[4]);
uint64_t ntpTime = u64at(&data[8]);
uint32_t rtpTime = u32at(&data[16]);
#if 0
ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
id,
rtpTime,
(ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
#endif
sp<ARTPSource> source = findSource(s, id);
source->timeUpdate(rtpTime, ntpTime);
return 0;
}
sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
sp<ARTPSource> source;
ssize_t index = info->mSources.indexOfKey(srcId);
if (index < 0) {
index = info->mSources.size();
source = new ARTPSource(
srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
info->mSources.add(srcId, source);
} else {
source = info->mSources.valueAt(index);
}
return source;
}
void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
msg->setInt32("index", index);
msg->setBuffer("buffer", buffer);
msg->post();
}
void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
int32_t index;
CHECK(msg->findInt32("index", &index));
sp<ABuffer> buffer;
CHECK(msg->findBuffer("buffer", &buffer));
List<StreamInfo>::iterator it = mStreams.begin();
while (it != mStreams.end()
&& it->mRTPSocket != index && it->mRTCPSocket != index) {
++it;
}
if (it == mStreams.end()) {
TRESPASS();
}
StreamInfo *s = &*it;
status_t err;
if (it->mRTPSocket == index) {
err = parseRTP(s, buffer);
} else {
err = parseRTCP(s, buffer);
}
}
} // namespace android