0

I am deploying this sync service. I thought that after the python 3 migration adding this prevents such an error.

client = ndb.Client()

I am not sure how to interpet the error. For the controller execution paths does that instruction need to be added there as well?

Is there someone familiar with the ndb packages who can help guide me?

Thanks,


import flask
import config
import util
app = flask.Flask(__name__)
#app.config.from_object(config)
from google.appengine.api import app_identity
from google.appengine.api import taskqueue, search, memcache
from apiclient.discovery import build, HttpError
from google.cloud import ndb
#from oauth2client.client import GoogleCredentials
from apiclient.http import MediaIoBaseUpload
from datetime import datetime, timedelta
from functools import partial
from io import BytesIO
import os
from os.path import splitext, basename
from model import Config
from model import VideosToCollections
from pytz import timezone
import datetime
import httplib2
import iso8601
import time
import requests
import requests_toolbelt.adapters.appengine
requests_toolbelt.adapters.appengine.monkeypatch()
from operator import attrgetter
import model
from model import CallBack
import re
import config
 
#
################################################################################
## Flush all caches, rebuild search index, and sync all videos
################################################################################
#
@app.route('/rx/', methods=['GET'])
def rx():
 GAE_APP_ID = os.environ['GOOGLE_CLOUD_PROJECT']
 index = search.Index('general-index')
 while True:
 document_ids = [
 document.doc_id
 for document
 in index.get_range(ids_only=True)]
#
 # If no IDs were returned, we've deleted everything.
 if not document_ids:
 break
#
 # Delete the documents for the given IDs
 index.delete(document_ids)
#
 # flush memcache
 memcache.flush_all()
#
# # get/put all collections so they are reindexed
 collections_dbs_keys, cursor = model.Collection.get_dbs(keys_only=True, limit=-1)
 collections_dbs = ndb.get_multi(collections_dbs_keys)
 for collection in collections_dbs:
 collection.search_update_index()
#
# # sync all videos
 taskqueue.add(url=flask.url_for('sync_video'),
 params={'syncthumb': True},
 method='GET')
#
 return 'ok. flushed everything and started video sync.'
#
#
################################################################################
## Sync video(s) task worker
################################################################################
#
@app.route('/sync/', methods=['GET'])
@app.route('/sync/<yt_video_id>', methods=['POST','GET'])
def sync_video(yt_video_id=None):
 GAE_APP_ID = os.environ['GOOGLE_CLOUD_PROJECT']
 syncthumb = util.param('syncthumb', bool)
 if not syncthumb:
 syncthumb = False
 if yt_video_id:
 util.sync_video_worker(yt_video_id, syncthumb=syncthumb)
 success = 'ok: synced ' + yt_video_id
 return success
 
 index = search.Index('general-index')
 while True:
 document_ids = [
 document.doc_id
 for document
 in index.get_range(ids_only=True)]
 # If no IDs were returned, we've deleted everything.
 if not document_ids:
 break
 # Delete the documents for the given IDs
 index.delete(document_ids)
 # get/put all collections so they are reindexed
 collections_dbs_keys, cursor = model.Collection.get_dbs(keys_only=True, limit=-1)
 collections_dbs = ndb.get_multi(collections_dbs_keys)
 for collection in collections_dbs:
 collection.search_update_index()
 
 video_dbs, video_cursor = model.Video.get_dbs(limit=-1)
 tasks = [taskqueue.Task(
 url='/sync/' + video_db.yt_video_id,
 params={'syncthumb': syncthumb},
 ) for video_db in video_dbs]
 for batches in [tasks[i:i + 5] for i in range(0, len(tasks), 5)]:
 rpc = taskqueue.Queue('sync').add_async(batches)
 rpc.wait()
 success = 'ok: dispatched ' + str(len(tasks)) + ' videos for sync tasks'
 return success
