Об одном методе распределения памяти

в 15:37, , рубрики: lock-free, memory management, метки: ,

image
Не секрет, что иногда выделение памяти требует отдельных решений. Например — когда память выделяется и освобождается стремительным домкратом потоком, в параллельных задачах.

В результате стандартный консервативный аллокатор выстраивает все запросы в очередь на pthread_mutex / critical section. И наш многоядерный процессор медленно и печально едет на первой передаче.

И что с этим делать? Познакомимся поближе с деталями реализации метода Scalable Lock-Free Dynamic Memory Allocation. Maged M. Michael. IBM Thomas J. Watson Research Center.

Самый простой код что я сумел найти — написан под LGPL камрадами Scott Schneider и Christos Antonopoulos. Его и рассмотрим.

Начнем издалека

image
Итак — как нам избавиться от ненужных локов?

Ответ лежит на поверхности — надо выделять память в lock-free списках. Хорошая идея — но как строятся такие списки?

На помощь нам спешат атомарные операции. Те самые, которые InterlockedCompareExchange. Но постойте — максимум на что мы можем рассчитывать это long long, он же __int64. И что делать? А вот что — определим свой собственный указатель с тегом.

Ограничив размер адреса в 46 бит, и мы можем в 64бит целое спрятать нужные нам дополнения, а они — понадобятся далее.

#pragma pack(1)
typedef struct {
    volatile unsigned __int64 top:46, ocount:18;
} top_aba_t;

Кстати, с учетом выравнивания по 8 / 16 байт — мы можем получить не 2 в 46 степени, а несколько больше. Стандартный прием — выданный адрес не может быть нечетным и более того — должен быть выровнен для плавающей точки.

И еще момент — код становится очень длинным. То есть стандартная портянка

desc->Next = queue_head;
queue_head = desc;

превращается вот в такие макароны

    descriptor_queue old_queue, new_queue;
    do {
        old_queue = queue_head;
        desc->Next = (descriptor*)old_queue.DescAvail;
        new_queue.DescAvail = (unsigned __int64)desc;
        new_queue.tag = old_queue.tag + 1;
    } while (!compare_and_swap64k(queue_head, old_queue, new_queue));

что сильно удлиняет код и делает его менее читабельным. Поэтому очевидные вещи убраны под спойлеры.

Lock-free FIFO queue

image
Раз у нас теперь есть собственный указатель, можем заняться списком.

// Pseudostructure for lock-free list elements.
// The only requirement is that the 5th-8th byte of
// each element should be available to be used as
// the pointer for the implementation of a singly-linked
// list. 
struct queue_elem_t {
    char                *_dummy;
    volatile queue_elem_t    *next;
};

И в нашем случае — не забыть про выравнивание, например так

typedef struct {
    unsigned __int64  _pad0[8];
    top_aba_t       both;
    unsigned __int64  _pad1[8];
} lf_fifo_queue_t;
Оборачиваем работу с атомарными функциями

image
Теперь определим парочку абстракций, чтобы наш код мог быть переносим (для Win32 например это реализуется вот так):

Обертка над атомами для Win32

#define fetch_and_store(address, value) InterlockedExchange((PLONG)(address), (value))
#define atmc_add(address, value) InterlockedExchangeAdd((PLONG)(address), (value))
#define compare_and_swap32(address, old_value, new_value) 
     (InterlockedCompareExchange(
          (PLONG)(address), (new_value), (old_value))
       == (old_value))
#define compare_and_swap64(address, old_value, new_value) 
    (InterlockedCompareExchange64(
          (PLONGLONG)(address), (__int64)(new_value), (__int64)(old_value)) 
     == (__int64)(old_value))
#define compare_and_swap_ptr(address, old_value, new_value) 
    (InterlockedCompareExchangePointer((address),  
         (new_value), (old_value)) 
      == (old_value))

и добавим еще один метод, просто чтобы не отвлекаться на кастинг параметров к __int664 и передаче их по указателям.

#define compare_and_swap64k(a,b,c) 
     compare_and_swap64((volatile unsigned __int64*)&(a), 
           *((unsigned __int64*)&(b)), 
           *((unsigned __int64*)&(c)))

И вот мы уже готовы реализовать базовые функции (добавить и удалить).

Определяем базовые функции работы со списком

image

