Skip to content
Snippets Groups Projects
Commit 4afd087b authored by Glauber Costa's avatar Glauber Costa Committed by Pekka Enberg
Browse files

mempool: shrink memory when no longer used.


This patch introduces the memory reclaimer thread, which I hope to use to
dispose of unused memory when pressure kicks in. "Pressure" right now is
defined to be when we have only 20 % of total memory available. But that can be
revisited.

The way it will work is that each memory user that is able to dispose of its
memory will register a shrinker, and the reclaimer will loop through them.
However, the current "loop through all" only "works" because we have only one
shrinker being registered. When other appears, we need better policies to drive
how much to take, and from whom.

Memory allocation will now wait if memory is not available, instead of
aborting.  The decision of aborting should belong to the reclaimer and no one
else.

We should never expect to have an unbounded and more importantly, all opaque,
number of shrinkers like Linux does. We have control of who they are and how
they behave, so I expect that we will be able to make a lot better decisions
in the long run.

Signed-off-by: default avatarGlauber Costa <glommer@cloudius-systems.com>
Signed-off-by: default avatarPekka Enberg <penberg@cloudius-systems.com>
parent 21d9c318
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,7 @@
#include <cassert>
#include <cstdint>
#include <new>
#include <thread>
#include <boost/utility.hpp>
#include <string.h>
#include "libc/libc.hh"
......@@ -36,6 +37,7 @@ TRACEPOINT(trace_memory_realloc, "in=%p, newlen=%d, out=%p", void *, size_t, voi
TRACEPOINT(trace_memory_page_alloc, "page=%p", void*);
TRACEPOINT(trace_memory_page_free, "page=%p", void*);
TRACEPOINT(trace_memory_huge_failure, "page ranges=%d", unsigned long);
TRACEPOINT(trace_memory_reclaim, "shrinker %s, target=%d, delta=%d", const char *, long, long);
bool smp_allocator = false;
......@@ -393,6 +395,15 @@ bi::set<page_range,
// and eventually hotplug in an hypothetical future
static std::atomic<size_t> total_memory(0);
static std::atomic<size_t> free_memory(0);
static size_t watermark_lo(0);
// At least two (x86) huge pages worth of size;
static size_t constexpr min_emergency_pool_size = 4 << 20;
__thread bool allow_emergency_alloc = false;
reclaimer reclaimer_thread
__attribute__((init_priority((int)init_prio::reclaimer)));
static void on_free(size_t mem)
{
......@@ -402,11 +413,15 @@ static void on_free(size_t mem)
static void on_alloc(size_t mem)
{
free_memory.fetch_sub(mem);
if (stats::free() < watermark_lo) {
reclaimer_thread.wake();
}
}
static void on_new_memory(size_t mem)
{
total_memory.fetch_add(mem);
watermark_lo = stats::total() * 10 / 100;
}
namespace stats {
......@@ -414,35 +429,236 @@ namespace stats {
size_t total() { return total_memory.load(std::memory_order_relaxed); }
}
void reclaimer::wake()
{
if (_thread) {
_blocked.wake_one();
}
}
pressure reclaimer::pressure_level()
{
assert(mutex_owned(&free_page_ranges_lock));
if (stats::free() < watermark_lo) {
return pressure::PRESSURE;
}
return pressure::NORMAL;
}
ssize_t reclaimer::bytes_until_normal(pressure curr)
{
assert(mutex_owned(&free_page_ranges_lock));
if (curr == pressure::PRESSURE) {
return watermark_lo - stats::free();
} else {
return 0;
}
}
void oom()
{
abort("Out of memory: could not reclaim any further");
}
void reclaimer::wait_for_minimum_memory()
{
if (allow_emergency_alloc) {
return;
}
if (stats::free() < min_emergency_pool_size) {
// Nothing could possibly give us memory back, might as well use up
// everything in the hopes that we only need a tiny bit more..
if (!_active_shrinkers) {
return;
}
wait_for_memory(min_emergency_pool_size - stats::free());
}
}
// Allocating memory here can lead to a stack overflow. That is why we need
// to use boost::intrusive for the waiting lists.
//
// Also, if the reclaimer itself reaches a point in which it needs to wait for
// memory, there is very little hope and we would might as well give up.
void reclaimer::wait_for_memory(size_t mem)
{
if (!_thread) {
auto would_block = _oom_blocked.trywait(mem);
assert(!would_block); // Too early for this, and would go negative
return;
}
if (sched::thread::current() == _thread) {
oom();
}
DROP_LOCK(free_page_ranges_lock) {
reclaimer_thread.wake();
_oom_blocked.wait(mem);
}
}
static void* malloc_large(size_t size)
{
size = (size + page_size - 1) & ~(page_size - 1);
size += page_size;
WITH_LOCK(free_page_ranges_lock) {
for (auto i = free_page_ranges.begin(); i != free_page_ranges.end(); ++i) {
auto header = &*i;
page_range* ret_header;
if (header->size >= size) {
if (header->size == size) {
free_page_ranges.erase(i);
ret_header = header;
} else {
void *v = header;
header->size -= size;
ret_header = new (v + header->size) page_range(size);
while (true) {
WITH_LOCK(free_page_ranges_lock) {
reclaimer_thread.wait_for_minimum_memory();
for (auto i = free_page_ranges.begin(); i != free_page_ranges.end(); ++i) {
auto header = &*i;
page_range* ret_header;
if (header->size >= size) {
if (header->size == size) {
free_page_ranges.erase(i);
ret_header = header;
} else {
void *v = header;
header->size -= size;
ret_header = new (v + header->size) page_range(size);
}
on_alloc(size);
void* obj = ret_header;
obj += page_size;
trace_memory_malloc_large(obj, size);
return obj;
}
on_alloc(size);
void* obj = ret_header;
obj += page_size;
trace_memory_malloc_large(obj, size);
return obj;
}
reclaimer_thread.wait_for_memory(size);
}
}
}
shrinker::shrinker(std::string name)
: _name(name)
{
WITH_LOCK(reclaimer_thread._shrinkers_mutex) {
reclaimer_thread._shrinkers.push_back(this);
}
}
// We don't know from the outside of semaphore how many units we are waiting
// for. But when we free memory, that is done by an arbitrary quantity that
// depends on how much memory we were able to free, not on how much we were
// waiting for.
//
// For instance, if we have two waiters waiting for 2Mb each, and we've just
// freed 8Mb, the semaphore would now be 4Mb positive. That means that a next
// waiter will just go through smoothly, instead of waiting as it should.
//
// This specialization of the "post" method guarantees that this never happen.
// Note that there are two possible cases:
//
// 1) We free at least as much memory as we need. In that case, we will wake up
// everybody, and whatever would be left in the semaphore will just be capped.
// All waiters are gone, and new waiters will correctly stall on wait().
//
// 2) We free less than the total waited for. In that case, we will wake up as
// much waiters as we can, and the remaining memory still waited for is kept intact
// in the queue. Because _val is also 0 in this case, new waiters will correctly
// stall on wait().
//
// An alternative to that would be to initialize the semaphore with the amount
// of free memory and update it every time we alloc/free. But that would be too
// expensive. But more importantly, it would put us to sleep in random places.
void reclaimer_waiters::post(unsigned units)
{
WITH_LOCK(_mtx) {
post_unlocked(units);
_val = 0;
}
}
reclaimer::reclaimer()
: _oom_blocked(), _thread(NULL)
{
// Set the semaphore the the current amount of free memory. We don't do it
// in the constructor list so we can hold the lock and guarantee free
// memory is not wildly changing.
WITH_LOCK(free_page_ranges_lock) {
_oom_blocked.post(stats::free());
}
// This cannot be a sched::thread because it may call into JNI functions,
// if the JVM balloon is registered as a shrinker. It expects the full
// pthread API to be functional, and for sched::threads it is not.
// std::thread is implemented ontop of pthreads, so it is fine
std::thread tmp([&] {
_thread = sched::thread::current();
allow_emergency_alloc = true;
do {
_do_reclaim();
} while (true);
});
tmp.detach();
}
bool reclaimer::_can_shrink()
{
auto p = pressure_level();
// The active fields are protected by the _shrinkers_mutex lock, but there
// is no need to take it. Worst that can happen is that we either defer
// this pass, or take an extra pass without need for it.
if (p == pressure::PRESSURE) {
return _active_shrinkers != 0;
}
return false;
}
void reclaimer::_do_reclaim()
{
ssize_t target;
size_t memory_freed = 0;
WITH_LOCK(free_page_ranges_lock) {
_blocked.wait_until(free_page_ranges_lock,
// We should only try to shrink if there are available shrinkers.
// But if we have waiters, we need to wake up the reclaimer anyway.
// Of course, if there are no shrinkers we won't free anything. But
// we need to wake up to be able to at least notice that and OOM.
[=] { return _oom_blocked.has_waiters() ||
(_can_shrink() && (pressure_level() != pressure::NORMAL)); }
);
target = bytes_until_normal();
}
// FIXME: This simple loop works only because we have a single shrinker
// When we have more, we need to probe them and decide how much to take from
// each of them.
WITH_LOCK(_shrinkers_mutex) {
// We execute this outside the free_page_ranges lock, so the threads
// freeing memory (or allocating, for that matter) will have the chance
// to manipulate the free_page_ranges structure. Executing the
// shrinkers with the lock held would result in a deadlock.
for (auto s : _shrinkers) {
if (s->should_shrink(target)) {
size_t freed = s->request_memory(target);
trace_memory_reclaim(s->name().c_str(), target, freed);
memory_freed += freed;
}
}
}
WITH_LOCK(free_page_ranges_lock) {
if (target > 0) {
// Because we are not disposing of our waiters, we will be forced
// to enter this method again. Even if no waiters can be serviced,
// if we could free at least some memory at this stage, there is
// still hope. So we won't abort. But if we have waiters, and
// we're already using up all our reserves, then it is time to give
// up.
if (_oom_blocked.has_waiters() && !memory_freed) {
oom();
}
// Wake up all waiters that are waiting for an ammount of memory that is
// smaller than the one we've just freed.
_oom_blocked.post(memory_freed);
}
}
debug(fmt("malloc_large(): out of memory: can't find %d bytes. aborting.\n")
% size);
abort();
}
static page_range* merge(page_range* a, page_range* b)
......@@ -513,11 +729,18 @@ PERCPU(page_buffer, percpu_page_buffer);
static void refill_page_buffer()
{
WITH_LOCK(free_page_ranges_lock) {
reclaimer_thread.wait_for_minimum_memory();
if (free_page_ranges.empty()) {
// That is almost a guaranteed oom, but we can still have some hope
// if we the current allocation is a small one. Another advantage
// of waiting here instead of oom'ing directly is that we can have
// less points in the code where we can oom, and be more
// predictable.
reclaimer_thread.wait_for_memory(mmu::page_size);
}
auto total_size = 0;
WITH_LOCK(preempt_lock) {
if (free_page_ranges.empty()) {
debug("alloc_page(): out of memory\n");
abort();
}
auto& pbuf = *percpu_page_buffer;
auto limit = (pbuf.max + 1) / 2;
......@@ -529,7 +752,7 @@ static void refill_page_buffer()
auto p = &*it;
auto size = std::min(p->size, (limit - pbuf.nr) * page_size);
p->size -= size;
on_alloc(size);
total_size += size;
void* pages = static_cast<void*>(p) + p->size;
if (!p->size) {
free_page_ranges.erase(*p);
......@@ -541,6 +764,10 @@ static void refill_page_buffer()
}
}
}
// That will wake up the reclaimer, we can't do that while holding the preempt_lock
// condvar's wake() will take a mutex that may sleep, that will require preemption
// to be enabled.
on_alloc(total_size);
}
}
......@@ -696,6 +923,11 @@ void* alloc_huge_page(size_t N)
// pages allocated. However, this would be inefficient, and since we
// only use alloc_huge_page in one place, maybe not worth it.
}
// Definitely a sign we are somewhat short on memory. It doesn't *mean* we
// are, because that might be just fragmentation. But we wake up the reclaimer
// just to be sure, and if this is not real pressure, it will just go back to
// sleep
reclaimer_thread.wake();
trace_memory_huge_failure(free_page_ranges.size());
return nullptr;
}
......
......@@ -9,12 +9,16 @@
#define MEMPOOL_HH
#include <cstdint>
#include <functional>
#include <list>
#include <boost/intrusive/set.hpp>
#include <boost/intrusive/list.hpp>
#include <osv/mutex.h>
#include <arch.hh>
#include <osv/pagealloc.hh>
#include <osv/percpu.hh>
#include <osv/condvar.h>
#include <osv/semaphore.hh>
namespace memory {
......@@ -105,6 +109,65 @@ void enable_debug_allocator();
extern bool tracker_enabled;
enum class pressure { RELAXED, NORMAL, PRESSURE, EMERGENCY };
class shrinker {
public:
explicit shrinker(std::string name);
virtual ~shrinker() {}
virtual size_t request_memory(size_t n) = 0;
virtual size_t release_memory(size_t n) = 0;
std::string name() { return _name; };
bool should_shrink(ssize_t target) { return _enabled && (target > 0); }
private:
std::string _name;
int _enabled = 1;
};
class reclaimer_waiters: public semaphore {
public:
explicit reclaimer_waiters() : semaphore(0) { }
virtual ~reclaimer_waiters() {}
virtual void post(unsigned units = 1);
bool has_waiters() { WITH_LOCK(_mtx) { return !_waiters.empty(); } }
};
class reclaimer {
public:
reclaimer ();
void wake();
void wait_for_memory(size_t mem);
void wait_for_minimum_memory();
friend void start_reclaimer();
friend class shrinker;
private:
void _do_reclaim();
// We could just check if the semaphore's wait_list is empty. But since we
// don't control the internals of the primitives we use for the
// implementation of semaphore, we are concerned that unlocked access may
// mean mayhem. Locked access, OTOH, will possibly deadlock us if someone allocates
// memory locked, and then we try to verify if we have waiters and need to hold the
// same lock
//std::atomic<int> _waiters_count = { 0 };
//bool _has_waiters() { return _waiters_count.load() != 0; }
reclaimer_waiters _oom_blocked; // Callers are blocked due to lack of memory
condvar _blocked; // The reclaimer itself is blocked waiting for pressure condition
sched::thread *_thread;
std::vector<shrinker *> _shrinkers;
mutex _shrinkers_mutex;
unsigned int _active_shrinkers = 0;
bool _can_shrink();
pressure pressure_level();
ssize_t bytes_until_normal(pressure curr);
ssize_t bytes_until_normal() { return bytes_until_normal(pressure_level()); }
};
namespace stats {
size_t free();
size_t total();
......
......@@ -17,10 +17,12 @@ enum class init_prio : int {
pthread,
notifiers,
acpi,
apic,
vma_list,
reclaimer,
sched,
clock,
hpet,
vma_list,
tracepoint_base,
malloc_pools,
idt,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment