From Chaos to Clarity: A Guide to Structured Logging in Python on Google Cloud

Nathan Barrett

In any microservices architecture, logs are your lifeline. But as your system grows, sifting through mountains of plain text from dozens of services becomes a nightmare. How do you trace a single user's request as it hops from a public-facing API, to a user service, into a message queue, and finally through a background worker?

The answer is structured, correlated logging.

This article provides a complete guide to building a production-grade logging utility in Python. This pattern will transform your logs from a chaotic stream of text into a rich, queryable, and cost-effective dataset in Google Cloud Logging.

The Goal: A Perfect Log Entry

Our goal is to enrich every log message with a consistent structure that Google Cloud understands. This enables powerful features like request tracing and source code linking automatically.

A perfect log entry has a clear message, a proper severity level, a link to the code that generated it, and, most importantly, a correlation_id that ties it to a single, end-to-end transaction.

1{
2  "message": "User profile updated",
3  "correlation_id": "c38047e6-acb3-4593-92c6-734863328805",
4  "severity": "INFO",
5  "logging.googleapis.com/sourceLocation": {
6    "file": "/app/services/user_service.py",
7    "line": "82",
8    "function": "update_user"
9  },
10  "logging.googleapis.com/trace": "projects/your-project/traces/..."
11}

The Tools

We’ll use two key Python libraries:

  • python-json-logger: To easily format our logs as JSON.
  • contextvars: To safely carry our correlation_id through asynchronous code without having to pass it as a parameter to every function.

Step 1: Create the Reusable Logging Utility

The heart of our system is a reusable utils/logger.py file that can be dropped into any microservice. It contains a custom formatter to structure the JSON and a context filter to automatically inject shared data.

Here is the complete utility:

1# in utils/logger.py
2import logging
3import os
4import sys
5from contextvars import ContextVar
6from typing import Union
7from pythonjsonlogger import jsonlogger
8PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
9# Context variables hold request-specific data for automatic injection.
10trace_context_var: ContextVar[Union[str, None]] = ContextVar("trace_context", default=None)
11correlation_id_var: ContextVar[Union[str, None]] = ContextVar("correlation_id", default=None)
12class GoogleCloudFormatter(jsonlogger.JsonFormatter):
13    """A custom formatter to structure logs for Google Cloud."""
14    def add_fields(self, log_record, record, message_dict):
15        super().add_fields(log_record, record, message_dict)
16        log_record['severity'] = record.levelname
17        log_record['logging.googleapis.com/sourceLocation'] = {
18            "file": record.pathname, "line": record.lineno, "function": record.funcName,
19        }
20        if hasattr(record, 'trace') and record.trace:
21            log_record['logging.googleapis.com/trace'] = record.trace
22            del log_record['trace']
23        if hasattr(record, 'correlation_id'):
24            log_record['correlation_id'] = record.correlation_id
25class ContextFilter(logging.Filter):
26    """A filter that injects context from context variables into the log record."""
27    def filter(self, record):
28        trace_context = trace_context_var.get()
29        if trace_context and PROJECT_ID:
30            try:
31                trace_id = trace_context.split('/')[0]
32                record.trace = f"projects/{PROJECT_ID}/traces/{trace_id}"
33            except (IndexError, TypeError):
34                record.trace = None
35        record.correlation_id = correlation_id_var.get()
36        return True
37def setup_logger(name="your-service-name"):
38    """Configures a JSON logger optimized for Google Cloud."""
39    logger = logging.getLogger(name)
40    logger.setLevel(os.environ.get("LOG_LEVEL", "INFO").upper())
41    logger.propagate = False # Prevents duplicate logs with Gunicorn/Uvicorn
42    if logger.hasHandlers():
43        logger.handlers.clear()
44    handler = logging.StreamHandler(sys.stdout)
45    handler.addFilter(ContextFilter())
46    formatter = GoogleCloudFormatter('%(message)s')
47    handler.setFormatter(formatter)
48    logger.addHandler(handler)
49    return logger

Step 2: Integrate with Your Web Service

In your main application file (e.g., main.py), call setup_logger and create a middleware to populate the context variables for each request. This middleware is our "context manager". it runs at the start of every request to establish the correlation_id and resets it at the end.

Here’s an example for a FastAPI application:

1# in main.py
2from fastapi import FastAPI, Request
3from app.utils.logger import setup_logger, trace_context_var, correlation_id_var
4import uuid
5logger = setup_logger("my-api-service")
6app = FastAPI()
7@app.middleware("http")
8async def context_middleware(request: Request, call_next):
9    # Get correlation ID from header or generate a new one
10    correlation_id = request.headers.get("x-correlation-id") or str(uuid.uuid4())
11    correlation_token = correlation_id_var.set(correlation_id)
12    trace_header = request.headers.get("x-cloud-trace-context")
13    trace_token = trace_context_var.set(trace_header)
14    logger.info("Request received")
15    try:
16        response = await call_next(request)
17        logger.info("Request completed")
18        return response
19    finally:
20        # Reset context to prevent data from leaking between requests
21        trace_context_var.reset(trace_token)
22        correlation_id_var.reset(correlation_token)

Step 3: Propagate the correlation_id

The final step is to pass the correlation_id between your services. Think of it as a baton in a relay race.

API-to-API Calls

When one service calls another, read the ID from the context and pass it in a header.

1# In an upstream service
2import requests
3from app.utils.logger import correlation_id_var
4
5def call_downstream_service():
6    correlation_id = correlation_id_var.get()
7    headers = {"x-correlation-id": correlation_id}
8    response = requests.get("http://downstream-service/api/data", headers=headers)
9    # ...

API-to-Background Worker (via Message Queue)

When publishing a message to a queue like Pub/Sub or RabbitMQ, add the correlation_id to the message payload.

1# In your API service
2from app.utils.logger import correlation_id_var
3
4def publish_task(task_data: dict):
5    correlation_id = correlation_id_var.get()
6    
7    message_payload = {
8        "correlation_id": correlation_id,
9        "task_data": task_data
10    }
11    # publisher.publish(json.dumps(message_payload))

Your background worker will then read this ID from the message, set it in its own contextvar, and all its logs will be automatically correlated.

Final Thoughts: Managing Log Volume

This detailed logging is perfect for debugging, but it can be too noisy and expensive for production. A good practice is to use log levels effectively.

  • Change granular, step-by-step logs from logger.info to logger.debug.
  • Run your service in production with the LOG_LEVEL environment variable set to INFO.
  • If you need to debug an issue, you can change the log level to DEBUG to temporarily get the full, detailed trace without redeploying your code.

By following these steps, you can create a powerful, scalable, and cost-effective logging system that will be the cornerstone of your microservices observability strategy.

Nathan Barrett
Share this post

Let’s just have a chat and see where this goes.

Book a meeting