importtimefromceleryimporttaskfromcelery.utils.logimportget_task_loggerfromcontextlibimportcontextmanagerfromdjango.core.cacheimportcachefromhashlibimportmd5fromdjangofeeds.modelsimportFeedlogger=get_task_logger(__name__)
LOCK_EXPIRE=60*10# Lock expires in 10 minutes@contextmanagerdefmemcache_lock(lock_id, oid):
timeout_at=time.monotonic() +LOCK_EXPIRE-3# cache.add fails if the key already existsstatus=cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yieldstatusfinally:
# memcache delete is very slow, but we have to use it to take# advantage of using add() for atomic lockingiftime.monotonic() <timeout_atandstatus:
# don't release the lock if we exceeded the timeout# to lessen the chance of releasing an expired lock# owned by someone else# also don't release the lock if we didn't acquire itcache.delete(lock_id)
@task(bind=True)defimport_feed(self, feed_url):
# The cache key consists of the task name and the MD5 digest# of the feed URL.feed_url_hexdigest=md5(feed_url).hexdigest()
lock_id='{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
logger.debug('Importing feed: %s', feed_url)
withmemcache_lock(lock_id, self.app.oid) asacquired:
ifacquired:
returnFeed.objects.import_feed(feed_url).urllogger.debug(
'Feed %s is already being imported by another worker', feed_url)