main
HuangHai 4 months ago
parent 62af315e3b
commit fc8125f3f0

@ -1,8 +1,7 @@
import asyncio
import base64
import base64
import datetime
import json
import logging
import time
import uuid
from contextlib import asynccontextmanager
@ -12,7 +11,7 @@ from typing import Optional
from alibabacloud_sts20150401 import models as sts_20150401_models
from alibabacloud_sts20150401.client import Client as Sts20150401Client
from alibabacloud_tea_openapi.models import Config
from fastapi import Query, Depends, HTTPException, status, Form, FastAPI
from fastapi import Query, Depends, status, Form, FastAPI
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from openai import AsyncOpenAI
@ -26,6 +25,7 @@ from WxMini.Utils.ImageUtil import *
from WxMini.Utils.MySQLUtil import init_mysql_pool, get_chat_log_by_session, get_user_by_login_name, \
get_chat_logs_by_risk_flag, get_chat_logs_summary, save_chat_to_mysql
from WxMini.Utils.MySQLUtil import update_risk, get_last_chat_log_id
from WxMini.Utils.NewsUtil import *
from WxMini.Utils.OssUtil import upload_mp3_to_oss_from_memory, hmacsha256
from WxMini.Utils.TianQiUtil import get_weather
from WxMini.Utils.TtsUtil import TTS
@ -319,6 +319,12 @@ async def reply(person_id: str = Form(...),
logger.info(f"历史交互提示词: {history_prompt}")
# NBA与CBA
result = await get_news(client, prompt)
if result is not None:
history_prompt += result
print("新闻返回了下面的内容:" + result)
# 调用大模型,将历史交互作为提示词
try:
response = await asyncio.wait_for(

@ -1,5 +1,5 @@
from openai import OpenAI
from TtsConfig import *
from WxMini.Milvus.Config.MulvusConfig import *
# 初始化 OpenAI 客户端

@ -0,0 +1,11 @@
from WxMini.Utils.NewsUtil import *
# 示例调用
if __name__ == '__main__':
user_input = "最近CBA的比赛结果怎么样"
result=get_news(user_input)
if result is not None:
print(result)
else:
print("No results found.")

@ -0,0 +1,90 @@
import requests
from bs4 import BeautifulSoup
def search_sina_news(keyword='新闻', time='d', page=1):
"""
搜索新浪新闻
:param keyword: 搜索关键字默认为'新闻'
:param time: 时间范围默认为'd'一天内
:param page: 页码默认为1第一页
:return: 返回抓取到的搜索结果列表
"""
results = [] # 存放抓取到搜索结果的列表
order = 0 # 抓取到的搜索结果序号
url = f'https://search.sina.com.cn/news?c=news&adv=1&q={keyword}&time={time}&size=20&page={str(page)}'
"""
参数说明
c=news news-新闻
adv=1 1-高级搜索模式 0-普通搜索模式
q 关键字
time 时间范围h-一个小时内d-一天内w-一周内m-一个月内年份数字(如20232024)-表示限定指定的年份内
size=20 每页显示的结果数量范围10-20
page 当前抓取的页号1-第一页2-第二页依此类推
"""
# 发送get请求
response = requests.get(url)
response.encoding = 'utf-8'
# 检查请求是否成功
if response.status_code == 200:
# 解析响应内容
soup = BeautifulSoup(response.text, 'html.parser')
if page == 1: # 只有第一页是才抓取
try:
# 获取总的搜索结果信息
news_number = soup.find('div', 'l_v2').text
print(news_number)
except Exception as e:
print(e)
# 抓取当前页面中的搜索结果
result_blocks = soup.find_all('div', class_='box-result clearfix')
print(f'{page}页抓取到的搜索结果数量为{len(result_blocks)}')
# 从result_blocks列表中提取有效的数据
for block in result_blocks:
order += 1
title = block.find('a').text # 获取标题
link = block.find('a')['href'] # 获取链接
# 获取包含内容摘要、来源、发布时间的信息块
infos = block.find('div', class_='r-info')
# 获取内容
content = infos.find('p', class_='content').text
# 获取来源和发布时间
source_time = infos.find('span').text
st_list = source_time.split()
source = st_list[0]
if len(st_list) > 2:
time = st_list[1] + ' ' + st_list[2] # 时间格式为yyyy-mm-dd hhmmss
else:
time = st_list[1] # 时间格式为XX小时前
results.append({
'order': order,
'title': title,
'link': link,
'content': content,
'source': source,
'time': time
})
# 在屏幕上输出抓取到的信息
print(order, ". ", title)
print(link)
print(content)
print(source, ' ', time)
print(" ")
print(f'本次共抓取到的搜索结果共{len(results)}')
return results
else:
print('status_code!=200, 不能解析内容')
return []
# 示例调用
if __name__ == '__main__':
# 默认抓取第一页
results = search_sina_news(keyword='NBA', time='d', page=1)
# 如果需要抓取其他页可以传入page参数
# results = search_sina_news(keyword='NBA', time='d', page=2)

@ -90,7 +90,7 @@ async def get_chat_log_by_session(mysql_pool, person_id, page=1, page_size=10):
records = await cur.fetchall()
# 将查询结果反转,确保最新消息显示在最后
if records:
if page==1 and records:
records.reverse()
# 将查询结果转换为字典列表

@ -0,0 +1,90 @@
import asyncio
import logging
import jieba
import requests
from bs4 import BeautifulSoup
from fastapi import HTTPException
from WxMini.Milvus.Config.MulvusConfig import *
# 配置日志
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# 分词与关键词提取
def extract_keywords(user_input):
# 使用 jieba 进行分词
words = jieba.lcut(user_input)
# 过滤掉无意义的词(如标点符号、停用词等)
stop_words = ['', '', '', '', '', '', '', '', '怎么', '怎么样', '', '', '']
keywords = [word for word in words if word not in stop_words]
return keywords
# 搜索新浪新闻
def search_sina_news(keyword='新闻', time='d', page=1):
results = []
url = f'https://search.sina.com.cn/news?c=news&adv=1&q={keyword}&time={time}&size=20&page={str(page)}'
try:
response = requests.get(url, timeout=10) # 设置请求超时
response.encoding = 'utf-8'
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
result_blocks = soup.find_all('div', class_='box-result clearfix')
for block in result_blocks:
try:
title = block.find('a').text
link = block.find('a')['href']
infos = block.find('div', class_='r-info')
content = infos.find('p', class_='content').text
source_time = infos.find('span').text
st_list = source_time.split()
source = st_list[0]
time = st_list[1] + ' ' + st_list[2] if len(st_list) > 2 else st_list[1]
results.append({'title': title, 'link': link, 'content': content, 'source': source, 'time': time})
except AttributeError as e:
logger.error(f"解析新闻块时出错: {e}")
continue
except requests.Timeout:
logger.error("请求新浪新闻超时")
except Exception as e:
logger.error(f"请求新浪新闻时出错: {e}")
return results
# 根据关键词调用搜索函数
def search_based_on_keywords(keywords):
if not keywords:
return []
# 使用第一个关键词进行搜索
return search_sina_news(keyword=keywords[0], time='d', page=1)
# 使用AI整理搜索结果
async def format_results_with_ai(client, results):
if not results:
return "未找到相关新闻。"
search_text = "\n".join([f"{result['title']} ({result['time']})" for result in results])
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "你是一个助手,负责将搜索结果整理成用户友好的格式。"},
{"role": "user", "content": f"请将以下搜索结果整理成一段话:\n{search_text}"}
],
max_tokens=4000
),
timeout=60 # 设置超时时间为 60 秒
)
return response.choices[0].message.content.strip()
except asyncio.TimeoutError:
logger.error("大模型调用超时")
raise HTTPException(status_code=500, detail="大模型调用超时")
# 主函数
async def get_news(client, user_input):
keywords = extract_keywords(user_input)
results = search_based_on_keywords(keywords)
if results:
formatted_response = await format_results_with_ai(client, results)
return formatted_response
else:
return "未找到相关新闻。"
Loading…
Cancel
Save