static inline int lf_fifo_enqueue(lf_fifo_queue_t *queue, void *element) {
    top_aba_t old_top;
    top_aba_t new_top;
    
    new_top.ocount = 0;
    for(;;) {
        old_top.ocount = queue->both.ocount;
        old_top.top = queue->both.top;

        ((queue_elem_t *)element)->next = (queue_elem_t *)old_top.top;
        new_top.top = (unsigned __int64)element;
        new_top.ocount += 1;
        if (compare_and_swap64k(queue->both, old_top, new_top)) 
            return 0;
    }
}

На что тут надо обратить внимание — обычная операция по добавлению обернута циклом, выход из которого — мы успешно записали новое значение поверх старого, и при этом старое кто-то еще не поменял. Ну а если поменял — то повторяем все заново. И еще момент — в наш ocount записываем номер попытки, с которой нам это удалось. Пустячок, а каждая попытка выдает уникальное 64бит целое.

Именно на таком простом шаманстве и строятся lock-free структуры данных.

Аналогичным образом реализуется снятие с вершины нашего FIFO списка:

Аналогично lf_fifo_dequeue

static inline void *lf_fifo_dequeue(lf_fifo_queue_t *queue) {
    top_aba_t head;
    top_aba_t next;

    next.ocount = 0;
    for(;;) {
        head.top = queue->both.top;
        head.ocount = queue->both.ocount;
        if (head.top == 0) return NULL;
        next.top = (unsigned __int64)(((struct queue_elem_t *)head.top)->next);
        next.ocount += 1;
        if (compare_and_swap64k(queue->both, head, next)) 
            return ((void *)head.top);
    }
}

Тут мы видим ровно то же самое — в цикле пробуем снять, если есть что и мы снимаем так что старое значение все еще правильное — то отлично, а нет — пробуем еще.

И конечно же инициализация такого элемента списка банальна — вот она:

lf_fifo_queue_init

static inline void lf_fifo_queue_init(lf_fifo_queue_t *queue) {
    queue->both.top = 0;
    queue->both.ocount = 0;
}

Собственно идея аллокатора

image
Перейдем теперь непосредственно к аллокатору. Аллокатор должен быть быстрым — поэтому разделим память на классы. Большой (напрямую берем у системы), и маленький, побитый на много-много маленьких подклассов размером от 8 байт до 2 килобайт.

Такой ход конем позволит нам решить две задачи. Первая — маленькие кусочки будут выделяться супербыстро, и за счет того что они разделены в таблицу — не будут лежать в одном огромном списке. А большие куски памяти не будут мешаться под ногами и приводить к проблемам с фрагментацией. Более того — поскольку в каждом подклассе у нас все блоки одного размера, работа с ними очень упрощается.

И еще момент! Выделение маленьких кусочков привяжем к нитям (а освобождение — нет). Таким образом мы убьем двух зайцев сразу — упростится контроль за выделением, и острова памяти выделенные для треда локально — не будут перемешиваться лишний раз.

Опишем хип и дескриптор блока

image
Помолясь богине Техно, приступим.

Определим парочку констант

Балнальщина а-ля GRANULARITY и PAGE_SIZE

struct Descriptor;
typedef struct Descriptor descriptor;
struct Procheap;
typedef struct Procheap procheap;

#define TYPE_SIZE   sizeof(void*)
#define PTR_SIZE    sizeof(void*)
#define HEADER_SIZE (TYPE_SIZE + PTR_SIZE)

#define LARGE       0
#define SMALL       1

#define PAGESIZE    4096
#define SBSIZE      (16 * PAGESIZE)
#define DESCSBSIZE  (1024 * sizeof(descriptor))

#define ACTIVE      0
#define FULL        1
#define PARTIAL     2
#define EMPTY       3

#define MAXCREDITS  64 // 2^(bits for credits in active)
#define GRANULARITY 8

И займемся креативом, определим нужные типы данных. Итак, у нас будет много хипов — каждый в своем классе, да привязанных к текущему треду. В суперблоке будет два указателя — активный список и перераспределенный.

Вы спросите — а что это такое и почему так сложно?

Первое, оно же главное — выделять список поэлементно банально невыгодно. То есть — классический однонаправленный список при миллионах элементов превращается в ад и израиль. На каждый элемент списка надо выделить 8 / 16 байт, чтобы хранить два несчастных указателя на сами данные и на следующий элемент списка.

