Building Reusable Queues Part 2

The Tenant


In part 1, we designed a table for our process to work off of. Now we’re going to look at how we can write a procedure that works off of the table effectively.

For the table, our main priorities were indexing to make sure we can find work easily, and not allowing duplicate items.

For our worker proc, the main goals are going to be

  • Looping until we run out of work
  • Finding and reserving work with minimal locking
  • Making that process atomic

Thankfully, this is a lot easier than it sounds. The table design does a lot of the work for us.

Queue Ball


I’m going to step through the code, and then show you the fully assembled babby at the end.

First up, making a loop happen:

WHILE EXISTS
(
  SELECT 1/0
  FROM dbo.four_queue AS fq WITH(READPAST)
  WHERE fq.in_process = 0
)

While any rows that aren’t in process exist in the table, we want this to keep picking up work. If you need an eternal loop, just use 1 = 1 or something.

We’re using readpast here, because we won’t want to hit any blocking every time we check to see if we need to go back into the loop.

Second up, finding and reserving work

WITH q4 AS
(
    SELECT TOP (1) fq.*
    FROM dbo.four_queue AS fq WITH(READPAST, ROWLOCK, UPDLOCK)
    WHERE fq.in_process = 0
    ORDER BY fq.id
)
UPDATE q4
    SET q4.in_process = 1,
        q4.start_date = SYSDATETIME(),
        @id = q4.id,
        @reputation = q4.reputation
FROM q4;

This is where all the cool stuff is, to be honest.

In our CTE, we isolate a single row. The TOP (1) with an ORDER BY gives us the lowest id that hasn’t been processed yet.

With the SELECT, we’re using some locking hints:

  • Readpast: Means we can skip over locked rows, but not ignore locks like NOLOCK would do
  • Rowlock: We nicely ask the optimizer to only lock a single row
  • Updlock: Even though this is a SELECT, take an UPDATE lock (LCK_M_U)

The UPDATE that occurs outside of the SELECT is pretty nifty, too. I tried to get OUTPUT to work here, but the variable assignment seemed to disagree with it.

A lot of people don’t know that you can assign a value to a variable like this, but anyway.

  • Mark the row as being processed
  • Get the processing start time
  • Assign the table id to a parameter
  • Assign the reputation we want to find to a parameter

Thirdly and thusly

DECLARE @sql NVARCHAR(MAX) = N'
SELECT COUNT_BIG(*) AS records
FROM dbo.Users AS u
WHERE u.Reputation = @i_reputation;
';

RAISERROR(@sql, 0, 1) WITH NOWAIT;
EXEC sys.sp_executesql @sql, N'@i_reputation int', @reputation;

We’ll pass the @reputation to some dynamic SQL and execute it. Like I mentioned before, this is just to keep the example brief. There’s absolutely no need to do this.

A fifth of fifthly

UPDATE fq
    SET fq.end_date = SYSDATETIME()
FROM dbo.four_queue AS fq
WHERE id = @id
AND   in_process = 1;

We’ll use the id parameter to update the end time, so we can find any processes that took a long time and look into them.

Out Of The Way


In real life, I’d probably want some error handling in here, but for a quick blog post example, this is good enough.

Now I can use a tool like SQL Query Stress to throw a bunch of workers at it and see if I run into any blocking or deadlocking.

We know there aren’t any deadlocks, because SQS doesn’t have any errors in it.

CHEERS

All ~20k rows get processed in about 26 seconds.

We can even use sp UNDERSCORE HumanEvents to look at wait stats for the duration.

EXEC sp_HumanEvents @event_type = 'waits', @seconds_sample = 30, @database_name = 'StackOverflow2013';
Humanly

Not too shabby, eh?

Thanks for reading!

CREATE OR ALTER PROCEDURE dbo.queue_four(@id INT, @reputation INT)
AS
BEGIN
SET NOCOUNT, XACT_ABORT ON;

WHILE EXISTS
(
  SELECT 1/0
  FROM dbo.four_queue AS fq WITH(READPAST)
  WHERE fq.in_process = 0
)
    BEGIN
    
        WITH q4 AS
        (
            SELECT TOP (1) fq.*
            FROM dbo.four_queue AS fq WITH(READPAST, ROWLOCK, UPDLOCK)
            WHERE fq.in_process = 0
            ORDER BY fq.id
        )
        UPDATE q4
            SET q4.in_process = 1,
                q4.start_date = SYSDATETIME(),
                @id = q4.id,
                @reputation = q4.reputation
        FROM q4;
        
        DECLARE @sql NVARCHAR(MAX) = N'
        SELECT COUNT_BIG(*) AS records
        FROM dbo.Users AS u
        WHERE u.Reputation = @i_reputation;
        ';
        
        RAISERROR(@sql, 0, 1) WITH NOWAIT;
        EXEC sys.sp_executesql @sql, N'@i_reputation int', @reputation;
    
        UPDATE fq
            SET fq.end_date = SYSDATETIME()
        FROM dbo.four_queue AS fq
        WHERE id = @id
        AND   in_process = 1;
    
    END;

END;

 



Leave a Reply

Your email address will not be published. Required fields are marked *