main
HuangHai 4 months ago
parent f2c527ee2c
commit 7703bbf66f

@ -10,3 +10,5 @@ alibabacloud_sts20150401==1.1.4
alibabacloud_credentials==2.2.1
python-jose[cryptography]==2.21
passlib[bcrypt]== 0.6.1
alibabacloud_iqs20241111==1.1.5

@ -21,6 +21,7 @@ from starlette.responses import StreamingResponse
from WxMini.Milvus.Config.MulvusConfig import *
from WxMini.Milvus.Utils.MilvusCollectionManager import MilvusCollectionManager
from WxMini.Milvus.Utils.MilvusConnectionPool import *
from WxMini.Milvus.X3_insert_data import person_id
from WxMini.Utils.EmbeddingUtil import text_to_embedding
from WxMini.Utils.ImageUtil import *
from WxMini.Utils.MySQLUtil import init_mysql_pool, get_chat_log_by_session, get_user_by_login_name, \
@ -598,10 +599,20 @@ async def generate_upload_params(current_user: dict = Depends(get_current_user))
@app.get("/aichat/recognize_content")
async def web_recognize_content(image_url: str, current_user: dict = Depends(get_current_user)):
logger.info(f"current_user:{current_user['login_name']}")
person_id = current_user['person_id']
try:
async def generate_stream():
# 假设 recognize_content 是一个异步生成器,逐条返回识别结果
async for result in recognize_content(client, app.state.mysql_pool, person_id, image_url):
yield f"{str(result)}" # 使用SSE格式
# 控制输出速度间隔0.01秒
await asyncio.sleep(0.01)
return StreamingResponse(
recognize_content(client, app.state.mysql_pool, current_user['person_id'], image_url),
media_type="text/plain")
generate_stream(),
media_type="text/event-stream", # 使用SSE的media_type
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} # 禁用缓存,保持连接
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@ -609,10 +620,20 @@ async def web_recognize_content(image_url: str, current_user: dict = Depends(get
@app.get("/aichat/recognize_text")
async def web_recognize_text(image_url: str, current_user: dict = Depends(get_current_user)):
logger.info(f"current_user:{current_user['login_name']}")
person_id = current_user['person_id']
try:
async def generate_stream():
# 假设 recognize_content 是一个异步生成器,逐条返回识别结果
async for result in recognize_text(client, app.state.mysql_pool, person_id, image_url):
yield f"{str(result)}" # 使用SSE格式
# 控制输出速度间隔0.01秒
await asyncio.sleep(0.01)
return StreamingResponse(
recognize_text(client, app.state.mysql_pool, current_user['person_id'], image_url),
media_type="text/plain")
generate_stream(),
media_type="text/event-stream", # 使用SSE的media_type
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} # 禁用缓存,保持连接
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@ -620,10 +641,20 @@ async def web_recognize_text(image_url: str, current_user: dict = Depends(get_cu
@app.get("/aichat/recognize_math")
async def web_recognize_math(image_url: str, current_user: dict = Depends(get_current_user)):
logger.info(f"current_user:{current_user['login_name']}")
person_id = current_user['person_id']
try:
async def generate_stream():
# 假设 recognize_content 是一个异步生成器,逐条返回识别结果
async for result in recognize_math(client, app.state.mysql_pool, person_id, image_url):
yield f"{str(result)}" # 使用SSE格式
# 控制输出速度间隔0.01秒
await asyncio.sleep(0.01)
return StreamingResponse(
recognize_math(app.state.mysql_pool, current_user['person_id'], image_url),
media_type="text/plain")
generate_stream(),
media_type="text/event-stream", # 使用SSE的media_type
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} # 禁用缓存,保持连接
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@ -0,0 +1,26 @@
from http import HTTPStatus
from urllib.parse import urlparse, unquote
from pathlib import PurePosixPath
import requests
from dashscope import ImageSynthesis
from WxMini.Milvus.Config.MulvusConfig import *
prompt = "高山,直上云端。少年,御剑飞行。怪物,紧追不放,大战,一触即发。"
print('----sync call, please wait a moment----')
rsp = ImageSynthesis.call(api_key=MODEL_API_KEY,
model="wanx2.1-t2i-turbo",
prompt=prompt,
n=1,
size='1024*1024')
print('response: %s' % rsp)
if rsp.status_code == HTTPStatus.OK:
# 在当前目录下保存图片
for result in rsp.output.results:
file_name = PurePosixPath(unquote(urlparse(result.url).path)).parts[-1]
#with open('./%s' % file_name, 'wb+') as f:
# f.write(requests.get(result.url).content)
else:
print('sync_call Failed, status_code: %s, code: %s, message: %s' %
(rsp.status_code, rsp.code, rsp.message))

@ -0,0 +1,96 @@
import requests
from bs4 import BeautifulSoup
# 定义地区列表
area = ["db"]
city_name = '长春'
for page in area:
# 构造 URL
url = f"https://www.weather.com.cn/textFC/{page}.shtml"
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0"
}
# 发送 HTTP 请求获取网页内容
res = requests.get(url=url, headers=headers)
res.encoding = 'utf-8' # 设置编码为 UTF-8
# 使用 BeautifulSoup 解析网页内容
soup = BeautifulSoup(res.text, 'lxml')
# 用于存储已经处理过的城市名称
processed_cities = set()
# 遍历所有 class 为 conMidtab2 的 div 元素
for div in soup.select('div.conMidtab2'):
# 遍历 div 中的所有 tr 元素(表格行)
for tr in div.select('tr'):
# 检查当前行是否包含宽度为 83 的 td 元素,该元素可能包含城市信息
if tr.find('td', width='83'):
# 检查宽度为 83 的 td 元素中是否有 a 标签a 标签内通常是城市名
if tr.find('td', width='83').a:
# 提取城市名
city = tr.find('td', width='83').a.string
# 如果城市已经处理过,则跳过
if city in processed_cities:
continue
# 提取城市名
city = tr.find('td', width='83').a.string
# 如果城市不是city_name则跳过
if city != city_name:
continue
# 如果城市已经处理过,则跳过
if city in processed_cities:
continue
# 否则,将城市添加到已处理集合中
processed_cities.add(city)
# 打印城市名
print(f"城市:{city}")
# 提取上午天气信息
morning_weather_td = tr.find('td', width='89')
if morning_weather_td:
morning_weather = morning_weather_td.string
print(f"上午天气:{morning_weather}")
# 提取上午风力风向信息
morning_wind_td = tr.find('td', width='162')
if morning_wind_td:
spans = morning_wind_td.find_all('span')
if len(spans) >= 2:
morning_wind_1 = spans[0].string
morning_wind_2 = spans[1].string
print(f"上午风力风向:{morning_wind_1} {morning_wind_2}")
# 提取上午最高温度
morning_max_temp_td = tr.find('td', width='92')
if morning_max_temp_td:
morning_max_temp = morning_max_temp_td.string
print(f"上午最高温度:{morning_max_temp}摄氏度")
# 提取晚上天气信息
night_weather_td = tr.find('td', width='98')
if night_weather_td:
night_weather = night_weather_td.string
print(f"晚上天气:{night_weather}")
# 提取晚上风力风向信息
night_wind_td = tr.find('td', width='177')
if night_wind_td:
spans = night_wind_td.find_all('span')
if len(spans) >= 2:
night_wind_1 = spans[0].string
night_wind_2 = spans[1].string
print(f"晚上风力风向:{night_wind_1} {night_wind_2}")
# 提取晚上最低温度
night_min_temp_td = tr.find('td', width='86')
if night_min_temp_td:
night_min_temp = night_min_temp_td.string
print(f"晚上最低温度:{night_min_temp}摄氏度")
else:
# 如果当前行不包含宽度为 83 的 td 元素,跳过该行
continue

@ -0,0 +1,74 @@
import requests
from bs4 import BeautifulSoup
def get_weather():
url = "https://www.weather.com.cn/textFC/db.shtml"
response = requests.get(url)
html_content = response.content
soup = BeautifulSoup(html_content, 'html.parser')
# 获取日期
day_tabs = soup.find('ul', class_='day_tabs')
days = [day.text.strip() for day in day_tabs.find_all('li')][:3] # 只取前三天
# 获取天气信息
weather_data = []
tables = soup.find_all('table', width="100%") # 找到所有天气表格
for table in tables:
rows = table.find_all('tr')
for row in rows:
cells = row.find_all('td')
if len(cells) >= 8: # 确保是天气数据行
city = cells[1].text.strip()
weather_info = {
'city': city,
'today': {
'day': {
'weather': cells[2].text.strip(),
'wind': cells[3].text.strip(),
'temp': cells[4].text.strip()
},
'night': {
'weather': cells[5].text.strip(),
'wind': cells[6].text.strip(),
'temp': cells[7].text.strip()
}
}
}
# 检查是否有明天和后天的数据
if len(cells) >= 15:
weather_info['tomorrow'] = {
'day': {
'weather': cells[9].text.strip(),
'wind': cells[10].text.strip(),
'temp': cells[11].text.strip()
},
'night': {
'weather': cells[12].text.strip(),
'wind': cells[13].text.strip(),
'temp': cells[14].text.strip()
}
}
if len(cells) >= 22:
weather_info['day_after_tomorrow'] = {
'day': {
'weather': cells[16].text.strip(),
'wind': cells[17].text.strip(),
'temp': cells[18].text.strip()
},
'night': {
'weather': cells[19].text.strip(),
'wind': cells[20].text.strip(),
'temp': cells[21].text.strip()
}
}
weather_data.append(weather_info)
return days, weather_data
if __name__ == "__main__":
days, weather_data = get_weather()
print(weather_data)

@ -70,7 +70,7 @@ async def recognize_content(client, pool, person_id, image_url):
if char != ' ':
yield char # 流式输出字符
full_text += char # 拼接字符
print(char, end='')
#print(char, end='')
time.sleep(0.1) # 控制输出速度
# 记录到数据库

Loading…
Cancel
Save