Выгодно ли это? Очевидно что — нет! И как следует поступить? Правильно, а давайте мы сгруппируем наши описатели списка в группы (stripe) по 500 (к примеру) элементов. И получим список не элементов, но — групп. Экономично, практично, работать с элементами можно как и в классическом варианте. Весь вопрос только в нестандартном выделении памяти.

Более того, все Next в пределах блока просто указывают на соседний элемент массива, и мы можем их явно проинициализировать сразу при выделении страйпа. По факту, последний Next в страйпе будет указывать на следующий страйп, но с точки зрения работы со списками — ничего не меняется.

Легко догадаться, что списки memory block descriptors строятся именно так.

И еще — Active и есть наш активный страйп с заранее выделенными кусочками памяти по эн байт, в котором и ведем выдачу памяти по принципу FIFO. Если есть место в страйпе — берем оттуда. Если нет — ищем уже в классическом списке Partial. Если нет ни там ни там — отлично, выделяем новый страйп.

Второе, такая «полосатость» ведет к некоторому перерасходу памяти, т. к. мы можем выделить страйп под 8 байтные кусочки памяти в виде массива в 64К, а запрошено будет пара кусочков всего. И еще, активные страйпы для каждой нити свои, что еще больше усугубит проблему.

Однако, если у нас действительно идет активная работа с памятью, это даст хороший выигрыш по скорости.

Вот сам хип

struct Procheap {
    volatile active     Active;     // initially NULL
    volatile descriptor*    Partial;    // initially NULL
    sizeclass*      sc;     // pointer to parent sizeclass
};

А вот что ему надо:

Что это за Active / Partial такие

typedef struct {
    unsigned __int64  ptr:58, credits:6;
} active;

/* We need to squeeze this in 64-bits, but conceptually
 * this is the case:
 *  descriptor* DescAvail;
 */
typedef struct {
    unsigned __int64  DescAvail:46, tag:18;
} descriptor_queue;

/* Superblock descriptor structure. We bumped avail and count 
 * to 24 bits to support larger superblock sizes. */
typedef struct {
    unsigned __int64  avail:24,count:24, state:2, tag:14;
} anchor;

typedef struct {
    lf_fifo_queue_t    Partial;    // initially empty
    unsigned int        sz;            // block size
    unsigned int        sbsize;    // superblock size
} sizeclass;

и собственно дескриптор — описатель нашего фрагмента.

struct Descriptor {
    struct queue_elem_t lf_fifo_queue_padding;
    volatile anchor    Anchor;     // anchor to superblock exact place
    descriptor*          Next;          // next element in list
    void*                     sb;              // pointer to superblock
    procheap*           heap;         // pointer to owner procheap
    unsigned int       sz;               // block size
    unsigned int       maxcount; // superblock size / sz
};

Как видно — ничего сверхъестественного. Описание хипа и суперблока нам нужны в дескрипторе, т. к. выделять память можно в одном треде, а освобождать — в другом.

Приступим к реализации

image
Сначала нам надо определить наши локальные переменные — хипы, размеры и прочее. Вот примерно так:

Как же без глобальных переменных

/* This is large and annoying, but it saves us from needing an 
 * initialization routine. */
sizeclass sizeclasses[2048 / GRANULARITY] =
                {
                {LF_FIFO_QUEUE_STATIC_INIT, 8, SBSIZE}, {LF_FIFO_QUEUE_STATIC_INIT, 16, SBSIZE},
                ...
                {LF_FIFO_QUEUE_STATIC_INIT, 2024, SBSIZE}, {LF_FIFO_QUEUE_STATIC_INIT, 2032, SBSIZE},
                {LF_FIFO_QUEUE_STATIC_INIT, 2040, SBSIZE}, {LF_FIFO_QUEUE_STATIC_INIT, 2048, SBSIZE},
                };

#define LF_FIFO_QUEUE_STATIC_INIT   {{0, 0, 0, 0, 0, 0, 0, 0}, {0, 0}, {0, 0, 0, 0, 0, 0, 0, 0}}

__declspec(thread) procheap* heaps[2048 / GRANULARITY]; // =  { };

static volatile descriptor_queue queue_head;

Тут мы видим все то, что рассматривали в предыдущем разделе:

__declspec(thread) procheap* heaps[2048 / GRANULARITY]; // =  { };
static volatile descriptor_queue queue_head;

Это и есть наши хипы per thread. И — список всех дескрипторов per-process.

Malloc — как это работает

