В данной статье мне хотелось бы поделится опытом использования Microsoft Azure для интеграции двух облачных CRM систем. В рамках задачи необходимо построить простое облачное приложение, осуществляющее обмен сообщениями между двумя имплементациями Dynamics CRM Online, находящимися в разных подписках Office 365. Мы рассмотрим специфику использования Azure Service Bus в контексте Dynamics CRM Online, не много поговорим о поддерживаемых механизмах взаимодействия и воспользуемся облачной рабочей ролью для осуществления процесса анализа и обработки сообщений.
При реализации интеграции, обеспечивающей два филиала компании важными и актуальными данными, нам необходимо обеспечить доставку сообщений, даже в том случае, если одна из систем не отвечает на запросы. Естественно, что в таком случае мы прибегаем к услугам систем обмена сообщениями, которые могут обеспечить хранение и отложенную доставку в случае отсутствия подключения или каких-либо других проблем.
Говоря про облако Microsoft, мне бы хотелось заметить, что корпорация сегодня уделяет достаточно внимания интеграции различных продуктов в одну однородную систему, что позволяет упросить и ускорить процесс построения и развертывания решений, а также избежать некоторых досадных ошибок.
Если говорить про Dynamics CRM, то этот продукт из коробки поддерживает работу с Azure Service Bus, что позволяет без единой строчки кода отправлять Ваши данные в очередь или раздел.
1. Настройка Azure Service Bus для работы с Dynamics CRM Online.
Тут есть своя специфика. Для того, чтобы настроить интеграцию между этими двумя системами, Service Bus должен что-то знать про CRM, а CRM должна корректно аутентифицироваться, используя соответствующие сервисы облачной шины. На сегодняшний день, Dynamics CRM поддерживает аутентификацию через ACS (Azure Active Directory Access Control). Подробнее почитать о том, что такое ACS, можно в следующие статье: Что такое ACS?
Итак, первое что нам нужно сделать, это собственно создать Service Bus, который мы будем использовать для работы с нашей очередью сообщений, но создать его простым способом через портал, к сожалению, не получится, так как в этом случае Service Bus не будет поддерживать аутентификацию через ACS. Для того, чтобы создать Service Bus с поддержкой ACS, воспользуемся Azure Power Shell. Подробнее о том, что такое Azure Power Shell и как им пользоваться можно почитать в следующей статье: Что такое Azure PowerShell?
[CmdletBinding(PositionalBinding=$True)]
Param(
# [Parameter(Mandatory = $true)]
# [ValidatePattern("^[a-z0-9]*$")]
[String]$Path = "q4depa2depb", # required needs to be alphanumeric
[Bool]$EnableDeadLetteringOnMessageExpiration = $True , # optional default to false
[Int]$LockDuration = 30, # optional default to 30
[Int]$MaxDeliveryCount = 10, # optional default to 10
[Int]$MaxSizeInMegabytes = 1024, # optional default to 1024
[Bool]$SupportOrdering = $True, # optional default to true
# [Parameter(Mandatory = $true)]
# [ValidatePattern("^[a-z0-9]*$")]
[String]$Namespace = "sb4crm2crm", # required needs to be alphanumeric
[Bool]$CreateACSNamespace = $True, # optional default to $false
[String]$Location = "West Europe" # optional default to "West Europe"
)
# Create Azure Service Bus namespace
$CurrentNamespace = Get-AzureSBNamespace -Name $Namespace
if ($CurrentNamespace)
{
Write-Output "The namespace [$Namespace] already exists in the [$($CurrentNamespace.Region)] region."
}
else
{
Write-Host "The [$Namespace] namespace does not exist."
Write-Output "Creating the [$Namespace] namespace in the [$Location] region..."
New-AzureSBNamespace -Name $Namespace -Location $Location -CreateACSNamespace $CreateACSNamespace -NamespaceType Messaging
$CurrentNamespace = Get-AzureSBNamespace -Name $Namespace
Write-Host "The [$Namespace] namespace in the [$Location] region has been successfully created."
}
$NamespaceManager = [Microsoft.ServiceBus.NamespaceManager]::CreateFromConnectionString($CurrentNamespace.ConnectionString);
if ($NamespaceManager.QueueExists($Path))
{
Write-Output "The [$Path] queue already exists in the [$Namespace] namespace."
}
else
{
Write-Output "Creating the [$Path] queue in the [$Namespace] namespace..."
$QueueDescription = New-Object -TypeName Microsoft.ServiceBus.Messaging.QueueDescription -ArgumentList $Path
$QueueDescription.EnableDeadLetteringOnMessageExpiration = $EnableDeadLetteringOnMessageExpiration
if ($LockDuration -gt 0)
{
$QueueDescription.LockDuration = [System.TimeSpan]::FromSeconds($LockDuration)
}
$QueueDescription.MaxDeliveryCount = $MaxDeliveryCount
$QueueDescription.MaxSizeInMegabytes = $MaxSizeInMegabytes
$QueueDescription.SupportOrdering = $SupportOrdering
$NamespaceManager.CreateQueue($QueueDescription);
Write-Host "The [$Path] queue in the [$Namespace] namespace has been successfully created."
}
Полный вариант использованного мной скрипта доступен тут.
Скрипт достаточно простой, параметры, которые могут использоваться для создания очереди подробно описаны в следующей статье: Azure Service Bus – As I Understand It: Part II (Queues & Messages). Добавлю, что для корректной работы нашей интеграции необходимо иметь четкую последовательность сообщений, так как мы будем обрабатывать сообщения как на создание, так и на изменение записей, и не хотелось бы обрабатывать сообщение об изменении записи до ее создания. В следствии чего не забываем проставить поле SupportOrdering в соответствующее значение, в этом случае очередь будет работать по принципу FIFO (First In First Out).
После того как скрипт успешно отработал на своем экране Вы должны получить нечто подобное
Теперь, после того как все готово мы можем убедиться, что очередь и шина корректно создались и доступны на портале.
2. Подключение Dynamics CRM Online к Azure Service Bus.
Итак, для того что бы подключить Dynamics CRM к Azure Service Bus необходимо открыть Plugin Registration Tool и установить соединение с CRM системой. После того, как откроется список Plugins, выбираем пункт Register и Register New Service Endpoint.
Далее, в открывшемся окне заполняем параметры подключения.
Name – это название нашего события. Как пример: ContactIntegration.
Description – описание вызова.
Solution Namespace – название нашей сервисной шины. В моем случае: sb4crm2crm
Path – название очереди, которая будет принимать сообщения. В моем случае: q4depa2depb
Contract – контракт передачи сообщений. Тут есть несколько вариантов: Queue, Topic, One — way, two – way, REST. Мы будем рассматривать Queue и Topic. Подробнее про каждый из этих контрактов можно прочитать в следующей статье: Write a listener for a Microsoft Azure solution. Для нашей интеграции выбираем Persistent Queue.
Claim – в качестве дополнительной информации в контексте сообщения можно отправить ID пользователя.
ID – уникальный идентификатор созданной конфигурации.
После того, как все поля заполнены, можно переходить к конфигурированию ACS. Для этого нажимам на кнопку Save & Configure ACS.
Management Key – этот ключ можно получить из портала Azure. Для этого нужно перейти в раздел сервисных шин.
Выбрать созданную нами шину и щелкнуть по кнопке Connection Information.
Откроется окно, в котором Вы сможете найти всю необходимую информацию.
Нам нужен Default Key из раздела ACS.
Certificate File – публичный сертификат, который использовался при конфигурировании Dynamics CRM для интеграции с Azure.
Issuer Name – Наименование эмитента. Имя должно быть то же, которое использовалось при конфигурировании Dynamics CRM для интеграции c Azure.
Certificate File и Issuer Name можно найти в Dynamics CRM в разделе Settings -> Customizations -> Developer Resources. Выглядит следующим образом.
Загружаем сертификат, заполняем все необходимые поля и нажимаем на кнопку Configure ACS. Если все указали корректно, то через короткий интервал времени, Вы увидите следующие сообщения:
После чего окно можно закрыть, нажав на кнопку Close.
Далее нажимаем на кнопку Save & Verify Authentication. Получаем сообщение следующего вида:
Verifying Authentication: Success
Закрываем окно, нажимаем кнопку Save и готово.
Теперь остается только зарегистрировать, какие конкретно события мы хотим обрабатывать и отсылать в нашу сервисную шину.
Для этого необходимо зарегистрировать Plugin Step, как Вы обычно это делаете для Ваших плагинов. Я регистрирую Create и Update сообщения для сущности Contact. Для этого достаточно вызвать контекстное меню, на только что созданном Service Endpoint, и выбрать Register New Step. Заполнение интуитивно понятно.
Теперь наши созданные контакты будут отправляться в Service Bus.
Для того что бы отследить успешность или не успешность отправки сообщений из Dynamics CRM, достаточно открыть систему и перейти в раздел Settings -> System Jobs. Выбрать интересующую сущность и загрузить представление.
Ниже привожу скриншот с потенциальной ошибкой:
3. Разработка Worker Role для обработки сообщений.
Дело остается за малым, разработать код, который будет обрабатывать наши сообщения, заливать их в другую систему и корректно реагировать на потенциальные ошибки.
Любой рабочий процесс должен где-то выполняется и в нашем случае это Azure Cloud Service.
Давайте создадим новый Azure Cloud Service в Visual Studio.
Далее указываем, что в контексте нашего Azure Cloud Service, мы хотим создать Worker Role.
Теперь, когда у нас есть Azure Cloud Service и Azure Worker Role, можно реализовать код, который сможет получать сообщения из нашей очереди. Самый простой вариант получения сообщений приведен ниже.
Любая Worker Role содержит три обязательных метода — это OnStart, Run и OnStop. Давайте рассмотрим их реализацию в самом общем виде. В методе OnStart определяем параметры подключения к нашей шине, здесь так же можно инициировать подключение к системе, в которую планируется заливка данных.
public override bool OnStart()
{
Trace.WriteLine("Creating Queue");
string connectionString = "*** provide your connection string here***";
var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
// Инициализация подключения к служебной шине
Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
return base.OnStart();
}
Метод Run, наиболее интересный, так как тут мы подписываемся на получение сообщений из нашей очереди и производим настройку метода получения данных.
public override void Run()
{
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = true; // Сообщение будет автоматически помечаться как отработанное и удаляться из очереди после завершения выполнения метода receivedMessage
options.MaxConcurrentCalls = 1; // Указывает максимальное число одновременных вызовов функции обратного вызова
options.ExceptionReceived += LogErrors; // Обработчик ошибок
// Start receiveing messages
Client.OnMessage((receivedMessage) => // Данный метод будет вызываться для каждого полученного сообщения
{
try
{
// Выполняем обработку сообщения
Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
}
catch
{
// Перехватываем ошибки возникшие в процессе обработки сообщения
}
}, options);
CompletedEvent.WaitOne();
}
Код снабжен достаточно подробными комментариями, так что дополнительно тут комментировать ничего не буду.
Ну, и на последок, посмотрим как выглядит метод OnStop.
public override void OnStop()
{
Client.Close();
CompletedEvent.Set(); //Завершаем выполнение Run функции
base.OnStop();
}
Тут мы закрываем все возможные подключения и заканчиваем выполнение функции Run. Подробнее про Azure Cloud Service, Вы можете прочитать в следующей статье: Подробное описание возможностей разработки с Microsoft Azure Cloud Services
Так же стоит заметить, что публикацию рабочей роли можно выполнить в двух вариациях: Staging и Production. Если Вы публикуете Вашу роль с типом сборки Debug, то в результате получите — Staging развертывание, если используете тип сборки Release, то Production. Даже при условии, что роль опубликована и находится в облаке, ее все равно можно отлаживать. Для того, чтобы подробнее узнать о возможностях публикации и отладки рабочей роли в облака, я предлагаю обратиться к следующей статье: Debugging an Azure cloud service or virtual machine in Visual Studio
4. Архитектура интеграции двух CRM систем.
В кратце опишу, как устроен процесс обработки сообщений в очереди, который мы разработали и применили в интеграции двух систем. Работа начинается с класса CRMQueueProcessor, в его обязанности входит инициализация подключений, создание и конфигурирование класса процессора сообщений “CrmMessageProcessor ”, а также подписка на получение сообщений от шины. Как только весь процесс инициализации завершен, и от шины получено сообщение, которое необходимо обработать, в работу вступает CrmMessageProcessor.
CrmMessageProcessor является реализацией паттерна “Наблюдатель”. Его задача наблюдать за изменениями, происходящими в системе и уведомлять об этих изменениях своих подписчиков. Подписчиков может быть сколько угодно много, каждый подписчик решает сам обрабатывать ему сообщение или нет. Все подписчики унаследованы от базового класса CrmBaseIntegrationHandler. CrmBaseIntegrationHandler являясь абстрактным классом, предлагает к реализации несколько методов:
getProcessingEntityName() – должен быть переопределен, возвращает имя сущности, например, contact.
getProcessingAction() – должен быть переопределен, возвращает действие или совокупность действий, на которые должен реагировать обработчик. Например: это создание записи.
HandleCrmMessage(string entityLogicalNameValue, string requestNameValue, Entity entity) – принимает само сообщение, а так же сущность и тип действия, вызывает переопределенный обработчик события, если событие имеет место быть
Entity OnProcessCreateEntity(Entity sourceEntity) – Обработчик создания записи, он принимает сущность пришедшую из очереди и формирует сущность, которая будет создана.
Entity OnProcessUpdateEntity(Entity sourceEntity) – Обработчик изменения записи, он принимает сущность пришедшую из очереди и формирует сущность, которая будет изменена.
public class ContactIntegrationHandler : CrmBaseIntegrationHandler
{
public override string getProcessingEntityName()
{
return "contact";
}
public override CrmMessageType getProcessingAction()
{
return CrmMessageType.Create | CrmMessageType.Update;
}
public override Entity OnProcessCreateEntity(Entity sourceEntity)
{
Entity output = new Entity("contact");
output["new_integrationid"] = sourceEntity.Id.ToString();
output["firstname"] = sourceEntity.GetAttributeValue<string>("firstname");
output["lastname"] = sourceEntity.GetAttributeValue<string>("lastname");
output["jobtitle"] = sourceEntity.GetAttributeValue<string>("jobtitle");
return output;
}
public override Entity OnProcessUpdateEntity(Entity sourceEntity)
{
Entity output = new Entity("contact");
output.Id = sourceEntity.Id;
if (sourceEntity.Contains("firstname"))
{
output["firstname"] = sourceEntity.GetAttributeValue<string>("firstname");
}
if (sourceEntity.Contains("lastname"))
{
output["lastname"] = sourceEntity.GetAttributeValue<string>("lastname");
}
if (sourceEntity.Contains("jobtitle"))
{
output["jobtitle"] = sourceEntity.GetAttributeValue<string>("jobtitle");
}
return output;
}
}
Класс CrmMessageProcessor выглядит следующим образом:
public class CrmMessageProcessor
{
List<CrmBaseIntegrationHandler> integrationSubscribers;
public CrmMessageProcessor(List<CrmBaseIntegrationHandler> subscribers)
{
this.integrationSubscribers = subscribers;
}
public void Subscribe(CrmBaseIntegrationHandler observer)
{
integrationSubscribers.Add(observer);
}
public void Unsubscribe(CrmBaseIntegrationHandler observer)
{
integrationSubscribers.Remove(observer);
}
public bool ProcessMessage(BrokeredMessage receivedMessage)
{
object entityLogicalNameValue, requestNameValue;
ExtractCrmProperties(receivedMessage, out entityLogicalNameValue, out requestNameValue);
if (entityLogicalNameValue == null || requestNameValue == null)
{
return false;
}
var context = receivedMessage.GetBody<RemoteExecutionContext>();
Entity entity = (Entity)context.InputParameters["Target"];
foreach (var handler in integrationSubscribers)
{
var status = handler.HandleCrmMessage((string)entityLogicalNameValue, (string)requestNameValue, entity);
if (status.ProcessMessgae)
{
switch (status.MessageType)
{
case CrmMessageType.Create:
{
CrmConnector.Instance.CreateEntity(status.EntityToProcess);
return true;
}
case CrmMessageType.Update:
{
var guid = CrmConnector.Instance.checkEntityForExistance(status.EntityToProcess);
if (guid != Guid.Empty)
{
status.EntityToProcess.Id = guid;
CrmConnector.Instance.UpdateEntity(status.EntityToProcess);
return true;
}
break;
}
default:
{
break;
}
}
}
}
return false;
}
/// <summary>
/// Извлекает специфичные для CRM параметры сообщения
/// </summary>
/// <param name="receivedMessage">Сообщение пришедшее из шины</param>
/// <param name="entityLogicalNameValue">out: Название сущности</param>
/// <param name="requestNameValue">out: Тип действия</param>
private void ExtractCrmProperties(BrokeredMessage receivedMessage,
out object entityLogicalNameValue, out object requestNameValue)
{
string keyRoot = "http://schemas.microsoft.com/xrm/2011/Claims/";
string entityLogicalNameKey = "EntityLogicalName";
string requestNameKey = "RequestName";
receivedMessage.Properties.TryGetValue(keyRoot + entityLogicalNameKey, out entityLogicalNameValue);
receivedMessage.Properties.TryGetValue(keyRoot + requestNameKey, out requestNameValue);
}
}
Если ни один из обработчиков не обработал сообщение, то оно помещается в очередь необработанных сообщений c соответствующей пометкой. Если во время обработки сообщения произошла ошибка, то мы разблокируем сообщение в очереди и пытаемся обработать его повторно, и так пока не будет достигнут лимит попыток, после чего сообщение попадает в очередь необработанных сообщений. Далее выдержка из класса CrmQueueProcessor.
public void OnMessageRecieved(BrokeredMessage receivedMessage)
{
try
{
if (processor.ProcessMessage(receivedMessage))
receivedMessage.Complete();
else
receivedMessage.DeadLetter("Canceled", "No event handler found");
}
catch (Exception ex)
{
receivedMessage.Abandon();
logger.LogCrmMessageException(receivedMessage, ex);
}
}
Для того чтобы получить путь до сообщений из очереди необработанных сообщений нужно вызывать метод FormatDeadLetterPath на существующем экземпляре объекта QueueClient и передать название рабочей очереди в качестве аргумента:
QueueClient.FormatDeadLetterPath(queueName)
Эта строка сформирует соответствующий путь, затем можно смело подписываться на получение сообщений и обрабатывать их.
5. Заключение
В примере, который мы разобрали, используется очередь и все сообщения обрабатываются одним рабочим процессом. В качестве альтернативы использования очереди, Вы так же можете использовать разделы (топики), их можно сконфигурировать таким образом, чтобы сообщения от разных сущностей обрабатывались разными потоками в рамках одной рабочей роли или в разных. Для каждого подписчика будет работать своя очередь, а корректно настроенный фильтр будет получать только те сообщения, которые должны обрабатываться данным экземпляром. Если при этом необходимо синхронизировать работу нескольких рабочих ролей, то для этого Вы можете использовать Blob leasing, более подробно о синхронизации рабочих ролей в Azure Вы можете прочитать в следующей статье: Preventing Jobs From Running Simultaneously on Multiple Role Instances.
Список статей на которые делал сноски:
Что такое ACS?
Что такое Azure PowerShell?
How to create service bus queues, topics and subscriptions using a powershell script .
Azure Service Bus – As I Understand It: Part II (Queues & Messages)
Подробное описание возможностей разработки с Microsoft Azure Cloud Services
Debugging an Azure cloud service or virtual machine in Visual Studio
Preventing Jobs From Running Simultaneously on Multiple Role Instances.
Автор: LrdSpr