Mysql

数据库操作

批量 kill 掉查询

有时候需要批量 kill 掉查询进程,有几种方式,比如生成 sql 文件执行:

mysql>selectconcat('KILL ',id,';')frominformation_schema.processlist
whereuser='root'andtime>200intooutfile'/tmp/a.txt';
mysql>source/tmp/a.txt;

有时候没法生成文件(权限原因),可以直接生成 sql 语句 copy 下来复制到命令行也可以,或者连接成一行方便复制:

mysql>selectconcat('KILL ',id,';')frominformation_schema.processlistwheredb='dbname';`
mysql>selectGROUP_CONCAT(statSEPARATOR' ')from(selectconcat('KILL ',id,';')asstatfrominformation_schema.processlistwheredb='dbname')asstats;
# 按客户端 IP 分组,看哪个客户端的链接数最多
selectclient_ip,count(client_ip)asclient_numfrom(selectsubstring_index(host,':',1)asclient_ipfromprocesslist)asconnect_infogroupbyclient_iporderbyclient_numdesc;
# 查看正在执行的线程,并按 Time 倒排序,看看有没有执行时间特别长的线程
select*frominformation_schema.processlistwhereCommand!='Sleep'orderbyTimedesc;
# 找出所有执行时间超过 5 分钟的线程,拼凑出 kill 语句,方便后面查杀
selectconcat('kill ',id,';')frominformation_schema.processlistwhereCommand!='Sleep'andTime>300orderbyTimedesc;

也可以通过 python 脚本来完成,原理也是查询进程 id 然后删除:

# https://stackoverflow.com/questions/1903838/how-do-i-kill-all-the-processes-in-mysql-show-processlist
importpymysql # pip install pymysql
connection = pymysql.connect(host='',
 user='',
 db='',
 password='',
 cursorclass=pymysql.cursors.DictCursor)
with connection.cursor() as cursor:
 cursor.execute('SHOW PROCESSLIST')
 for item in cursor.fetchall():
 if item.get('db') == 'dbname': # 过滤条件
 _id = item.get('Id')
 print('kill %s' % item)
 cursor.execute('kill %s', _id)
 connection.close()

删除大表(借助一个临时表)

# https://stackoverflow.com/questions/879327/quickest-way-to-delete-enormous-mysql-table
CREATETABLEnew_fooLIKEfoo;
RENAMETABLEfooTOold_foo,new_fooTOfoo;
DROPTABLEold_foo;
# 发现好像直接用 truncate table tablename; 清理千万级别表也挺快的

统计表的大小并排序

#https://stackoverflow.com/questions/9620198/how-to-get-the-sizes-of-the-tables-of-a-mysql-database
SELECT
table_schemaas`Database`,
table_nameAS`Table`,
round(((data_length+index_length)/1024/1024),2)`SizeinMB`
FROMinformation_schema.TABLES
ORDERBY(data_length+index_length)DESC;

统计数据库大小

# https://stackoverflow.com/questions/1733507/how-to-get-size-of-mysql-database
SELECTtable_schema"DB Name",
ROUND(SUM(data_length+index_length)/1024/1024,1)"DB Size in MB"
FROMinformation_schema.tables
GROUPBYtable_schema;
# 输出前十个大表
selecttable_schemaasdatabase_name,
table_name,
round((data_length+index_length)/1024/1024,2)astotal_size,
round((data_length)/1024/1024,2)asdata_size,
round((index_length)/1024/1024,2)asindex_size
frominformation_schema.tables
wheretable_schemanotin('information_schema','mysql',
'performance_schema','sys')
andtable_type='BASE TABLE'
--andtable_schema='your database name'
orderbytotal_sizedesc
limit10;

查看表信息

mysql>showtablestatus;
mysql>showtablestatuswhereRows>100000;

纵向显示

有时候表字段比较多的时候,查询结果显示会很乱,可以使用竖屏显示的方式,结尾加上 \G

mysql>select*fromuserlimit10\G

导出和导入表的数据

shell>mysqldump-uuser-hhost-ppassdb_nametable_name>out.sql
mysql>source/path/to/out.sql

重命名数据库

#https://stackoverflow.com/questions/67093/how-do-i-quickly-rename-a-mysql-database-change-schema-name
mysqldump-uusername-p-volddatabase>olddbdump.sql
mysqladmin-uusername-pcreatenewdatabase
mysql-uusername-pnewdatabase<olddbdump.sql

Python Mysql 操作

Sqlalchemy 示例

# -*- coding: utf-8 -*-
"""
sqlalchemy 快速读取 mysql 数据示例
# pip install SQLAlchemy -i https://pypi.doubanio.com/simple --user
pip install SQLAlchemy==1.3.20 -i https://pypi.doubanio.com/simple --user
pip install pymysql -i https://pypi.doubanio.com/simple --user
"""
importsqlalchemyasdb
fromsqlalchemyimport text
"""
# 本机 mysql 创建一个测试表
CREATE TABLE `area_code` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `code` bigint(12) NOT NULL DEFAULT '0' COMMENT '行政区划代码',
 `name` varchar(32) NOT NULL DEFAULT '' COMMENT '名称',
 PRIMARY KEY (`id`),
 KEY `idx_code` (`code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