image
Рассмотрим собственно процесс выделения памяти подробнее. Легко увидеть несколько особенностей, а именно:

  1. Если запрошенный размер не имеет нашего хипа для малых размеров — просто запрашиваем у системы
  2. Выделяем память по очереди — сначала из активного списка, затем из списка фрагментов, и наконец — пробуем запросить у системы новый кусок.
  3. У нас всегда есть память в системе, а если нет — надо только подождать в цикле

Вот такое решение.

void* my_malloc(size_t sz) { 
    procheap *heap;
    void* addr;

    // Use sz and thread id to find heap.
    heap = find_heap(sz);

    if (!heap) 
        // Large block
        return alloc_large_block(sz);

    for(;;) { 
        addr = MallocFromActive(heap);
        if (addr) return addr;

        addr = MallocFromPartial(heap);
        if (addr) return addr;

        addr = MallocFromNewSB(heap);
        if (addr) return addr;
    } 
}

Углубимся в тему, рассмотрим каждую часть в отдельности. Начнем с начала — с добавления нового куска из системы.

MallocFromNewSB

static void* MallocFromNewSB(procheap* heap) {
    descriptor* desc;
    void* addr;
    active newactive, oldactive;

    *((unsigned __int64*)&oldactive) = 0;
    desc = DescAlloc();
    desc->sb = AllocNewSB(heap->sc->sbsize, SBSIZE);

    desc->heap = heap;
    desc->Anchor.avail = 1;
    desc->sz = heap->sc->sz;
    desc->maxcount = heap->sc->sbsize / desc->sz;

    // Organize blocks in a linked list starting with index 0.
    organize_list(desc->sb, desc->maxcount, desc->sz);

    *((unsigned __int64*)&newactive) = 0;
    newactive.ptr = (__int64)desc;
    newactive.credits = min(desc->maxcount - 1, MAXCREDITS) - 1;

    desc->Anchor.count = max(((signed long)desc->maxcount - 1 ) - 
           ((signed long)newactive.credits + 1), 0); // max added by Scott
    desc->Anchor.state = ACTIVE;

    // memory fence.
    if (compare_and_swap64k(heap->Active, oldactive, newactive)) { 
        addr = desc->sb;
        *((char*)addr) = (char)SMALL; 
        addr = (char*) addr + TYPE_SIZE;
        *((descriptor **)addr) = desc; 
        return (void *)((char*)addr + PTR_SIZE);
    } 
    else {
        //Free the superblock desc->sb.
        munmap(desc->sb, desc->heap->sc->sbsize);
        DescRetire(desc); 
        return NULL;
    }
}

Никаких чудес — всего лишь создание суперблока, дескриптора, и инициализация пустого списка. И добавление в список активных нового блока. Тут прошу обратить внимание на такой факт, что нет цикла с присваиванием. Если не удалось — значит не удалось. Почему так? Потому что функция и так вызывается из цикла, а если не удалось вставить в список — то это значит что вставил кто-то другой и надо проводить выделение памяти сначала.

Перейдем теперь к выделению блока из списка активных — ведь мы уже научились выделять суперблок и прочее.

MallocFromActive

static void* MallocFromActive(procheap *heap)  {
    active newactive, oldactive;
    descriptor* desc;
    anchor oldanchor, newanchor;
    void* addr;
    unsigned __int64 morecredits = 0;
    unsigned long next = 0;

    // First step: reserve block
    do { 
        newactive = oldactive = heap->Active;
        if (!(*((unsigned __int64*)(&oldactive)))) return NULL;
        if (oldactive.credits == 0)  *((unsigned __int64*)(&newactive)) = 0;
        else --newactive.credits;
    } while (!compare_and_swap64k(heap->Active, oldactive, newactive));

    // Second step: pop block
    desc = mask_credits(oldactive);
    do {
        // state may be ACTIVE, PARTIAL or FULL
        newanchor = oldanchor = desc->Anchor;
        addr = (void *)((unsigned __int64)desc->sb + oldanchor.avail * desc->sz);
        next = *(unsigned long *)addr;
        newanchor.avail = next;
        ++newanchor.tag;

        if (oldactive.credits == 0) {
            // state must be ACTIVE
            if (oldanchor.count == 0) newanchor.state = FULL;
            else { 
                morecredits = min(oldanchor.count, MAXCREDITS);
                newanchor.count -= morecredits;
            }
        } 
    } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor));

    if (oldactive.credits == 0 && oldanchor.count > 0) 
        UpdateActive(heap, desc, morecredits);

    *((char*)addr) = (char)SMALL; 
    addr = (char*) addr + TYPE_SIZE;
    *((descriptor**)addr) = desc; 
    return ((void*)((char*)addr + PTR_SIZE));
}

