-2

I am trying to create a Pub/Sub topic and a log sink in Google Cloud. The idea is to create a log sink and assign the role pubsub.publisher to the log sink's service account for the Pub/Sub topic created. The code works fine when run locally, but it fails when bundled into a Docker container. Below is the relevant code snippet and the error message I am encountering.

(deploy-functions) kernal42@space42 KT % podman run --rm -v /Users/kernal42/Downloads/my-project-anp-c94dc8530044.json:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
INFO:root:Creating topic: projects/my-project-anp/topics/kt-test-local-15
INFO:root:Topic created: projects/my-project-anp/topics/kt-test-local-15
ERROR:root:Sink not found: projects/my-project-anp/sinks/kt-test-local-15
Traceback (most recent call last):
 File "/app/main.py", line 41, in setup_pubsub_logsink
 existing_sink = config_client.get_sink(request={"sink_name": sink_path})
 File "/usr/local/lib/python3.10/site-packages/google/cloud/logging_v2/services/config_service_v2/client.py", line 2199, in get_sink
 response = rpc(
 File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
 return wrapped_func(*args, **kwargs)
 File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
 return retry_target(
 File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
 _retry_error_helper(
 File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
 raise final_exc from source_exc
 File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
 result = target()
 File "/usr/local/lib/python3.10/site-packages/google/api_core/timeout.py", line 130, in func_with_timeout
 return func(*args, **kwargs)
 File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
 raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.NotFound: 404 Sink kt-test-local-15 does not exist
INFO:root:Creating sink: kt-test-local-15
INFO:root:Created new sink with writer identity: serviceAccount:[email protected]
INFO:root:getting the iam policy for the sink: projects/my-project-anp/sinks/kt-test-local-15
ERROR:root:Error setting up pubsub and logsink: 403 User not authorized to perform this action.
Traceback (most recent call last):
 File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 76, in error_remapped_callable
 return callable_(*args, **kwargs)
 File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1181, in __call__
 return _end_unary_response_blocking(state, call, False, None)
 File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
 raise _InactiveRpcError(state) # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
 status = StatusCode.PERMISSION_DENIED
 details = "User not authorized to perform this action."
 debug_error_string = "UNKNOWN:Error received from peer ipv4:142.250.193.202:443 {grpc_message:"User not authorized to perform this action.", grpc_status:7, created_time:"2025年02月15日T05:19:05.783737357+00:00"}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
 File "/app/main.py", line 98, in <module>
 setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
 File "/app/main.py", line 68, in setup_pubsub_logsink
 policy = publisher.get_iam_policy(request={"resource": topic_path})
 File "/usr/local/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 1940, in get_iam_policy
 response = rpc(
 File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
 return wrapped_func(*args, **kwargs)
 File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
 raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.PermissionDenied: 403 User not authorized to perform this action.
(deploy-functions) kernal42@space42 KT % 

Below is the code I have for above.

main.py

from google.cloud import pubsub_v1
from google.cloud.logging_v2.services.config_service_v2 import ConfigServiceV2Client
from google.cloud import logging_v2
from google.oauth2 import service_account
import logging
def setup_pubsub_logsink(project_id, topic_name, sink_name):
 try:
 # Initialize credentials
 credentials = service_account.Credentials.from_service_account_file(
 '/app/key.json'
 )
 
 # Initialize clients with explicit credentials
 publisher = pubsub_v1.PublisherClient(credentials=credentials)
 logging_client = logging_v2.Client(credentials=credentials)
 config_client = ConfigServiceV2Client(credentials=credentials)
 
 # Create topic path
 topic_path = publisher.topic_path(project_id, topic_name)
 
 # Create topic
 logging.info(f"Creating topic: {topic_path}")
 try:
 topic = publisher.create_topic(request={"name": topic_path})
 logging.info(f"Topic created: {topic.name}")
 except Exception as e:
 if "AlreadyExists" in str(e):
 logging.info(f"Topic already exists: {topic_path}")
 else:
 logging.info(f"Topic already exists: {topic_path}")
 # raise
 
 # Create sink
 sink_path = f"projects/{project_id}/sinks/{sink_name}"
 writer_identity = None
 
 # Check if sink exists
 try:
 existing_sink = config_client.get_sink(request={"sink_name": sink_path})
 writer_identity = existing_sink.writer_identity
 logging.info(f"Sink already exists with writer identity: {writer_identity}")
 except Exception as e:
 if "NotFound" not in str(e):
 # raise
 logging.exception(f"Sink not found: {sink_path}")
 
 # Create new sink if it doesn't exist
 logging.info(f"Creating sink: {sink_name}")
 sink = {
 "name": sink_name,
 "destination": f"pubsub.googleapis.com/{topic_path}",
 "filter": "severity >= WARNING" # Adjust filter as needed
 }
 
 new_sink = config_client.create_sink(
 request={
 "parent": f"projects/{project_id}",
 "sink": sink,
 }
 )
 writer_identity = new_sink.writer_identity
 logging.info(f"Created new sink with writer identity: {writer_identity}")
 
 logging.info(f"getting the iam policy for the sink: {sink_path}")
 # Grant publisher permissions to the sink's writer identity
 policy = publisher.get_iam_policy(request={"resource": topic_path})
 policy.bindings.append({
 "role": "roles/pubsub.publisher",
 "members": [writer_identity]
 })
 logging.info(f"iam policy for the sink: {policy}")
 
 publisher.set_iam_policy(request={
 "resource": topic_path,
 "policy": policy
 })
 
 logging.info("Successfully set up topic and sink with proper permissions")
 
 except Exception as e:
 logging.error(f"Error setting up pubsub and logsink: {str(e)}")
 raise
if __name__ == "__main__":
 logging.basicConfig(level=logging.INFO)
 
 PROJECT_ID = "<your-project-id>"
 TOPIC_NAME = "kt-test-local-15"
 SINK_NAME = "kt-test-local-15"
 
 setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)

Ensure you replace the project ID with the correct ID in above code snippet.

Dockerfile

# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# Install required packages
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy your service account key and code
COPY main.py .
# Note: key.json should be mounted at runtime, not built into the image
CMD ["python", "main.py"]

requirements.txt

google-cloud-pubsub
google-cloud-logging

Additional Information:

  • The service account key (key.json) is securely mounted at runtime and not included in the Docker image.
  • The code works perfectly when run locally but fails when run inside the Docker container.

(How I am mounting the Service Account key at runtime and how )we can reproduce using Docker/podman with the following commands:

  1. Build the image: podman build -f Dockerfile -t rtsink:latest .
  2. Run the container: podman run --rm -v <complete-path-to-your-sa-account-key.json>:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
asked Feb 14, 2025 at 14:39
7
  • Please include details as to how you're mounting the Service Account key at runtime because the evidence (PermissionDenied) suggests that you're doing this incorrectly. Commented Feb 14, 2025 at 16:43
  • Added the details in the query description on how I am mounting the Service Account key at runtime and how we can reproduce using Docker/podman. Commented Feb 14, 2025 at 19:05
  • 1
    Not only do you want to mount the Service Account key but you must also add it the environment variable used by ADC --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json Commented Feb 14, 2025 at 20:02
  • credentials = service_account.Credentials.from_service_account_file( '/app/key.json' ) This line is from the main.py where we are using the credentials explicitly and not implicitly (via env=GOOGLE_APPLICATION_CREDENTIALS) hence adding/mounting this env is not needed. Commented Feb 15, 2025 at 9:13
  • I've updated my query description with the full error logs. The logs indicate that the topics and sink are being created and, when we try to retrieve the IAM permissions (using the function get_iam_policy), we encounter a 403 error. Some of the cloud services being created indicate that there is no problem accessing the SA Account secret by the code/pod at runtime but we are seeing GRPC-related errors while retrieving the IAM policy for the Pub/Sub topic Commented Feb 15, 2025 at 9:14

2 Answers 2

1

I'm not convinced that the code works outside of a container.

I repro'd your issue and received an error:

Expected a message object, but got {'role': 'roles/pubsub.publisher', 'members': ['serviceAccount:[email protected]']}

Using the IAM library's protobuf classes (iam_policy_pb2.SetIamPolicyRequest,policy_pb2.Binding) appears (!?) to work correctly:

from google.cloud import pubsub_v1
from google.cloud.logging_v2.services.config_service_v2 import ConfigServiceV2Client
from google.cloud import logging_v2
from google.oauth2 import service_account
from google.iam.v1 import iam_policy_pb2,policy_pb2
import logging
import os
def setup_pubsub_logsink(project_id, topic_name, sink_name):
 try: 
 # Initialize clients with explicit credentials
 publisher = pubsub_v1.PublisherClient()
 logging_client = logging_v2.Client()
 config_client = ConfigServiceV2Client()
 
 # Create topic path
 topic_path = publisher.topic_path(project_id, topic_name)
 
 # Create topic
 logging.info(f"Creating topic: {topic_path}")
 try:
 topic = publisher.create_topic(request={"name": topic_path})
 logging.info(f"Topic created: {topic.name}")
 except Exception as e:
 if "AlreadyExists" in str(e):
 logging.info(f"Topic already exists: {topic_path}")
 else:
 logging.info(f"Topic already exists: {topic_path}")
 # raise
 
 # Create sink
 sink_path = f"projects/{project_id}/sinks/{sink_name}"
 writer_identity = None
 
 # Check if sink exists
 try:
 existing_sink = config_client.get_sink(request={"sink_name": sink_path})
 writer_identity = existing_sink.writer_identity
 logging.info(f"Sink already exists with writer identity: {writer_identity}")
 except Exception as e:
 if "NotFound" not in str(e):
 # raise
 logging.exception(f"Sink not found: {sink_path}")
 
 # Create new sink if it doesn't exist
 logging.info(f"Creating sink: {sink_name}")
 sink = {
 "name": sink_name,
 "destination": f"pubsub.googleapis.com/{topic_path}",
 "filter": "severity >= WARNING" # Adjust filter as needed
 }
 
 new_sink = config_client.create_sink(
 request={
 "parent": f"projects/{project_id}",
 "sink": sink,
 }
 )
 writer_identity = new_sink.writer_identity
 logging.info(f"Created new sink with writer identity: {writer_identity}")
 
 logging.info(f"getting the iam policy for the sink: {sink_path}")
 # Grant publisher permissions to the sink's writer identity
 policy = publisher.get_iam_policy(request={
 "resource": topic_path,
 })
 binding = policy_pb2.Binding(
 role="roles/pubsub.publisher",
 members=[writer_identity]
 )
 policy.bindings.append(binding)
 logging.info(f"iam policy for the sink: {policy}")
 
 request = iam_policy_pb2.SetIamPolicyRequest(
 resource=topic_path,
 policy=policy,
 )
 publisher.set_iam_policy(request=request)
 
 logging.info("Successfully set up topic and sink with proper permissions")
 
 except Exception as e:
 logging.error(f"Error setting up pubsub and logsink: {str(e)}")
 raise
if __name__ == "__main__":
 logging.basicConfig(level=logging.INFO)
 
 PROJECT_ID = os.getenv("PROJECT")
 TOPIC_NAME = os.getenv("TOPIC")
 SINK_NAME = os.getenv("SINK")
 
 setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
answered Feb 15, 2025 at 18:40
Sign up to request clarification or add additional context in comments.

Comments

-1

Based on the error the identity used to run this code does not have permissions to grant access to the sink. Review how do you provide credentials inside your container and make sure that the identity you use have proper permissions. If you provide the same permissions when the code run outside and inside the container, I suggest to print out the identity to make sure that your assumption is true.

answered Jun 17, 2025 at 22:12

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.