blob: d0f3bc2f68f94eed995c21447e24741c56560716 [file] [log] [blame]
/*
* Copyright (C) 2010 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//#define LOG_NDEBUG 0
#define LOG_TAG "LiveSession"
#include <utils/Log.h>
#include "LiveSession.h"
#include "M3UParser.h"
#include "PlaylistFetcher.h"
#include "include/HTTPBase.h"
#include "mpeg2ts/AnotherPacketSource.h"
#include <cutils/properties.h>
#include <media/IMediaHTTPConnection.h>
#include <media/IMediaHTTPService.h>
#include <media/stagefright/foundation/hexdump.h>
#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/DataSource.h>
#include <media/stagefright/FileSource.h>
#include <media/stagefright/MediaErrors.h>
#include <media/stagefright/MediaHTTP.h>
#include <media/stagefright/MetaData.h>
#include <media/stagefright/Utils.h>
#include <utils/Mutex.h>
#include <ctype.h>
#include <inttypes.h>
#include <openssl/aes.h>
#include <openssl/md5.h>
namespace android {
// Number of recently-read bytes to use for bandwidth estimation
const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024;
LiveSession::LiveSession(
const sp<AMessage> &notify, uint32_t flags,
const sp<IMediaHTTPService> &httpService)
: mNotify(notify),
mFlags(flags),
mHTTPService(httpService),
mInPreparationPhase(true),
mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
mCurBandwidthIndex(-1),
mStreamMask(0),
mNewStreamMask(0),
mSwapMask(0),
mCheckBandwidthGeneration(0),
mSwitchGeneration(0),
mSubtitleGeneration(0),
mLastDequeuedTimeUs(0ll),
mRealTimeBaseUs(0ll),
mReconfigurationInProgress(false),
mSwitchInProgress(false),
mDisconnectReplyID(0),
mSeekReplyID(0),
mFirstTimeUsValid(false),
mFirstTimeUs(0),
mLastSeekTimeUs(0) {
mStreams[kAudioIndex] = StreamItem("audio");
mStreams[kVideoIndex] = StreamItem("video");
mStreams[kSubtitleIndex] = StreamItem("subtitles");
for (size_t i = 0; i < kMaxStreams; ++i) {
mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
mBuffering[i] = false;
}
size_t numHistoryItems = kBandwidthHistoryBytes /
PlaylistFetcher::kDownloadBlockSize + 1;
if (numHistoryItems < 5) {
numHistoryItems = 5;
}
mHTTPDataSource->setBandwidthHistorySize(numHistoryItems);
}
LiveSession::~LiveSession() {
}
sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
ABuffer *discontinuity = new ABuffer(0);
discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
discontinuity->meta()->setInt32("swapPacketSource", swap);
discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
discontinuity->meta()->setInt64("timeUs", -1);
return discontinuity;
}
void LiveSession::swapPacketSource(StreamType stream) {
sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
sp<AnotherPacketSource> tmp = aps;
aps = aps2;
aps2 = tmp;
aps2->clear();
}
status_t LiveSession::dequeueAccessUnit(
StreamType stream, sp<ABuffer> *accessUnit) {
if (!(mStreamMask & stream)) {
// return -EWOULDBLOCK to avoid halting the decoder
// when switching between audio/video and audio only.
return -EWOULDBLOCK;
}
status_t finalResult;
sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream);
if (discontinuityQueue->hasBufferAvailable(&finalResult)) {
discontinuityQueue->dequeueAccessUnit(accessUnit);
// seeking, track switching
sp<AMessage> extra;
int64_t timeUs;
if ((*accessUnit)->meta()->findMessage("extra", &extra)
&& extra != NULL
&& extra->findInt64("timeUs", &timeUs)) {
// seeking only
mLastSeekTimeUs = timeUs;
mDiscontinuityOffsetTimesUs.clear();
mDiscontinuityAbsStartTimesUs.clear();
}
return INFO_DISCONTINUITY;
}
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
ssize_t idx = typeToIndex(stream);
if (!packetSource->hasBufferAvailable(&finalResult)) {
if (finalResult == OK) {
mBuffering[idx] = true;
return -EAGAIN;
} else {
return finalResult;
}
}
int32_t targetDuration = 0;
sp<AMessage> meta = packetSource->getLatestEnqueuedMeta();
if (meta != NULL) {
meta->findInt32("targetDuration", &targetDuration);
}
int64_t targetDurationUs = targetDuration * 1000000ll;
if (targetDurationUs == 0 ||
targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) {
// Fetchers limit buffering to
// min(3 * targetDuration, kMinBufferedDurationUs)
targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs;
}
if (mBuffering[idx]) {
if (mSwitchInProgress
|| packetSource->isFinished(0)
|| packetSource->getEstimatedDurationUs() > targetDurationUs) {
mBuffering[idx] = false;
}
}
if (mBuffering[idx]) {
return -EAGAIN;
}
// wait for counterpart
sp<AnotherPacketSource> otherSource;
uint32_t mask = mNewStreamMask & mStreamMask;
uint32_t fetchersMask = 0;
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
fetchersMask |= fetcherMask;
}
mask &= fetchersMask;
if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
} else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
}
if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
return finalResult == OK ? -EAGAIN : finalResult;
}
status_t err = packetSource->dequeueAccessUnit(accessUnit);
size_t streamIdx;
const char *streamStr;
switch (stream) {
case STREAMTYPE_AUDIO:
streamIdx = kAudioIndex;
streamStr = "audio";
break;
case STREAMTYPE_VIDEO:
streamIdx = kVideoIndex;
streamStr = "video";
break;
case STREAMTYPE_SUBTITLES:
streamIdx = kSubtitleIndex;
streamStr = "subs";
break;
default:
TRESPASS();
}
StreamItem& strm = mStreams[streamIdx];
if (err == INFO_DISCONTINUITY) {
// adaptive streaming, discontinuities in the playlist
int32_t type;
CHECK((*accessUnit)->meta()->findInt32("discontinuity", &type));
sp<AMessage> extra;
if (!(*accessUnit)->meta()->findMessage("extra", &extra)) {
extra.clear();
}
ALOGI("[%s] read discontinuity of type %d, extra = %s",
streamStr,
type,
extra == NULL ? "NULL" : extra->debugString().c_str());
int32_t swap;
if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
int32_t switchGeneration;
CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
{
Mutex::Autolock lock(mSwapMutex);
if (switchGeneration == mSwitchGeneration) {
swapPacketSource(stream);
sp<AMessage> msg = new AMessage(kWhatSwapped, id());
msg->setInt32("stream", stream);
msg->setInt32("switchGeneration", switchGeneration);
msg->post();
}
}
} else {
size_t seq = strm.mCurDiscontinuitySeq;
int64_t offsetTimeUs;
if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
offsetTimeUs = mDiscontinuityOffsetTimesUs.valueFor(seq);
} else {
offsetTimeUs = 0;
}
seq += 1;
if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
int64_t firstTimeUs;
firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
offsetTimeUs += strm.mLastDequeuedTimeUs - firstTimeUs;
offsetTimeUs += strm.mLastSampleDurationUs;
} else {
offsetTimeUs += strm.mLastSampleDurationUs;
}
mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
}
} else if (err == OK) {
if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
int64_t timeUs;
int32_t discontinuitySeq = 0;
CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
(*accessUnit)->meta()->findInt32("discontinuitySeq", &discontinuitySeq);
strm.mCurDiscontinuitySeq = discontinuitySeq;
int32_t discard = 0;
int64_t firstTimeUs;
if (mDiscontinuityAbsStartTimesUs.indexOfKey(strm.mCurDiscontinuitySeq) >= 0) {
int64_t durUs; // approximate sample duration
if (timeUs > strm.mLastDequeuedTimeUs) {
durUs = timeUs - strm.mLastDequeuedTimeUs;
} else {
durUs = strm.mLastDequeuedTimeUs - timeUs;
}
strm.mLastSampleDurationUs = durUs;
firstTimeUs = mDiscontinuityAbsStartTimesUs.valueFor(strm.mCurDiscontinuitySeq);
} else if ((*accessUnit)->meta()->findInt32("discard", &discard) && discard) {
firstTimeUs = timeUs;
} else {
mDiscontinuityAbsStartTimesUs.add(strm.mCurDiscontinuitySeq, timeUs);
firstTimeUs = timeUs;
}
strm.mLastDequeuedTimeUs = timeUs;
if (timeUs >= firstTimeUs) {
timeUs -= firstTimeUs;
} else {
timeUs = 0;
}
timeUs += mLastSeekTimeUs;
if (mDiscontinuityOffsetTimesUs.indexOfKey(discontinuitySeq) >= 0) {
timeUs += mDiscontinuityOffsetTimesUs.valueFor(discontinuitySeq);
}
ALOGV("[%s] read buffer at time %" PRId64 " us", streamStr, timeUs);
(*accessUnit)->meta()->setInt64("timeUs", timeUs);
mLastDequeuedTimeUs = timeUs;
mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
} else if (stream == STREAMTYPE_SUBTITLES) {
int32_t subtitleGeneration;
if ((*accessUnit)->meta()->findInt32("subtitleGeneration", &subtitleGeneration)
&& subtitleGeneration != mSubtitleGeneration) {
return -EAGAIN;
};
(*accessUnit)->meta()->setInt32(
"trackIndex", mPlaylist->getSelectedIndex());
(*accessUnit)->meta()->setInt64("baseUs", mRealTimeBaseUs);
}
} else {
ALOGI("[%s] encountered error %d", streamStr, err);
}
return err;
}
status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
// No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
if (!(mStreamMask & stream)) {
return UNKNOWN_ERROR;
}
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
sp<MetaData> meta = packetSource->getFormat();
if (meta == NULL) {
return -EAGAIN;
}
return convertMetaDataToMessage(meta, format);
}
void LiveSession::connectAsync(
const char *url, const KeyedVector<String8, String8> *headers) {
sp<AMessage> msg = new AMessage(kWhatConnect, id());
msg->setString("url", url);
if (headers != NULL) {
msg->setPointer(
"headers",
new KeyedVector<String8, String8>(*headers));
}
msg->post();
}
status_t LiveSession::disconnect() {
sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
sp<AMessage> response;
status_t err = msg->postAndAwaitResponse(&response);
return err;
}
status_t LiveSession::seekTo(int64_t timeUs) {
sp<AMessage> msg = new AMessage(kWhatSeek, id());
msg->setInt64("timeUs", timeUs);
sp<AMessage> response;
status_t err = msg->postAndAwaitResponse(&response);
return err;
}
void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatConnect:
{
onConnect(msg);
break;
}
case kWhatDisconnect:
{
CHECK(msg->senderAwaitsResponse(&mDisconnectReplyID));
if (mReconfigurationInProgress) {
break;
}
finishDisconnect();
break;
}
case kWhatSeek:
{
uint32_t seekReplyID;
CHECK(msg->senderAwaitsResponse(&seekReplyID));
mSeekReplyID = seekReplyID;
mSeekReply = new AMessage;
status_t err = onSeek(msg);
if (err != OK) {
msg->post(50000);
}
break;
}
case kWhatFetcherNotify:
{
int32_t what;
CHECK(msg->findInt32("what", &what));
switch (what) {
case PlaylistFetcher::kWhatStarted:
break;
case PlaylistFetcher::kWhatPaused:
case PlaylistFetcher::kWhatStopped:
{
if (what == PlaylistFetcher::kWhatStopped) {
AString uri;
CHECK(msg->findString("uri", &uri));
if (mFetcherInfos.removeItem(uri) < 0) {
// ignore duplicated kWhatStopped messages.
break;
}
if (mSwitchInProgress) {
tryToFinishBandwidthSwitch();
}
}
if (mContinuation != NULL) {
CHECK_GT(mContinuationCounter, 0);
if (--mContinuationCounter == 0) {
mContinuation->post();
if (mSeekReplyID != 0) {
CHECK(mSeekReply != NULL);
mSeekReply->setInt32("err", OK);
mSeekReply->postReply(mSeekReplyID);
mSeekReplyID = 0;
mSeekReply.clear();
}
}
}
break;
}
case PlaylistFetcher::kWhatDurationUpdate:
{
AString uri;
CHECK(msg->findString("uri", &uri));
int64_t durationUs;
CHECK(msg->findInt64("durationUs", &durationUs));
FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
info->mDurationUs = durationUs;
break;
}
case PlaylistFetcher::kWhatError:
{
status_t err;
CHECK(msg->findInt32("err", &err));
ALOGE("XXX Received error %d from PlaylistFetcher.", err);
// handle EOS on subtitle tracks independently
AString uri;
if (err == ERROR_END_OF_STREAM && msg->findString("uri", &uri)) {
ssize_t i = mFetcherInfos.indexOfKey(uri);
if (i >= 0) {
const sp<PlaylistFetcher> &fetcher = mFetcherInfos.valueAt(i).mFetcher;
if (fetcher != NULL) {
uint32_t type = fetcher->getStreamTypeMask();
if (type == STREAMTYPE_SUBTITLES) {
mPacketSources.valueFor(
STREAMTYPE_SUBTITLES)->signalEOS(err);;
break;
}
}
}
}
if (mInPreparationPhase) {
postPrepared(err);
}
cancelBandwidthSwitch();
mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);
mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
mPacketSources.valueFor(
STREAMTYPE_SUBTITLES)->signalEOS(err);
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatError);
notify->setInt32("err", err);
notify->post();
break;
}
case PlaylistFetcher::kWhatTemporarilyDoneFetching:
{
AString uri;
CHECK(msg->findString("uri", &uri));
if (mFetcherInfos.indexOfKey(uri) < 0) {
ALOGE("couldn't find uri");
break;
}
FetcherInfo *info = &mFetcherInfos.editValueFor(uri);
info->mIsPrepared = true;
if (mInPreparationPhase) {
bool allFetchersPrepared = true;
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
if (!mFetcherInfos.valueAt(i).mIsPrepared) {
allFetchersPrepared = false;
break;
}
}
if (allFetchersPrepared) {
postPrepared(OK);
}
}
break;
}
case PlaylistFetcher::kWhatStartedAt:
{
int32_t switchGeneration;
CHECK(msg->findInt32("switchGeneration", &switchGeneration));
if (switchGeneration != mSwitchGeneration) {
break;
}
// Resume fetcher for the original variant; the resumed fetcher should
// continue until the timestamps found in msg, which is stored by the
// new fetcher to indicate where the new variant has started buffering.
for (size_t i = 0; i < mFetcherInfos.size(); i++) {
const FetcherInfo info = mFetcherInfos.valueAt(i);
if (info.mToBeRemoved) {
info.mFetcher->resumeUntilAsync(msg);
}
}
break;
}
default:
TRESPASS();
}
break;
}
case kWhatCheckBandwidth:
{
int32_t generation;
CHECK(msg->findInt32("generation", &generation));
if (generation != mCheckBandwidthGeneration) {
break;
}
onCheckBandwidth(msg);
break;
}
case kWhatChangeConfiguration:
{
onChangeConfiguration(msg);
break;
}
case kWhatChangeConfiguration2:
{
onChangeConfiguration2(msg);
break;
}
case kWhatChangeConfiguration3:
{
onChangeConfiguration3(msg);
break;
}
case kWhatFinishDisconnect2:
{
onFinishDisconnect2();
break;
}
case kWhatSwapped:
{
onSwapped(msg);
break;
}
case kWhatCheckSwitchDown:
{
onCheckSwitchDown();
break;
}
case kWhatSwitchDown:
{
onSwitchDown();
break;
}
default:
TRESPASS();
break;
}
}
// static
int LiveSession::SortByBandwidth(const BandwidthItem *a, const BandwidthItem *b) {
if (a->mBandwidth < b->mBandwidth) {
return -1;
} else if (a->mBandwidth == b->mBandwidth) {
return 0;
}
return 1;
}
// static
LiveSession::StreamType LiveSession::indexToType(int idx) {
CHECK(idx >= 0 && idx < kMaxStreams);
return (StreamType)(1 << idx);
}
// static
ssize_t LiveSession::typeToIndex(int32_t type) {
switch (type) {
case STREAMTYPE_AUDIO:
return 0;
case STREAMTYPE_VIDEO:
return 1;
case STREAMTYPE_SUBTITLES:
return 2;
default:
return -1;
};
return -1;
}
void LiveSession::onConnect(const sp<AMessage> &msg) {
AString url;
CHECK(msg->findString("url", &url));
KeyedVector<String8, String8> *headers = NULL;
if (!msg->findPointer("headers", (void **)&headers)) {
mExtraHeaders.clear();
} else {
mExtraHeaders = *headers;
delete headers;
headers = NULL;
}
// TODO currently we don't know if we are coming here from incognito mode
ALOGI("onConnect %s", uriDebugString(url).c_str());
mMasterURL = url;
bool dummy;
mPlaylist = fetchPlaylist(url.c_str(), NULL /* curPlaylistHash */, &dummy);
if (mPlaylist == NULL) {
ALOGE("unable to fetch master playlist %s.", uriDebugString(url).c_str());
postPrepared(ERROR_IO);
return;
}
// We trust the content provider to make a reasonable choice of preferred
// initial bandwidth by listing it first in the variant playlist.
// At startup we really don't have a good estimate on the available
// network bandwidth since we haven't tranferred any data yet. Once
// we have we can make a better informed choice.
size_t initialBandwidth = 0;
size_t initialBandwidthIndex = 0;
if (mPlaylist->isVariantPlaylist()) {
for (size_t i = 0; i < mPlaylist->size(); ++i) {
BandwidthItem item;
item.mPlaylistIndex = i;
sp<AMessage> meta;
AString uri;
mPlaylist->itemAt(i, &uri, &meta);
CHECK(meta->findInt32("bandwidth", (int32_t *)&item.mBandwidth));
if (initialBandwidth == 0) {
initialBandwidth = item.mBandwidth;
}
mBandwidthItems.push(item);
}
CHECK_GT(mBandwidthItems.size(), 0u);
mBandwidthItems.sort(SortByBandwidth);
for (size_t i = 0; i < mBandwidthItems.size(); ++i) {
if (mBandwidthItems.itemAt(i).mBandwidth == initialBandwidth) {
initialBandwidthIndex = i;
break;
}
}
} else {
// dummy item.
BandwidthItem item;
item.mPlaylistIndex = 0;
item.mBandwidth = 0;
mBandwidthItems.push(item);
}
mPlaylist->pickRandomMediaItems();
changeConfiguration(
0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */);
}
void LiveSession::finishDisconnect() {
// No reconfiguration is currently pending, make sure none will trigger
// during disconnection either.
cancelCheckBandwidthEvent();
// Protect mPacketSources from a swapPacketSource race condition through disconnect.
// (finishDisconnect, onFinishDisconnect2)
cancelBandwidthSwitch();
// cancel switch down monitor
mSwitchDownMonitor.clear();
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
mFetcherInfos.valueAt(i).mFetcher->stopAsync();
}
sp<AMessage> msg = new AMessage(kWhatFinishDisconnect2, id());
mContinuationCounter = mFetcherInfos.size();
mContinuation = msg;
if (mContinuationCounter == 0) {
msg->post();
}
}
void LiveSession::onFinishDisconnect2() {
mContinuation.clear();
mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(ERROR_END_OF_STREAM);
mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(ERROR_END_OF_STREAM);
mPacketSources.valueFor(
STREAMTYPE_SUBTITLES)->signalEOS(ERROR_END_OF_STREAM);
sp<AMessage> response = new AMessage;
response->setInt32("err", OK);
response->postReply(mDisconnectReplyID);
mDisconnectReplyID = 0;
}
sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
ssize_t index = mFetcherInfos.indexOfKey(uri);
if (index >= 0) {
return NULL;
}
sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
notify->setString("uri", uri);
notify->setInt32("switchGeneration", mSwitchGeneration);
FetcherInfo info;
info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
info.mDurationUs = -1ll;
info.mIsPrepared = false;
info.mToBeRemoved = false;
looper()->registerHandler(info.mFetcher);
mFetcherInfos.add(uri, info);
return info.mFetcher;
}
/*
* Illustration of parameters:
*
* 0 `range_offset`
* +------------+-------------------------------------------------------+--+--+
* | | | next block to fetch | | |
* | | `source` handle => `out` buffer | | | |
* | `url` file |<--------- buffer size --------->|<--- `block_size` -->| | |
* | |<----------- `range_length` / buffer capacity ----------->| |
* |<------------------------------ file_size ------------------------------->|
*
* Special parameter values:
* - range_length == -1 means entire file
* - block_size == 0 means entire range
*
*/
ssize_t LiveSession::fetchFile(
const char *url, sp<ABuffer> *out,
int64_t range_offset, int64_t range_length,
uint32_t block_size, /* download block size */
sp<DataSource> *source, /* to return and reuse source */
String8 *actualUrl) {
off64_t size;
sp<DataSource> temp_source;
if (source == NULL) {
source = &temp_source;
}
if (*source == NULL) {
if (!strncasecmp(url, "file://", 7)) {
*source = new FileSource(url + 7);
} else if (strncasecmp(url, "http://", 7)
&& strncasecmp(url, "https://", 8)) {
return ERROR_UNSUPPORTED;
} else {
KeyedVector<String8, String8> headers = mExtraHeaders;
if (range_offset > 0 || range_length >= 0) {
headers.add(
String8("Range"),
String8(
AStringPrintf(
"bytes=%lld-%s",
range_offset,
range_length < 0
? "" : AStringPrintf("%lld",
range_offset + range_length - 1).c_str()).c_str()));
}
status_t err = mHTTPDataSource->connect(url, &headers);
if (err != OK) {
return err;
}
*source = mHTTPDataSource;
}
}
status_t getSizeErr = (*source)->getSize(&size);
if (getSizeErr != OK) {
size = 65536;
}
sp<ABuffer> buffer = *out != NULL ? *out : new ABuffer(size);
if (*out == NULL) {
buffer->setRange(0, 0);
}
ssize_t bytesRead = 0;
// adjust range_length if only reading partial block
if (block_size > 0 && (range_length == -1 || (int64_t)(buffer->size() + block_size) < range_length)) {
range_length = buffer->size() + block_size;
}
for (;;) {
// Only resize when we don't know the size.
size_t bufferRemaining = buffer->capacity() - buffer->size();
if (bufferRemaining == 0 && getSizeErr != OK) {
size_t bufferIncrement = buffer->size() / 2;
if (bufferIncrement < 32768) {
bufferIncrement = 32768;
}
bufferRemaining = bufferIncrement;
ALOGV("increasing download buffer to %zu bytes",
buffer->size() + bufferRemaining);
sp<ABuffer> copy = new ABuffer(buffer->size() + bufferRemaining);
memcpy(copy->data(), buffer->data(), buffer->size());
copy->setRange(0, buffer->size());
buffer = copy;
}
size_t maxBytesToRead = bufferRemaining;
if (range_length >= 0) {
int64_t bytesLeftInRange = range_length - buffer->size();
if (bytesLeftInRange < (int64_t)maxBytesToRead) {
maxBytesToRead = bytesLeftInRange;
if (bytesLeftInRange == 0) {
break;
}
}
}
// The DataSource is responsible for informing us of error (n < 0) or eof (n == 0)
// to help us break out of the loop.
ssize_t n = (*source)->readAt(
buffer->size(), buffer->data() + buffer->size(),
maxBytesToRead);
if (n < 0) {
return n;
}
if (n == 0) {
break;
}
buffer->setRange(0, buffer->size() + (size_t)n);
bytesRead += n;
}
*out = buffer;
if (actualUrl != NULL) {
*actualUrl = (*source)->getUri();
if (actualUrl->isEmpty()) {
*actualUrl = url;
}
}
return bytesRead;
}
sp<M3UParser> LiveSession::fetchPlaylist(
const char *url, uint8_t *curPlaylistHash, bool *unchanged) {
ALOGV("fetchPlaylist '%s'", url);
*unchanged = false;
sp<ABuffer> buffer;
String8 actualUrl;
ssize_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
if (err <= 0) {
return NULL;
}
// MD5 functionality is not available on the simulator, treat all
// playlists as changed.
#if defined(HAVE_ANDROID_OS)
uint8_t hash[16];
MD5_CTX m;
MD5_Init(&m);
MD5_Update(&m, buffer->data(), buffer->size());
MD5_Final(hash, &m);
if (curPlaylistHash != NULL && !memcmp(hash, curPlaylistHash, 16)) {
// playlist unchanged
*unchanged = true;
return NULL;
}
if (curPlaylistHash != NULL) {
memcpy(curPlaylistHash, hash, sizeof(hash));
}
#endif
sp<M3UParser> playlist =
new M3UParser(actualUrl.string(), buffer->data(), buffer->size());
if (playlist->initCheck() != OK) {
ALOGE("failed to parse .m3u8 playlist");
return NULL;
}
return playlist;
}
#if 0
static double uniformRand() {
return (double)rand() / RAND_MAX;
}
#endif
size_t LiveSession::getBandwidthIndex() {
if (mBandwidthItems.size() == 0) {
return 0;
}
#if 1
char value[PROPERTY_VALUE_MAX];
ssize_t index = -1;
if (property_get("media.httplive.bw-index", value, NULL)) {
char *end;
index = strtol(value, &end, 10);
CHECK(end > value && *end == '\0');
if (index >= 0 && (size_t)index >= mBandwidthItems.size()) {
index = mBandwidthItems.size() - 1;
}
}
if (index < 0) {
int32_t bandwidthBps;
if (mHTTPDataSource != NULL
&& mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
} else {
ALOGV("no bandwidth estimate.");
return 0; // Pick the lowest bandwidth stream by default.
}
char value[PROPERTY_VALUE_MAX];
if (property_get("media.httplive.max-bw", value, NULL)) {
char *end;
long maxBw = strtoul(value, &end, 10);
if (end > value && *end == '\0') {
if (maxBw > 0 && bandwidthBps > maxBw) {
ALOGV("bandwidth capped to %ld bps", maxBw);
bandwidthBps = maxBw;
}
}
}
// Pick the highest bandwidth stream below or equal to estimated bandwidth.
index = mBandwidthItems.size() - 1;
while (index > 0) {
// consider only 80% of the available bandwidth, but if we are switching up,
// be even more conservative (70%) to avoid overestimating and immediately
// switching back.
size_t adjustedBandwidthBps = bandwidthBps;
if (index > mCurBandwidthIndex) {
adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
} else {
adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
}
if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
break;
}
--index;
}
}
#elif 0
// Change bandwidth at random()
size_t index = uniformRand() * mBandwidthItems.size();
#elif 0
// There's a 50% chance to stay on the current bandwidth and
// a 50% chance to switch to the next higher bandwidth (wrapping around
// to lowest)
const size_t kMinIndex = 0;
static ssize_t mCurBandwidthIndex = -1;
size_t index;
if (mCurBandwidthIndex < 0) {
index = kMinIndex;
} else if (uniformRand() < 0.5) {
index = (size_t)mCurBandwidthIndex;
} else {
index = mCurBandwidthIndex + 1;
if (index == mBandwidthItems.size()) {
index = kMinIndex;
}
}
mCurBandwidthIndex = index;
#elif 0
// Pick the highest bandwidth stream below or equal to 1.2 Mbit/sec
size_t index = mBandwidthItems.size() - 1;
while (index > 0 && mBandwidthItems.itemAt(index).mBandwidth > 1200000) {
--index;
}
#elif 1
char value[PROPERTY_VALUE_MAX];
size_t index;
if (property_get("media.httplive.bw-index", value, NULL)) {
char *end;
index = strtoul(value, &end, 10);
CHECK(end > value && *end == '\0');
if (index >= mBandwidthItems.size()) {
index = mBandwidthItems.size() - 1;
}
} else {
index = 0;
}
#else
size_t index = mBandwidthItems.size() - 1; // Highest bandwidth stream
#endif
CHECK_GE(index, 0);
return index;
}
int64_t LiveSession::latestMediaSegmentStartTimeUs() {
sp<AMessage> audioMeta = mPacketSources.valueFor(STREAMTYPE_AUDIO)->getLatestDequeuedMeta();
int64_t minSegmentStartTimeUs = -1, videoSegmentStartTimeUs = -1;
if (audioMeta != NULL) {
audioMeta->findInt64("segmentStartTimeUs", &minSegmentStartTimeUs);
}
sp<AMessage> videoMeta = mPacketSources.valueFor(STREAMTYPE_VIDEO)->getLatestDequeuedMeta();
if (videoMeta != NULL
&& videoMeta->findInt64("segmentStartTimeUs", &videoSegmentStartTimeUs)) {
if (minSegmentStartTimeUs < 0 || videoSegmentStartTimeUs < minSegmentStartTimeUs) {
minSegmentStartTimeUs = videoSegmentStartTimeUs;
}
}
return minSegmentStartTimeUs;
}
status_t LiveSession::onSeek(const sp<AMessage> &msg) {
int64_t timeUs;
CHECK(msg->findInt64("timeUs", &timeUs));
if (!mReconfigurationInProgress) {
changeConfiguration(timeUs, mCurBandwidthIndex);
return OK;
} else {
return -EWOULDBLOCK;
}
}
status_t LiveSession::getDuration(int64_t *durationUs) const {
int64_t maxDurationUs = -1ll;
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
int64_t fetcherDurationUs = mFetcherInfos.valueAt(i).mDurationUs;
if (fetcherDurationUs > maxDurationUs) {
maxDurationUs = fetcherDurationUs;
}
}
*durationUs = maxDurationUs;
return OK;
}
bool LiveSession::isSeekable() const {
int64_t durationUs;
return getDuration(&durationUs) == OK && durationUs >= 0;
}
bool LiveSession::hasDynamicDuration() const {
return false;
}
size_t LiveSession::getTrackCount() const {
if (mPlaylist == NULL) {
return 0;
} else {
return mPlaylist->getTrackCount();
}
}
sp<AMessage> LiveSession::getTrackInfo(size_t trackIndex) const {
if (mPlaylist == NULL) {
return NULL;
} else {
return mPlaylist->getTrackInfo(trackIndex);
}
}
status_t LiveSession::selectTrack(size_t index, bool select) {
if (mPlaylist == NULL) {
return INVALID_OPERATION;
}
++mSubtitleGeneration;
status_t err = mPlaylist->selectTrack(index, select);
if (err == OK) {
sp<AMessage> msg = new AMessage(kWhatChangeConfiguration, id());
msg->setInt32("bandwidthIndex", mCurBandwidthIndex);
msg->setInt32("pickTrack", select);
msg->post();
}
return err;
}
ssize_t LiveSession::getSelectedTrack(media_track_type type) const {
if (mPlaylist == NULL) {
return -1;
} else {
return mPlaylist->getSelectedTrack(type);
}
}
bool LiveSession::canSwitchUp() {
// Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
status_t err = OK;
for (size_t i = 0; i < mPacketSources.size(); ++i) {
sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
int64_t dur = source->getBufferedDurationUs(&err);
if (err == OK && dur > 10000000) {
return true;
}
}
return false;
}
void LiveSession::changeConfiguration(
int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
// Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
// (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
cancelBandwidthSwitch();
CHECK(!mReconfigurationInProgress);
mReconfigurationInProgress = true;
mCurBandwidthIndex = bandwidthIndex;
ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
timeUs, bandwidthIndex, pickTrack);
CHECK_LT(bandwidthIndex, mBandwidthItems.size());
const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);
uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher
AString URIs[kMaxStreams];
for (size_t i = 0; i < kMaxStreams; ++i) {
if (mPlaylist->getTypeURI(item.mPlaylistIndex, mStreams[i].mType, &URIs[i])) {
streamMask |= indexToType(i);
}
}
// Step 1, stop and discard fetchers that are no longer needed.
// Pause those that we'll reuse.
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
const AString &uri = mFetcherInfos.keyAt(i);
bool discardFetcher = true;
// If we're seeking all current fetchers are discarded.
if (timeUs < 0ll) {
// delay fetcher removal if not picking tracks
discardFetcher = pickTrack;
for (size_t j = 0; j < kMaxStreams; ++j) {
StreamType type = indexToType(j);
if ((streamMask & type) && uri == URIs[j]) {
resumeMask |= type;
streamMask &= ~type;
discardFetcher = false;
}
}
}
if (discardFetcher) {
mFetcherInfos.valueAt(i).mFetcher->stopAsync();
} else {
mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
}
}
sp<AMessage> msg;
if (timeUs < 0ll) {
// skip onChangeConfiguration2 (decoder destruction) if not seeking.
msg = new AMessage(kWhatChangeConfiguration3, id());
} else {
msg = new AMessage(kWhatChangeConfiguration2, id());
}
msg->setInt32("streamMask", streamMask);
msg->setInt32("resumeMask", resumeMask);
msg->setInt32("pickTrack", pickTrack);
msg->setInt64("timeUs", timeUs);
for (size_t i = 0; i < kMaxStreams; ++i) {
if ((streamMask | resumeMask) & indexToType(i)) {
msg->setString(mStreams[i].uriKey().c_str(), URIs[i].c_str());
}
}
// Every time a fetcher acknowledges the stopAsync or pauseAsync request
// we'll decrement mContinuationCounter, once it reaches zero, i.e. all
// fetchers have completed their asynchronous operation, we'll post
// mContinuation, which then is handled below in onChangeConfiguration2.
mContinuationCounter = mFetcherInfos.size();
mContinuation = msg;
if (mContinuationCounter == 0) {
msg->post();
if (mSeekReplyID != 0) {
CHECK(mSeekReply != NULL);
mSeekReply->setInt32("err", OK);
mSeekReply->postReply(mSeekReplyID);
mSeekReplyID = 0;
mSeekReply.clear();
}
}
}
void LiveSession::onChangeConfiguration(const sp<AMessage> &msg) {
if (!mReconfigurationInProgress) {
int32_t pickTrack = 0, bandwidthIndex = mCurBandwidthIndex;
msg->findInt32("pickTrack", &pickTrack);
msg->findInt32("bandwidthIndex", &bandwidthIndex);
changeConfiguration(-1ll /* timeUs */, bandwidthIndex, pickTrack);
} else {
msg->post(1000000ll); // retry in 1 sec
}
}
void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
mContinuation.clear();
// All fetchers are either suspended or have been removed now.
uint32_t streamMask, resumeMask;
CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
// currently onChangeConfiguration2 is only called for seeking;
// remove the following CHECK if using it else where.
CHECK_EQ(resumeMask, 0);
streamMask |= resumeMask;
AString URIs[kMaxStreams];
for (size_t i = 0; i < kMaxStreams; ++i) {
if (streamMask & indexToType(i)) {
const AString &uriKey = mStreams[i].uriKey();
CHECK(msg->findString(uriKey.c_str(), &URIs[i]));
ALOGV("%s = '%s'", uriKey.c_str(), URIs[i].c_str());
}
}
// Determine which decoders to shutdown on the player side,
// a decoder has to be shutdown if either
// 1) its streamtype was active before but now longer isn't.
// or
// 2) its streamtype was already active and still is but the URI
// has changed.
uint32_t changedMask = 0;
for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
if (((mStreamMask & streamMask & indexToType(i))
&& !(URIs[i] == mStreams[i].mUri))
|| (mStreamMask & ~streamMask & indexToType(i))) {
changedMask |= indexToType(i);
}
}
if (changedMask == 0) {
// If nothing changed as far as the audio/video decoders
// are concerned we can proceed.
onChangeConfiguration3(msg);
return;
}
// Something changed, inform the player which will shutdown the
// corresponding decoders and will post the reply once that's done.
// Handling the reply will continue executing below in
// onChangeConfiguration3.
sp<AMessage> notify = mNotify->dup();
notify->setInt32("what", kWhatStreamsChanged);
notify->setInt32("changedMask", changedMask);
msg->setWhat(kWhatChangeConfiguration3);
msg->setTarget(id());
notify->setMessage("reply", msg);
notify->post();
}
void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
mContinuation.clear();
// All remaining fetchers are still suspended, the player has shutdown
// any decoders that needed it.
uint32_t streamMask, resumeMask;
CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));
int64_t timeUs;
int32_t pickTrack;
bool switching = false;
CHECK(msg->findInt64("timeUs", &timeUs));
CHECK(msg->findInt32("pickTrack", &pickTrack));
if (timeUs < 0ll) {
if (!pickTrack) {
switching = true;
}
mRealTimeBaseUs = ALooper::GetNowUs() - mLastDequeuedTimeUs;
} else {
mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;
}
for (size_t i = 0; i < kMaxStreams; ++i) {
if (streamMask & indexToType(i)) {
if (switching) {
CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mNewUri));
} else {
CHECK(msg->findString(mStreams[i].uriKey().c_str(), &mStreams[i].mUri));
}
}
}
mNewStreamMask = streamMask | resumeMask;
if (switching) {
mSwapMask = mStreamMask & ~resumeMask;
}
// Of all existing fetchers:
// * Resume fetchers that are still needed and assign them original packet sources.
// * Mark otherwise unneeded fetchers for removal.
ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
const AString &uri = mFetcherInfos.keyAt(i);
sp<AnotherPacketSource> sources[kMaxStreams];
for (size_t j = 0; j < kMaxStreams; ++j) {
if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
sources[j] = mPacketSources.valueFor(indexToType(j));
if (j != kSubtitleIndex) {
ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j));
sp<AnotherPacketSource> discontinuityQueue;
discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
discontinuityQueue->queueDiscontinuity(
ATSParser::DISCONTINUITY_NONE,
NULL,
true);
}
}
}
FetcherInfo &info = mFetcherInfos.editValueAt(i);
if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
|| sources[kSubtitleIndex] != NULL) {
info.mFetcher->startAsync(
sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
} else {
info.mToBeRemoved = true;
}
}
// streamMask now only contains the types that need a new fetcher created.
if (streamMask != 0) {
ALOGV("creating new fetchers for mask 0x%08x", streamMask);
}
// Find out when the original fetchers have buffered up to and start the new fetchers
// at a later timestamp.
for (size_t i = 0; i < kMaxStreams; i++) {
if (!(indexToType(i) & streamMask)) {
continue;
}
AString uri;
uri = switching ? mStreams[i].mNewUri : mStreams[i].mUri;
sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
CHECK(fetcher != NULL);
int64_t startTimeUs = -1;
int64_t segmentStartTimeUs = -1ll;
int32_t discontinuitySeq = -1;
sp<AnotherPacketSource> sources[kMaxStreams];
if (i == kSubtitleIndex) {
segmentStartTimeUs = latestMediaSegmentStartTimeUs();
}
// TRICKY: looping from i as earlier streams are already removed from streamMask
for (size_t j = i; j < kMaxStreams; ++j) {
const AString &streamUri = switching ? mStreams[j].mNewUri : mStreams[j].mUri;
if ((streamMask & indexToType(j)) && uri == streamUri) {
sources[j] = mPacketSources.valueFor(indexToType(j));
if (timeUs >= 0) {
sources[j]->clear();
startTimeUs = timeUs;
sp<AnotherPacketSource> discontinuityQueue;
sp<AMessage> extra = new AMessage;
extra->setInt64("timeUs", timeUs);
discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
discontinuityQueue->queueDiscontinuity(
ATSParser::DISCONTINUITY_TIME, extra, true);
} else {
int32_t type;
sp<AMessage> meta;
if (pickTrack) {
// selecting
meta = sources[j]->getLatestDequeuedMeta();
} else {
// adapting
meta = sources[j]->getLatestEnqueuedMeta();
}
if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
int64_t tmpUs;
int64_t tmpSegmentUs;
CHECK(meta->findInt64("timeUs", &tmpUs));
CHECK(meta->findInt64("segmentStartTimeUs", &tmpSegmentUs));
if (startTimeUs < 0 || tmpSegmentUs < segmentStartTimeUs) {
startTimeUs = tmpUs;
segmentStartTimeUs = tmpSegmentUs;
} else if (tmpSegmentUs == segmentStartTimeUs && tmpUs < startTimeUs) {
startTimeUs = tmpUs;
}
int32_t seq;
CHECK(meta->findInt32("discontinuitySeq", &seq));
if (discontinuitySeq < 0 || seq < discontinuitySeq) {
discontinuitySeq = seq;
}
}
if (pickTrack) {
// selecting track, queue discontinuities before content
sources[j]->clear();
if (j == kSubtitleIndex) {
break;
}
sp<AnotherPacketSource> discontinuityQueue;
discontinuityQueue = mDiscontinuities.valueFor(indexToType(j));
discontinuityQueue->queueDiscontinuity(
ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
} else {
// adapting, queue discontinuities after resume
sources[j] = mPacketSources2.valueFor(indexToType(j));
sources[j]->clear();
uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
if (extraStreams & indexToType(j)) {
sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
}
}
}
streamMask &= ~indexToType(j);
}
}
fetcher->startAsync(
sources[kAudioIndex],
sources[kVideoIndex],
sources[kSubtitleIndex],
startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
segmentStartTimeUs,
discontinuitySeq,
switching);
}
// All fetchers have now been started, the configuration change
// has completed.
cancelCheckBandwidthEvent();
scheduleCheckBandwidthEvent();
ALOGV("XXX configuration change completed.");
mReconfigurationInProgress = false;
if (switching) {
mSwitchInProgress = true;
} else {
mStreamMask = mNewStreamMask;
}
if (mDisconnectReplyID != 0) {
finishDisconnect();
}
}
void LiveSession::onSwapped(const sp<AMessage> &msg) {
int32_t switchGeneration;
CHECK(msg->findInt32("switchGeneration", &switchGeneration));
if (switchGeneration != mSwitchGeneration) {
return;
}
int32_t stream;
CHECK(msg->findInt32("stream", &stream));
ssize_t idx = typeToIndex(stream);
CHECK(idx >= 0);
if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
}
mStreams[idx].mUri = mStreams[idx].mNewUri;
mStreams[idx].mNewUri.clear();
mSwapMask &= ~stream;
if (mSwapMask != 0) {
return;
}
// Check if new variant contains extra streams.
uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
while (extraStreams) {
StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
swapPacketSource(extraStream);
extraStreams &= ~extraStream;
idx = typeToIndex(extraStream);
CHECK(idx >= 0);
if (mStreams[idx].mNewUri.empty()) {
ALOGW("swapping extra stream type %d %s to empty stream",
extraStream, mStreams[idx].mUri.c_str());
}
mStreams[idx].mUri = mStreams[idx].mNewUri;
mStreams[idx].mNewUri.clear();
}
tryToFinishBandwidthSwitch();
}
void LiveSession::onCheckSwitchDown() {
if (mSwitchDownMonitor == NULL) {
return;
}
if (mSwitchInProgress || mReconfigurationInProgress) {
ALOGV("Switch/Reconfig in progress, defer switch down");
mSwitchDownMonitor->post(1000000ll);
return;
}
for (size_t i = 0; i < kMaxStreams; ++i) {
int32_t targetDuration;
sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i));
sp<AMessage> meta = packetSource->getLatestDequeuedMeta();
if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) {
int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs();
int64_t targetDurationUs = targetDuration * 1000000ll;
if (bufferedDurationUs < targetDurationUs / 3) {
(new AMessage(kWhatSwitchDown, id()))->post();
break;
}
}
}
mSwitchDownMonitor->post(1000000ll);
}
void LiveSession::onSwitchDown() {
if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) {
return;
}
ssize_t bandwidthIndex = getBandwidthIndex();
if (bandwidthIndex < mCurBandwidthIndex) {
changeConfiguration(-1, bandwidthIndex, false);
return;
}
}
// Mark switch done when:
// 1. all old buffers are swapped out
void LiveSession::tryToFinishBandwidthSwitch() {
if (!mSwitchInProgress) {
return;
}
bool needToRemoveFetchers = false;
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
if (mFetcherInfos.valueAt(i).mToBeRemoved) {
needToRemoveFetchers = true;
break;
}
}
if (!needToRemoveFetchers && mSwapMask == 0) {
ALOGI("mSwitchInProgress = false");
mStreamMask = mNewStreamMask;
mSwitchInProgress = false;
}
}
void LiveSession::scheduleCheckBandwidthEvent() {
sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
msg->setInt32("generation", mCheckBandwidthGeneration);
msg->post(10000000ll);
}
void LiveSession::cancelCheckBandwidthEvent() {
++mCheckBandwidthGeneration;
}
void LiveSession::cancelBandwidthSwitch() {
Mutex::Autolock lock(mSwapMutex);
mSwitchGeneration++;
mSwitchInProgress = false;
mSwapMask = 0;
for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
FetcherInfo& info = mFetcherInfos.editValueAt(i);
if (info.mToBeRemoved) {
info.mToBeRemoved = false;
}
}
for (size_t i = 0; i < kMaxStreams; ++i) {
if (!mStreams[i].mNewUri.empty()) {
ssize_t j = mFetcherInfos.indexOfKey(mStreams[i].mNewUri);
if (j < 0) {
mStreams[i].mNewUri.clear();
continue;
}
const FetcherInfo &info = mFetcherInfos.valueAt(j);
info.mFetcher->stopAsync();
mFetcherInfos.removeItemsAt(j);
mStreams[i].mNewUri.clear();
}
}
}
bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
if (mReconfigurationInProgress || mSwitchInProgress) {
return false;
}
if (mCurBandwidthIndex < 0) {
return true;
}
if (bandwidthIndex == (size_t)mCurBandwidthIndex) {
return false;
} else if (bandwidthIndex > (size_t)mCurBandwidthIndex) {
return canSwitchUp();
} else {
return true;
}
}
void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) {
size_t bandwidthIndex = getBandwidthIndex();
if (canSwitchBandwidthTo(bandwidthIndex)) {
changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
} else {
// Come back and check again 10 seconds later in case there is nothing to do now.
// If we DO change configuration, once that completes it'll schedule a new
// check bandwidth event with an incremented mCheckBandwidthGeneration.
msg->post(10000000ll);
}
}
void LiveSession::postPrepared(status_t err) {
CHECK(mInPreparationPhase);
sp<AMessage> notify = mNotify->dup();
if (err == OK || err == ERROR_END_OF_STREAM) {
notify->setInt32("what", kWhatPrepared);
} else {
notify->setInt32("what", kWhatPreparationFailed);
notify->setInt32("err", err);
}
notify->post();
mInPreparationPhase = false;
mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, id());
mSwitchDownMonitor->post();
}
} // namespace android