Собствено алгоритм прост, лишь слегка путает нас громоздкая форма записи. На самом же деле — если есть что брать, то берем новый кусок из суперблока, и отмечаем сей факт. Попутно проверяем — а не забрали ли мы последний кусочек, и если да — отмечаем и это.

Есть тут роано одна тонкость, а именно — если вдруг выяснилось, что мы взяли последний кусочек из суперблока, то вслед за нами следующий запрос приведет к добавлению нового суперблока взамен уже использованного. И как только мы это обнаружили — нам негде записать факт того что блок выделен. Поэтому мы заносим только что выделенный кусочек в partial list.

UpdateActive

static void UpdateActive(procheap* heap, descriptor* desc, unsigned __int64 morecredits) { 
    active oldactive, newactive;
    anchor oldanchor, newanchor;

    *((unsigned __int64*)&oldactive) = 0;
    newactive.ptr = (__int64)desc;
    newactive.credits = morecredits - 1;
    if (compare_and_swap64k(heap->Active, oldactive, newactive)) 
        return;

    // Someone installed another active sb
    // Return credits to sb and make it partial
    do { 
        newanchor = oldanchor = desc->Anchor;
        newanchor.count += morecredits;
        newanchor.state = PARTIAL;
    } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor));
    HeapPutPartial(desc);
}

Настала пора рассмотреть работу с дескрипторами, прежде чем мы перейдем к заключительной части этого эссе.

Memory Block Descriptors

image
Для начала, научимся дескриптор создавать. Но где? Вообще-то, если кто забыл — мы как раз пишем выделение памяти. Очевидным и красивым решением было бы использование тех же механизмов, что и для generic allocation, но увы — это будет всем известный анекдот pkunzip.zip. Поэтому принцип тот же — выделяем большой блок содержащий массив дескрипторов, и как только массив переполняется — создаем новвый и объединяем его с предыдущим в список.

DescAlloc

static descriptor* DescAlloc() {
    descriptor_queue old_queue, new_queue;
    descriptor* desc;
  
    for(;;) {
        old_queue = queue_head;
        if (old_queue.DescAvail) {
            new_queue.DescAvail = (unsigned __int64)((descriptor*)old_queue.DescAvail)->Next;
            new_queue.tag = old_queue.tag + 1;
            if (compare_and_swap64k(queue_head, old_queue, new_queue)) {
                desc = (descriptor*)old_queue.DescAvail;
                break;
            }
        }
        else {
            desc = AllocNewSB(DESCSBSIZE, sizeof(descriptor));
            organize_desc_list((void *)desc, DESCSBSIZE / sizeof(descriptor), sizeof(descriptor));

            new_queue.DescAvail = (unsigned long)desc->Next;
            new_queue.tag = old_queue.tag + 1;
            if (compare_and_swap64k(queue_head, old_queue, new_queue)) 
                break;
            munmap((void*)desc, DESCSBSIZE);   
        }
    }
    return desc;
}

Ну а теперь дело за не менее мощным колдунством — надо еще и научиться дескриптор обратно возвращать. Однако, возвращать мы будем в том же FIFO — потому что возвращать нам понадобится только если мы его взяли по ошибке, а этот факт обнаруживается сразу. Так что дело оказывается гораздо проще

DescRetire

void DescRetire(descriptor* desc) {
    descriptor_queue old_queue, new_queue;

    do {
        old_queue = queue_head;
        desc->Next = (descriptor*)old_queue.DescAvail;
        new_queue.DescAvail = (unsigned __int64)desc;
        new_queue.tag = old_queue.tag + 1;
    } while (!compare_and_swap64k(queue_head, old_queue, new_queue));
}
Helpers

image
Приведем также вспомогательные функции по инициализации списков и т. д. Функции настолько самоочевидны, что их даже описывать как-то смысла нет.

organize_list

static void organize_list(void* start, unsigned long count, unsigned long stride) {
    char* ptr;
    unsigned long i;
  
    ptr = (char*)start; 
    for (i = 1; i < count - 1; i++) {
        ptr += stride;
        *((unsigned long*)ptr) = i + 1;
    }
}

