Skip to main content

Create a distributed worker

Abstract

How to create a distributed worker to remove identifiers with a particular source from batches of contacts.

The following sample distributed worker uses the xConnect Client API to remove identifiers with a particular source from batches of contacts. The worker accepts a custom worker options dictionary (SampleDistributedWorkerOptionsDictionary) which defines:

  • The identifier source of the identifiers to be removed

  • The fully qualified name of the worker

You can use the DistributedWorkerOptionsDictionary class to register a task for a custom distributed worker. However, you must know the fully qualified name of the worker and correct name and format of each dictionary entry. To reduce the risk of error, it is recommended that you createa  custom worker options dictionary for all distributed workers.

To create a custom worker options dictionary:

  • Create a class named SampleDistributedWorkerOptionsDictionary that inherits DistributedWorkerOptionsDictionary as shown:

using System.Collections.Generic;
using Newtonsoft.Json;
using Sitecore.Processing.Engine.Abstractions;

namespace Sitecore.Documentation.Examples
{
    public class SampleDistributedWorkerOptionsDictionary : DistributedWorkerOptionsDictionary
    {

        public SampleDistributedWorkerOptionsDictionary(string identifierSource) : base("Sitecore.Documentation.Examples.SampleDistributedWorker, Sitecore.Documentation.Examples", CreateDictionary(identifierSource))
        {

        }

        [JsonConstructor]
        protected SampleDistributedWorkerOptionsDictionary(IDictionary<string, string> dictionary) : base(dictionary)
        {
        }

        public static SampleDistributedWorkerOptionsDictionary Parse(IReadOnlyDictionary<string, string> options)
        {
            return new SampleDistributedWorkerOptionsDictionary(options["IdentifierSource"]);
        }

        private static IDictionary<string, string> CreateDictionary(string identifierSource)
        {
            var dictionary = new Dictionary<string, string>
            {
                {
                    "IdentifierSource", identifierSource
                }
            };

            return dictionary;
        }
    }
}

When creating a distributed worker options dictionary:

  • You must include a constructor that accepts a IDictionary<string, string> parameter. The constructor must be decorated with the [JsonConstructor] attribute. Without this constructor, the Processing Engine will throw the following error when it tries to execute the worker: "System.FormatException: Could not deserialize JSON text." 

  • It is recommended that you create helper methods that can:

    • Convert an IReadOnlyDictionary<string, string>  into an options dictionary (CreateDictionary())

    • Convert an options dictionary into an  IReadOnlyDictionary<string, string>  (Parse())

To create a distributed worker:

  • Create a class named SampleDistributedWorker that implements IDistributedWorker as shown:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Logging;
    using Sitecore.Processing.Engine.Abstractions;
    using Sitecore.XConnect;
    
    namespace Sitecore.Documentation.Examples
    {
        public class SampleDistributedWorker : IDistributedWorker<Contact>
        {
            private readonly ILogger _logger;
            private readonly IServiceProvider _serviceProvider;
            private readonly SampleDistributedWorkerOptionsDictionary _options;
    
            public SampleDistributedWorker(ILogger logger, SampleDistributedWorkerOptionsDictionary options, IServiceProvider serviceProvider)
    
            {
                _logger = logger;
                _serviceProvider = serviceProvider;
                _options = options;
            }
    
            public SampleDistributedWorker(ILogger logger, IReadOnlyDictionary<string, string> options, IServiceProvider serviceProvider) : this (logger, SampleDistributedWorkerOptionsDictionary.Parse(options), serviceProvider)
            {
            }
    
            public async Task ProcessBatchAsync(IReadOnlyList<Contact> batch, CancellationToken token)
            {
                _logger.LogInformation($"Run Worker: {GetType().Name}");
    
                if (batch == null)
                {
                    throw new Exception("Batch is null.");
                }
    
                var contactsToUpdate = batch.Where(c => c.Identifiers.Any(x => x.Source == _options["IdentifierSource"]));
    
                using (IServiceScope scope = _serviceProvider.CreateScope())
                {
                    using (var xdbContext = scope.ServiceProvider.GetRequiredService<IXdbContext>())
                    {
                        foreach (var contact in contactsToUpdate)
                        {
                            xdbContext.RemoveContactIdentifier(new ContactReference(contact.Id.Value), contact.Identifiers.FirstOrDefault(x => x.Source == _options["IdentifierSource"]));
                        }
    
                        await xdbContext.SubmitAsync(token);
                    }
                }
    
                return;
            }
    
            public void Dispose()
            {
            }
        }
    }

When creating a distributed worker:

  • You must have constructor that accepts an IReadOnlyDictionary<string, string> parameter. If you do not include this constructor, the Processing Engine will not start and you will get the following error: "System.InvalidOperationException: A suitable constructor for type could not be located."

To register the worker in the Cortex Processing Engine:

  1. Create a file named C:\<PathToxConnect>\App_data\jobs\continuous\ProcessingEngine\App_Data\Config\Global\Processing\sc.Processing.Documentation.Examples.xml and add the following configuration:

    <Settings>
        <Sitecore>
            <Processing>
            <Services>
            <TaskServicesFactory>
                <Options>
                    <SampleDistributedWorker>
                    <Type>Sitecore.Documentation.Examples.SampleDistributedWorker, Sitecore.Documentation.Examples</Type>
                    </SampleDistributedWorker>
                </Options>
                </TaskServicesFactory>
            </Services>
            </Processing>
        </Sitecore>
    </Settings>
  2. Restart the Processing Engine. This is required to register changes in configuration.

To use the custom worker in a core role environment (such as Content Management):

  • Register a distributed task using the SampleDistributedWorkerOptionsDictionary class as shown:

    using System;
    using Microsoft.Extensions.DependencyInjection;
    using Sitecore.DependencyInjection;
    using Sitecore.Diagnostics;
    using Sitecore.Processing.Engine.Abstractions;
    
    namespace Sitecore.Documentation.Examples
    {
        public class Example
        {
            public async void Examples()
            {
                var _taskManager = ServiceLocator.ServiceProvider.GetRequiredService<ITaskManager>();
    
                Guid taskId = await _taskManager.RegisterDistributedTaskAsync(
                    new ContactDataSourceOptionsDictionary(new ContactExpandOptions(), 20, 20),
                    new SampleDistributedWorkerOptionsDictionary("RemoveMe"), null, TimeSpan.FromDays(1))
                    .ConfigureAwait(false);
            }
        }
    }