Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复mongo上线问题 #2803

Merged
merged 6 commits into from
Sep 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 45 additions & 22 deletions sql/engines/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def test_connection(self):
def exec_cmd(self, sql, db_name=None, slave_ok=""):
"""审核时执行的语句"""

if self.user and self.password and self.port and self.host:
if self.port and self.host:
msg = ""
auth_db = self.instance.db_name or "admin"
sql_len = len(sql)
Expand All @@ -303,7 +303,8 @@ def exec_cmd(self, sql, db_name=None, slave_ok=""):
)
fp.write(sql.encode("utf-8"))
fp.seek(0) # 把文件指针指向开始,这样写的sql内容才能落到磁盘文件上
cmd = "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}load('{tempfile_}')\nEOF".format(
if self.user and self.password:
cmd = "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}load('{tempfile_}')\nEOF".format(
mongo=mongo,
uname=self.user,
password=self.password,
Expand All @@ -315,11 +316,23 @@ def exec_cmd(self, sql, db_name=None, slave_ok=""):
slave_ok=slave_ok,
tempfile_=fp.name,
)
else:
cmd = "{mongo} --quiet {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}load('{tempfile_}')\nEOF".format(
mongo=mongo,
host=self.host,
port=self.port,
db_name=db_name,
sql=sql,
auth_db=auth_db,
slave_ok=slave_ok,
tempfile_=fp.name,
)
is_load = True # 标记使用了load方法,用来在finally里面判断是否需要强制删除临时文件
elif (
not sql.startswith("var host=") and sql_len < 4000
): # 在master节点执行的情况, 如果sql长度小于4000,就直接用mongo shell执行,减少磁盘交换,节省性能
cmd = "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}printjson({sql})\nEOF".format(
if self.user and self.password:
cmd = "{mongo} --quiet -u {uname} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}printjson({sql})\nEOF".format(
mongo=mongo,
uname=self.user,
password=self.password,
Expand All @@ -330,8 +343,19 @@ def exec_cmd(self, sql, db_name=None, slave_ok=""):
auth_db=auth_db,
slave_ok=slave_ok,
)
else:
cmd = "{mongo} --quiet {host}:{port}/{auth_db} <<\\EOF\ndb=db.getSiblingDB(\"{db_name}\");{slave_ok}printjson({sql})\nEOF".format(
mongo=mongo,
host=self.host,
port=self.port,
db_name=db_name,
sql=sql,
auth_db=auth_db,
slave_ok=slave_ok,
)
else:
cmd = "{mongo} --quiet -u {user} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\nrs.slaveOk();{sql}\nEOF".format(
if self.user and self.password:
cmd = "{mongo} --quiet -u {user} -p '{password}' {host}:{port}/{auth_db} <<\\EOF\nrs.slaveOk();{sql}\nEOF".format(
mongo=mongo,
user=self.user,
password=self.password,
Expand All @@ -341,6 +365,15 @@ def exec_cmd(self, sql, db_name=None, slave_ok=""):
sql=sql,
auth_db=auth_db,
)
else:
cmd = "{mongo} --quiet {host}:{port}/{auth_db} <<\\EOF\nrs.slaveOk();{sql}\nEOF".format(
mongo=mongo,
host=self.host,
port=self.port,
db_name=db_name,
sql=sql,
auth_db=auth_db,
)
p = subprocess.Popen(
cmd,
shell=True,
Expand Down Expand Up @@ -795,25 +828,15 @@ def execute_check(self, db_name=None, sql=""):
def get_connection(self, db_name=None):
self.db_name = db_name or self.instance.db_name or "admin"
auth_db = self.instance.db_name or "admin"

self.conn = pymongo.MongoClient(
self.host,
self.port,
authSource=auth_db,
connect=True,
connectTimeoutMS=10000,
)
if self.user and self.password:
self.conn = pymongo.MongoClient(
self.host,
self.port,
username=self.user,
password=self.password,
authSource=auth_db,
connect=True,
connectTimeoutMS=10000,
)
else:
self.conn = pymongo.MongoClient(
self.host,
self.port,
authSource=auth_db,
connect=True,
connectTimeoutMS=10000,
)
self.conn[self.db_name].authenticate(self.user, self.password, auth_db)
return self.conn

def close(self):
Expand Down
Loading