organize_desc_list

static void organize_desc_list(descriptor* start, unsigned long count, unsigned long stride) {
    char* ptr;
    unsigned int i;
 
    start->Next = (descriptor*)(start + stride);
    ptr = (char*)start; 
    for (i = 1; i < count - 1; i++) {
        ptr += stride;
        ((descriptor*)ptr)->Next = (descriptor*)((char*)ptr + stride);
    }
    ptr += stride;
    ((descriptor*)ptr)->Next = NULL;
}

mask_credits

static descriptor* mask_credits(active oldactive) {
    return (descriptor*)oldactive.ptr;
}

Суперблок просто запрашиваем у системы:

static void* AllocNewSB(size_t size, unsigned long alignement) {
    return VirtualAlloc(NULL, size, MEM_COMMIT, PAGE_READWRITE);
}

Точно так же получаем большие блоки, запросив чуть больше под заголовок блока:

alloc_large_block

static void* alloc_large_block(size_t sz) {
    void* addr = VirtualAlloc(NULL, sz + HEADER_SIZE, MEM_COMMIT, PAGE_READWRITE);

    // If the highest bit of the descriptor is 1, then the object is large 
    // (allocated / freed directly from / to the OS)
    *((char*)addr) = (char)LARGE;
    addr = (char*) addr + TYPE_SIZE;
    *((unsigned long *)addr) = sz + HEADER_SIZE;
    return (void*)((char*)addr + PTR_SIZE); 
}

А это — поиск в нашей таблице хипа адаптированного под нужный размер (bkb ноль если размер слишком велик):

find_heap

static procheap* find_heap(size_t sz) {
    procheap* heap;
  
    // We need to fit both the object and the descriptor in a single block
    sz += HEADER_SIZE;
    if (sz > 2048) return NULL;
  
    heap = heaps[sz / GRANULARITY];
    if (heap == NULL) {
        heap = VirtualAlloc(NULL, sizeof(procheap), MEM_COMMIT, PAGE_READWRITE);
        *((unsigned __int64*)&(heap->Active)) = 0;
        heap->Partial = NULL;
        heap->sc = &sizeclasses[sz / GRANULARITY];
        heaps[sz / GRANULARITY] = heap;
    }
    
    return heap;
}

А вот обертки для списков. Они добавлены исключительно для наглядности — кода с макаронами вокруг атомиков у нас более чем достаточно, чтобы в одну функцию yt складывать по надцать циклов вокруг compare and swap

ListGetPartial

static descriptor* ListGetPartial(sizeclass* sc) {
    return (descriptor*)lf_fifo_dequeue(&sc->Partial);
}

ListPutPartial

static void ListPutPartial(descriptor* desc) {
    lf_fifo_enqueue(&desc->heap->sc->Partial, (void*)desc);  
}

Удаление строится банальнейше — перестройкой списка во временный и обратно:

ListRemoveEmptyDesc

static void ListRemoveEmptyDesc(sizeclass* sc) {
    descriptor *desc;
    lf_fifo_queue_t temp = LF_FIFO_QUEUE_STATIC_INIT;

    while (desc = (descriptor *)lf_fifo_dequeue(&sc->Partial)) {
        lf_fifo_enqueue(&temp, (void *)desc);
        if (desc->sb == NULL) DescRetire(desc);
        else break;
    }
    while (desc = (descriptor *)lf_fifo_dequeue(&temp)) 
        lf_fifo_enqueue(&sc->Partial, (void *)desc);
}

И несколько оберток вокруг partial lists

RemoveEmptyDesc

static void RemoveEmptyDesc(procheap* heap, descriptor* desc) {
    if (compare_and_swap_ptr(&heap->Partial, desc, NULL)) 
        DescRetire(desc);
    else 
        ListRemoveEmptyDesc(heap->sc);
}

HeapGetPartial

static descriptor* HeapGetPartial(procheap* heap) { 
    descriptor* desc;
    do {
        desc = *((descriptor**)&heap->Partial); // casts away the volatile
        if (desc == NULL) return ListGetPartial(heap->sc);
    } while (!compare_and_swap_ptr(&heap->Partial, desc, NULL));
    return desc;
}

HeapPutPartial

