""" Query functionality for RAGAnything Contains all query-related methods for both text and multimodal queries """ from typing import Dict, List, Any from pathlib import Path from lightrag import QueryParam from lightrag.utils import always_get_an_event_loop from raganything.prompt import PROMPTS from raganything.utils import get_processor_for_type class QueryMixin: """QueryMixin class containing query functionality for RAGAnything""" async def aquery(self, query: str, mode: str = "hybrid", **kwargs) -> str: """ Pure text query - directly calls LightRAG's query functionality Args: query: Query text mode: Query mode ("local", "global", "hybrid", "naive", "mix", "bypass") **kwargs: Other query parameters, will be passed to QueryParam Returns: str: Query result """ if self.lightrag is None: raise ValueError( "No LightRAG instance available. Please process documents first or provide a pre-initialized LightRAG instance." ) # Create query parameters query_param = QueryParam(mode=mode, **kwargs) self.logger.info(f"Executing text query: {query[:100]}...") self.logger.info(f"Query mode: {mode}") # Call LightRAG's query method result = await self.lightrag.aquery(query, param=query_param) self.logger.info("Text query completed") return result async def aquery_with_multimodal( self, query: str, multimodal_content: List[Dict[str, Any]] = None, mode: str = "hybrid", **kwargs, ) -> str: """ Multimodal query - combines text and multimodal content for querying Args: query: Base query text multimodal_content: List of multimodal content, each element contains: - type: Content type ("image", "table", "equation", etc.) - Other fields depend on type (e.g., img_path, table_data, latex, etc.) mode: Query mode ("local", "global", "hybrid", "naive", "mix", "bypass") **kwargs: Other query parameters, will be passed to QueryParam Returns: str: Query result Examples: # Pure text query result = await rag.query_with_multimodal("What is machine learning?") # Image query result = await rag.query_with_multimodal( "Analyze the content in this image", multimodal_content=[{ "type": "image", "img_path": "./image.jpg" }] ) # Table query result = await rag.query_with_multimodal( "Analyze the data trends in this table", multimodal_content=[{ "type": "table", "table_data": "Name,Age\nAlice,25\nBob,30" }] ) """ # Ensure LightRAG is initialized await self._ensure_lightrag_initialized() self.logger.info(f"Executing multimodal query: {query[:100]}...") self.logger.info(f"Query mode: {mode}") # If no multimodal content, fallback to pure text query if not multimodal_content: self.logger.info("No multimodal content provided, executing text query") return await self.aquery(query, mode=mode, **kwargs) # Process multimodal content to generate enhanced query text enhanced_query = await self._process_multimodal_query_content( query, multimodal_content ) # Create query parameters query_param = QueryParam(mode=mode, **kwargs) self.logger.info( f"Generated enhanced query length: {len(enhanced_query)} characters" ) # Execute enhanced query result = await self.lightrag.aquery(enhanced_query, param=query_param) self.logger.info("Multimodal query completed") return result async def _process_multimodal_query_content( self, base_query: str, multimodal_content: List[Dict[str, Any]] ) -> str: """ Process multimodal query content to generate enhanced query text Args: base_query: Base query text multimodal_content: List of multimodal content Returns: str: Enhanced query text """ self.logger.info("Starting multimodal query content processing...") enhanced_parts = [f"User query: {base_query}"] for i, content in enumerate(multimodal_content): content_type = content.get("type", "unknown") self.logger.info( f"Processing {i+1}/{len(multimodal_content)} multimodal content: {content_type}" ) try: # Get appropriate processor processor = get_processor_for_type(self.modal_processors, content_type) if processor: # Generate content description description = await self._generate_query_content_description( processor, content, content_type ) enhanced_parts.append( f"\nRelated {content_type} content: {description}" ) else: # If no appropriate processor, use basic description basic_desc = str(content)[:200] enhanced_parts.append( f"\nRelated {content_type} content: {basic_desc}" ) except Exception as e: self.logger.error(f"Error processing multimodal content: {str(e)}") # Continue processing other content continue enhanced_query = "\n".join(enhanced_parts) enhanced_query += PROMPTS["QUERY_ENHANCEMENT_SUFFIX"] self.logger.info("Multimodal query content processing completed") return enhanced_query async def _generate_query_content_description( self, processor, content: Dict[str, Any], content_type: str ) -> str: """ Generate content description for query Args: processor: Multimodal processor content: Content data content_type: Content type Returns: str: Content description """ try: if content_type == "image": return await self._describe_image_for_query(processor, content) elif content_type == "table": return await self._describe_table_for_query(processor, content) elif content_type == "equation": return await self._describe_equation_for_query(processor, content) else: return await self._describe_generic_for_query( processor, content, content_type ) except Exception as e: self.logger.error(f"Error generating {content_type} description: {str(e)}") return f"{content_type} content: {str(content)[:100]}" async def _describe_image_for_query( self, processor, content: Dict[str, Any] ) -> str: """Generate image description for query""" image_path = content.get("img_path") captions = content.get("img_caption", []) footnotes = content.get("img_footnote", []) if image_path and Path(image_path).exists(): # If image exists, use vision model to generate description image_base64 = processor._encode_image_to_base64(image_path) if image_base64: prompt = PROMPTS["QUERY_IMAGE_DESCRIPTION"] description = await processor.modal_caption_func( prompt, image_data=image_base64, system_prompt=PROMPTS["QUERY_IMAGE_ANALYST_SYSTEM"], ) return description # If image doesn't exist or processing failed, use existing information parts = [] if image_path: parts.append(f"Image path: {image_path}") if captions: parts.append(f"Image captions: {', '.join(captions)}") if footnotes: parts.append(f"Image footnotes: {', '.join(footnotes)}") return "; ".join(parts) if parts else "Image content information incomplete" async def _describe_table_for_query( self, processor, content: Dict[str, Any] ) -> str: """Generate table description for query""" table_data = content.get("table_data", "") table_caption = content.get("table_caption", "") prompt = PROMPTS["QUERY_TABLE_ANALYSIS"].format( table_data=table_data, table_caption=table_caption ) description = await processor.modal_caption_func( prompt, system_prompt=PROMPTS["QUERY_TABLE_ANALYST_SYSTEM"] ) return description async def _describe_equation_for_query( self, processor, content: Dict[str, Any] ) -> str: """Generate equation description for query""" latex = content.get("latex", "") equation_caption = content.get("equation_caption", "") prompt = PROMPTS["QUERY_EQUATION_ANALYSIS"].format( latex=latex, equation_caption=equation_caption ) description = await processor.modal_caption_func( prompt, system_prompt=PROMPTS["QUERY_EQUATION_ANALYST_SYSTEM"] ) return description async def _describe_generic_for_query( self, processor, content: Dict[str, Any], content_type: str ) -> str: """Generate generic content description for query""" content_str = str(content) prompt = PROMPTS["QUERY_GENERIC_ANALYSIS"].format( content_type=content_type, content_str=content_str ) description = await processor.modal_caption_func( prompt, system_prompt=PROMPTS["QUERY_GENERIC_ANALYST_SYSTEM"].format( content_type=content_type ), ) return description # Synchronous versions of query methods def query(self, query: str, mode: str = "hybrid", **kwargs) -> str: """ Synchronous version of pure text query Args: query: Query text mode: Query mode ("local", "global", "hybrid", "naive", "mix", "bypass") **kwargs: Other query parameters, will be passed to QueryParam Returns: str: Query result """ loop = always_get_an_event_loop() return loop.run_until_complete(self.aquery(query, mode=mode, **kwargs)) def query_with_multimodal( self, query: str, multimodal_content: List[Dict[str, Any]] = None, mode: str = "hybrid", **kwargs, ) -> str: """ Synchronous version of multimodal query Args: query: Base query text multimodal_content: List of multimodal content, each element contains: - type: Content type ("image", "table", "equation", etc.) - Other fields depend on type (e.g., img_path, table_data, latex, etc.) mode: Query mode ("local", "global", "hybrid", "naive", "mix", "bypass") **kwargs: Other query parameters, will be passed to QueryParam Returns: str: Query result """ loop = always_get_an_event_loop() return loop.run_until_complete( self.aquery_with_multimodal(query, multimodal_content, mode=mode, **kwargs) )