GridFS的安装和使用
1.安装MongoDB。
打开终端,并执行以下命令安装 MongoDB:
1
2
|
arch -arm64 brew tap mongodb/brew
arch -arm64 brew install mongodb-community
|
2.安装 pymongo 和 gridfs 包
1
|
arch -arm64 pip3 install pymongo
|
3.启动MongoDB 服务
1
|
brew services start mongodb-community
|
4.MongoDB可视化工具
https://www.mongodb.com/try/download/shell选择合适的版本下载安装
5.GridFS的使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
from bson import ObjectId
from pymongo import MongoClient
import gridfs
from typing import Any
g_gridfs = None
class XGridFS(object):
def __init__(self):
self.fs = None
def init_gridfs(self):
# 连接到 MongoDB 数据库
client = MongoClient('mongodb://localhost:27017/')
db = client['my_database']
self.fs = gridfs.GridFS(db, 'fs')
def put_file(self, file_content: Any, file_name: str):
file_id = self.fs.put(file_content, filename=file_name)
return file_id
def find_file(self, file_id: str):
return self.fs.get(ObjectId(file_id))
def find_one(self, d: dict):
file = self.fs.find_one(d)
if file:
return file
return None
def gridfs(self):
return self.fs
@staticmethod
def shared_gridfs():
global g_gridfs
if not g_gridfs:
g_gridfs = XGridFS()
g_gridfs.init_gridfs()
return g_gridfs
|
6.FastAPI上传单个文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
router.post("/uploadFile", response_model=ResponseBaseModel)
async def upload_file(file: UploadFile = None):
if not file:
return ResponseBaseModel(code=http.HTTPStatus.NOT_ACCEPTABLE, message=f"文件为空,上传失败", data={
"file_id": 0,
})
f = XGridFS.shared_gridfs().find_one({'filename': file.filename})
# 如果文件存在,则返回文件id;如果不存在,则写入GridFS
file_content = None
if f:
gridfs_file_content = XGridFS.shared_gridfs().gridfs().get(f._id).read()
file_content = await file.read()
# 比较文件是否相同
if md5_by_data(file_content) == md5_by_data(gridfs_file_content):
return ResponseBaseModel(message=f"文件{file.filename}已经存在", data={
"file_id": str(f._id),
})
if not file_content:
file_content = await file.read()
# 将文件写入 GridFS 中
file_id = XGridFS.shared_gridfs().put_file(file_content, file.filename)
if not file_id:
return ResponseBaseModel(message=f"文件{file.filename}上传失败", data={
"file_id": 0,
})
return ResponseBaseModel(message=f"文件{file.filename}上传成功", data={
"file_id": str(file_id),
})
|
7.执行效果
文章作者
梵梵爸
上次更新
2023-11-21
许可协议
原创文章,如需转载请注明文章作者和出处。谢谢