static void HeapPutPartial(descriptor* desc) {
    descriptor* prev;
    do {
        prev = (descriptor*)desc->heap->Partial; // casts away volatile
    } while (!compare_and_swap_ptr(&desc->heap->Partial, prev, desc));
    if (prev) ListPutPartial(prev); 
}
Последний рывок — и выделять / освобождать готов!

image
И наконец мы готовы реализовать выделение памяти не в страйпе, все возможности для этого у нас уже есть.

Алгоритм прост — находим наш список, резервируем в нем место (попутно освобождая пустые блоки), и возвращаем его клиенту.

MallocFromPartial

static void* MallocFromPartial(procheap* heap) {
    descriptor* desc;
    anchor oldanchor, newanchor;
    unsigned __int64 morecredits;
    void* addr;
  
retry:
    desc = HeapGetPartial(heap);
    if (!desc) return NULL;

    desc->heap = heap;
    do {
        // reserve blocks
        newanchor = oldanchor = desc->Anchor;
        if (oldanchor.state == EMPTY) 
            DescRetire(desc); 
            goto retry;
        }

        // oldanchor state must be PARTIAL
        // oldanchor count must be > 0
        morecredits = min(oldanchor.count - 1, MAXCREDITS);
        newanchor.count -= morecredits + 1;
        newanchor.state = morecredits > 0 ? ACTIVE : FULL;
    } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor));

    do { 
        // pop reserved block
        newanchor = oldanchor = desc->Anchor;
        addr = (void*)((unsigned __int64)desc->sb + oldanchor.avail * desc->sz);

        newanchor.avail = *(unsigned long*)addr;
        ++newanchor.tag;
    } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor));

    if (morecredits > 0) 
        UpdateActive(heap, desc, morecredits);

    *((char*)addr) = (char)SMALL; 
    addr = (char*) addr + TYPE_SIZE;
    *((descriptor**)addr) = desc; 
    return ((void *)((char*)addr + PTR_SIZE));
}

Теперь рассмотрим как вернуть нам память обратно в список. В общем — классический алгоритм: по переданному нам указателю восстанавливаем дескриптор, и по якорю запомненному в дескрипторе — попадаем в нужное нам место суперблока и отмечаем нужный кусочек как свободный, а кто до сих дочитал тому пиво. И конечно же пара проверок — не надо ли нам освободить весь суперблок, а то в нем он последний неосвобожденный.

void my_free(void* ptr) {
    descriptor* desc;
    void* sb;
    anchor oldanchor, newanchor;
    procheap* heap = NULL;

    if (!ptr) return;
    
    // get prefix
    ptr = (void*)((char*)ptr - HEADER_SIZE);  
    if (*((char*)ptr) == (char)LARGE) {
        munmap(ptr, *((unsigned long *)((char*)ptr + TYPE_SIZE)));
        return;
    }
    desc = *((descriptor**)((char*)ptr + TYPE_SIZE));
    
    sb = desc->sb;
    do { 
        newanchor = oldanchor = desc->Anchor;

        *((unsigned long*)ptr) = oldanchor.avail;
        newanchor.avail = ((char*)ptr - (char*)sb) / desc->sz;

        if (oldanchor.state == FULL) newanchor.state = PARTIAL;

        if (oldanchor.count == desc->maxcount - 1) {
            heap = desc->heap;
            // instruction fence.
            newanchor.state = EMPTY;
        } 
        else 
            ++newanchor.count;

        // memory fence.
    } while (!compare_and_swap64k(desc->Anchor, oldanchor, newanchor));

    if (newanchor.state == EMPTY) {
        munmap(sb, heap->sc->sbsize);
        RemoveEmptyDesc(heap, desc);
    } 
    else if (oldanchor.state == FULL) 
        HeapPutPartial(desc);
}

На что надо обратить внимание — освобожденные кусочки попадают в partial list, и было бы неплохо добавить проверку — а не попадает ли наш кусочек на вершину Active страйпа, это бы неплохо повысило эффективность вырожденного случая «выделяем и освобождаем в цикле». Но это уже в качестве домашнего задания.

Выводы

image
В данной крайне занудной и длинной работе мы рассмотрели как можно построить свой аллокатор на lock-free FIFO lists, узнали что это вообще такое, и освоили немало трюков по работе с атомиками. Надеюсь, умение группировать списки в страйпы поможет не только в деле написания своего менеджера памяти.

Дополнительные материалы

  1. Hoard memory allocator
  2. Scalable Locality-ConsciousMultithreaded Memory Allocation

Автор: viklequick

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js