Source code for gaetk2.taskqueue

#!/usr/bin/env python
# encoding: utf-8
"""
gaetk2.taskqueue

Created by Maximillian Dornseif on 2011-01-07.
Copyright (c) 2011, 2012, 2016, 2017 Cyberlogi/HUDORA. All rights reserved.
"""
import logging
import re
import zlib

import google.appengine.ext.deferred.deferred
from google.appengine.api import taskqueue
from google.appengine.ext import deferred

from .config import is_production
from .tools.unicode import slugify

logger = logging.getLogger(__name__)


[docs]def taskqueue_add_multi(qname, url, paramlist, **kwargs): """Adds more than one Task to the same Taskqueue/URL. This helps to save API-Calls. Usage pattern:: tasks = [] for kdnnr in kunden.get_changed(): tasks.append(dict(kundennr=kdnnr)) taskqueue_add_multi('softmq', '/some/path', tasks) """ tasks = [] for params in paramlist: tasks.append(taskqueue.Task(url=url, params=params, **kwargs)) # Batch Addition to Taskqueue if len(tasks) >= 50: taskqueue.Queue(name=qname).add(tasks) tasks = [] if tasks: taskqueue.Queue(name=qname).add(tasks)
[docs]def taskqueue_add_multi_payload(name, url, payloadlist, **kwargs): """like taskqueue_add_multi() but transmit a json encoded payload instead a query parameter. In the Task handler you can get the data via ``zdata = json.loads(self.request.body)``. See http://code.google.com/appengine/docs/python/taskqueue/tasks.html""" import huTools.hujson tasks = [] for payload in payloadlist: payload = huTools.hujson.dumps(payload) payload = zlib.compress(payload) tasks.append(taskqueue.Task(url=url, payload=payload, **kwargs)) # Patch Addition to Taskqueue if len(tasks) >= 50: taskqueue.Queue(name=name).add(tasks) tasks = [] if tasks: taskqueue.Queue(name=name).add(tasks) logger.debug(u'%d tasks queued to %s', len(payloadlist), url)
# See also https://github.com/freshplanet/AppEngine-Deferred # and https://medium.com/the-infinite-machine/problems-with-deferred-bad13cac3216 # and https://pypi.python.org/pypi/appenginetaskutils
[docs]def defer(obj, *args, **kwargs): """Defers a callable for execution later. like https://cloud.google.com/appengine/articles/deferred but adds the function name to the url for easier debugging. Add this to `app.yaml`: handlers: # needed to allow abritary postfixes and better error handling - url: /_ah/queue/deferred(.*) script: gaetk2.views.default.application login: admin """ suffix = '{0}({1!s},{2!r})'.format( obj.__name__, ','.join(_to_str(arg) for arg in args), ','.join('%s=%s' % ( key, _to_str(value)) for (key, value) in kwargs.items() if not key.startswith('_')) ) suffix = re.sub(r'-+', '-', suffix.replace(' ', '-')) suffix = re.sub(r'[^/A-Za-z0-9_,.:@&+$\(\)\-]+', '', suffix) url = google.appengine.ext.deferred.deferred._DEFAULT_URL + '/' + suffix[:200] kwargs["_url"] = kwargs.pop("_url", url) # kwargs["_queue"] = kwargs.pop("_queue", 'workersq') if is_production(): # we only route to the workers backend/module on production machines pass # kwargs["_target"] = kwargs.pop("_target", 'workers') try: return deferred.defer(obj, *args, **kwargs) except taskqueue.TaskAlreadyExistsError: logger.info('Task already exists') except taskqueue.TombstonedTaskError: logger.info('Task did already run')
def _to_str(value): """Convert all datatypes to str""" if isinstance(value, basestring): value = slugify(repr(value)) value = str(value) if len(value) > 20: value = '{}...'.format(value[:20]) return value