Build a RAG chatbot with GKE and Cloud Storage
Stay organized with collections
Save and categorize content based on your preferences.
This tutorial shows you how to integrate a large language model (LLM) application based on retrieval-augmented generation (RAG) with PDF files that you upload to a Cloud Storage bucket.
This guide uses a database as a storage and semantic search engine that holds the representations (embeddings) of the uploaded documents. You use the Langchain framework to interact with the embeddings and you use Gemini models available through Vertex AI.
Langchain is a popular open-source Python framework that simplifies many machine learning tasks and has interfaces to integrate with different vector databases and AI services.
This tutorial is intended for cloud platform administrators and architects, ML engineers, and MLOps (DevOps) professionals interested in deploying RAG LLM applications to GKE and Cloud Storage.
Objectives
In this tutorial, you learn how to:
- Build and deploy an application to create and store document embeddings in a vector database.
- Automate the application to trigger new document uploads to a Cloud Storage bucket.
- Deploy a chatbot application that uses semantic search to answer questions based on the document content.
Deployment architecture
In this tutorial, you create a Cloud Storage bucket, an Eventarc trigger, and the following Services:
embed-docs: Eventarc triggers this Service every time a user uploads a new document to the Cloud Storage bucket. The Service starts a Kubernetes Job which creates embeddings for the uploaded document and inserts the embeddings into a vector database.chatbot: This Service answers natural language questions about the uploaded documents using semantic search and the Gemini API.
The following diagram shows the process of uploading and vectorizing documents:
In the diagram, the user uploads files into the Cloud Storage bucket.
Eventarc subscribes to object metadataUpdated events for the
bucket and uses Eventarc's event forwarder, which is a Kubernetes
workload, to call the embed-docs Service when you upload a new document. The
Service then creates embeddings for the uploaded document. The embed-docs
Service stores the embeddings in a vector database using the Vertex AI
embedding model.
The following diagram shows the process of asking questions about the uploaded
document content using the chatbot Service:
Users can ask questions using natural language, and the chatbot generates answers based solely on the content of the uploaded files. The chatbot retrieves context from the vector database using semantic search, then sends the question and context to Gemini.
Costs
In this document, you use the following billable components of Google Cloud:
To generate a cost estimate based on your projected usage,
use the pricing calculator.
When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.
Before you begin
In this tutorial, you use Cloud Shell to run commands. Cloud Shell is a shell environment for managing resources hosted on Google Cloud. Cloud Shell comes preinstalled with the Google Cloud CLI, kubectl, and Terraform command-line tools. If you don't use Cloud Shell, install the Google Cloud CLI.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get 300ドル in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloudinit
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Vertex AI, Cloud Build, Eventarc, Artifact Registry APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloudservicesenableaiplatform.googleapis.com
cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com -
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloudinit
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_IDwith a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_IDwith your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Vertex AI, Cloud Build, Eventarc, Artifact Registry APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.gcloudservicesenableaiplatform.googleapis.com
cloudbuild.googleapis.com eventarc.googleapis.com artifactregistry.googleapis.com -
Grant roles to your user account. Run the following command once for each of the following IAM roles:
eventarc.admingcloudprojectsadd-iam-policy-bindingPROJECT_ID--member="user:USER_IDENTIFIER"--role=ROLE
Replace the following:
PROJECT_ID: Your project ID.USER_IDENTIFIER: The identifier for your user account. For example,myemail@example.com.ROLE: The IAM role that you grant to your user account.
Create a cluster
Create a Qdrant, Elasticsearch, or Postgres cluster:
Qdrant
Follow the instructions in Deploy a Qdrant vector database on GKE to create a Qdrant cluster running on an Autopilot mode or Standard mode GKE cluster.
Elasticsearch
Follow the instructions in Deploy an Elasticsearch vector database on GKE to create an Elasticsearch cluster running on an Autopilot mode or Standard mode GKE cluster.
PGVector
Follow the instructions in Deploy a PostgreSQL vector database on GKE to create a Postgres cluster with PGVector running on an Autopilot mode or Standard mode GKE cluster.
Weaviate
Follow the instructions to Deploy a Weaviate vector database on GKE to create a Weaviate cluster running on an Autopilot or Standard mode GKE cluster.
Set up your environment
Set up your environment with Cloud Shell:
Set environment variables for your project:
Qdrant
exportPROJECT_ID=PROJECT_ID exportKUBERNETES_CLUSTER_PREFIX=qdrant exportCONTROL_PLANE_LOCATION=us-central1 exportREGION=us-central1 exportDB_NAMESPACE=qdrantReplace
PROJECT_IDwith your Google Cloud project ID.Elasticsearch
exportPROJECT_ID=PROJECT_ID exportKUBERNETES_CLUSTER_PREFIX=elasticsearch exportCONTROL_PLANE_LOCATION=us-central1 exportREGION=us-central1 exportDB_NAMESPACE=elasticReplace
PROJECT_IDwith your Google Cloud project ID.PGVector
exportPROJECT_ID=PROJECT_ID exportKUBERNETES_CLUSTER_PREFIX=postgres exportCONTROL_PLANE_LOCATION=us-central1 exportREGION=us-central1 exportDB_NAMESPACE=pg-nsReplace
PROJECT_IDwith your Google Cloud project ID.Weaviate
exportPROJECT_ID=PROJECT_ID exportKUBERNETES_CLUSTER_PREFIX=weaviate exportCONTROL_PLANE_LOCATION=us-central1 exportREGION=us-central1 exportDB_NAMESPACE=weaviateReplace
PROJECT_IDwith your Google Cloud project ID.Verify that your GKE cluster is running:
gcloudcontainerclusterslist--project=${PROJECT_ID}--location=${CONTROL_PLANE_LOCATION}The output is similar to the following:
NAME LOCATION MASTER_VERSION MASTER_IP MACHINE_TYPE NODE_VERSION NUM_NODES STATUS [KUBERNETES_CLUSTER_PREFIX]-cluster us-central1 1.30.1-gke.1329003 <EXTERNAL IP> e2-standard-2 1.30.1-gke.1329003 6 RUNNINGClone the sample code repository from GitHub:
gitclonehttps://github.com/GoogleCloudPlatform/kubernetes-engine-samplesNavigate to the
databasesdirectory:cdkubernetes-engine-samples/databases
Prepare your infrastructure
Create an Artifact Registry repository, build Docker images, and push Docker images to Artifact Registry:
Create an Artifact Registry repository:
gcloudartifactsrepositoriescreate${KUBERNETES_CLUSTER_PREFIX}-images\ --repository-format=docker\ --location=${REGION}\ --description="Vector database images repository"\ --asyncSet the
storage.objectAdminandartifactregistry.adminpermissions on the Compute Engine service account to use Cloud Build to build and push Docker images for theembed-docsandchatbotServices.exportPROJECT_NUMBER=PROJECT_NUMBER gcloudprojectsadd-iam-policy-binding${PROJECT_ID}\ --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com"\ --role="roles/storage.objectAdmin" gcloudprojectsadd-iam-policy-binding${PROJECT_ID}\ --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com"\ --role="roles/artifactregistry.admin"Replace
PROJECT_NUMBERwith your Google Cloud project number.Build Docker images for the
embed-docsandchatbotServices. Theembed-docsimage contains Python code for both the application that receives Eventarc forwarder requests and the embedding job.Qdrant
exportDOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images" gcloudbuildssubmitqdrant/docker/chatbot--region=${REGION}\ --tag${DOCKER_REPO}/chatbot:1.0--async gcloudbuildssubmitqdrant/docker/embed-docs--region=${REGION}\ --tag${DOCKER_REPO}/embed-docs:1.0--asyncElasticsearch
exportDOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images" gcloudbuildssubmitelasticsearch/docker/chatbot--region=${REGION}\ --tag${DOCKER_REPO}/chatbot:1.0--async gcloudbuildssubmitelasticsearch/docker/embed-docs--region=${REGION}\ --tag${DOCKER_REPO}/embed-docs:1.0--asyncPGVector
exportDOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images" gcloudbuildssubmitpostgres-pgvector/docker/chatbot--region=${REGION}\ --tag${DOCKER_REPO}/chatbot:1.0--async gcloudbuildssubmitpostgres-pgvector/docker/embed-docs--region=${REGION}\ --tag${DOCKER_REPO}/embed-docs:1.0--asyncWeaviate
exportDOCKER_REPO="${REGION}-docker.pkg.dev/${PROJECT_ID}/${KUBERNETES_CLUSTER_PREFIX}-images" gcloudbuildssubmitweaviate/docker/chatbot--region=${REGION}\ --tag${DOCKER_REPO}/chatbot:1.0--async gcloudbuildssubmitweaviate/docker/embed-docs--region=${REGION}\ --tag${DOCKER_REPO}/embed-docs:1.0--asyncVerify the images:
gcloudartifactsdockerimageslist$DOCKER_REPO\ --project=$PROJECT_ID\ --format="value(IMAGE)"The output is similar to the following:
$REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/chatbot $REGION-docker.pkg.dev/$PROJECT_ID/${KUBERNETES_CLUSTER_PREFIX}-images/embed-docsDeploy a Kubernetes Service Account with permissions to run Kubernetes Jobs:
Qdrant
sed"s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/"qdrant/manifests/05-rag/service-account.yaml|kubectl-nqdrantapply-f-Elasticsearch
sed"s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/"elasticsearch/manifests/05-rag/service-account.yaml|kubectl-nelasticapply-f-PGVector
sed"s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/"postgres-pgvector/manifests/03-rag/service-account.yaml|kubectl-npg-nsapply-f-Weaviate
sed"s/<PROJECT_ID>/$PROJECT_ID/;s/<CLUSTER_PREFIX>/$KUBERNETES_CLUSTER_PREFIX/"weaviate/manifests/04-rag/service-account.yaml|kubectl-nweaviateapply-f-When using Terraform to create the GKE cluster and have
create_service_accountset as true, a separate service account will be created and used by the cluster and nodes. Grantartifactregistry.serviceAgentrole to this Compute Engine service account to allow the nodes to pull image from the Artifact Registry created forembed-docsandchatbot.exportCLUSTER_SERVICE_ACCOUNT=$(gcloudcontainerclustersdescribe${KUBERNETES_CLUSTER_PREFIX}-cluster\ --location=${CONTROL_PLANE_LOCATION}\ --format="value(nodeConfig.serviceAccount)") gcloudprojectsadd-iam-policy-binding${PROJECT_ID}\ --member="serviceAccount:${CLUSTER_SERVICE_ACCOUNT}"\ --role="roles/artifactregistry.serviceAgent"Without granting access to the service account, your nodes might experience permission issue when trying to pull image from the Artifact Registry when deploying the
embed-docsandchatbotServices.Deploy a Kubernetes Deployment for the
embed-docsandchatbotServices. A Deployment is a Kubernetes API object that lets you run multiple replicas of Pods that are distributed among the nodes in a cluster.:Qdrant
sed"s|<DOCKER_REPO>|$DOCKER_REPO|"qdrant/manifests/05-rag/chatbot.yaml|kubectl-nqdrantapply-f- sed"s|<DOCKER_REPO>|$DOCKER_REPO|"qdrant/manifests/05-rag/docs-embedder.yaml|kubectl-nqdrantapply-f-Elasticsearch
sed"s|<DOCKER_REPO>|$DOCKER_REPO|"elasticsearch/manifests/05-rag/chatbot.yaml|kubectl-nelasticapply-f- sed"s|<DOCKER_REPO>|$DOCKER_REPO|"elasticsearch/manifests/05-rag/docs-embedder.yaml|kubectl-nelasticapply-f-PGVector
sed"s|<DOCKER_REPO>|$DOCKER_REPO|"postgres-pgvector/manifests/03-rag/chatbot.yaml|kubectl-npg-nsapply-f- sed"s|<DOCKER_REPO>|$DOCKER_REPO|"postgres-pgvector/manifests/03-rag/docs-embedder.yaml|kubectl-npg-nsapply-f-Weaviate
sed"s|<DOCKER_REPO>|$DOCKER_REPO|"weaviate/manifests/04-rag/chatbot.yaml|kubectl-nweaviateapply-f- sed"s|<DOCKER_REPO>|$DOCKER_REPO|"weaviate/manifests/04-rag/docs-embedder.yaml|kubectl-nweaviateapply-f-Enable Eventarc triggers for GKE:
gcloudeventarcgke-destinationsinitWhen prompted, enter
y.Deploy the Cloud Storage bucket and create an Eventarc trigger using Terraform:
exportGOOGLE_OAUTH_ACCESS_TOKEN=$(gcloudauthprint-access-token) terraform-chdir=vector-database/terraform/cloud-storageinit terraform-chdir=vector-database/terraform/cloud-storageapply\ -varproject_id=${PROJECT_ID}\ -varregion=${REGION}\ -varcluster_prefix=${KUBERNETES_CLUSTER_PREFIX}\ -vardb_namespace=${DB_NAMESPACE}When prompted, type
yes. It might take several minutes for the command to complete.Terraform creates the following resources:
- A Cloud Storage bucket to upload the documents
- An Eventarc trigger
- A Google Cloud Service Account named
service_account_eventarc_namewith permission to use Eventarc. - A Google Cloud Service Account named
service_account_bucket_namewith permission to read the bucket and access Vertex AI models.
The output is similar to the following:
... # Several lines of output omitted Apply complete! Resources: 15 added, 0 changed, 0 destroyed. ... # Several lines of output omitted
Load documents and run chatbot queries
Upload the demo documents and run queries to search over the demo documents using the chatbot:
Upload the example
carbon-free-energy.pdfdocument to your bucket:gcloudstoragecpvector-database/documents/carbon-free-energy.pdfgs://${PROJECT_ID}-${KUBERNETES_CLUSTER_PREFIX}-training-docsVerify the document embedder job completed successfully:
kubectlgetjob-n${DB_NAMESPACE}The output is similar to the following:
NAME COMPLETIONS DURATION AGE docs-embedder1716570453361446 1/1 32s 71sGet the external IP address of the load balancer:
exportEXTERNAL_IP=$(kubectl-n${DB_NAMESPACE}getsvcchatbot--outputjsonpath='{.status.loadBalancer.ingress[0].ip}') echohttp://${EXTERNAL_IP}:80Open the external IP address in your web browser:
http://EXTERNAL_IPThe chatbot responds with a message similar to the following:
How can I help you?Ask questions about the content of the uploaded documents. If the chatbot cannot find anything, it answers
I don't know. For example, you could ask the following:You: Hi, what are Google plans for the future?An example output from the chatbot is similar to the following:
Bot: Google intends to run on carbon-free energy everywhere, at all times by 2030. To achieve this, it will rely on a combination of renewable energy sources, such as wind and solar, and carbon-free technologies, such as battery storage.Ask the chatbot a question that is out of context of the uploaded document. For example, you could ask the following:
You: What are Google plans to colonize Mars?An example output from the chatbot is similar to the following:
Bot: I don't know. The provided context does not mention anything about Google's plans to colonize Mars.
About the application code
This section explains how the application code works. There are three scripts inside the Docker images:
endpoint.py: receives Eventarc events on each document upload and starts the Kubernetes Jobs to process them.embedding-job.py: downloads documents from the bucket, creates embeddings, and insert embeddings into the vector database.chat.py: runs queries over the content of stored documents.
The diagram shows the process of generating answers using the documents data:
In the diagram, the application loads a PDF file, splits the file into chunks, then vectors, then sends the vectors to a vector database. Later, a user asks a question to the chatbot. The RAG chain uses semantic search to search the vector database, then returns the context along with the question to the LLM. The LLM answers the question, and stores the question into chat history.
About endpoint.py
This file processes messages from Eventarc, creates a Kubernetes Job for embedding the document, and accepts request from anywhere on port 5001
Qdrant
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromflaskimport Flask, jsonify
fromflaskimport request
importlogging
importsys,os,time
fromkubernetesimport client, config, utils
importkubernetes.client
fromkubernetes.client.restimport ApiException
app = Flask(__name__)
@app.route('/check')
defmessage():
return jsonify({"Message": "Hi there"})
@app.route('/', methods=['POST'])
defbucket():
request_data = request.get_json()
print(request_data)
bckt = request_data['bucket']
f_name = request_data['name']
id = request_data['generation']
kube_create_job(bckt, f_name, id)
return "ok"
# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# Setup K8 configs
config.load_incluster_config()
defkube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):
body = client.V1Job(api_version="batch/v1", kind="Job")
body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
body.status = client.V1JobStatus()
template = client.V1PodTemplate()
template.template = client.V1PodTemplateSpec()
env_list = [
client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"),
client.V1EnvVar(name="FILE_NAME", value=f_name),
client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))),
]
container = client.V1Container(name=container_name, image=container_image, env=env_list)
template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
return body
defkube_test_credentials():
try:
api_response = api_instance.get_api_resources()
logging.info(api_response)
except ApiException as e:
print("Exception when calling API: %s\n" % e)
defkube_create_job(bckt, f_name, id):
container_image = os.getenv("JOB_IMAGE")
namespace = os.getenv("JOB_NAMESPACE")
name = "docs-embedder" + id
body = kube_create_job_object(name, container_image, bckt, f_name)
v1=client.BatchV1Api()
try:
v1.create_namespaced_job(namespace, body, pretty=True)
except ApiException as e:
print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
return
if __name__ == '__main__':
app.run('0.0.0.0', port=5001, debug=True)
Elasticsearch
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromflaskimport Flask, jsonify
fromflaskimport request
importlogging
importsys,os,time
fromkubernetesimport client, config, utils
importkubernetes.client
fromkubernetes.client.restimport ApiException
app = Flask(__name__)
@app.route('/check')
defmessage():
return jsonify({"Message": "Hi there"})
@app.route('/', methods=['POST'])
defbucket():
request_data = request.get_json()
print(request_data)
bckt = request_data['bucket']
f_name = request_data['name']
id = request_data['generation']
kube_create_job(bckt, f_name, id)
return "ok"
# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# Setup K8 configs
config.load_incluster_config()
defkube_create_job_object(name, container_image, bucket_name, f_name, namespace="elastic", container_name="jobcontainer", env_vars={}):
body = client.V1Job(api_version="batch/v1", kind="Job")
body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
body.status = client.V1JobStatus()
template = client.V1PodTemplate()
template.template = client.V1PodTemplateSpec()
env_list = [
client.V1EnvVar(name="ES_URL", value=os.getenv("ES_URL")),
client.V1EnvVar(name="INDEX_NAME", value="training-docs"),
client.V1EnvVar(name="FILE_NAME", value=f_name),
client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="elastic", name="elasticsearch-ha-es-elastic-user"))),
]
container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
return body
defkube_test_credentials():
try:
api_response = api_instance.get_api_resources()
logging.info(api_response)
except ApiException as e:
print("Exception when calling API: %s\n" % e)
defkube_create_job(bckt, f_name, id):
container_image = os.getenv("JOB_IMAGE")
namespace = os.getenv("JOB_NAMESPACE")
name = "docs-embedder" + id
body = kube_create_job_object(name, container_image, bckt, f_name)
v1=client.BatchV1Api()
try:
v1.create_namespaced_job(namespace, body, pretty=True)
except ApiException as e:
print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
return
if __name__ == '__main__':
app.run('0.0.0.0', port=5001, debug=True)
PGVector
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromflaskimport Flask, jsonify
fromflaskimport request
importlogging
importsys,os,time
fromkubernetesimport client, config, utils
importkubernetes.client
fromkubernetes.client.restimport ApiException
app = Flask(__name__)
@app.route('/check')
defmessage():
return jsonify({"Message": "Hi there"})
@app.route('/', methods=['POST'])
defbucket():
request_data = request.get_json()
print(request_data)
bckt = request_data['bucket']
f_name = request_data['name']
id = request_data['generation']
kube_create_job(bckt, f_name, id)
return "ok"
# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# Setup K8 configs
config.load_incluster_config()
defkube_create_job_object(name, container_image, bucket_name, f_name, namespace="pg-ns", container_name="jobcontainer", env_vars={}):
body = client.V1Job(api_version="batch/v1", kind="Job")
body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
body.status = client.V1JobStatus()
template = client.V1PodTemplate()
template.template = client.V1PodTemplateSpec()
env_list = [
client.V1EnvVar(name="POSTGRES_HOST", value=os.getenv("POSTGRES_HOST")),
client.V1EnvVar(name="DATABASE_NAME", value="app"),
client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"),
client.V1EnvVar(name="FILE_NAME", value=f_name),
client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
client.V1EnvVar(name="PASSWORD", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="password", name="gke-pg-cluster-app"))),
client.V1EnvVar(name="USERNAME", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="username", name="gke-pg-cluster-app"))),
]
container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
return body
defkube_test_credentials():
try:
api_response = api_instance.get_api_resources()
logging.info(api_response)
except ApiException as e:
print("Exception when calling API: %s\n" % e)
defkube_create_job(bckt, f_name, id):
container_image = os.getenv("JOB_IMAGE")
namespace = os.getenv("JOB_NAMESPACE")
name = "docs-embedder" + id
body = kube_create_job_object(name, container_image, bckt, f_name)
v1=client.BatchV1Api()
try:
v1.create_namespaced_job(namespace, body, pretty=True)
except ApiException as e:
print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
return
if __name__ == '__main__':
app.run('0.0.0.0', port=5001, debug=True)
Weaviate
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromflaskimport Flask, jsonify
fromflaskimport request
importlogging
importsys,os,time
fromkubernetesimport client, config, utils
importkubernetes.client
fromkubernetes.client.restimport ApiException
app = Flask(__name__)
@app.route('/check')
defmessage():
return jsonify({"Message": "Hi there"})
@app.route('/', methods=['POST'])
defbucket():
request_data = request.get_json()
print(request_data)
bckt = request_data['bucket']
f_name = request_data['name']
id = request_data['generation']
kube_create_job(bckt, f_name, id)
return "ok"
# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# Setup K8 configs
config.load_incluster_config()
defkube_create_job_object(name, container_image, bucket_name, f_name, namespace, container_name="jobcontainer", env_vars={}):
body = client.V1Job(api_version="batch/v1", kind="Job")
body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
body.status = client.V1JobStatus()
template = client.V1PodTemplate()
template.template = client.V1PodTemplateSpec()
env_list = [
client.V1EnvVar(name="WEAVIATE_ENDPOINT", value=os.getenv("WEAVIATE_ENDPOINT")),
client.V1EnvVar(name="WEAVIATE_GRPC_ENDPOINT", value=os.getenv("WEAVIATE_GRPC_ENDPOINT")),
client.V1EnvVar(name="FILE_NAME", value=f_name),
client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="AUTHENTICATION_APIKEY_ALLOWED_KEYS", name="apikeys"))),
]
container = client.V1Container(name=container_name, image=container_image, image_pull_policy='Always', env=env_list)
template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
return body
defkube_test_credentials():
try:
api_response = api_instance.get_api_resources()
logging.info(api_response)
except ApiException as e:
print("Exception when calling API: %s\n" % e)
defkube_create_job(bckt, f_name, id):
container_image = os.getenv("JOB_IMAGE")
namespace = os.getenv("JOB_NAMESPACE")
name = "docs-embedder" + id
body = kube_create_job_object(name, container_image, bckt, f_name, namespace)
v1=client.BatchV1Api()
try:
v1.create_namespaced_job(namespace, body, pretty=True)
except ApiException as e:
print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
return
if __name__ == '__main__':
app.run('0.0.0.0', port=5001, debug=True)
About embedding-job.py
This file processes documents and sends them to the vector database.
Qdrant
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromlangchain_google_vertexaiimport ChatVertexAI
fromlangchain.promptsimport ChatPromptTemplate
fromlangchain_google_vertexaiimport VertexAIEmbeddings
fromlangchain.memoryimport ConversationBufferWindowMemory
fromlangchain_community.vectorstoresimport Qdrant
fromqdrant_clientimport QdrantClient
importstreamlitasst
importos
vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
("human", """
The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
Base your response on the provided text context and the current conversation history to answer the query.
Select the most relevant information from the context.
Generate a draft response using the selected information. Remove duplicate content from the draft response.
Generate your final response after adjusting it to increase accuracy and relevance.
Now only show your final response!
If you do not know the answer or context is not relevant, response with "I don't know".
text_context:
{context}
conversation_history:
{history}
query:
{query}
"""),
]
)
embedding_model = VertexAIEmbeddings("text-embedding-005")
client = QdrantClient(
url=os.getenv("QDRANT_URL"),
api_key=os.getenv("APIKEY"),
)
collection_name = os.getenv("COLLECTION_NAME")
vector_search = Qdrant(client, collection_name, embeddings=embedding_model)
defformat_docs(docs):
return "\n\n".join([d.page_content for d in docs])
st.title("🤖 Chatbot")
if "messages" not in st.session_state:
st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
if "memory" not in st.session_state:
st.session_state["memory"] = ConversationBufferWindowMemory(
memory_key="history",
ai_prefix="Bob",
human_prefix="User",
k=3,
)
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
if chat_input := st.chat_input():
with st.chat_message("human"):
st.write(chat_input)
st.session_state.messages.append({"role": "human", "content": chat_input})
found_docs = vector_search.similarity_search(chat_input)
context = format_docs(found_docs)
prompt_value = prompt_template.format_messages(name="Bob", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
with st.chat_message("ai"):
with st.spinner("Typing..."):
content = ""
with st.empty():
for chunk in vertexAI.stream(prompt_value):
content += chunk.content
st.write(content)
st.session_state.messages.append({"role": "ai", "content": content})
st.session_state.memory.save_context({"input": chat_input}, {"output": content})
Elasticsearch
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromlangchain_google_vertexaiimport VertexAIEmbeddings
fromlangchain_community.document_loadersimport PyPDFLoader
fromlangchain.text_splitterimport RecursiveCharacterTextSplitter
fromelasticsearchimport Elasticsearch
fromlangchain_community.vectorstores.elasticsearchimport ElasticsearchStore
fromgoogle.cloudimport storage
importos
bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")
storage_client = storage .Client ()
bucket = storage_client.bucket (bucketname)
blob = bucket.blob(filename)
blob.download_to_filename ("/documents/" + filename)
loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)
embeddings = VertexAIEmbeddings("text-embedding-005")
client = Elasticsearch(
[os.getenv("ES_URL")],
verify_certs=False,
ssl_show_warn=False,
basic_auth=("elastic", os.getenv("PASSWORD"))
)
db = ElasticsearchStore.from_documents(
documents,
embeddings,
es_connection=client,
index_name=os.getenv("INDEX_NAME")
)
db.client.indices.refresh(index=os.getenv("INDEX_NAME"))
print(filename + " was successfully embedded")
print(f"# of vectors = {len(documents)}")
PGVector
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromlangchain_google_vertexaiimport VertexAIEmbeddings
fromlangchain_community.document_loadersimport PyPDFLoader
fromlangchain.text_splitterimport RecursiveCharacterTextSplitter
fromlangchain_community.vectorstores.pgvectorimport PGVector
fromgoogle.cloudimport storage
importos
bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")
storage_client = storage .Client ()
bucket = storage_client.bucket (bucketname)
blob = bucket.blob(filename)
blob.download_to_filename ("/documents/" + filename)
loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)
for document in documents:
document.page_content = document.page_content.replace('\x00', '')
embeddings = VertexAIEmbeddings("text-embedding-005")
CONNECTION_STRING = PGVector.connection_string_from_db_params(
driver="psycopg2",
host=os.environ.get("POSTGRES_HOST"),
port=5432,
database=os.environ.get("DATABASE_NAME"),
user=os.environ.get("USERNAME"),
password=os.environ.get("PASSWORD"),
)
COLLECTION_NAME = os.environ.get("COLLECTION_NAME")
db = PGVector.from_documents(
embedding=embeddings,
documents=documents,
collection_name=COLLECTION_NAME,
connection_string=CONNECTION_STRING,
use_jsonb=True
)
print(filename + " was successfully embedded")
print(f"# of vectors = {len(documents)}")
Weaviate
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromlangchain_google_vertexaiimport VertexAIEmbeddings
fromlangchain_community.document_loadersimport PyPDFLoader
fromlangchain.text_splitterimport RecursiveCharacterTextSplitter
importweaviate
fromweaviate.connectimport ConnectionParams
fromlangchain_weaviate.vectorstoresimport WeaviateVectorStore
fromgoogle.cloudimport storage
importos
bucketname = os.getenv("BUCKET_NAME")
filename = os.getenv("FILE_NAME")
storage_client = storage .Client ()
bucket = storage_client.bucket (bucketname)
blob = bucket.blob(filename)
blob.download_to_filename ("/documents/" + filename)
loader = PyPDFLoader("/documents/" + filename)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
documents = loader.load_and_split(text_splitter)
embeddings = VertexAIEmbeddings("text-embedding-005")
auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
client = weaviate.WeaviateClient(
connection_params=ConnectionParams.from_params(
http_host=os.getenv("WEAVIATE_ENDPOINT"),
http_port="80",
http_secure=False,
grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
grpc_port="50051",
grpc_secure=False,
),
auth_client_secret=auth_config
)
client.connect()
if not client.collections.exists("trainingdocs"):
collection = client.collections.create(name="trainingdocs")
db = WeaviateVectorStore.from_documents(documents, embeddings, client=client, index_name="trainingdocs")
print(filename + " was successfully embedded")
print(f"# of vectors = {len(documents)}")
About chat.py
This file configures the model to answer questions using only the provided
context and previous answers. If the context or conversation history does not
match any data, the model returns I don't know.
Qdrant
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromflaskimport Flask, jsonify
fromflaskimport request
importlogging
importsys,os,time
fromkubernetesimport client, config, utils
importkubernetes.client
fromkubernetes.client.restimport ApiException
app = Flask(__name__)
@app.route('/check')
defmessage():
return jsonify({"Message": "Hi there"})
@app.route('/', methods=['POST'])
defbucket():
request_data = request.get_json()
print(request_data)
bckt = request_data['bucket']
f_name = request_data['name']
id = request_data['generation']
kube_create_job(bckt, f_name, id)
return "ok"
# Set logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# Setup K8 configs
config.load_incluster_config()
defkube_create_job_object(name, container_image, bucket_name, f_name, namespace="qdrant", container_name="jobcontainer", env_vars={}):
body = client.V1Job(api_version="batch/v1", kind="Job")
body.metadata = client.V1ObjectMeta(namespace=namespace, name=name)
body.status = client.V1JobStatus()
template = client.V1PodTemplate()
template.template = client.V1PodTemplateSpec()
env_list = [
client.V1EnvVar(name="QDRANT_URL", value=os.getenv("QDRANT_URL")),
client.V1EnvVar(name="COLLECTION_NAME", value="training-docs"),
client.V1EnvVar(name="FILE_NAME", value=f_name),
client.V1EnvVar(name="BUCKET_NAME", value=bucket_name),
client.V1EnvVar(name="APIKEY", value_from=client.V1EnvVarSource(secret_key_ref=client.V1SecretKeySelector(key="api-key", name="qdrant-database-apikey"))),
]
container = client.V1Container(name=container_name, image=container_image, env=env_list)
template.template.spec = client.V1PodSpec(containers=[container], restart_policy='Never', service_account='embed-docs-sa')
body.spec = client.V1JobSpec(backoff_limit=3, ttl_seconds_after_finished=60, template=template.template)
return body
defkube_test_credentials():
try:
api_response = api_instance.get_api_resources()
logging.info(api_response)
except ApiException as e:
print("Exception when calling API: %s\n" % e)
defkube_create_job(bckt, f_name, id):
container_image = os.getenv("JOB_IMAGE")
namespace = os.getenv("JOB_NAMESPACE")
name = "docs-embedder" + id
body = kube_create_job_object(name, container_image, bckt, f_name)
v1=client.BatchV1Api()
try:
v1.create_namespaced_job(namespace, body, pretty=True)
except ApiException as e:
print("Exception when calling BatchV1Api->create_namespaced_job: %s\n" % e)
return
if __name__ == '__main__':
app.run('0.0.0.0', port=5001, debug=True)
Elasticsearch
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromlangchain_google_vertexaiimport ChatVertexAI
fromlangchain.promptsimport ChatPromptTemplate
fromlangchain_google_vertexaiimport VertexAIEmbeddings
fromlangchain.memoryimport ConversationBufferWindowMemory
fromelasticsearchimport Elasticsearch
fromlangchain_community.vectorstores.elasticsearchimport ElasticsearchStore
importstreamlitasst
importos
vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
("human", """
The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
Base your response on the provided text context and the current conversation history to answer the query.
Select the most relevant information from the context.
Generate a draft response using the selected information. Remove duplicate content from the draft response.
Generate your final response after adjusting it to increase accuracy and relevance.
Now only show your final response!
If you do not know the answer or context is not relevant, response with "I don't know".
text_context:
{context}
conversation_history:
{history}
query:
{query}
"""),
]
)
embedding_model = VertexAIEmbeddings("text-embedding-005")
client = Elasticsearch(
[os.getenv("ES_URL")],
verify_certs=False,
ssl_show_warn=False,
basic_auth=("elastic", os.getenv("PASSWORD"))
)
vector_search = ElasticsearchStore(
index_name=os.getenv("INDEX_NAME"),
es_connection=client,
embedding=embedding_model
)
defformat_docs(docs):
return "\n\n".join([d.page_content for d in docs])
st.title("🤖 Chatbot")
if "messages" not in st.session_state:
st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
if "memory" not in st.session_state:
st.session_state["memory"] = ConversationBufferWindowMemory(
memory_key="history",
ai_prefix="Bot",
human_prefix="User",
k=3,
)
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
if chat_input := st.chat_input():
with st.chat_message("human"):
st.write(chat_input)
st.session_state.messages.append({"role": "human", "content": chat_input})
found_docs = vector_search.similarity_search(chat_input)
context = format_docs(found_docs)
prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
with st.chat_message("ai"):
with st.spinner("Typing..."):
content = ""
with st.empty():
for chunk in vertexAI.stream(prompt_value):
content += chunk.content
st.write(content)
st.session_state.messages.append({"role": "ai", "content": content})
st.session_state.memory.save_context({"input": chat_input}, {"output": content})
PGVector
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromlangchain_google_vertexaiimport ChatVertexAI
fromlangchain.promptsimport ChatPromptTemplate
fromlangchain_google_vertexaiimport VertexAIEmbeddings
fromlangchain.memoryimport ConversationBufferWindowMemory
fromlangchain_community.vectorstores.pgvectorimport PGVector
importstreamlitasst
importos
vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
("human", """
The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
Base your response on the provided text context and the current conversation history to answer the query.
Select the most relevant information from the context.
Generate a draft response using the selected information. Remove duplicate content from the draft response.
Generate your final response after adjusting it to increase accuracy and relevance.
Now only show your final response!
If you do not know the answer or context is not relevant, response with "I don't know".
text_context:
{context}
conversation_history:
{history}
query:
{query}
"""),
]
)
embedding_model = VertexAIEmbeddings("text-embedding-005")
CONNECTION_STRING = PGVector.connection_string_from_db_params(
driver="psycopg2",
host=os.environ.get("POSTGRES_HOST"),
port=5432,
database=os.environ.get("DATABASE_NAME"),
user=os.environ.get("USERNAME"),
password=os.environ.get("PASSWORD"),
)
COLLECTION_NAME = os.environ.get("COLLECTION_NAME"),
vector_search = PGVector(
collection_name=COLLECTION_NAME,
connection_string=CONNECTION_STRING,
embedding_function=embedding_model,
)
defformat_docs(docs):
return "\n\n".join([d.page_content for d in docs])
st.title("🤖 Chatbot")
if "messages" not in st.session_state:
st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
if "memory" not in st.session_state:
st.session_state["memory"] = ConversationBufferWindowMemory(
memory_key="history",
ai_prefix="Bot",
human_prefix="User",
k=3,
)
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
if chat_input := st.chat_input():
with st.chat_message("human"):
st.write(chat_input)
st.session_state.messages.append({"role": "human", "content": chat_input})
found_docs = vector_search.similarity_search(chat_input)
context = format_docs(found_docs)
prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
with st.chat_message("ai"):
with st.spinner("Typing..."):
content = ""
with st.empty():
for chunk in vertexAI.stream(prompt_value):
content += chunk.content
st.write(content)
st.session_state.messages.append({"role": "ai", "content": content})
st.session_state.memory.save_context({"input": chat_input}, {"output": content})
Weaviate
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
fromlangchain_google_vertexaiimport ChatVertexAI
fromlangchain.promptsimport ChatPromptTemplate
fromlangchain_google_vertexaiimport VertexAIEmbeddings
fromlangchain.memoryimport ConversationBufferWindowMemory
importweaviate
fromweaviate.connectimport ConnectionParams
fromlangchain_weaviate.vectorstoresimport WeaviateVectorStore
importstreamlitasst
importos
vertexAI = ChatVertexAI(model_name=os.getenv("VERTEX_AI_MODEL_NAME", "gemini-2.5-flash-preview-04-17"), streaming=True, convert_system_message_to_human=True)
prompt_template = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful assistant who helps in finding answers to questions using the provided context."),
("human", """
The answer should be based on the text context given in "text_context" and the conversation history given in "conversation_history" along with its Caption: \n
Base your response on the provided text context and the current conversation history to answer the query.
Select the most relevant information from the context.
Generate a draft response using the selected information. Remove duplicate content from the draft response.
Generate your final response after adjusting it to increase accuracy and relevance.
Now only show your final response!
If you do not know the answer or context is not relevant, response with "I don't know".
text_context:
{context}
conversation_history:
{history}
query:
{query}
"""),
]
)
embedding_model = VertexAIEmbeddings("text-embedding-005")
auth_config = weaviate.auth.AuthApiKey(api_key=os.getenv("APIKEY"))
client = weaviate.WeaviateClient(
connection_params=ConnectionParams.from_params(
http_host=os.getenv("WEAVIATE_ENDPOINT"),
http_port="80",
http_secure=False,
grpc_host=os.getenv("WEAVIATE_GRPC_ENDPOINT"),
grpc_port="50051",
grpc_secure=False,
),
auth_client_secret=auth_config
)
client.connect()
vector_search = WeaviateVectorStore.from_documents([],embedding_model,client=client, index_name="trainingdocs")
defformat_docs(docs):
return "\n\n".join([d.page_content for d in docs])
st.title("🤖 Chatbot")
if "messages" not in st.session_state:
st.session_state["messages"] = [{"role": "ai", "content": "How can I help you?"}]
if "memory" not in st.session_state:
st.session_state["memory"] = ConversationBufferWindowMemory(
memory_key="history",
ai_prefix="Bot",
human_prefix="User",
k=3,
)
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.write(message["content"])
if chat_input := st.chat_input():
with st.chat_message("human"):
st.write(chat_input)
st.session_state.messages.append({"role": "human", "content": chat_input})
found_docs = vector_search.similarity_search(chat_input)
context = format_docs(found_docs)
prompt_value = prompt_template.format_messages(name="Bot", query=chat_input, context=context, history=st.session_state.memory.load_memory_variables({}))
with st.chat_message("ai"):
with st.spinner("Typing..."):
content = ""
with st.empty():
for chunk in vertexAI.stream(prompt_value):
content += chunk.content
st.write(content)
st.session_state.messages.append({"role": "ai", "content": content})
st.session_state.memory.save_context({"input": chat_input}, {"output": content})
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.
Delete the project
The easiest way to avoid billing is to delete the project you created for this tutorial.
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
If you deleted the project, your clean up is complete. If you didn't delete the project, proceed to delete the individual resources.
Delete individual resources
Delete the Artifact Registry repository:
gcloudartifactsrepositoriesdelete${KUBERNETES_CLUSTER_PREFIX}-images\ --location=${REGION}\ --asyncWhen prompted, type
y.Delete the Cloud Storage bucket and the Eventarc trigger:
exportGOOGLE_OAUTH_ACCESS_TOKEN=$(gcloudauthprint-access-token) terraform-chdir=vector-database/terraform/cloud-storagedestroy\ -varproject_id=${PROJECT_ID}\ -varregion=${REGION}\ -varcluster_prefix=${KUBERNETES_CLUSTER_PREFIX}\ -vardb_namespace=${DB_NAMESPACE}When prompted, type
yes.Eventarc requires that you have a valid endpoint target both during creation and during deletion.
What's next
- Learn about the best practices for deploying databases on GKE.
- Discover solutions for running data-intensive workloads with GKE.