Организация очереди обращений к native библиотеке из управляемого кода средствами Dispatcher

в 13:03, , рубрики: .net, native code, метки: ,

В недавнем прошлом столкнулся со следующей проблемой: в реализуемом проекте (под .net) необходимо было организовать взаимодействие с внешними ресурсами (аппаратное обеспечение, специфическая полнотекстовая БД). Доступ к этим ресурсам осуществлялся средствами библиотек содержащих API функции, которые были написаны с использованием различных языков (С++, Delphi), и объединяло их одно свойство: они не поддерживали вызовы из различных потоков. В то время как архитектура разрабатываемого приложения, продиктованная функциональными требованиями, подразумевала необходимость доступа к этим ресурсам из различных потоков. Наиболее логичным выходом из ситуации было создание отдельного потока для доступа к каждой библиотеке (ресурсу) и передача всех запросов этому потоку с постановкой в очередь по необходимости.

Постановка задачи

Таким образом, решаемую задачу можно сформулировать следующим образом: требуется разработать механизм позволяющий выполнить участок кода (элемент выполнения) в контексте заданного (целевого) потока, вне зависимости от того в контексте какого потока было инициировано выполнение. Производным требованием является организация очереди элементов выполнения, так как целевой поток может исполнять не более одного элемента выполнения одновременно.

Варианты реализации

Первым приходящим на ум вариантом реализации такого механизма является использование очереди объектов класса Action:

ConcurrentQueue<Action> _actions;

При этом клиентские потоки добавляют элементы выполнения в нее:

_actions.Enqueue(() => { <Выполняемый код> });

А целевой поток выполняет их в порядке поступления:

Action action;
if (_actions.TryDequeue())
{
    action();
}

Этот вариант реализации, безусловно, имеет право на существование, однако лобовая реализация в данном случае будет далеко не оптимальна с точки зрения использования ресурсов и производительности, а более продвинутые техники потребуют значительно большего времени на разработку и тестирование и, вероятно, будут являться изобретением велосипеда. Поэтому в данной статье будет рассмотрен более элегантный способ решения поставленной задачи, с использованием средств предоставляемых .NET Framework. А именно System.Windows.Threading.Dispatcher, которые вероятно работает аналогично описанию выше и эксклюзивные блокировки.

Реализация

Предположим, что у нас есть класс ExternalInterface, реализующий интерфейс к внешней библиотеке, вызов методов которой должен происходить в отдельном потоке:

internal static class ExternalInterface
{
    public static void Method1()
    {
        throw new NotImplementedException();
    }

    public static int Method2()
    {
        throw new NotImplementedException();
    }

    public static int Method3()
    {
        throw new NotImplementedException();
    }

    private static void ExternalMethod1()
    {
        // Имитация бурной деятельности
        Thread.Sleep(1000);
    }

    private static int ExternalMethod2()
    {
        // Имитация бурной деятельности
        Thread.Sleep(1000);
        return 1;
    }

    private static void ExternalMethod3()
    {
        // Имитация бурной деятельности
        Thread.Sleep(1000);
    }
}

Где ExternalMethod1, ExternalMethod2 и ExternalMethod3 являются импортируемыми методами внешней библиотеки, а методы Method1, Method2 и Method3 представляют собой интерфейсные методы класса, единственной задачей которых является выполнение импортируемых методов в контексте одного и того же выделенного потока.
Так же есть клиентский класс (в простейшем случае класс Program консольного приложения), в котором из различных потоков происходит вызов методов ExternalInterface:

class Program
{
    static void ThreadMethod1()
    {
        ExternalInterface.Method1();
    }

    static void ThreadMethod2()
    {
        ExternalInterface.Method2();           
    }

    static void ThreadMethod3()
    {
        ExternalInterface.Method3();
    }
        
    static void Main(string[] args){  }
}

Теперь перейдем к организации вызовов импортируемых методов в одном потоке. Для начала создадим поток в контексте, которого будет проходить вызов импортируемых методов, и подготовим его к ожиданию вызовов:

// Поток для выполнения запросов
private static Thread _dispatchingThread;
// Dispatcher для передачи элемента выполнения потоку
private static Dispatcher _dispatchObject;

public static void Open()
{
    // Создание и запуск потока
    _dispatchingThread = new Thread(DispatchingThreadMethod);
    _dispatchingThread.Start();
}

private static void DispatchingThreadMethod()
{
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
    // Код инициализации, например открытие соединения
    // ...

    // Получение Dispatcher'а потока
    _dispatchObject = Dispatcher.CurrentDispatcher;

    // Ожидание поступления запросов на выполнение
    Dispatcher.Run();

    // Код финализации, например закрытие соединения
    // ...
}

Далее упростим передачу элемента выполнения в целевой поток:

private void Dispatch(Action action)
{
    if (_dispatchObject != null)
    {
        _dispatchObject.Invoke(action);
    }
}

Однако такая реализация позволяет нескольким потокам передавать элементы выполнения целевому потоку, что приведет к взаимоблокировке взывающих потоков (такое поведение наблюдается только при использовании конкретных native библиотек и не воспроизводится в данном примере, то есть в ряде случаев этот шаг может быть пропущен). Организовать очередь запросов можно с использованием эксклюзивной блокировки. Изменим код следующим образом:

private void Dispatch(Action action)
{
    if (_dispatchObject != null)
    {
        lock (_dispatchObject)
        {
            _dispatchObject.Invoke(action);
        }
    }
}   

Теперь реализуем интерфейсные методы:

public static void Method1()
{
    Dispatch(() => ExternalMethod1());
}

public static int Method2()
{
    int result = 0;
    Dispatch(() => 
    {
        result = ExternalMethod2();
    });
    return result;
}

public static void Method3()
{
    Dispatch(() => ExternalMethod3());
}

Для корректного завершения потока_dispatchingThread добавим еще один метод в класс ExternalInterface:

public static void Close()
{
    _dispatchObject.InvokeShutdown();
}

В доказательство того что вызов всех методов пройдет в одном и том же потоке выполним следующий код:

static void Main(string[] args)
{
    ExternalInterface.Open();

    // Требуется для того что бы получить Dispatcher 
    // до старта остальных потоков
    Thread.Sleep(1000);

    new Thread(ThreadMethod1).Start();
    new Thread(ThreadMethod2).Start();
    new Thread(ThreadMethod3).Start();

    Console.ReadKey();

    ExternalInterface.Close();
}

Результатом будет четырехкратная печать одного и того же идентификатора потока, что означает что все вызовы методов выполнялись в контексте одного потока.

В заключение хотелось бы сказать, что использование данного кода в серьезных проектах потребует некоторых доработок, так как целью статьи было показать лишь механизм решения поставленной задачи. Так же приведенный пример не ограничивает сферу применения данного механизма. Такой подход может применяться в любой ситуации, где требуется работа из одного потока, но при этом невозможно работать из основного потока программы. Схожий подход подойдет и при организации очереди из элементов выполнения для некоторого потока.

Автор: ievgeniy88

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js