BigQuery Callback Handler
CommunityPythonPreview
Google BigQuery is a serverless and cost-effective enterprise data warehouse that works across clouds and scales with your data.
The BigQueryCallbackHandler allows you to log events from LangChain to Google BigQuery. This is useful for monitoring, auditing, and analyzing the performance of your LLM applications.
!!! example “Preview release”
This feature is in Preview. APIs and functionality are subject to change.
!!! warning “BigQuery Storage Write API”
This feature uses the BigQuery Storage Write API, which is a paid service.
For information on costs, see the
BigQuery documentation.
Use cases
- Agent workflow debugging and analysis: Capture a wide range of LangChain/LangGraph lifecycle events (LLM calls, tool usage) and agent-yielded events (user input, model responses), into a well-defined schema.
- High-volume analysis and debugging: Logging operations are performed asynchronously using the Storage Write API to allow high throughput and low latency.
- Multimodal Analysis: Log and analyze text, images, and other modalities. Large files are offloaded to GCS, making them accessible to BigQuery ML via Object Tables.
- Distributed Tracing: Uses LangChain’s native tracing identifiers (run_id, parent_run_id) to visualize agent execution flows.
The agent event data recorded varies based on the LangChain/LangGraph event type. For more
information, see Event types and payloads.
Installation
You need to install langchain-google-community with bigquery extra dependencies. For this example, you will also need langchain-google-genai and langgraph.
pip install "langchain-google-community[bigquery]" langchain langchain-google-genai langgraph
Prerequisites
- Google Cloud Project with the BigQuery API enabled.
- BigQuery Dataset: Create a dataset to store logging tables before using the callback handler. The callback handler automatically creates the necessary events table within the dataset if the table does not exist.
- Google Cloud Storage Bucket (Optional): If you plan to log multimodal content (images, audio, etc.), creating a GCS bucket is recommended for offloading large files.
- Authentication:
- Local: Run
gcloud auth application-default login.
- Cloud: Ensure your service account has the required permissions.
IAM Permissions
For the callback handler to work properly, the principal (e.g., service account, user account) under which the application is running needs these Google Cloud roles:
roles/bigquery.jobUser at Project Level to run BigQuery queries.
roles/bigquery.dataEditor at Table Level to write log/event data.
- If using GCS offloading:
roles/storage.objectCreator and roles/storage.objectViewer on the target bucket.
Use with LangChain Agent
To use the AsyncBigQueryCallbackHandler, you need to instantiate it with your Google Cloud project ID, dataset ID, and table ID.
If you want to log session_id, user_id, and agent fields to BigQuery, you must pass them via the metadata dictionary in the config object when invoking the chain or agent.
import asyncio
import os
from datetime import datetime
from langchain.tools import tool
from langchain_google_community.callbacks.bigquery_callback import AsyncBigQueryCallbackHandler
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.prebuilt import create_react_agent
# 1. Define some tools for the agent
@tool
def get_current_time():
"""Returns the current local time."""
return datetime.now().strftime("%H:%M:%S")
@tool
def get_weather(city: str):
"""Returns the current weather for a specific city."""
return f"The weather in {city} is sunny and 25°C."
async def run_example_async_agent(bq_project_id: str):
"""Runs an asynchronous Agent with BigQuery logging."""
# Setup BigQuery logging details
dataset_id = "your_dataset_id"
table_id = "your_table_id"
print(f"--- Starting Async Agent Example ---")
print(f"Logging to: {bq_project_id}.{dataset_id}.{table_id}")
# Initialize the async callback handler
bigquery_handler = AsyncBigQueryCallbackHandler(
project_id=bq_project_id, dataset_id=dataset_id, table_id=table_id
)
try:
# Setup LLM and Tools
llm = ChatGoogleGenerativeAI(model="gemini-2.5-pro")
tools = [get_current_time, get_weather]
# Create the Agent (LangGraph ReAct implementation)
agent_executor = create_react_agent(llm, tools)
query = "What time is it now, and what is the weather like in New York?"
print(f"User Query: {query}")
# Run the agent asynchronously
# We use astream to process chunks, but you can also use ainvoke
async for chunk in agent_executor.astream(
{"messages": [("user", query)]},
config={
"callbacks": [bigquery_handler], # Pass the handler here
"metadata": {
"session_id": "agent-session-001",
"user_id": "user-123",
"agent": "weather-agent",
},
},
):
# Print agent thoughts/actions as they happen
if "agent" in chunk:
print(f"🤖 Agent: {chunk['agent']['messages'][0].content}")
elif "tools" in chunk:
print(f"🛠️ Tool Output: {chunk['tools']['messages'][0].content}")
print("✅ Agent finished. Logs are being written in the background...")
finally:
# Ensure resources are cleaned up
await bigquery_handler.close()
if __name__ == "__main__":
# Ensure GOOGLE_API_KEY is set in your environment
if "GOOGLE_API_KEY" not in os.environ:
raise ValueError("Please set the GOOGLE_API_KEY environment variable.")
project_id = "your-project-id"
asyncio.run(run_example_async_agent(project_id))
Configuration options
You can customize the callback handler using BigQueryLoggerConfig.
To disable the handler from logging data to the BigQuery table, set this parameter to False.
clustering_fields
List[str]
default:"[\"event_type\", \"agent\", \"user_id\"]"
The fields used to cluster the BigQuery table when it is automatically created.
The name of the GCS bucket to offload large content (images, blobs, large text) to. If not provided, large content may be truncated or replaced with placeholders.
The BigQuery connection ID (e.g., us.my-connection) to use as the authorizer for ObjectRef columns. Required for using ObjectRef with BigQuery ML.
(500 KB) The maximum length (in characters) of text content to store inline in BigQuery before offloading to GCS (if configured) or truncating.
The number of events to batch before writing to BigQuery.
The maximum time (in seconds) to wait before flushing a partial batch.
Seconds to wait for logs to flush during shutdown.
A list of event types to log. If None, all events are logged except those in event_denylist.
A list of event types to skip logging.
Whether to log detailed content parts (including GCS references).
table_id
str
default:"agent_events_v2"
The default table ID to use if not explicitly provided to the callback handler constructor.
retry_config
RetryConfig
default:"RetryConfig()"
Configuration for retry logic (max retries, delay, multiplier) when writing to BigQuery fails.
The maximum number of events to hold in the internal buffer queue before dropping new events.
The following code sample shows how to define a configuration for the BigQuery callback handler, including a custom content formatter:
import json
import re
from typing import Any
from langchain_google_community.callbacks.bigquery_callback import (
BigQueryCallbackHandler,
BigQueryLoggerConfig
)
def redact_dollar_amounts(event_content: Any) -> str:
"""
Custom formatter to redact dollar amounts (e.g., $600, $12.50)
and ensure JSON output if the input is a dict.
"""
text_content = ""
# If the content is a dictionary (e.g., a list of messages), convert it to a JSON string first.
if isinstance(event_content, dict):
text_content = json.dumps(event_content)
else:
text_content = str(event_content)
# Regex to find dollar amounts: $ followed by digits, optionally with commas or decimals.
# Examples: $600, $1,200.50, $0.99
redacted_content = re.sub(r'\$\d+(?:,\d{3})*(?:\.\d+)?', 'xxx', text_content)
return redacted_content
# 1. Configure BigQueryLoggerConfig
config = BigQueryLoggerConfig(
enabled=True,
event_allowlist=["LLM_REQUEST", "LLM_RESPONSE"], # Only log these specific events
shutdown_timeout=10.0, # Wait up to 10s for logs to flush on exit
max_content_length=500, # Truncate content to 500 characters
content_formatter=redact_dollar_amounts, # Set the custom formatting function
)
# 2. Initialize the Callback Handler
handler = BigQueryCallbackHandler(
project_id="your-project-id",
dataset_id="your_dataset",
table_id="your_table",
config=config
)
Schema and production setup
The plugin automatically creates the table if it does not exist. However, for production, we recommend creating the table manually using the following DDL, which utilizes the JSON type for flexibility and REPEATED RECORDs for multimodal content.
Recommended DDL:
CREATE TABLE `your-gcp-project-id.adk_agent_logs.agent_events_v2`
(
timestamp TIMESTAMP NOT NULL OPTIONS(description="The UTC timestamp when the event occurred."),
event_type STRING OPTIONS(description="The category of the event."),
agent STRING OPTIONS(description="The name of the agent."),
session_id STRING OPTIONS(description="A unique identifier for the conversation session."),
invocation_id STRING OPTIONS(description="A unique identifier for a single turn."),
user_id STRING OPTIONS(description="The identifier of the end-user."),
trace_id STRING OPTIONS(description="OpenTelemetry trace ID."),
span_id STRING OPTIONS(description="OpenTelemetry span ID."),
parent_span_id STRING OPTIONS(description="OpenTelemetry parent span ID."),
content JSON OPTIONS(description="The primary payload of the event."),
content_parts ARRAY<STRUCT<
mime_type STRING,
uri STRING,
object_ref STRUCT<
uri STRING,
version STRING,
authorizer STRING,
details JSON
>,
text STRING,
part_index INT64,
part_attributes STRING,
storage_mode STRING
>> OPTIONS(description="For multi-modal events, contains a list of content parts."),
attributes JSON OPTIONS(description="Arbitrary key-value pairs."),
latency_ms JSON OPTIONS(description="Latency measurements."),
status STRING OPTIONS(description="The outcome of the event."),
error_message STRING OPTIONS(description="Detailed error message."),
is_truncated BOOLEAN OPTIONS(description="Flag indicating if content was truncated.")
)
PARTITION BY DATE(timestamp)
CLUSTER BY event_type, agent, user_id;
Event types and payloads
The content column contains a JSON object specific to the event_type.
The content_parts column provides a structured view of the content, especially useful for images or offloaded data.
!!! note “Content Truncation”
- Variable content fields are truncated to
max_content_length (configured in BigQueryLoggerConfig, default 500KB).
- If
gcs_bucket_name is configured, large content is offloaded to GCS instead of being truncated, and a reference is stored in content_parts.object_ref.
LLM interactions
These events track the raw requests sent to and responses received from the LLM.
| Event Type | Content (JSON) Structure | Attributes (JSON) | Example Content (Simplified) |
|---|
LLM_REQUEST |
{
“messages”: [
{“content”: ”…”}
]
}
{
“prompts”: [”…”]
} | {
“tags”: [“tag1”],
“model”: “gemini-1.5-pro”
} | {
“messages”: [
{“content”: “What is the weather?”}
]
} |
LLM_RESPONSE | “The weather is sunny.”
(Stored as JSON string) | {
“usage”: {
“total_tokens”: 20
}
} | “The weather is sunny.” |
LLM_ERROR | null | {} | null (See error_message column) |
These events track the execution of tools by the agent.
| Event Type | Content (JSON) Structure | Attributes (JSON) |
|---|
TOOL_STARTING |
“city=‘Paris’” | {} |
TOOL_COMPLETED |
“25°C, Sunny” | {} |
TOOL_ERROR | ”Error: Connection timeout” | {} |
Chain Execution
These events track the start and end of high-level chains/graphs.
| Event Type | Content (JSON) Structure |
|---|
CHAIN_START | {
“messages”: […]
} |
CHAIN_END | {
“output”: ”…”
} |
CHAIN_ERROR | null (See error_message column) |
Retriever usage
These events track the execution of retrievers.
| Event Type | Content (JSON) Structure |
|---|
RETRIEVER_START |
“What is the capital of France?” |
RETRIEVER_END |
[
{
“page_content”: “Paris is the capital…”,
“metadata”: {“source”: “wiki”}
}
] |
RETRIEVER_ERROR | null (See error_message column) |
Agent Actions
These events track specific actions taken by the agent.
| Event Type | Content (JSON) Structure |
|---|
AGENT_ACTION | {
“tool”: “Calculator”,
“input”: “2 + 2”
} |
AGENT_FINISH | {
“output”: “The answer is 4”
} |
Other Events
| Event Type | Content (JSON) Structure |
|---|
TEXT |
“Some logging text…” |
Advanced analysis queries
Once your agent is running and logging events, you can perform power analysis on the agent_events_v2 table.
1. Reconstruct a Trace (Conversation Turn)
Use the trace_id to group all events (Chain, LLM, Tool) belonging to a single execution flow.
SELECT
timestamp,
event_type,
span_id,
parent_span_id,
-- Extract summary or specific content based on event type
COALESCE(
JSON_VALUE(content, '$.messages[0].content'),
JSON_VALUE(content, '$.summary'),
JSON_VALUE(content)
) AS summary,
JSON_VALUE(latency_ms, '$.total_ms') AS duration_ms
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE
-- Replace with a specific trace_id from your logs
trace_id = '019bb986-a0db-7da1-802d-2725795ab340'
ORDER BY
timestamp ASC;
2. Analyze LLM Latency & Token Usage
Calculate the average latency and total token usage for your LLM calls.
SELECT
JSON_VALUE(attributes, '$.model') AS model,
COUNT(*) AS total_calls,
AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency_ms,
SUM(CAST(JSON_VALUE(attributes, '$.usage.total_tokens') AS INT64)) AS total_tokens
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE
event_type = 'LLM_RESPONSE'
GROUP BY
1;
3. Analyze Multimodal Content with BigQuery Remote Model (Gemini)
If you are offloading images to GCS, you can use BigQuery ML to analyze them directly.
SELECT
logs.session_id,
-- Get a signed URL for the image (optional, for viewing)
STRING(OBJ.GET_ACCESS_URL(parts.object_ref, "r").access_urls.read_url) as signed_url,
-- Analyze the image using a remote model (e.g., gemini-1.5-pro)
AI.GENERATE(
('Describe this image briefly. What company logo?', parts.object_ref)
) AS generated_result
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2` logs,
UNNEST(logs.content_parts) AS parts
WHERE
parts.mime_type LIKE 'image/%'
ORDER BY logs.timestamp DESC
LIMIT 1;
4. Querying Offloaded Content (Get Signed URLs)
SELECT
timestamp,
event_type,
part.mime_type,
part.storage_mode,
part.object_ref.uri AS gcs_uri,
-- Generate a signed URL to read the content directly (requires connection_id configuration)
STRING(OBJ.GET_ACCESS_URL(part.object_ref, 'r').access_urls.read_url) AS signed_url
FROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`,
UNNEST(content_parts) AS part
WHERE part.storage_mode = 'GCS_REFERENCE'
ORDER BY timestamp DESC
LIMIT 10;
Additional resources