###############################################################################
# Populate Collections
###############################################################################
@app.route('/collectionsync/', methods=['GET'])
#@ndb.transactional
def collectionsync():
 GAE_APP_ID = os.environ['GOOGLE_CLOUD_PROJECT']
 vnew=model.Collection.query(model.Collection.slug=='new').get()
 #clear out collections
 for p in model.VideosToCollections.query(model.VideosToCollections.collection_key==vnew.key):
 p.key.delete()
 
 #populate newest collection
 tot=0
 ct=0
 for p in model.Video.query().order(-model.Video.launch_date):
 #print(p)
 if(p.launch_date):
 if p.get_launch_date() > (datetime.now(timezone("UTC"))):
 continue
 if(p.yt_date_added is not None):
 if(p.yt_date_added > (datetime.today() - timedelta(days=30))):
 ct+=1
 vc = VideosToCollections()
 vc.video_key = p.key
 vc.collection_key = vnew.key
 vc.order = ct
 vc.launch_date = datetime.now(timezone("US/Eastern"))
 model.VideosToCollections.put(vc)
 if(ct==1):
 vnew.featured_primary=p.key
 model.Collection.put(vnew)
 if(ct==2):
 vnew.featured_secondary=p.key 
 model.Collection.put(vnew) 
 if(ct>=25):
 break
 tot+=ct
 #populate highest rated collection
 ct=0
 vhighest=model.Collection.query(model.Collection.slug=='highest-rated').get()
 for p in model.VideosToCollections.query(model.VideosToCollections.collection_key==vhighest.key):
 p.key.delete()
 for p in model.Video.query().order(-model.Video.approval):
 if(p.launch_date):
 if p.get_launch_date() > (datetime.now(timezone("UTC"))):
 continue
 if(p.yt_views > 25000):
 ct+=1
 vc = VideosToCollections()
 vc.video_key = p.key
 vc.collection_key = vhighest.key
 vc.launch_date = datetime.now(timezone("US/Eastern"))
 vc.order = ct
 model.VideosToCollections.put(vc)
 if(ct==1):
 vhighest.featured_primary=p.key
 model.Collection.put(vhighest)
 if(ct==2):
 vhighest.featured_secondary=p.key
 model.Collection.put(vhighest)
 if(ct>=25):
 break
 tot+=ct
 # flush memcache
 #memcache.flush_all() 
 success = 'ok: dispatched ' + str(tot) + ' videos into collections'
 return success
###############################################################################
# Called every 15 minutes to keep system from turning off
###############################################################################
@app.route('/keepalive/', methods=['GET'])
def keepalive():
 collection_dbs, collection_cursor = model.Collection.get_dbs(
 order='name'
 )
 normal_dbs = [collection_db for collection_db in collection_dbs if collection_db.collection_type == 'normal']
 sorted_normal_dbs = sorted(normal_dbs, key=attrgetter('display_rank'))
 manufacturer_dbs = [collection_db for collection_db in collection_dbs if collection_db.collection_type == 'manufacturer']
 popmachine_dbs = [collection_db for collection_db in collection_dbs if collection_db.collection_type == 'popmachine']
 featured_dbs_dict = model.Collection.get_all_featured_videos()
 featured_collection_db = sorted_normal_dbs[0]
 featured_video_db = featured_dbs_dict.get(featured_collection_db.key)[0]
 recs_dbs = [featured_dbs_dict.get(featured_collection_db.key)[0], featured_dbs_dict.get(featured_collection_db.key)[1]]
 num = 6
 collection_slugs = ['favorites', 'jackpot', 'popular']
 homepage_dbs = model.Video.get_random_video_dbs(collection_slug=collection_slugs, num=num)
 return 'ok'
import google.appengine.api
client = ndb.Client()
def ndb_wsgi_middleware(wsgi_app):
 def middleware(environ, start_response):
 with client.context():
 return wsgi_app(environ, start_response)
 return middleware
app.wsgi_app = ndb_wsgi_middleware(google.appengine.api.wrap_wsgi_app(app.wsgi_app))

Error Produced

