I am deploying this sync service. I thought that after the python 3 migration adding this prevents such an error.
client = ndb.Client()
I am not sure how to interpet the error. For the controller execution paths does that instruction need to be added there as well?
Is there someone familiar with the ndb packages who can help guide me?
Thanks,
import flask
import config
import util
app = flask.Flask(__name__)
#app.config.from_object(config)
from google.appengine.api import app_identity
from google.appengine.api import taskqueue, search, memcache
from apiclient.discovery import build, HttpError
from google.cloud import ndb
#from oauth2client.client import GoogleCredentials
from apiclient.http import MediaIoBaseUpload
from datetime import datetime, timedelta
from functools import partial
from io import BytesIO
import os
from os.path import splitext, basename
from model import Config
from model import VideosToCollections
from pytz import timezone
import datetime
import httplib2
import iso8601
import time
import requests
import requests_toolbelt.adapters.appengine
requests_toolbelt.adapters.appengine.monkeypatch()
from operator import attrgetter
import model
from model import CallBack
import re
import config
#
################################################################################
## Flush all caches, rebuild search index, and sync all videos
################################################################################
#
@app.route('/rx/', methods=['GET'])
def rx():
GAE_APP_ID = os.environ['GOOGLE_CLOUD_PROJECT']
index = search.Index('general-index')
while True:
document_ids = [
document.doc_id
for document
in index.get_range(ids_only=True)]
#
# If no IDs were returned, we've deleted everything.
if not document_ids:
break
#
# Delete the documents for the given IDs
index.delete(document_ids)
#
# flush memcache
memcache.flush_all()
#
# # get/put all collections so they are reindexed
collections_dbs_keys, cursor = model.Collection.get_dbs(keys_only=True, limit=-1)
collections_dbs = ndb.get_multi(collections_dbs_keys)
for collection in collections_dbs:
collection.search_update_index()
#
# # sync all videos
taskqueue.add(url=flask.url_for('sync_video'),
params={'syncthumb': True},
method='GET')
#
return 'ok. flushed everything and started video sync.'
#
#
################################################################################
## Sync video(s) task worker
################################################################################
#
@app.route('/sync/', methods=['GET'])
@app.route('/sync/<yt_video_id>', methods=['POST','GET'])
def sync_video(yt_video_id=None):
GAE_APP_ID = os.environ['GOOGLE_CLOUD_PROJECT']
syncthumb = util.param('syncthumb', bool)
if not syncthumb:
syncthumb = False
if yt_video_id:
util.sync_video_worker(yt_video_id, syncthumb=syncthumb)
success = 'ok: synced ' + yt_video_id
return success
index = search.Index('general-index')
while True:
document_ids = [
document.doc_id
for document
in index.get_range(ids_only=True)]
# If no IDs were returned, we've deleted everything.
if not document_ids:
break
# Delete the documents for the given IDs
index.delete(document_ids)
# get/put all collections so they are reindexed
collections_dbs_keys, cursor = model.Collection.get_dbs(keys_only=True, limit=-1)
collections_dbs = ndb.get_multi(collections_dbs_keys)
for collection in collections_dbs:
collection.search_update_index()
video_dbs, video_cursor = model.Video.get_dbs(limit=-1)
tasks = [taskqueue.Task(
url='/sync/' + video_db.yt_video_id,
params={'syncthumb': syncthumb},
) for video_db in video_dbs]
for batches in [tasks[i:i + 5] for i in range(0, len(tasks), 5)]:
rpc = taskqueue.Queue('sync').add_async(batches)
rpc.wait()
success = 'ok: dispatched ' + str(len(tasks)) + ' videos for sync tasks'
return success
###############################################################################
# Populate Collections
###############################################################################
@app.route('/collectionsync/', methods=['GET'])
#@ndb.transactional
def collectionsync():
GAE_APP_ID = os.environ['GOOGLE_CLOUD_PROJECT']
vnew=model.Collection.query(model.Collection.slug=='new').get()
#clear out collections
for p in model.VideosToCollections.query(model.VideosToCollections.collection_key==vnew.key):
p.key.delete()
#populate newest collection
tot=0
ct=0
for p in model.Video.query().order(-model.Video.launch_date):
#print(p)
if(p.launch_date):
if p.get_launch_date() > (datetime.now(timezone("UTC"))):
continue
if(p.yt_date_added is not None):
if(p.yt_date_added > (datetime.today() - timedelta(days=30))):
ct+=1
vc = VideosToCollections()
vc.video_key = p.key
vc.collection_key = vnew.key
vc.order = ct
vc.launch_date = datetime.now(timezone("US/Eastern"))
model.VideosToCollections.put(vc)
if(ct==1):
vnew.featured_primary=p.key
model.Collection.put(vnew)
if(ct==2):
vnew.featured_secondary=p.key
model.Collection.put(vnew)
if(ct>=25):
break
tot+=ct
#populate highest rated collection
ct=0
vhighest=model.Collection.query(model.Collection.slug=='highest-rated').get()
for p in model.VideosToCollections.query(model.VideosToCollections.collection_key==vhighest.key):
p.key.delete()
for p in model.Video.query().order(-model.Video.approval):
if(p.launch_date):
if p.get_launch_date() > (datetime.now(timezone("UTC"))):
continue
if(p.yt_views > 25000):
ct+=1
vc = VideosToCollections()
vc.video_key = p.key
vc.collection_key = vhighest.key
vc.launch_date = datetime.now(timezone("US/Eastern"))
vc.order = ct
model.VideosToCollections.put(vc)
if(ct==1):
vhighest.featured_primary=p.key
model.Collection.put(vhighest)
if(ct==2):
vhighest.featured_secondary=p.key
model.Collection.put(vhighest)
if(ct>=25):
break
tot+=ct
# flush memcache
#memcache.flush_all()
success = 'ok: dispatched ' + str(tot) + ' videos into collections'
return success
###############################################################################
# Called every 15 minutes to keep system from turning off
###############################################################################
@app.route('/keepalive/', methods=['GET'])
def keepalive():
collection_dbs, collection_cursor = model.Collection.get_dbs(
order='name'
)
normal_dbs = [collection_db for collection_db in collection_dbs if collection_db.collection_type == 'normal']
sorted_normal_dbs = sorted(normal_dbs, key=attrgetter('display_rank'))
manufacturer_dbs = [collection_db for collection_db in collection_dbs if collection_db.collection_type == 'manufacturer']
popmachine_dbs = [collection_db for collection_db in collection_dbs if collection_db.collection_type == 'popmachine']
featured_dbs_dict = model.Collection.get_all_featured_videos()
featured_collection_db = sorted_normal_dbs[0]
featured_video_db = featured_dbs_dict.get(featured_collection_db.key)[0]
recs_dbs = [featured_dbs_dict.get(featured_collection_db.key)[0], featured_dbs_dict.get(featured_collection_db.key)[1]]
num = 6
collection_slugs = ['favorites', 'jackpot', 'popular']
homepage_dbs = model.Video.get_random_video_dbs(collection_slug=collection_slugs, num=num)
return 'ok'
import google.appengine.api
client = ndb.Client()
def ndb_wsgi_middleware(wsgi_app):
def middleware(environ, start_response):
with client.context():
return wsgi_app(environ, start_response)
return middleware
app.wsgi_app = ndb_wsgi_middleware(google.appengine.api.wrap_wsgi_app(app.wsgi_app))
Error Produced
2025年05月09日 16:39:22 sync[20250425t195045] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025年05月09日 16:39:22 sync[20250425t195045] File "/layers/google.python.pip/pip/lib/python3.12/site-packages/google/cloud/ndb/tasklets.py", line 323, in _advance_tasklet
2025年05月09日 16:39:22 sync[20250425t195045] yielded = self.generator.send(send_value)
2025年05月09日 16:39:22 sync[20250425t195045] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025年05月09日 16:39:22 sync[20250425t195045] File "/layers/google.python.pip/pip/lib/python3.12/site-packages/google/cloud/ndb/_retry.py", line 112, in retry_wrapper
util.py
# coding: utf-8
from __future__ import absolute_import
from urllib.parse import urlparse, urlunparse
from uuid import uuid4
import hashlib
import re
import unicodedata
import urllib
from oauth2client.contrib.appengine import StorageByKeyName
from apiclient.http import MediaIoBaseUpload
from google.appengine.ext import ndb, deferred
from google.appengine.datastore.datastore_query import Cursor
from webargs import fields as wf
from webargs.flaskparser import parser
import flask
from os.path import splitext, basename
import model
import simplejson
import base64
import time
import hmac
import hashlib
import pickle
import config
import datetime
import httplib2
import iso8601
import oauth2client
import requests
#import requests_toolbelt.adapters.appengine
#requests_toolbelt.adapters.appengine.monkeypatch()
from apiclient.discovery import build, HttpError
from createsend import Transactional, Subscriber
from functools import partial
from io import BytesIO
from urllib.parse import urlparse, urlunparse
from google.cloud import ndb#
...
###############################################################################
# Model manipulations
###############################################################################
def get_dbs(
query, order=None, limit=None, cursor=None, prev_cursor=False,
keys_only=None, **filters
):
model_class = ndb.Model._kind_map[query.kind]
query_prev = +++
if order:
for o in order.split(','):
if o.startswith('-'):
query = query.order(-model_class._properties[o[1:]])
if prev_cursor:
query_prev = query_prev.order(model_class._properties[o[1:]])
else:
query = query.order(model_class._properties[o])
if prev_cursor:
query_prev = query_prev.order(-model_class._properties[o])
for prop, value in filters.items():
if value is None:
continue
for val in value if isinstance(value, list) else [value]:
query = query.filter(model_class._properties[prop] == val)
if prev_cursor:
query_prev = query_prev.filter(model_class._properties[prop] == val)
limit = limit or config.DEFAULT_DB_LIMIT
if limit == -1:
return list(query.fetch(keys_only=keys_only)), {'next': None, 'prev': None}
cursor = Cursor.from_websafe_string(cursor) if cursor else None
model_dbs, next_cursor, more = query.fetch_page(
limit, start_cursor=cursor, keys_only=keys_only,
)
next_cursor = next_cursor.to_websafe_string() if more else None
if not prev_cursor:
return list(model_dbs), {'next': next_cursor, 'prev': None}
model_dbs_prev, prev_cursor, prev_more = query_prev.fetch_page(
limit, start_cursor=cursor.reversed() if cursor else None, keys_only=True
)
prev_cursor = prev_cursor.reversed().to_websafe_string() \
if prev_cursor and cursor else None
return list(model_dbs), {'next': next_cursor, 'prev': prev_cursor}
1 Answer 1
Since you're using App Engine bundled services (Datastore), migration to Python 3 will require a path. Here’s an overview about migration paths for App Engine bundled services, specifically Datastore, which might be useful for you, as you transition to Python 3:
If your Python 2 app uses NDB to interact with Datastore, migrate to the Cloud NDB library. Cloud NDB is meant primarily as a transition tool for migrating Python 2 apps. We recommend that Python 3 apps use the Datastore mode client library.
For details, see Migrating to Cloud NDB. To simulate this migration with a sample app, see Migrating from App Engine ndb to Cloud NDB.
I hope this helps, at least.
get_dbs
. Also note that Datastore supports multiple dbs and so your function/method name gives the impression that you're trying to get a list of your dbs. If that isn't your aim, consider renaming your function for easier debugging