Spaces:
Sleeping
Sleeping
Abid Ali Awan
Enhance regulatory query handling by implementing improved detection for new regulatory questions versus follow-ups. Update parameter extraction to include report type, refining response generation based on user intent. Revise README to reflect these new features and clarify functionality. Update UIHandler to incorporate conversation context in general chat responses for better user experience.
dc31452
| import threading | |
| import time | |
| import uuid | |
| import gradio as gr | |
| from gradio import ChatMessage | |
| from agents.reg_radar import RegRadarAgent | |
| from tools.llm import stream_llm | |
| class UIHandler: | |
| def __init__(self): | |
| self.agent = RegRadarAgent() | |
| def streaming_chatbot(self, message, history, user_id_state): | |
| """Process messages with tool visibility and lock input during response generation""" | |
| # Initialize user_id if not set | |
| if not user_id_state: | |
| user_id_state = f"user-{uuid.uuid4().hex[:4]}" | |
| user_id = user_id_state | |
| if not message.strip(): | |
| return history, "", gr.update(interactive=True), user_id_state | |
| # Add user message | |
| history.append(ChatMessage(role="user", content=message)) | |
| # Start timer | |
| start_time = time.time() | |
| # Disable input box at the start | |
| yield history, "", gr.update(interactive=False), user_id_state | |
| # Detect if this is a regulatory query | |
| is_regulatory = self.agent.is_regulatory_query(message) | |
| if not is_regulatory: | |
| yield from self._handle_general_chat(message, history, user_id_state) | |
| return | |
| yield from self._handle_regulatory_chat( | |
| message, history, user_id_state, user_id, start_time | |
| ) | |
| def _handle_general_chat(self, message, history, user_id_state): | |
| """Handle general (non-regulatory) chat flow with context from conversation history.""" | |
| history.append( | |
| ChatMessage(role="assistant", content="💬 Processing general query...") | |
| ) | |
| yield history, "", gr.update(interactive=False), user_id_state | |
| # Clear processing message and stream response | |
| history.pop() | |
| streaming_content = "" | |
| history.append(ChatMessage(role="assistant", content="")) | |
| # Gather last 5 user/assistant messages as context | |
| context_msgs = [] | |
| for msg in history[-10:]: | |
| if isinstance(msg, dict): | |
| role = msg.get("role") | |
| content = msg.get("content") | |
| else: | |
| role = getattr(msg, "role", None) | |
| content = getattr(msg, "content", None) | |
| if role in ("user", "assistant"): | |
| context_msgs.append(f"{role.capitalize()}: {content}") | |
| context_str = "\n".join(context_msgs[-5:]) | |
| # Compose prompt with context | |
| if context_str: | |
| prompt = f""" | |
| You are an expert assistant. Here is the recent conversation history: | |
| {context_str} | |
| Now answer the user's latest message: | |
| {message} | |
| """ | |
| else: | |
| prompt = message | |
| for chunk in stream_llm(prompt): | |
| streaming_content += chunk | |
| history[-1] = ChatMessage(role="assistant", content=streaming_content) | |
| yield history, "", gr.update(interactive=False), user_id_state | |
| # Re-enable input box at the end | |
| yield history, "", gr.update(interactive=True), user_id_state | |
| def _handle_regulatory_chat( | |
| self, message, history, user_id_state, user_id, start_time | |
| ): | |
| """Handle regulatory chat flow.""" | |
| # Show tool detection | |
| tool_key, tool_name = self.agent.determine_intended_tool(message) | |
| # Initial processing message with tool info (collapsible) | |
| status_msg = ( | |
| f"Using **{tool_name}** to analyze your query (estimated 10-20 seconds)..." | |
| ) | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content=status_msg, | |
| metadata={"title": f"🛠️ Tool Selected: {tool_name}"}, | |
| ) | |
| ) | |
| yield history, "", gr.update(interactive=False), user_id_state | |
| # Extract parameters and process query | |
| params = self.agent.extract_parameters(message) | |
| # Clear status and show parameter extraction (collapsible) | |
| history.pop() | |
| param_msg = self.agent.format_parameter_extraction(params) | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content=param_msg, | |
| metadata={"title": "📍Parameter Extraction"}, | |
| ) | |
| ) | |
| yield history, "", gr.update(interactive=False), user_id_state | |
| # Show tool execution steps (collapsible) | |
| tool_status = f""" | |
| **Executing {tool_name}...** | |
| ⏳ _This process may take 40-90 seconds depending on the number of webpages being crawled._ | |
| """ | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content=tool_status, | |
| metadata={"title": "📢 Tool Execution Status"}, | |
| ) | |
| ) | |
| yield history, "", gr.update(interactive=False), user_id_state | |
| # Process the regulatory query | |
| results = self.agent.process_regulatory_query(message, params, user_id=user_id) | |
| crawl_results = results["crawl_results"] | |
| memory_results = results["memory_results"] | |
| # Show collapsible raw results | |
| if crawl_results["results"]: | |
| collapsible_results = self._format_crawl_results(crawl_results["results"]) | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content=collapsible_results, | |
| metadata={"title": "🌐 Raw Regulatory Data", "status": "done"}, | |
| ) | |
| ) | |
| yield history, "", gr.update(interactive=False), user_id_state | |
| # Display memory results if available | |
| if memory_results: | |
| memory_msg = self._format_memory_results(memory_results) | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content=memory_msg, | |
| metadata={"title": "💾 Past Memories", "status": "done"}, | |
| ) | |
| ) | |
| yield history, "", gr.update(interactive=False), user_id_state | |
| # Generate final analysis (no metadata, standard message) | |
| history.append( | |
| ChatMessage( | |
| role="assistant", content="📝 **Generating Compliance Report...**" | |
| ) | |
| ) | |
| yield history, "", gr.update(interactive=False), user_id_state | |
| # Clear generating message and stream final report | |
| history.pop() | |
| streaming_content = "" | |
| history.append(ChatMessage(role="assistant", content="")) | |
| for chunk in self.agent.generate_report(params, crawl_results, memory_results): | |
| streaming_content += chunk | |
| history[-1] = ChatMessage(role="assistant", content=streaming_content) | |
| yield history, "", gr.update(interactive=False), user_id_state | |
| # Show completion time appended to the final report (no metadata) | |
| elapsed = time.time() - start_time | |
| history[-1] = ChatMessage( | |
| role="assistant", | |
| content=streaming_content + f"\n\n✨ Analysis complete ({elapsed:.1f}s).", | |
| ) | |
| # Re-enable input box at the end | |
| yield history, "", gr.update(interactive=True), user_id_state | |
| # Save to memory in the background | |
| threading.Thread( | |
| target=self.agent.memory_tools.save_to_memory, | |
| args=(user_id, message, streaming_content), | |
| daemon=True, | |
| ).start() | |
| def _format_crawl_results(self, results): | |
| """Format crawl results for display, removing duplicates by URL.""" | |
| seen_urls = set() | |
| results_display = [] | |
| count = 0 | |
| for result in results: | |
| url = result["url"] | |
| if url in seen_urls: | |
| continue | |
| seen_urls.add(url) | |
| title = result["title"][:100] if result["title"] else "No Title" | |
| count += 1 | |
| results_display.append(f""" | |
| **{count}. {result["source"]}** | |
| - Title: {title}... | |
| - URL: {url} | |
| """) | |
| if results_display: | |
| # Only return the content, let Gradio's metadata title handle the dropdown | |
| collapsible_results = "\n".join(results_display) | |
| else: | |
| collapsible_results = "No unique regulatory updates found." | |
| return collapsible_results | |
| def _format_memory_results(self, memory_results): | |
| """Format memory results for display.""" | |
| top_memories = memory_results[:3] | |
| memory_details = "" | |
| for i, mem in enumerate(top_memories, 1): | |
| memory_text = mem.get("memory", "N/A") | |
| memory_details += f"\n**{i}. Memory:** {memory_text[:300]}...\n" | |
| memory_msg = f"Found {len(memory_results)} similar past queries in memory. \nTop 3 shown below:\n{memory_details}" | |
| return memory_msg | |
| def delayed_clear(self, user_id_state): | |
| time.sleep(0.1) # 100ms delay to allow generator cancellation | |
| return [], "", gr.update(interactive=True), user_id_state | |