Data Analysis Agent
The narrative documentation for the data analysis package — covering the Data Analysis Agent and the two analysis algorithms (anomaly detection and Adtributor drill-down) — lives in the package README, included below:
Data Analysis Package (openchatbi.analysis)
This package provides OpenChatBI’s advanced data analysis capabilities: a specialized Data Analysis Agent that orchestrates analysis tools into multi-step workflows, plus the two analysis algorithms it relies on — time series anomaly detection and multi-dimensional anomaly drill-down (Adtributor).
Layout
openchatbi/analysis/
├── agent.py # Data Analysis Agent + the `data_analysis` delegation tool
├── anomaly_detection.py # Anomaly detection scoring algorithm (core)
├── adtributor.py # Adtributor root-cause / drill-down algorithm (core)
└── models.py # Pydantic output models for the Adtributor algorithm
openchatbi/tool/
├── anomaly_detection.py # `anomaly_detection` tool wrapper (LLM-facing)
└── adtributor_tool.py # `adtributor_drilldown` tool wrapper (LLM-facing)
openchatbi/prompts/
└── data_analysis_prompt.md # System prompt with the 5 workflow playbooks
The convention is: algorithm core lives in analysis/, the LLM-facing tool
wrapper lives in tool/, and the agent that orchestrates the tools lives in
analysis/agent.py.
1. Data Analysis Agent (agent.py)
A specialized sub-agent (built on the
deepagents framework) that the
main agent delegates complex analysis to via the data_analysis tool. Keeping
the orchestration inside a sub-agent isolates the analysis context and prevents
the main agent’s context from ballooning with intermediate tool output.
Public API
build_data_analysis_agent(sql_graph, sync_mode=False, llm_provider=None, checkpointer=None, memory_store=None)— assembles the tool set and compiles the agent graph.get_data_analysis_tool(...)— wraps the agent in adata_analysisStructuredTool(sync or async) that the main agent registers.
Tool set
Tool |
Source |
Notes |
|---|---|---|
|
main SQL generation subgraph |
reused |
|
|
requires forecast service |
|
|
requires forecast service |
|
|
no external service |
|
|
no external service |
timeseries_forecast and anomaly_detection share the forecasting service
health check and are registered together only when the service is healthy.
Supported scenarios (prompt-driven workflows)
Scenario |
Typical flow |
|---|---|
Single-metric trend forecasting |
|
Single-metric anomaly detection |
|
Single-metric anomaly drill-down |
|
Multi-metric correlation |
|
Business combination analysis |
|
LLM selection
The agent uses the optional analysis_llm configuration when present, otherwise
it falls back to default_llm (see openchatbi.llm.llm.get_analysis_llm). This
lets you point analysis at a stronger reasoning model without changing the main
agent’s model.
Sub-agent isolation
The agent is a separately compiled graph that may share the main agent’s
checkpointer. The data_analysis tool therefore derives an isolated child
config (_build_sub_agent_config):
a deterministic child
thread_id("{parent_thread_id}:data_analysis") so it satisfies LangGraph’s checkpointer requirement, keeps interrupt/resume stable, and never clobbers the main agent’s checkpoint thread;inherited
checkpoint_ns/checkpoint_idare cleared so the sub-agent starts from a clean namespace;other config (callbacks, tags, metadata) is propagated unchanged.
GraphInterrupt raised inside the sub-agent is re-raised (not swallowed) so
human-in-the-loop interrupts can bubble up to the main graph. Final output is
normalized to a string via _extract_final_content, including the case where
the model returns multimodal content blocks.
2. Anomaly Detection (anomaly_detection.py)
Scores how anomalous the most recent points of a single-metric time series are, by comparing them against a forecasting baseline. The forecast comes from the time series forecasting service; if it is unavailable, detection fails fast with an error in the details dict.
Output
evaluate_anomalies(...) returns (score, details) where score is in [0, 1]
(closer to 1 = more significant / higher-impact anomaly). format_anomaly_report
turns it into a human-readable report (used by the tool wrapper). Severity bands:
>0.8 Critical, >0.6 High, >0.4 Medium, else Low.
Scoring strategy
The score combines a set of intentionally orthogonal factors so the same evidence is not double-counted:
Deviation significance — how far the actual value is from the forecast in robust noise-scale (sigma) units. Significance ramps to 0.5 at 3σ and to 1.0 at 6σ. The noise scale is estimated from first differences with a MAD-based estimator (robust to historical spikes; not inflated by seasonality).
Direction weighting — drops vs rises can be weighted differently (
drop_weight/rise_weight), since an unexpected drop is often more severe.Volume modulation — anomalies on high-traffic moments matter more. A multiplier in
[0.6, 1.0]driven by the expected (predicted) level, so a drop-to-zero is never penalized by it.Historical anomaly frequency — intrinsically noisy/jumpy metrics are dampened (fewer false positives).
Duration — a run of consecutive anomalous points near the end of the window boosts the score.
Recent points are weighted more heavily (linear weighting). Per-point sigmas from
the forecast service (prediction_std) are used when available, otherwise the
historical noise-scale estimate is used.
Input contract
input_data is a list of numbers or dicts; the last evaluation_window
points are the points to evaluate, the preceding points are the historical
context (used for forecast input and for the noise/volume/frequency factors).
input_data length must be greater than evaluation_window.
3. Anomaly Drill-Down / Adtributor (adtributor.py, models.py)
Multi-dimensional root-cause analysis based on Microsoft’s Adtributor algorithm. Given baseline (predict) vs actual (real) values broken down by dimension elements, it finds the dimensions and elements that best explain an anomaly.
Core concepts
Surprise — a Jensen-Shannon-divergence-style measure of how much an element’s distribution shifted between predict and real.
Explanatory Power (EP) — the share of the overall metric change attributable to an element. For derived (ratio) metrics, EP is computed with the ratio decomposition and normalized to sum to 1.
Thresholds —
tep(cumulative EP threshold to accept a dimension’s top elements as root cause) andteep(per-element EP threshold to be considered), plusk(number of top candidate dimensions to return).additional_check— guards against degenerate explanations (e.g. when the candidate elements already account for ~100% of the value).
Absolute vs derived metrics
Absolute metrics use
predict/real.Derived (ratio) metrics use
predict_numerator/predict_denominator/real_numerator/real_denominator(setderived=True).
Output (AdtributorOutput)
root_causes—{dimension: [elements]}for the top-kexplaining dimensions.ranked_dimensions— all analyzed dimensions sorted by total surprise.dimension_details— per-dimensionDimensionResult(EP, surprise, elements, reason).status— one of:success— root cause elements identified;no_root_cause— anomaly is systemic / evenly distributed, no element passed the thresholds;no_anomaly_direction— no element matched the requestedissue_type(drop/rise).
Input contract (tool layer)
The adtributor_drilldown tool accepts a 1D melted table (list[dict]) with
keys dimension_name, element_value, and the relevant predict/real fields. The
tool reshapes it into the per-dimension dict[str, DataFrame] the core algorithm
expects, runs the algorithm, and attaches a business-friendly narrative to the
result. The prompt instructs the agent to prepare this melted table via
text2sql before drilling down.
See also
API reference: the docs Tools and Utilities page documents
agent.pyand the two tool wrappers.Prompt / workflows:
openchatbi/prompts/data_analysis_prompt.md.Forecasting service:
timeseries_forecasting/README.md.
API Reference
Data Analysis Agent implementation.
- openchatbi.analysis.agent.build_data_analysis_agent(sql_graph: CompiledStateGraph, sync_mode: bool = False, llm_provider: str | None = None, checkpointer: BaseCheckpointSaver | None = None, memory_store: BaseStore | None = None) CompiledStateGraph[source]
Build the data analysis agent.
- Parameters:
sql_graph – Compiled SQL generation graph to use for text2sql tool.
sync_mode – Whether to use synchronous mode.
llm_provider – LLM provider to use.
checkpointer – Checkpointer for state persistence.
memory_store – Store for long-term memory.
- Returns:
The compiled data analysis agent.
- Return type:
CompiledStateGraph
- class openchatbi.analysis.agent.DataAnalysisInput(*, reasoning: str, task: str)[source]
Bases:
BaseModelInput schema for data analysis tool.
- reasoning: str
- task: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- openchatbi.analysis.agent.get_data_analysis_tool(sql_graph: CompiledStateGraph, sync_mode: bool = False, llm_provider: str | None = None, checkpointer: BaseCheckpointSaver | None = None, memory_store: BaseStore | None = None) StructuredTool[source]
Create the data analysis tool that delegates to the data analysis agent.
- Parameters:
sql_graph – Compiled SQL generation graph.
sync_mode – Whether to use synchronous mode.
llm_provider – LLM provider to use.
checkpointer – Checkpointer for state persistence.
memory_store – Store for long-term memory.
- Returns:
The data analysis tool.
- Return type:
StructuredTool
Core algorithms for time series anomaly detection.
The scoring strategy follows a rule/strategy-based design where each factor is intentionally orthogonal so that they do not double-count the same evidence:
Deviation significance: how statistically far the actual value is from the forecast, expressed in robust noise-scale (sigma) units. This is the single “how abnormal” signal and replaces the previously redundant reconstruction-error + bound-violation pair.
Direction: drops vs rises can be weighted differently because, for most business metrics, an unexpected drop is more severe than a rise.
Volume modulation: anomalies on high-traffic moments matter more than the same relative deviation on low-traffic moments. This is a multiplier driven by the expected (predicted) level, NOT by the anomalous actual value, so a drop-to-zero is never penalised by it.
Historical anomaly frequency: noisy/jumpy metrics that violate their own bounds frequently are dampened, reducing false positives.
Duration: a run of consecutive anomalous points near the end of the window boosts the score.
The final score is in the range [0, 1]; values closer to 1 indicate a more significant / higher-impact anomaly.
- openchatbi.analysis.anomaly_detection.evaluate_anomalies(input_data: list[float | int | dict[str, Any]], evaluation_window: int = 3, frequency: str = 'hourly', target_column: str = 'value', input_length: int | None = None, drop_weight: float = 1.0, rise_weight: float = 1.0, smoothing: float = 0.5) tuple[float, dict[str, Any]][source]
Forecast the evaluation window and score it for anomalies.
The last
evaluation_windowpoints ofinput_dataare treated as the points to evaluate; the preceding points are used both as the forecast input and as the historical context for the noise scale, volume and frequency factors.- Parameters:
input_data – Time series as numbers or dicts; evaluation points are at the end.
evaluation_window – Number of trailing points to evaluate (must be < len(input_data)).
frequency – Time series frequency (e.g. ‘hourly’, ‘daily’).
target_column – Column to read from structured (dict) data.
input_length – Target historical input length for the service; if the supplied history is shorter, the service left-pads the earliest points with zeros to reach this length.
drop_weight – Severity multiplier applied to downward deviations.
rise_weight – Severity multiplier applied to upward deviations.
- Returns:
Tuple of (anomaly score in [0, 1], details dict). On any failure the score is 0.0 and details contains an
"error"key.
- openchatbi.analysis.anomaly_detection.detect_anomaly_range(input_data: list[float | int | dict[str, Any]], detection_range: int, frequency: str = 'hourly', target_column: str = 'value', input_length: int | None = None, stride: int | None = None, evaluation_window: int | None = None, drop_weight: float = 1.0, rise_weight: float = 1.0, smoothing: float = 0.5, t_high: float = 0.6, t_low: float = 0.4, merge_gap: int = 1, area_min: float = 1.0, peak_high: float = 0.8) tuple[float, dict[str, Any]][source]
Scan a long range for anomalies and summarise it into anomalous intervals.
The trailing
detection_rangepoints are scanned; everything before them is used as forecast/context history. The range is forecast instrideblocks (block forecasting), the lower window slides over it to produce a per-point severity curve, and the curve is segmented (hysteresis) and filtered (area / peak) into zero or more anomalous intervals.- Parameters:
input_data – Continuous (gap-free) time series; the scan range is at the end.
detection_range – Number of trailing points to scan (must be < len(input_data)).
frequency – Time series frequency; drives the
evaluation_windowandstridedefaults.target_column – Column to read from structured (dict) data.
input_length – Target historical input length for the forecast service per block.
stride – Block-forecast horizon; defaults from
frequencywhen None.evaluation_window – Lower-window size; defaults from
frequencywhen None.drop_weight – Severity multiplier applied to downward deviations.
rise_weight – Severity multiplier applied to upward deviations.
smoothing – Lower-window smoothing strength in [0, 1].
t_high – Hysteresis upper threshold to open an interval.
t_low – Hysteresis lower threshold to close an interval.
merge_gap – Merge intervals separated by <= this many points.
area_min – Minimum interval area (sum of severity) to keep it.
peak_high – Peak severity that keeps an interval regardless of its area.
- Returns:
Tuple of (overall score in [0, 1] = max interval peak, details dict). On any failure the score is 0.0 and details contains an
"error"key.
- openchatbi.analysis.anomaly_detection.format_anomaly_report(score: float, details: dict[str, Any]) str[source]
Format the anomaly detection result into a human-readable report.
- openchatbi.analysis.anomaly_detection.format_anomaly_range_report(score: float, details: dict[str, Any]) str[source]
Format a range (sliding) anomaly detection result into a readable report.
- openchatbi.analysis.adtributor.additional_check(dimension: str, df: DataFrame, attr_list: list[Any]) tuple[bool, str][source]
if rc real value proportion ~= 100%, skip this dimension
if all attrs evenly drop, skip this check for now
- openchatbi.analysis.adtributor.add_surprise(df: DataFrame, derived: bool, merged_divide: int = 1) DataFrame[source]
Computes the surprise for all elements in the dataframe.
- openchatbi.analysis.adtributor.add_explanatory_power(df: DataFrame, derived: bool, issue_type: str = 'drop') DataFrame[source]
Computes the explanatory power for all elements in the dataframe.
- openchatbi.analysis.adtributor.adtributor(derived: bool, df_dict: dict[str, DataFrame], dimension_weights: dict[str, float] | None = None, tep: float = 0.7, teep: float = 0.02, k: int = 1, issue_type: str = 'drop') AdtributorOutput[source]
Analyzes the input data and identifies candidate dimensions for drill-down analysis.
- class openchatbi.analysis.models.DimensionResult(*, explanatory_power: float | None = None, total_surprise: float, elements: list[Any] | None = None, surprise: float | None = None, reason: str)[source]
Bases:
BaseModel- explanatory_power: float | None
- total_surprise: float
- elements: list[Any] | None
- surprise: float | None
- reason: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class openchatbi.analysis.models.AdtributorOutput(*, root_causes: dict[str, list[Any]], ranked_dimensions: list[str], dimension_details: dict[str, DimensionResult], status: str, reason_flag: str = '')[source]
Bases:
BaseModel- root_causes: dict[str, list[Any]]
- ranked_dimensions: list[str]
- dimension_details: dict[str, DimensionResult]
- status: str
- reason_flag: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].