Core Module
Main Module
OpenChatBI core module initialization.
Agent Graph
Main agent graph construction and execution logic.
- openchatbi.agent_graph.ask_human(state: AgentState) dict[str, Any][source]
Node function to ask human for additional information or clarification.
- Parameters:
state (AgentState) – The current graph state containing messages and context.
- Returns:
Updated state with human feedback as a tool message and user input.
- Return type:
dict
- class openchatbi.agent_graph.CallSQLGraphInput(*, reasoning: str, context: str | dict[str, Any] | list[dict[str, Any]] | list[str])[source]
Bases:
BaseModel- reasoning: str
- context: str | dict[str, Any] | list[dict[str, Any]] | list[str]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- openchatbi.agent_graph.get_sql_tools(sql_graph: CompiledStateGraph, sync_mode: bool = False) Callable[source]
Create SQL generation tool from compiled SQL graph.
- Parameters:
sql_graph (CompiledStateGraph) – The compiled SQL generation subgraph.
sync_mode (bool) – Whether to create synchronous or asynchronous tools
- Returns:
Tool function for SQL generation.
- Return type:
function
- openchatbi.agent_graph.agent_llm_call(llm: BaseChatModel, tools: list, context_manager: ContextManager = None) Callable[source]
Create llm call function to generate reasoning and determine next node based on tool calls in LLM response.
- Parameters:
llm (BaseChatModel) – The LLM for agent decision-making.
tools – List of tools.
context_manager – Optional context manager for handling long conversations.
- Returns:
function that processes state and determines next node.
- Return type:
function
- openchatbi.agent_graph.build_agent_graph_sync(catalog: CatalogStore, checkpointer: None | bool | BaseCheckpointSaver = None, memory_store: BaseStore = None, enable_context_management: bool = True, llm_provider: str | None = None) CompiledStateGraph[source]
Build the main agent graph with all nodes and edges (sync version).
- Parameters:
catalog – Catalog store containing schema information.
checkpointer – The Checkpointer for state persistence (short memory). If None, no short memory.
memory_store – The BaseStore to use for long-term memory. If None, will auto assign according to sync_mode.
enable_context_management – Whether to enable context management for long conversations.
- Returns:
Compiled agent graph ready for execution.
- Return type:
CompiledStateGraph
- async openchatbi.agent_graph.build_agent_graph_async(catalog: CatalogStore, checkpointer: None | bool | BaseCheckpointSaver = None, memory_store: BaseStore = None, memory_tools: list[Callable] = None, enable_context_management: bool = True, llm_provider: str | None = None) CompiledStateGraph[source]
Build the main agent graph with all nodes and edges (async version).
This function is identical to build_agent_graph_sync but properly handles async MCP tool initialization when called from async contexts.
- Parameters:
catalog – Catalog store containing schema information.
checkpointer – The Checkpointer for state persistence (short memory). If None, no short memory.
memory_store – The BaseStore to use for long-term memory. If None, will auto assign according to sync_mode.
memory_tools – List of memory tools (manage_memory_tool, search_memory_tool). If None, creates async tools.
enable_context_management – Whether to enable context management for long conversations.
- Returns:
Compiled agent graph ready for execution.
- Return type:
CompiledStateGraph
State Management
State classes for OpenChatBI graph execution.
- class openchatbi.graph_state.AgentState[source]
Bases:
MessagesStateState for the main agent graph execution.
Extends MessagesState with additional fields for routing and responses.
- history_messages: AIMessage], <function add_history_messages at 0x7f1d6a5ff920>]
- agent_next_node: str
- sends: list[Send]
- sql: str
- final_answer: str
- messages: _add_messages at 0x7f1d64b2ccc0>]
- class openchatbi.graph_state.SQLGraphState[source]
Bases:
MessagesStateState for SQL generation subgraph.
Contains rewritten question, table selection, extracted entities, and generated SQL.
- rewrite_question: str
- tables: list[dict[str, Any]]
- info_entities: dict[str, Any]
- sql: str
- sql_retry_count: int
- sql_execution_result: str
- schema_info: dict[str, Any]
- data: str
- previous_sql_errors: list[dict[str, Any]]
- visualization_dsl: dict[str, Any]
- sql_confidence: float
- confidence_reasons: list[str]
- human_sql_decision: str
- recovery_strategy: str
- messages: _add_messages at 0x7f1d64b2ccc0>]
- class openchatbi.graph_state.InputState[source]
Bases:
MessagesStateInput state schema for the main graph.
- messages: _add_messages at 0x7f1d64b2ccc0>]
- class openchatbi.graph_state.OutputState[source]
Bases:
MessagesStateOutput state schema for the main graph.
- messages: _add_messages at 0x7f1d64b2ccc0>]
- class openchatbi.graph_state.SQLOutputState[source]
Bases:
MessagesStateOutput state schema for the SQL generation subgraph.
- rewrite_question: str
- tables: list[dict[str, Any]]
- sql: str
- schema_info: dict[str, Any]
- data: str
- visualization_dsl: dict[str, Any]
- sql_confidence: float
- confidence_reasons: list[str]
- human_sql_decision: str
- recovery_strategy: str
- messages: _add_messages at 0x7f1d64b2ccc0>]
Utilities
Utility functions for OpenChatBI.
- openchatbi.utils.get_text_from_content(content: str | list[str | dict]) str[source]
Extract text from various content formats.
- Parameters:
content – String, list of strings, or list of dicts with ‘text’ key.
- Returns:
Extracted text content.
- Return type:
str
- openchatbi.utils.get_text_from_message_chunk(chunk: AIMessageChunk) str[source]
Extract content from an AIMessageChunk.
- Parameters:
chunk (AIMessageChunk) – The message chunk to extract text from.
- Returns:
Extracted text content or empty string.
- Return type:
str
- openchatbi.utils.extract_json_from_answer(answer: str) dict[source]
Extract the first JSON object from a string answer.
- Parameters:
answer (str) – String that may contain JSON objects.
- Returns:
Parsed JSON object or empty dict if none found.
- Return type:
dict
- openchatbi.utils.get_report_download_response(filename: str) FileResponse[source]
Get FileResponse for downloading a report file.
- Parameters:
filename – The filename of the report to download
- Returns:
Response object for file download
- Return type:
FileResponse
- Raises:
HTTPException – Various HTTP errors for invalid requests
- openchatbi.utils.create_vector_db(texts: list[str], embedding=None, collection_name: str = 'langchain', metadatas=None, collection_metadata: dict = None, chroma_db_path: str = None) VectorStore[source]
Create or reuse a Chroma vector database.
- Parameters:
texts (List[str]) – Text documents to index.
embedding – Embedding function to use.
collection_name (str) – Name of the collection.
metadatas – Metadata for each document.
collection_metadata (dict) – Collection-level metadata.
chroma_db_path (str) – Path to chroma database file.
- Returns:
Vector database instance.
- Return type:
Chroma
- openchatbi.utils.recover_incomplete_tool_calls(state: AgentState) list[source]
Recover from incomplete tool calls by creating message operations to insert ToolMessages correctly.
When the graph execution is interrupted (e.g., by kill or app restart) during tool execution, the state can end up with AIMessage containing tool_calls but no corresponding ToolMessage responses. This function detects such cases and creates the necessary message operations to insert failure ToolMessages in the correct position (right after the AIMessage).
- Parameters:
state (AgentState) – The current graph state containing messages.
- Returns:
Message operations to insert recovery ToolMessages, or empty list if no recovery needed.
- Return type:
list
- class openchatbi.utils.SimpleStore(texts: list[str], metadatas: list[dict] | None = None, ids: list[str] | None = None)[source]
Bases:
VectorStoreSimple vector store using BM25 for text retrieval without embeddings.
- __init__(texts: list[str], metadatas: list[dict] | None = None, ids: list[str] | None = None)[source]
Initialize SimpleStore with texts.
- Parameters:
texts – List of text documents to store.
metadatas – Optional list of metadata dicts for each document.
ids – Optional list of IDs for each document.
- similarity_search(query: str, k: int = 4, **kwargs: Any) list[Document][source]
Search for documents similar to the query using BM25.
- Parameters:
query – Query text.
k – Number of documents to return.
**kwargs – Additional arguments (unused).
- Returns:
List of most similar Document objects.
- similarity_search_with_score(query: str, k: int = 4, **kwargs: Any) list[tuple[Document, float]][source]
Search for documents similar to the query with BM25 scores.
- Parameters:
query – Query text.
k – Number of documents to return.
**kwargs – Additional arguments (unused).
- Returns:
List of (Document, score) tuples.
- add_texts(texts: list[str], metadatas: list[dict] | None = None, *, ids: list[str] | None = None, **kwargs: Any) list[str][source]
Add texts to the store.
- Parameters:
texts – Texts to add.
metadatas – Optional metadata for each text.
ids – Optional IDs for each text.
**kwargs – Additional arguments (unused).
- Returns:
List of IDs of added texts.
- delete(ids: list[str] | None = None, **kwargs: Any) bool | None[source]
Delete documents by IDs.
- Parameters:
ids – List of document IDs to delete.
**kwargs – Additional arguments (unused).
- Returns:
True if deletion successful, False otherwise.
- get_by_ids(ids: list[str], /) list[Document][source]
Get documents by their IDs.
- Parameters:
ids – List of document IDs to retrieve.
- Returns:
List of Document objects.
- classmethod from_texts(texts: list[str], embedding: Any = None, metadatas: list[dict] | None = None, *, ids: list[str] | None = None, **kwargs: Any) SimpleStore[source]
Create SimpleStore from texts.
- Parameters:
texts – List of texts.
embedding – Unused (SimpleStore doesn’t use embeddings).
metadatas – Optional metadata for each text.
ids – Optional IDs for each text.
**kwargs – Additional arguments (unused).
- Returns:
SimpleStore instance.
- max_marginal_relevance_search(query: str, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, **kwargs: Any) list[Document][source]
Return docs selected using the maximal marginal relevance.
Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents.
- Parameters:
query – Text to look up documents similar to.
k – Number of Document objects to return.
fetch_k – Number of Document objects to fetch to pass to MMR algorithm.
lambda_mult – Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity.
**kwargs – Arguments to pass to the search method.
- Returns:
List of Document objects selected by maximal marginal relevance.