Producer Consumer Parallelism in PowerShell

Wed, Sep 5, 2018 5-minute read

After it’s done indexing your data, Scour is blazingly fast at local content searches – far faster than Select-String, grep, or anything else that rips through files when you search. It accomplishes this through the power of the popular Lucene search engine. Similar to the way that publishers have made searching physical books fast, Lucene must first create an index of your content before you can search it.

So while Scour was incredibly fast at searching content, the indexing process took much longer than I wanted it to. In one directory (20,000 files), it took about 13 minutes. Most of this was my fault – I had put in a call to [GC]::Collect() after every file. This was temporary code I had put in when I was investigating some memory issues, and it destroyed performance. After removing that mistake, indexing my test directory was now down to 45 seconds. After spending an afternoon on further tweaks, the final performance measurement was down to 9 seconds. That’s a 90x improvement if you include the sandbag, and merely a 5x improvement if you don’t :) The majority of this performance came from introducing multi-threading into the indexing process. There are lots of great threading techniques in PowerShell – both built-in, and from the community: Jobs, PoshRSJob, and Thread Jobs, to name a few. All of these do an excellent job of running multiple blobs of code in parallel.

Managing Parallelism

One thing that’s not often talked about is how you manage this parallelism. Once you have half a dozen threads running, what do you do with them? If you have hundreds of thousands of work items, how do you dispatch them to the threads and receive the output? How do you track overall status? One approach to this is the Producer-Consumer pattern. Many cloud architectures use components like Azure Service Bus to accomplish this at a large scale, but it’s equally helpful for work management among threads. In the Producer-Consumer pattern, you have one thread generating work. In this situation, it was a PowerShell script generating lists of file names for Lucene to index. Then, you have multiple threads consuming this shared pipeline – fetching whatever work is available, doing the work itself, and then submitting the output back to a shared output pipeline. In this situation, these were background threads receiving file names and then doing the work to add those files to the index. This pattern can be successful when the description of the work (i.e.: a filename, or coordinate of an image) is much smaller than the work itself (indexing that filename, or rendering that portion of the image).

Example Implementation

You can see the implementation of this in Scour if you are interested, but the details are somewhat obscured by the details and context of the actual implementation. Instead, here is a heavily-commented example of implementing this pattern in PowerShell, simply passing around GUIDs and printing them out.

image

I’ve also shared this on the PowerShell Gallery:

Install-Script Invoke-ProducerConsumer –Scope CurrentUser

And the script:

<#PSScriptInfo
.VERSION 1.0
.GUID bfb939b9-03f0-433e-ad0f-e4e12f4a009c
.AUTHOR Lee Holmes
#>
<#
.DESCRIPTION
Example implementation of producer / consumer parallelism in PowerShell
#>

## The script block we want to run in parallel. Threads will all
## retrieve work from $InputQueue, and send results to $OutputQueue
$parallelScript = {
    param(
        ## An Input queue of work to do
        $InputQueue,
       
        ## The output buffer to write responses to
        $OutputQueue,
       
        ## State tracking, to help threads communicate
        ## how much progress they've made
        $OutputProgress, $ThreadId, $ShouldExit
    )

    ## Continually try to fetch work from the input queue, until
    ## the 'ShouldExit' flag is set
    $processed = 0
    $workItem = $null
    while(! $ShouldExit.Value)
    {
        if($InputQueue.TryDequeue([ref] $workItem))
        {
            ## If we got a work item, do something with it. In this
            ## situation, we just create a string and sleep a bit.
            $workItemResult = "Processing $workItem in thread $ThreadId"
            Start-Sleep -Seconds (Get-Random -Max 3)

            ## Add the result to the output queue
            $OutputQueue.Enqueue($workItemResult)

            ## Update our progress
            $processed++
            $OutputProgress[$ThreadId] = $processed
        }
        else
        {
            ## If there was no work, wait a bit for more.
            Start-Sleep -m 100
        }
    }
}

## Create a set of background PowerShell instances to do work, based on the
## number of available processors.
$threads = Get-WmiObject Win32_Processor | Foreach-Object NumberOfLogicalProcessors
$runspaces = 1..$threads | Foreach-Object { [PowerShell]::Create() }
$outputProgress = New-Object 'Int[]' $threads
$inputQueue = New-Object 'System.Collections.Concurrent.ConcurrentQueue[String]'
$outputQueue = New-Object 'System.Collections.Concurrent.ConcurrentQueue[String]'
$shouldExit = $false

## Spin up each of our PowerShell runspaces. Once invoked, these are actively
## waiting for work and consuming once available.
for($counter = 0; $counter -lt $threads; $counter++)
{  
    $null = $runspaces[$counter].AddScript($parallelScript).
        AddParameter("InputQueue", $inputQueue).
        AddParameter("OutputQueue", $outputQueue).
        AddParameter("OutputProgress", $outputProgress).
        AddParameter("ThreadId", $counter).
        AddParameter("ShouldExit", [ref] $shouldExit).BeginInvoke()
}

## Some fake work - send 50 GUIDs into our worker threads
$estimated = 50
1..$estimated | Foreach-Object {
    $currentInput = New-Guid
    $inputQueue.Enqueue($currentInput)
}

## Wait for our worker threads to complete processing the
## work.
try
{
    do
    {
        ## Update the status of how many items we've processed, based on adding up the
        ## output progress from each of the worker threads
        $totalProcessed = $outputProgress | Measure-Object -Sum | Foreach-Object Sum
        Write-Progress "Processed $totalProcessed of $estimated" -PercentComplete ($totalProcessed * 100 / $estimated)
  
        ## If there were any results, output them.
        $scriptOutput = $null
        while($outputQueue.TryDequeue([ref] $scriptOutput))
        {
            $scriptOutput
        } 

        ## If the threads are done processing the input we gave them, let them know they can exit
        if($inputQueue.Count -eq 0)
        {
            $shouldExit = $true
        }      
       
        Start-Sleep -m 100

        ## See if we still have any busy runspaces. If not, exit the loop.
        $busyRunspaces = $runspaces | Where-Object { $_.InvocationStateInfo.State -ne 'Complete' }
    } while($busyRunspaces)
}
finally
{
    ## Clean up our PowerShell instances
    foreach($runspace in $runspaces)
    {
        $runspace.Stop()
        $runspace.Dispose()
    }
}