2025年05月09日 16:39:22 sync[20250425t195045] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025年05月09日 16:39:22 sync[20250425t195045] File "/layers/google.python.pip/pip/lib/python3.12/site-packages/google/cloud/ndb/tasklets.py", line 323, in _advance_tasklet
2025年05月09日 16:39:22 sync[20250425t195045] yielded = self.generator.send(send_value)
2025年05月09日 16:39:22 sync[20250425t195045] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2025年05月09日 16:39:22 sync[20250425t195045] File "/layers/google.python.pip/pip/lib/python3.12/site-packages/google/cloud/ndb/_retry.py", line 112, in retry_wrapper
util.py
# coding: utf-8
from __future__ import absolute_import
from urllib.parse import urlparse, urlunparse
from uuid import uuid4
import hashlib
import re
import unicodedata
import urllib
from oauth2client.contrib.appengine import StorageByKeyName
from apiclient.http import MediaIoBaseUpload
from google.appengine.ext import ndb, deferred
from google.appengine.datastore.datastore_query import Cursor
from webargs import fields as wf
from webargs.flaskparser import parser
import flask
from os.path import splitext, basename
import model
import simplejson
import base64
import time
import hmac
import hashlib
import pickle
import config
import datetime
import httplib2
import iso8601
import oauth2client
import requests
#import requests_toolbelt.adapters.appengine
#requests_toolbelt.adapters.appengine.monkeypatch()
from apiclient.discovery import build, HttpError
from createsend import Transactional, Subscriber
from functools import partial
from io import BytesIO
from urllib.parse import urlparse, urlunparse
from google.cloud import ndb#
...
###############################################################################
# Model manipulations
###############################################################################
def get_dbs(
 query, order=None, limit=None, cursor=None, prev_cursor=False,
 keys_only=None, **filters
):
 model_class = ndb.Model._kind_map[query.kind]
 query_prev = +++
 if order:
 for o in order.split(','):
 if o.startswith('-'):
 query = query.order(-model_class._properties[o[1:]])
 if prev_cursor:
 query_prev = query_prev.order(model_class._properties[o[1:]])
 else:
 query = query.order(model_class._properties[o])
 if prev_cursor:
 query_prev = query_prev.order(-model_class._properties[o])
 for prop, value in filters.items():
 if value is None:
 continue
 for val in value if isinstance(value, list) else [value]:
 query = query.filter(model_class._properties[prop] == val)
 if prev_cursor:
 query_prev = query_prev.filter(model_class._properties[prop] == val)
 limit = limit or config.DEFAULT_DB_LIMIT
 if limit == -1:
 return list(query.fetch(keys_only=keys_only)), {'next': None, 'prev': None}
 cursor = Cursor.from_websafe_string(cursor) if cursor else None
 model_dbs, next_cursor, more = query.fetch_page(
 limit, start_cursor=cursor, keys_only=keys_only,
 )
 next_cursor = next_cursor.to_websafe_string() if more else None
 if not prev_cursor:
 return list(model_dbs), {'next': next_cursor, 'prev': None}
 model_dbs_prev, prev_cursor, prev_more = query_prev.fetch_page(
 limit, start_cursor=cursor.reversed() if cursor else None, keys_only=True
 )
 prev_cursor = prev_cursor.reversed().to_websafe_string() \
 if prev_cursor and cursor else None
 return list(model_dbs), {'next': next_cursor, 'prev': prev_cursor}
asked May 10 at 17:59
10
  • What exactly are you trying to do? Where in your code do you think your error is occuring? Commented May 12 at 0:32
  • Thank you for the reply, it is getting an exception on get_dbs Which is an abstraction that eventually ends on model_dbs, next_cursor, more = query.fetch_page( limit, start_cursor=cursor, keys_only=keys_only, ) My understanding beneath that is from google.appengine.datastore.datastore_query import Cursor which I haven't studied (yet?) Commented May 12 at 1:46
  • maybe you should post the code for get_dbs. Also note that Datastore supports multiple dbs and so your function/method name gives the impression that you're trying to get a list of your dbs. If that isn't your aim, consider renaming your function for easier debugging Commented May 12 at 13:27
  • added thank you. I was focused on the with client.context() we had to add for the migration? I don't have a good mental model of how that state is shared. Commented May 12 at 14:33
  • I saw that you have another post (Python Error in NDB after python 3 upgrade). Is it the same issue as the one you’ve mentioned here? Commented May 13 at 13:51

1 Answer 1

0

Since you're using App Engine bundled services (Datastore), migration to Python 3 will require a path. Here’s an overview about migration paths for App Engine bundled services, specifically Datastore, which might be useful for you, as you transition to Python 3:

If your Python 2 app uses NDB to interact with Datastore, migrate to the Cloud NDB library. Cloud NDB is meant primarily as a transition tool for migrating Python 2 apps. We recommend that Python 3 apps use the Datastore mode client library.

For details, see Migrating to Cloud NDB. To simulate this migration with a sample app, see Migrating from App Engine ndb to Cloud NDB.

I hope this helps, at least.

answered May 13 at 15:38
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.