Using Python to implement a queue publisher and subscriber with a polling back-off delay algorithm.
I recently come across Python. It's a wonderful language with many packages for statistics, HDInsight, natural language toolkit, etc.
After using Azure queue and table store separately with Python I implemented a best practice to handle large messages using table store and the queue together. The flowchart on this page shows the logic commonly used when implementing a queue listener with a polling mechanism. This flowchart was implemented below.
A single queue message can be up to 64KB. However, what if you have larger messages or files that you'd like to manage via a queue? You could put the large message in table store or file in blob store. Next, add an entry to the queue that can be used to uniquely identify that particular table store row or blob.
Note: Python 2.7.9 was used for the code below. All code below is unsupported and is for illustrative purposes only. Also, see the unhandled exceptions section below.
Publisher
Below I implemented a publisher that adds an artificially "long" message to table store (when you see this message below just play along and pretend it's long…). Next, the unique partition key and row key for that message is added to a queue message.
Subscriber
I implemented a subscriber to process the queue and table store entry added by the publisher above. The subscriber checks the queue for a message. If a message exists it is retrieved then the unique partition and row key is parsed. Next, it uses that partition and row key to retrieve the message from table store. Lastly, it deletes the message from the queue and the table store entry.
Also, rather than check the queue repeatedly (which can be costly), I used a back-off delay algorithm. After checking the queue if nothing exists it backs off at an ever increasing duration until a maximum duration is reached. For example, the code below sleeps for 2, 4, 8, 16, 32 and 64 seconds. Between each sleep duration it checks the queue. If there is something in the queue it is retrieved per above then starts sleeping again at 2 seconds.
The publisher and subscriber below can be run in separate command windows so you can see them exchanging messages.
Also, if the queue and table store become empty just run the publisher again in a separate command prompts. You'll see the subscriber pick up the message and back-off accordingly.
Publisher.py
import string
import unicodedata
import base64
from azure.storage import TableService, Entity, QueueService
from operator import itemgetter
# This generates random words so the messages in table store look kinda interesting.
def generate_random_word(wordLen):
word = ''
for i in range(wordLen):
word += random.choice(string.ascii_letters + string.digits)
return word
def main():
# Add your account name and key below
Acct_name = 'add here'
Acct_key = 'add here'
# Name of the table store table and queue.
Task_table = 'tasktable'
Partition_key = 'Tasks'
Task_queue = 'taskqueue'
# Number of messages to add to table store and thus the queue.
Max_messages = 3
# Create a table if it doesn't exist.
table_service = TableService(account_name=Acct_name, account_key=Acct_key)
table_service.create_table(Task_table)
# Create a queue if it doesn't exist.
queue_service = QueueService(account_name=Acct_name, account_key=Acct_key)
queue_service.create_queue(Task_queue)
# Add a few entries to table store.
for rowCnt in range(1, Max_messages):
print '----------'
print 'Publisher'
print ' '
# Generate a random word between x and y chars long to put into the description.
wordLen = random.randint(10,25)
randWord = 'Pretend this is a really long description... '
randWord += generate_random_word(wordLen)
print 'Random description is:', randWord
# Add data to the table below. Priority field in the message is just extra data and is not used for anything.
task = {'PartitionKey': Partition_key, 'RowKey': str(rowCnt), 'description' : randWord, 'priority' : 200}
table_service.insert_entity(Task_table, task)
# For the data we just added to table store, add the unique partition and row key to the queue.
# The partition key and row key in the queue message will look like PartitionKey-RowKey
PR_Key = Partition_key
PR_Key += '-'
PR_Key += str(rowCnt)
print 'Partition and row key is ', PR_Key
queue_service.put_message(Task_queue, base64.b64encode(PR_Key))
# Show the length of the queue.
queue_metadata = queue_service.get_queue_metadata(Task_queue)
print 'Messages in the queue:', queue_metadata['x-ms-approximate-messages-count']
print '----------'
main()
-----
After the publisher runs, you'll see something like the following in table storage and the queue. I used Azure Storage Explorer to view this data.
-----
Subscriber.py
import base64
import time
from azure.storage import TableService, Entity, QueueService
# Add your account name and key here
Acct_name = 'add here'
Acct_key = 'add here'
# Name your table store table and queue
Task_table = 'tasktable'
Task_queue = 'taskqueue'
# Params used to check the queue and slowly back-off if it is empty. Start with a minimum/fastest rate and back-off to the maximum/slowest rate to check the queue.
Min_delay_sec = 2
Max_delay_sec = 64
# Create a table if it doesn't already exist.
table_service = TableService(account_name=Acct_name, account_key=Acct_key)
table_service.create_table(Task_table)
# Create a queue if it doesn't already exist.
queue_service = QueueService(account_name=Acct_name, account_key=Acct_key)
queue_service.create_queue(Task_queue)
# Start an infinite loop to read from the queue. Crtl+C to kill it.
while 1 == 1:
# A 2 step process to dequeue a message; get the message then further below it is deleted.
messages = queue_service.get_messages(Task_queue, numofmessages=1, visibilitytimeout=1)
if messages.queue_messages:
Current_delay_sec = Min_delay_sec
print '----------'
print 'Subscriber'
print ' '
for message in messages:
PR_Key = base64.b64decode(message.message_text)
print 'Message text has partition and row key:', PR_Key
# Parse out partition key and row key from PR_Key. The partition key and row key will look like: PartitionKey-RowKey
PR_Key = PR_Key.split('-', 2)
print "Partition key:", PR_Key[0]
print "Row key:", PR_Key[1]
# Get the table store message using the unique partition key and a row key
task = table_service.get_entity(Task_table, PR_Key[0], str(PR_Key[1]))
print 'Message description:', task.description
print 'Message priority:', task.priority
print('----------')
# I foolishly assume everything was retrieved just fine. Delete the queue entry and delete the table store entry using the partition and row key.
queue_service.delete_message(Task_queue, message.message_id, message.pop_receipt)
table_service.delete_entity(Task_table, PR_Key[0], str(PR_Key[1]))
# However, if the queue was empty then back-off per below.
else:
if Current_delay_sec < Max_delay_sec:
Current_delay_sec = Current_delay_sec * 2
print 'The queue is empty. Will check the queue again in seconds:', Current_delay_sec
time.sleep(Current_delay_sec)
----------
When the subscriber runs it looks like the following. Note the queue check re-try is backing off up to 64 seconds.
Unhandled exceptions?! Say it isn't so!
Yes, it's true! I have unhandled exceptions in the code above. For example if you run the publisher twice in a row without running the subscriber to delete entries from table store the publisher will fail since it tried to add yet another entry to table store with the same row key. You can dream-up ways to work around this fault.
Below is some more info in case you'd like to learn more: