#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
gaetk2.datastore - Helper for ndb datastore usage.
Created by Maximillian Dornseif on 2011-01-07.
Copyright (c) 2011, 2012, 2016, 2017, 2018 Cyberlogi/HUDORA. All rights reserved.
"""
from __future__ import unicode_literals
import logging
import warnings
from google.appengine.ext import ndb
from gaetk2.taskqueue import defer
logger = logging.getLogger(__name__)
class Model(ndb.Model):
"""Generic fields to keep datastore organized."""
created_at = ndb.DateTimeProperty(auto_now_add=True, indexed=True)
created_at.__doc__ = 'Zeitpunkt, zu dem der Datensatz erstellt wurde (UTC).'
updated_at = ndb.DateTimeProperty(auto_now=True, indexed=True)
updated_at.__doc__ = 'Zeitpunkt, zu dem der Datensatz zuletzt gespeichert wurde (UTC).'
# see https://docs.python.org/2/glossary.html#term-hashable
def __hash__(self):
return hash(self.key.id())
def __eq__(self, other):
return self.key.id() == other.key.id()
def as_dict(self):
"""Gibt eine Repräsentation des Objektes zurück."""
warnings.warn('`as_dict` is deprecated, use `to_dict()`', DeprecationWarning, stacklevel=2)
return self.to_dict()
class DeletableModel(Model):
"""Functionality to implement (soft) delete."""
deleted = ndb.BooleanProperty(default=False, indexed=True)
class AuditedModel(Model):
"""Fields to add an Audit-Trail to the Datastore."""
# these fields work only if the user was logged in via google infrastructure
created_by = ndb.UserProperty(required=False, auto_current_user_add=True, indexed=True)
updated_by = ndb.UserProperty(required=False, auto_current_user=True, indexed=True)
class DeletableAuditedModel(Model):
deleted = ndb.BooleanProperty(default=False, indexed=True)
# these fields work only if the user was logged in via google infrastructure
created_by = ndb.UserProperty(required=False, auto_current_user_add=True, indexed=True)
updated_by = ndb.UserProperty(required=False, auto_current_user=True, indexed=True)
[docs]def query_iterator(query, limit=50):
"""Iterates over a datastore query while avoiding timeouts via a cursor.
Especially helpful for usage in backend-jobs."""
cursor = None
while True:
bucket, cursor, more_objects = query.fetch_page(limit, start_cursor=cursor)
if not bucket:
break
for entity in bucket:
yield entity
if not more_objects:
break
[docs]def copy_entity(e, **extra_args):
"""Copy ndb entity but change values in kwargs.
Usage::
b = copy_entity(a, id='new_id_here')
b.put()
"""
# see https://stackoverflow.com/a/2712401
klass = e.__class__
props = {
v._code_name: v.__get__(e, klass)
for v in klass._properties.itervalues()
if type(v) is not ndb.ComputedProperty}
props.update(extra_args)
return klass(**props)
[docs]@ndb.transactional
def get_or_insert_if_new(cls, id, **kwds):
"""Like ndb.get_or_insert()` but returns `(entity, new)`.
This allows you to see if something has been created or if there was an
already existing entity::
>>> get_or_insert_if_new(Model, 'newid')
(<instance>, True)
>>> get_or_insert_if_new(Model, 'newid')
(<instance>, False)
"""
# from https://stackoverflow.com/a/14549493/49407
# See https://cloud.google.com/appengine/docs/standard/python/ndb/modelclass#Model_get_or_insert
key = ndb.Key(cls, id)
ent = key.get()
if ent is not None:
return (ent, False) # False meaning "not created"
ent = cls(**kwds)
ent.key = key
ent.put()
return (ent, True) # True meaning "created"
[docs]def write_on_change2(obj, data):
"""Apply new data to an entity and write to datastore if anything changed.
This should save you money since reads are 3 times cheaper than writes.
It also helps you do leave not given attributes unchanged.
Usage::
instance = ndb.Model...get()
dirty = write_on_change2(instance, ...,
dict(id=123, amout_open=500, score=5, ...)
"""
assert obj
dirty = False
for key, value in data.iteritems():
if value != getattr(obj, key, None):
setattr(obj, key, value)
dirty = True
if dirty:
obj.put()
return dirty
[docs]def update_obj(obj, **kwargs):
"""More modern Interface to :func:`write_on_change2`."""
return write_on_change2(obj, kwargs)
[docs]def reload_obj(obj):
"""Returns a reloaded Entity from disk."""
return obj.key.get(use_cache=False, use_memcache=False)
[docs]def apply_to_all_entities(func, model, batch_size=0, num_updated=0, num_processed=0, cursor=None):
"""Appliy a certain task all entities of `model`.
It scans every entity in the datastore for the
model, exectues `func(entity)` on it and re-saves it
if func trturns true.
Tries to keep `updated_at` and `updated_by` unchanged.
Example:
def _fixup_MyModel_updatefunc(obj):
if obj.wert_eur is not None:
obj.wert_eur = int(obj.wert_eur)
return True
return False
def fixup_MyModel():
apply_to_all_entities(_fixup_app_angebotspos_updatefunc, MyModel)
# or
def execute(_now):
datastore.apply_to_all_entities(
_fixup_bestandsbuch_updatefunc,
ic_bestandsbuch.ic_BestandsbuchEintrag)
def _fixup_bestandsbuch_updatefunc(obj):
changed = False
# Attribute, die es als string und text in der datebnbank gibt normalisieren
for attrname in '''ausloeser vorhergehender_bestandsbucheintrag info'''.split():
if getattr(obj, attrname, None) is not None:
setattr(obj, attrname, unicode(getattr(obj, attrname)))
changed = True
return changed
"""
# from https://cloud.google.com/appengine/articles/update_schema
# Get all of the entities for this Model.
query = model.query()
if not batch_size:
objects, next_cursor, more = query.fetch_page(
5, start_cursor=cursor)
else:
batch_size = 100
objects, next_cursor, more = query.fetch_page(
batch_size, start_cursor=cursor)
updated_now = 0
for obj in objects:
num_processed += 1
if 'updated_at' in obj._properties:
obj._properties['updated_at'].auto_now = False
obj._properties['updated_at']._auto_now = False
if 'updated_by' in obj._properties:
obj._properties['updated_by'].auto_current_user = False
obj._properties['updated_by']._auto_current_user = False
if func(obj):
obj.put() # use_cache=False, use_memcache=False)
num_updated += 1
updated_now += 1
if 'updated_at' in obj._properties:
obj._properties['updated_at'].auto_now = True
obj._properties['updated_at']._auto_now = True
if 'updated_by' in obj._properties:
obj._properties['updated_by'].auto_current_user = True
obj._properties['updated_by']._auto_current_user = True
logger.debug(
'Put %s entities to Datastore for a total of %s/%s', updated_now,
num_updated, num_processed)
# If there are more entities, re-queue this task for the next page.
if more:
defer(
apply_to_all_entities, func, model,
batch_size=100, cursor=next_cursor,
num_updated=num_updated, num_processed=num_processed)
else:
logger.info(
'update_schema_task complete with %s entities resulting in %s updates!',
num_processed, num_updated)