I am trying to create a Pub/Sub topic and a log sink in Google Cloud. The idea is to create a log sink and assign the role pubsub.publisher to the log sink's service account for the Pub/Sub topic created. The code works fine when run locally, but it fails when bundled into a Docker container. Below is the relevant code snippet and the error message I am encountering.
(deploy-functions) kernal42@space42 KT % podman run --rm -v /Users/kernal42/Downloads/my-project-anp-c94dc8530044.json:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
INFO:root:Creating topic: projects/my-project-anp/topics/kt-test-local-15
INFO:root:Topic created: projects/my-project-anp/topics/kt-test-local-15
ERROR:root:Sink not found: projects/my-project-anp/sinks/kt-test-local-15
Traceback (most recent call last):
File "/app/main.py", line 41, in setup_pubsub_logsink
existing_sink = config_client.get_sink(request={"sink_name": sink_path})
File "/usr/local/lib/python3.10/site-packages/google/cloud/logging_v2/services/config_service_v2/client.py", line 2199, in get_sink
response = rpc(
File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
return retry_target(
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
_retry_error_helper(
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
raise final_exc from source_exc
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
result = target()
File "/usr/local/lib/python3.10/site-packages/google/api_core/timeout.py", line 130, in func_with_timeout
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.NotFound: 404 Sink kt-test-local-15 does not exist
INFO:root:Creating sink: kt-test-local-15
INFO:root:Created new sink with writer identity: serviceAccount:[email protected]
INFO:root:getting the iam policy for the sink: projects/my-project-anp/sinks/kt-test-local-15
ERROR:root:Error setting up pubsub and logsink: 403 User not authorized to perform this action.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 76, in error_remapped_callable
return callable_(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1181, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.PERMISSION_DENIED
details = "User not authorized to perform this action."
debug_error_string = "UNKNOWN:Error received from peer ipv4:142.250.193.202:443 {grpc_message:"User not authorized to perform this action.", grpc_status:7, created_time:"2025年02月15日T05:19:05.783737357+00:00"}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/app/main.py", line 98, in <module>
setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
File "/app/main.py", line 68, in setup_pubsub_logsink
policy = publisher.get_iam_policy(request={"resource": topic_path})
File "/usr/local/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 1940, in get_iam_policy
response = rpc(
File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.PermissionDenied: 403 User not authorized to perform this action.
(deploy-functions) kernal42@space42 KT %
Below is the code I have for above.
main.py
from google.cloud import pubsub_v1
from google.cloud.logging_v2.services.config_service_v2 import ConfigServiceV2Client
from google.cloud import logging_v2
from google.oauth2 import service_account
import logging
def setup_pubsub_logsink(project_id, topic_name, sink_name):
try:
# Initialize credentials
credentials = service_account.Credentials.from_service_account_file(
'/app/key.json'
)
# Initialize clients with explicit credentials
publisher = pubsub_v1.PublisherClient(credentials=credentials)
logging_client = logging_v2.Client(credentials=credentials)
config_client = ConfigServiceV2Client(credentials=credentials)
# Create topic path
topic_path = publisher.topic_path(project_id, topic_name)
# Create topic
logging.info(f"Creating topic: {topic_path}")
try:
topic = publisher.create_topic(request={"name": topic_path})
logging.info(f"Topic created: {topic.name}")
except Exception as e:
if "AlreadyExists" in str(e):
logging.info(f"Topic already exists: {topic_path}")
else:
logging.info(f"Topic already exists: {topic_path}")
# raise
# Create sink
sink_path = f"projects/{project_id}/sinks/{sink_name}"
writer_identity = None
# Check if sink exists
try:
existing_sink = config_client.get_sink(request={"sink_name": sink_path})
writer_identity = existing_sink.writer_identity
logging.info(f"Sink already exists with writer identity: {writer_identity}")
except Exception as e:
if "NotFound" not in str(e):
# raise
logging.exception(f"Sink not found: {sink_path}")
# Create new sink if it doesn't exist
logging.info(f"Creating sink: {sink_name}")
sink = {
"name": sink_name,
"destination": f"pubsub.googleapis.com/{topic_path}",
"filter": "severity >= WARNING" # Adjust filter as needed
}
new_sink = config_client.create_sink(
request={
"parent": f"projects/{project_id}",
"sink": sink,
}
)
writer_identity = new_sink.writer_identity
logging.info(f"Created new sink with writer identity: {writer_identity}")
logging.info(f"getting the iam policy for the sink: {sink_path}")
# Grant publisher permissions to the sink's writer identity
policy = publisher.get_iam_policy(request={"resource": topic_path})
policy.bindings.append({
"role": "roles/pubsub.publisher",
"members": [writer_identity]
})
logging.info(f"iam policy for the sink: {policy}")
publisher.set_iam_policy(request={
"resource": topic_path,
"policy": policy
})
logging.info("Successfully set up topic and sink with proper permissions")
except Exception as e:
logging.error(f"Error setting up pubsub and logsink: {str(e)}")
raise
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
PROJECT_ID = "<your-project-id>"
TOPIC_NAME = "kt-test-local-15"
SINK_NAME = "kt-test-local-15"
setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
Ensure you replace the project ID with the correct ID in above code snippet.
Dockerfile
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# Install required packages
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy your service account key and code
COPY main.py .
# Note: key.json should be mounted at runtime, not built into the image
CMD ["python", "main.py"]
requirements.txt
google-cloud-pubsub
google-cloud-logging
Additional Information:
- The service account key (key.json) is securely mounted at runtime and not included in the Docker image.
- The code works perfectly when run locally but fails when run inside the Docker container.
(How I am mounting the Service Account key at runtime and how )we can reproduce using Docker/podman with the following commands:
- Build the image:
podman build -f Dockerfile -t rtsink:latest . - Run the container:
podman run --rm -v <complete-path-to-your-sa-account-key.json>:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
2 Answers 2
I'm not convinced that the code works outside of a container.
I repro'd your issue and received an error:
Expected a message object, but got {'role': 'roles/pubsub.publisher', 'members': ['serviceAccount:[email protected]']}
Using the IAM library's protobuf classes (iam_policy_pb2.SetIamPolicyRequest,policy_pb2.Binding) appears (!?) to work correctly:
from google.cloud import pubsub_v1
from google.cloud.logging_v2.services.config_service_v2 import ConfigServiceV2Client
from google.cloud import logging_v2
from google.oauth2 import service_account
from google.iam.v1 import iam_policy_pb2,policy_pb2
import logging
import os
def setup_pubsub_logsink(project_id, topic_name, sink_name):
try:
# Initialize clients with explicit credentials
publisher = pubsub_v1.PublisherClient()
logging_client = logging_v2.Client()
config_client = ConfigServiceV2Client()
# Create topic path
topic_path = publisher.topic_path(project_id, topic_name)
# Create topic
logging.info(f"Creating topic: {topic_path}")
try:
topic = publisher.create_topic(request={"name": topic_path})
logging.info(f"Topic created: {topic.name}")
except Exception as e:
if "AlreadyExists" in str(e):
logging.info(f"Topic already exists: {topic_path}")
else:
logging.info(f"Topic already exists: {topic_path}")
# raise
# Create sink
sink_path = f"projects/{project_id}/sinks/{sink_name}"
writer_identity = None
# Check if sink exists
try:
existing_sink = config_client.get_sink(request={"sink_name": sink_path})
writer_identity = existing_sink.writer_identity
logging.info(f"Sink already exists with writer identity: {writer_identity}")
except Exception as e:
if "NotFound" not in str(e):
# raise
logging.exception(f"Sink not found: {sink_path}")
# Create new sink if it doesn't exist
logging.info(f"Creating sink: {sink_name}")
sink = {
"name": sink_name,
"destination": f"pubsub.googleapis.com/{topic_path}",
"filter": "severity >= WARNING" # Adjust filter as needed
}
new_sink = config_client.create_sink(
request={
"parent": f"projects/{project_id}",
"sink": sink,
}
)
writer_identity = new_sink.writer_identity
logging.info(f"Created new sink with writer identity: {writer_identity}")
logging.info(f"getting the iam policy for the sink: {sink_path}")
# Grant publisher permissions to the sink's writer identity
policy = publisher.get_iam_policy(request={
"resource": topic_path,
})
binding = policy_pb2.Binding(
role="roles/pubsub.publisher",
members=[writer_identity]
)
policy.bindings.append(binding)
logging.info(f"iam policy for the sink: {policy}")
request = iam_policy_pb2.SetIamPolicyRequest(
resource=topic_path,
policy=policy,
)
publisher.set_iam_policy(request=request)
logging.info("Successfully set up topic and sink with proper permissions")
except Exception as e:
logging.error(f"Error setting up pubsub and logsink: {str(e)}")
raise
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
PROJECT_ID = os.getenv("PROJECT")
TOPIC_NAME = os.getenv("TOPIC")
SINK_NAME = os.getenv("SINK")
setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
Comments
Based on the error the identity used to run this code does not have permissions to grant access to the sink. Review how do you provide credentials inside your container and make sure that the identity you use have proper permissions. If you provide the same permissions when the code run outside and inside the container, I suggest to print out the identity to make sure that your assumption is true.
Comments
Explore related questions
See similar questions with these tags.
PermissionDenied) suggests that you're doing this incorrectly.--env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json