defsqlalchemy_demo():
 # https://towardsdatascience.com/sqlalchemy-python-tutorial-79a577141a91
 url = "mysql+pymysql://root:wnnwnn@127.0.0.1:3306/testdb" # 本地用的测试地址
 engine = db.create_engine(url)
 connection = engine.connect()
 metadata = db.MetaData()
 table = db.Table('area_code', metadata, autoload=True, autoload_with=engine)
 # 插入单个数据
 query = db.insert(table).values(code=10010, name="北京")
 connection.execute(query)
 # 插入多个数据
 query = db.insert(table)
 values = [
 {'code': 10020, 'name': '上海'},
 {'code': 10030, 'name': '杭州'},
 ]
 connection.execute(query, values)
 # 查询
 query = db.select([table]).order_by(db.desc(table.columns.id)).limit(10)
 rows = connection.execute(query).fetchall()
 for row in rows:
 print(row.id, row.code, row.name)
 # 修改
 query = db.update(table).values(name="帝都").where(table.columns.code == 10010)
 connection.execute(query)
 # 删除行
 query = db.delete(table).where(table.columns.code == 10010)
 connection.execute(query)
 defsqlalchemy_text_demo():
"""直接执行 sql 语句 """
 url = "mysql+pymysql://root:wnnwnn@127.0.0.1:3306/testdb" # 本地用的测试地址
 engine = db.create_engine(url)
 connection = engine.connect()
 sql = text("show tables;")
 res = connection.execute(sql)
 for i in res:
 print(i)
if __name__ == "__main__":
 sqlalchemy_demo()

Go Mysql 操作

go 可以使用 gorm 或者 database/sql

packagemain
import(
"database/sql"
"fmt"
"log"
"time"
_"github.com/go-sql-driver/mysql"
)
funcmain(){
db,err:=sql.Open("mysql","root:root@(127.0.0.1:3306)/root?parseTime=true")
iferr!=nil{
log.Fatal(err)
}
iferr:=db.Ping();err!=nil{
log.Fatal(err)
}
{// Create a new table
query:=`
 CREATE TABLE users (
 id INT AUTO_INCREMENT,
 username TEXT NOT NULL,
 password TEXT NOT NULL,
 created_at DATETIME,
 PRIMARY KEY (id)
 );`
if_,err:=db.Exec(query);err!=nil{
log.Fatal(err)
}
}
{// Insert a new user
username:="johndoe"
password:="secret"
createdAt:=time.Now()
result,err:=db.Exec(`INSERT INTO users (username, password, created_at) VALUES (?, ?, ?)`,username,password,createdAt)
iferr!=nil{
log.Fatal(err)
}
id,err:=result.LastInsertId()
fmt.Println(id)
}
{// Query a single user
var(
idint
usernamestring
passwordstring
createdAttime.Time
)
query:="SELECT id, username, password, created_at FROM users WHERE id = ?"
iferr:=db.QueryRow(query,1).Scan(&id,&username,&password,&createdAt);err!=nil{
log.Fatal(err)
}
fmt.Println(id,username,password,createdAt)
}
{// Query all users
typeuserstruct{
idint
usernamestring
passwordstring
createdAttime.Time
}
rows,err:=db.Query(`SELECT id, username, password, created_at FROM users`)
iferr!=nil{
log.Fatal(err)
}
deferrows.Close()
varusers[]user
forrows.Next(){
varuuser
err:=rows.Scan(&u.id,&u.username,&u.password,&u.createdAt)
iferr!=nil{
log.Fatal(err)
}
users=append(users,u)
}
iferr:=rows.Err();err!=nil{
log.Fatal(err)
}
fmt.Printf("%#v",users)
}
{
_,err:=db.Exec(`DELETE FROM users WHERE id = ?`,1)
iferr!=nil{
log.Fatal(err)
}
}
}