
In any microservices architecture, logs are your lifeline. But as your system grows, sifting through mountains of plain text from dozens of services becomes a nightmare. How do you trace a single user's request as it hops from a public-facing API, to a user service, into a message queue, and finally through a background worker?
The answer is structured, correlated logging.
This article provides a complete guide to building a production-grade logging utility in Python. This pattern will transform your logs from a chaotic stream of text into a rich, queryable, and cost-effective dataset in Google Cloud Logging.
The Goal: A Perfect Log Entry
Our goal is to enrich every log message with a consistent structure that Google Cloud understands. This enables powerful features like request tracing and source code linking automatically.
A perfect log entry has a clear message, a proper severity level, a link to the code that generated it, and, most importantly, a correlation_id
that ties it to a single, end-to-end transaction.
1{
2 "message": "User profile updated",
3 "correlation_id": "c38047e6-acb3-4593-92c6-734863328805",
4 "severity": "INFO",
5 "logging.googleapis.com/sourceLocation": {
6 "file": "/app/services/user_service.py",
7 "line": "82",
8 "function": "update_user"
9 },
10 "logging.googleapis.com/trace": "projects/your-project/traces/..."
11}
The Tools
We’ll use two key Python libraries:
python-json-logger
: To easily format our logs as JSON.contextvars
: To safely carry ourcorrelation_id
through asynchronous code without having to pass it as a parameter to every function.
Step 1: Create the Reusable Logging Utility
The heart of our system is a reusable utils/logger.py
file that can be dropped into any microservice. It contains a custom formatter to structure the JSON and a context filter to automatically inject shared data.
Here is the complete utility:
1# in utils/logger.py
2import logging
3import os
4import sys
5from contextvars import ContextVar
6from typing import Union
7from pythonjsonlogger import jsonlogger
8PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
9# Context variables hold request-specific data for automatic injection.
10trace_context_var: ContextVar[Union[str, None]] = ContextVar("trace_context", default=None)
11correlation_id_var: ContextVar[Union[str, None]] = ContextVar("correlation_id", default=None)
12class GoogleCloudFormatter(jsonlogger.JsonFormatter):
13 """A custom formatter to structure logs for Google Cloud."""
14 def add_fields(self, log_record, record, message_dict):
15 super().add_fields(log_record, record, message_dict)
16 log_record['severity'] = record.levelname
17 log_record['logging.googleapis.com/sourceLocation'] = {
18 "file": record.pathname, "line": record.lineno, "function": record.funcName,
19 }
20 if hasattr(record, 'trace') and record.trace:
21 log_record['logging.googleapis.com/trace'] = record.trace
22 del log_record['trace']
23 if hasattr(record, 'correlation_id'):
24 log_record['correlation_id'] = record.correlation_id
25class ContextFilter(logging.Filter):
26 """A filter that injects context from context variables into the log record."""
27 def filter(self, record):
28 trace_context = trace_context_var.get()
29 if trace_context and PROJECT_ID:
30 try:
31 trace_id = trace_context.split('/')[0]
32 record.trace = f"projects/{PROJECT_ID}/traces/{trace_id}"
33 except (IndexError, TypeError):
34 record.trace = None
35 record.correlation_id = correlation_id_var.get()
36 return True
37def setup_logger(name="your-service-name"):
38 """Configures a JSON logger optimized for Google Cloud."""
39 logger = logging.getLogger(name)
40 logger.setLevel(os.environ.get("LOG_LEVEL", "INFO").upper())
41 logger.propagate = False # Prevents duplicate logs with Gunicorn/Uvicorn
42 if logger.hasHandlers():
43 logger.handlers.clear()
44 handler = logging.StreamHandler(sys.stdout)
45 handler.addFilter(ContextFilter())
46 formatter = GoogleCloudFormatter('%(message)s')
47 handler.setFormatter(formatter)
48 logger.addHandler(handler)
49 return logger
Step 2: Integrate with Your Web Service
In your main application file (e.g., main.py
), call setup_logger
and create a middleware to populate the context variables for each request. This middleware is our "context manager". it runs at the start of every request to establish the correlation_id
and resets it at the end.
Here’s an example for a FastAPI application:
1# in main.py
2from fastapi import FastAPI, Request
3from app.utils.logger import setup_logger, trace_context_var, correlation_id_var
4import uuid
5logger = setup_logger("my-api-service")
6app = FastAPI()
7@app.middleware("http")
8async def context_middleware(request: Request, call_next):
9 # Get correlation ID from header or generate a new one
10 correlation_id = request.headers.get("x-correlation-id") or str(uuid.uuid4())
11 correlation_token = correlation_id_var.set(correlation_id)
12 trace_header = request.headers.get("x-cloud-trace-context")
13 trace_token = trace_context_var.set(trace_header)
14 logger.info("Request received")
15 try:
16 response = await call_next(request)
17 logger.info("Request completed")
18 return response
19 finally:
20 # Reset context to prevent data from leaking between requests
21 trace_context_var.reset(trace_token)
22 correlation_id_var.reset(correlation_token)
Step 3: Propagate the correlation_id
The final step is to pass the correlation_id
between your services. Think of it as a baton in a relay race.
API-to-API Calls
When one service calls another, read the ID from the context and pass it in a header.
1# In an upstream service
2import requests
3from app.utils.logger import correlation_id_var
4
5def call_downstream_service():
6 correlation_id = correlation_id_var.get()
7 headers = {"x-correlation-id": correlation_id}
8 response = requests.get("http://downstream-service/api/data", headers=headers)
9 # ...
API-to-Background Worker (via Message Queue)
When publishing a message to a queue like Pub/Sub or RabbitMQ, add the correlation_id
to the message payload.
1# In your API service
2from app.utils.logger import correlation_id_var
3
4def publish_task(task_data: dict):
5 correlation_id = correlation_id_var.get()
6
7 message_payload = {
8 "correlation_id": correlation_id,
9 "task_data": task_data
10 }
11 # publisher.publish(json.dumps(message_payload))
Your background worker will then read this ID from the message, set it in its own contextvar
, and all its logs will be automatically correlated.
Final Thoughts: Managing Log Volume
This detailed logging is perfect for debugging, but it can be too noisy and expensive for production. A good practice is to use log levels effectively.
- Change granular, step-by-step logs from
logger.info
tologger.debug
. - Run your service in production with the
LOG_LEVEL
environment variable set toINFO
. - If you need to debug an issue, you can change the log level to
DEBUG
to temporarily get the full, detailed trace without redeploying your code.
By following these steps, you can create a powerful, scalable, and cost-effective logging system that will be the cornerstone of your microservices observability strategy.