blob: 2f030f3e026575243c66c35e079edfc81523969d [file] [log] [blame]
#region Copyright notice and license
// Copyright 2017 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Threading;
using System.Collections.Generic;
using Grpc.Core.Utils;
namespace Grpc.Core.Internal
{
/// <summary>
/// Pool of objects that combines a shared pool and a thread local pool.
/// </summary>
internal class DefaultObjectPool<T> : IObjectPool<T>
where T : class, IDisposable
{
readonly object myLock = new object();
readonly Func<T> itemFactory;
// Queue shared between threads, access needs to be synchronized.
readonly Queue<T> sharedQueue;
readonly int sharedCapacity;
readonly ThreadLocal<ThreadLocalData> threadLocalData;
readonly int threadLocalCapacity;
readonly int rentLimit;
bool disposed;
/// <summary>
/// Initializes a new instance of <c>DefaultObjectPool</c> with given shared capacity and thread local capacity.
/// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately
/// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected
/// after the thread that owns them has finished).
/// On average, the shared pool will only be accessed approx. once for every <c>threadLocalCapacity / 2</c> rent or lease
/// operations.
/// </summary>
public DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity)
{
GrpcPreconditions.CheckArgument(sharedCapacity >= 0);
GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0);
this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory));
this.sharedQueue = new Queue<T>(sharedCapacity);
this.sharedCapacity = sharedCapacity;
this.threadLocalData = new ThreadLocal<ThreadLocalData>(() => new ThreadLocalData(threadLocalCapacity), false);
this.threadLocalCapacity = threadLocalCapacity;
this.rentLimit = threadLocalCapacity != 1 ? threadLocalCapacity / 2 : 1;
}
/// <summary>
/// Leases an item from the pool or creates a new instance if the pool is empty.
/// Attempts to retrieve the item from the thread local pool first.
/// If the thread local pool is empty, the item is taken from the shared pool
/// along with more items that are moved to the thread local pool to avoid
/// prevent acquiring the lock for shared pool too often.
/// The methods should not be called after the pool is disposed, but it won't
/// results in an error to do so (after depleting the items potentially left
/// in the thread local pool, it will continue returning new objects created by the factory).
/// </summary>
public T Lease()
{
var localData = threadLocalData.Value;
if (localData.Queue.Count > 0)
{
return localData.Queue.Dequeue();
}
if (localData.CreateBudget > 0)
{
localData.CreateBudget --;
return itemFactory();
}
int itemsMoved = 0;
T leasedItem = null;
lock(myLock)
{
if (sharedQueue.Count > 0)
{
leasedItem = sharedQueue.Dequeue();
}
while (sharedQueue.Count > 0 && itemsMoved < rentLimit)
{
localData.Queue.Enqueue(sharedQueue.Dequeue());
itemsMoved ++;
}
}
// If the shared pool didn't contain all rentLimit items,
// next time we try to lease we will just create those
// instead of trying to grab them from the shared queue.
// This is to guarantee we won't be accessing the shared queue too often.
localData.CreateBudget = rentLimit - itemsMoved;
return leasedItem ?? itemFactory();
}
/// <summary>
/// Returns an item to the pool.
/// Attempts to add the item to the thread local pool first.
/// If the thread local pool is full, item is added to a shared pool,
/// along with half of the items for the thread local pool, which
/// should prevent acquiring the lock for shared pool too often.
/// If called after the pool is disposed, we make best effort not to
/// add anything to the thread local pool and we guarantee not to add
/// anything to the shared pool (items will be disposed instead).
/// </summary>
public void Return(T item)
{
GrpcPreconditions.CheckNotNull(item);
var localData = threadLocalData.Value;
if (localData.Queue.Count < threadLocalCapacity && !disposed)
{
localData.Queue.Enqueue(item);
return;
}
if (localData.DisposeBudget > 0)
{
localData.DisposeBudget --;
item.Dispose();
return;
}
int itemsReturned = 0;
int returnLimit = rentLimit + 1;
lock (myLock)
{
if (sharedQueue.Count < sharedCapacity && !disposed)
{
sharedQueue.Enqueue(item);
itemsReturned ++;
}
while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed)
{
sharedQueue.Enqueue(localData.Queue.Dequeue());
itemsReturned ++;
}
}
// If the shared pool could not accomodate all returnLimit items,
// next time we try to return we will just dispose the item
// instead of trying to return them to the shared queue.
// This is to guarantee we won't be accessing the shared queue too often.
localData.DisposeBudget = returnLimit - itemsReturned;
if (itemsReturned == 0)
{
localData.DisposeBudget --;
item.Dispose();
}
}
public void Dispose()
{
lock (myLock)
{
if (!disposed)
{
disposed = true;
while (sharedQueue.Count > 0)
{
sharedQueue.Dequeue().Dispose();
}
}
}
}
class ThreadLocalData
{
public ThreadLocalData(int capacity)
{
this.Queue = new Queue<T>(capacity);
}
public Queue<T> Queue { get; }
public int CreateBudget { get; set; }
public int DisposeBudget { get; set; }
}
}
}