[Nhà tài trợ] Bắt đầu học Python với hướng dẫn Giới thiệu về Python miễn phí của DataCamp. Tìm hiểu Khoa học dữ liệu bằng cách hoàn thành các thử thách mã hóa tương tác và xem video của các chuyên gia hướng dẫn. Bắt đầu bây giờ
Cập nhật ngày 07 tháng 1 năm 2020
Câu lệnh Insert được sử dụng để chèn các bản ghi trong mysql
cú pháp.
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] sql = "insert into city VALUES[null, 'Mars City', 'MAC', 'MARC', 1233]" number_of_rows = cursor.execute[sql] db.commit[] # you need to call commit[] method to save # your changes to the database db.close[]3
ví dụ 1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] sql = "insert into city VALUES[null, 'Mars City', 'MAC', 'MARC', 1233]" number_of_rows = cursor.execute[sql] db.commit[] # you need to call commit[] method to save # your changes to the database db.close[]
Chương trình này chèn một thành phố mới vào bảng thành phố, chú ý sử dụng tới
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] sql = "insert into city VALUES[null, 'Mars City', 'MAC', 'MARC', 1233]" number_of_rows = cursor.execute[sql] db.commit[] # you need to call commit[] method to save # your changes to the database db.close[]4, phương pháp này lưu các thay đổi của bạn vào cơ sở dữ liệu
ví dụ 2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] name = "Some new city" country_code = 'PSE' district = 'Someyork' population = 10008 sql = "insert into city VALUES[null, '%s', '%s', '%s', %d]" % \ [name, country_code , district, population] number_of_rows = cursor.execute[sql] db.commit[] db.close[]
Lưu ý việc sử dụng ký tự dấu gạch chéo ngược [
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] sql = "insert into city VALUES[null, 'Mars City', 'MAC', 'MARC', 1233]" number_of_rows = cursor.execute[sql] db.commit[] # you need to call commit[] method to save # your changes to the database db.close[]5] trong dòng 18. Ký tự
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] sql = "insert into city VALUES[null, 'Mars City', 'MAC', 'MARC', 1233]" number_of_rows = cursor.execute[sql] db.commit[] # you need to call commit[] method to save # your changes to the database db.close[]5 dùng để chia câu lệnh python thành nhiều dòng
Chèn nhiều hàng
Để chèn nhiều hàng vào bảng, hãy sử dụng phương thức
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] sql = "insert into city VALUES[null, 'Mars City', 'MAC', 'MARC', 1233]" number_of_rows = cursor.execute[sql] db.commit[] # you need to call commit[] method to save # your changes to the database db.close[]0 của đối tượng con trỏ
cú pháp.
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] sql = "insert into city VALUES[null, 'Mars City', 'MAC', 'MARC', 1233]" number_of_rows = cursor.execute[sql] db.commit[] # you need to call commit[] method to save # your changes to the database db.close[]1
bản tường trình. chuỗi chứa truy vấn để thực thi
tranh luận. một chuỗi chứa các giá trị để sử dụng trong câu lệnh chèn
Hãy lấy một ví dụ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] name = "Some new city" country_code = 'SNC' district = 'Someyork' population = 10008 data = [ ['city 1', 'MAC', 'distrct 1', 16822], ['city 2', 'PSE', 'distrct 2', 15642], ['city 3', 'ZWE', 'distrct 3', 11642], ['city 4', 'USA', 'distrct 4', 14612], ['city 5', 'USA', 'distrct 5', 17672], ] sql = "insert into city[name, countrycode, district, population] VALUES[%s, %s, %s, %s]" number_of_rows = cursor.executemany[sql, data] db.commit[] db.close[]
Trong bài tiếp theo, chúng tôi thảo luận về cách xử lý lỗi
Hướng dẫn khác [Nhà tài trợ]
Trang web này được hỗ trợ rộng rãi bởi DataCamp. DataCamp cung cấp Hướng dẫn Python tương tác trực tuyến cho Khoa học dữ liệu. Tham gia cùng hơn một triệu người học khác và bắt đầu học Python cho khoa học dữ liệu ngay hôm nay
Trong bài đăng này, tôi sẽ hướng dẫn cách di chuyển nhanh một lượng lớn dữ liệu sang Cơ sở dữ liệu bằng Python
Bài đăng này xuất phát từ yêu cầu của dự án để đọc khoảng 20 triệu bản ghi JSON và tải nội dung của chúng vào Cơ sở dữ liệu máy chủ SQL
điều kiện tiên quyết
Tôi sẽ giả định một vài điều
- Đầu tiên, bạn hơi quen thuộc với Python và một số khái niệm như sử dụng mô-đun, nhập thư viện và cấu trúc dữ liệu phổ biến
- Bạn đã quen với việc sử dụng Python để di chuyển dữ liệu đến Cơ sở dữ liệu và sử dụng trình kết nối Cơ sở dữ liệu
- Cuối cùng, tôi sẽ giả sử Cơ sở dữ liệu đích là Cơ sở dữ liệu quan hệ cấp doanh nghiệp như Postgres hoặc SQL Server. Chúng có các tính năng hữu ích để tải dữ liệu nhanh chóng, chẳng hạn như đồng thời và nhiều kết nối
Bài đăng này có ý định hướng dẫn cách tiếp cận một vấn đề hơn là cung cấp một hướng dẫn
Những thách thức của rất nhiều dữ liệu
Có hai thách thức chính khi di chuyển khối lượng lớn dữ liệu sang Cơ sở dữ liệu
- Thời gian - Một thao tác, chẳng hạn như chèn bản ghi, có thể mất vài phần triệu giây nhưng có thể lên đến vài phút hoặc vài giờ khi được thực hiện hàng triệu lần
- Bộ nhớ - Nếu kích thước của tập dữ liệu lớn hơn RAM khả dụng của chúng tôi, chúng tôi sẽ không thể đưa tất cả vào danh sách và xử lý nó
Trong bài đăng này, chúng ta sẽ khám phá cách đối phó với những thách thức này
Đi nhanh hơn
Trước khi xem mã, hãy khám phá một số cách để tải dữ liệu vào Cơ sở dữ liệu nhanh hơn
chủ đề
Thực hiện lặp đi lặp lại cùng một hành động trên dữ liệu là một gợi ý tốt rằng tác vụ có thể hưởng lợi từ đồng thời, cụ thể là Chủ đề
Phân luồng cho phép chúng ta chia một nhiệm vụ thành nhiều phần công việc và thực hiện chúng đồng thời thay vì tuần tự. Chúng ta có thể làm nhiều việc hơn trong cùng một khoảng thời gian
Tránh tắc nghẽn
Việc chèn một hàng tại một thời điểm có thể chỉ mất một phần triệu giây nhưng sẽ tăng lên nếu được thực hiện hàng triệu lần
Để tối đa hóa tốc độ, chúng tôi sẽ cam kết các bản ghi tối đa có thể trong một giao dịch Cơ sở dữ liệu. Câu lệnh cụ thể của
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] sql = "insert into city VALUES[null, 'Mars City', 'MAC', 'MARC', 1233]" number_of_rows = cursor.execute[sql] db.commit[] # you need to call commit[] method to save # your changes to the database db.close[]7 được sử dụng được gọi là
SQL thô
Thông thường, nên sử dụng ORM [Object Relational Mapper] để tương tác với Cơ sở dữ liệu trong Python. Tuy nhiên, việc tạo ra hàng nghìn hoặc hàng triệu đối tượng ORM cụ thể sẽ làm tăng thêm chi phí mà chúng tôi có thể tránh được
Thay vào đó, SQL thô sẽ được xây dựng từ dữ liệu và được thực thi với API cơ sở dữ liệu
Chúng ta hãy xem mã
Đang cài đặt
Nếu bạn muốn làm theo, bạn sẽ cần một số dữ liệu nguồn. Nguồn trong hướng dẫn này là tệp CSV
Bài đăng này nhằm mục đích tải dữ liệu nhanh chóng, vì vậy tôi sẽ không trình bày chi tiết cách thức hoạt động của nó. Tuy nhiên, tập lệnh sử dụng thư viện Faker tuyệt vời để tạo dữ liệu cá nhân giả và gấu trúc để ghi ra CSV
data_gen. py
________số 8Thật không may, nhiệm vụ này không phải là nhanh chóng. Tập lệnh mất khoảng 5 phút để chạy trên máy của tôi. Thay đổi đối số n_records để tạo bộ dữ liệu lớn hơn
Đang tải dữ liệu
Để làm cho mã dễ quản lý hơn, tôi đã chia mã thành hai tệp [mô-đun]
0 - mô-đun quản lý tải nhanh dữ liệu nguồnfrom concurrent import futures import csv import queue import sqlactions MULTI_ROW_INSERT_LIMIT = 1000 WORKERS = 6 def read_csv[csv_file]: with open[csv_file, encoding="utf-8", newline=""] as in_file: reader = csv.reader[in_file, delimiter="|"] next[reader] # Header row for row in reader: yield row def process_row[row, batch, table_name, conn_params]: batch.put[row] if batch.full[]: sqlactions.multi_row_insert[batch, table_name, conn_params] return batch def load_csv[csv_file, table_def, conn_params]: # Optional, drops table if it exists before creating sqlactions.make_table[table_def, conn_params] batch = queue.Queue[MULTI_ROW_INSERT_LIMIT] with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor: todo = [] for row in read_csv[csv_file]: future = executor.submit[ process_row, row, batch, table_def["name"], conn_params ] todo.append[future] for future in futures.as_completed[todo]: result = future.result[] # Handle left overs if not result.empty[]: sqlactions.multi_row_insert[result, table_def["name"], conn_params] if __name__ == "__main__": table_def = { "name": "dummy_data", "columns": { "id": "INTEGER", "job": "VARCHAR[100]", "company": "VARCHAR[100]", "name": "VARCHAR[100]", "sex": "CHAR", "mail": "VARCHAR[100]", "birthdate": "DATE", }, } conn_params = { "server": "localhost", "database": "TutorialDB", "user": "yourUserName", "tds_version": "7.4", "password": "yourStrong[!]Password", "port": 1433, "driver": "FreeTDS", } load_csv["dummy_data.csv", table_def, conn_params]
1 - mô-đun chứa các chức năng thực hiện các hành động trên Cơ sở dữ liệu đíchfrom concurrent import futures import csv import queue import sqlactions MULTI_ROW_INSERT_LIMIT = 1000 WORKERS = 6 def read_csv[csv_file]: with open[csv_file, encoding="utf-8", newline=""] as in_file: reader = csv.reader[in_file, delimiter="|"] next[reader] # Header row for row in reader: yield row def process_row[row, batch, table_name, conn_params]: batch.put[row] if batch.full[]: sqlactions.multi_row_insert[batch, table_name, conn_params] return batch def load_csv[csv_file, table_def, conn_params]: # Optional, drops table if it exists before creating sqlactions.make_table[table_def, conn_params] batch = queue.Queue[MULTI_ROW_INSERT_LIMIT] with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor: todo = [] for row in read_csv[csv_file]: future = executor.submit[ process_row, row, batch, table_def["name"], conn_params ] todo.append[future] for future in futures.as_completed[todo]: result = future.result[] # Handle left overs if not result.empty[]: sqlactions.multi_row_insert[result, table_def["name"], conn_params] if __name__ == "__main__": table_def = { "name": "dummy_data", "columns": { "id": "INTEGER", "job": "VARCHAR[100]", "company": "VARCHAR[100]", "name": "VARCHAR[100]", "sex": "CHAR", "mail": "VARCHAR[100]", "birthdate": "DATE", }, } conn_params = { "server": "localhost", "database": "TutorialDB", "user": "yourUserName", "tds_version": "7.4", "password": "yourStrong[!]Password", "port": 1433, "driver": "FreeTDS", } load_csv["dummy_data.csv", table_def, conn_params]
Bạn sẽ tìm thấy mã đầy đủ trong kho lưu trữ kèm theo bài đăng này
Đầu tiên, hãy xem mô-đun
from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
2tảicsv
tảicsv. py
from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
thiết lập ban đầu
Mô-đun bắt đầu bằng cách nhập các thư viện cần thiết,
from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
3, from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
4 và from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
5, tất cả từ thư viện chuẩn, tiếp theo là mô-đun from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
6Sau khi nhập, hằng số được xác định. Chúng bao gồm
from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
7, xác định kích thước hàng đợi [xem bên dưới] và số lượng luồng hoặc from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
8Đọc dữ liệu
Tiếp theo là đọc dữ liệu từ tệp CSV
Đọc dữ liệu được thực hiện với một trình tạo. Trình tạo là một loại hàm cụ thể trả về từng giá trị một [
from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
9] thay vì trả về tất cả chúng cùng một lúc [_______200]tảicsv. py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 240
Sử dụng trình tạo để tránh đưa tất cả dữ liệu vào danh sách và hết bộ nhớ
Thay vào đó, dữ liệu được đọc theo khối [trong trường hợp này là một dòng], loại bỏ từng đoạn khi hoàn thành nó. Hãy nhớ rằng một khi dữ liệu đã được đưa vào Cơ sở dữ liệu, không cần phải giữ nó trong bộ nhớ
Thiết lập chủ đề
Thiết lập hàng đợi
Nhớ lại trước đó rằng cách tiếp cận nhanh hơn là chèn càng nhiều bản ghi càng tốt trong một giao dịch?
Batching yêu cầu cấu trúc dữ liệu để giữ dữ liệu hàng tạm thời. Sự lựa chọn tốt nhất ở đây là một hàng đợi có các tính năng hữu ích cho loại nhiệm vụ này
- Hàng đợi bị khóa bất cứ khi nào sửa đổi diễn ra trên chúng. Nếu chúng tôi lấy một giá trị ra khỏi hàng đợi, không có luồng nào khác có thể đồng thời sửa đổi hàng đợi. Thuộc tính này được gọi là "thread safe" và giải quyết một vấn đề gọi là điều kiện chủng tộc
- Hàng đợi được tối ưu hóa để lấy dữ liệu từ đầu và thêm dữ liệu vào cuối, tôi. e. , chúng tôi không cần truy cập các giá trị ở bất kỳ vị trí nào khác trong hàng đợi
tảicsv. py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 242
Là một lưu ý phụ, mẫu được sử dụng ở đây được gọi là nhà sản xuất-người tiêu dùng
Làm việc với chủ đề
Cách đơn giản nhất để sử dụng Chủ đề là đồng thời. mô-đun tương lai, một phần của thư viện chuẩn Python. Mô-đun trừu tượng hóa nhiều chi tiết của việc sử dụng Chủ đề
Cách sử dụng Chủ đề được khuyến nghị là khởi tạo một
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2401 bên trong trình quản lý bối cảnh
Bên trong trình quản lý bối cảnh, công việc cho Chủ đề cần được lên lịch
Lập lịch trình xảy ra bằng cách tạo một đối tượng
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2402 bằng phương thức
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2403 thêm nó vào danh sách
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2404. Mỗi tương lai được thông qua một định nghĩa hàm [có thể gọi được] và bất kỳ đối số bắt buộc nào
Tại thời điểm này, công việc được lên lịch nhưng không được thực hiện
Đối tượng Tương lai là gì?
Một Tương lai là một đại diện của một số công việc sẽ xảy ra trong tương lai. Trong bối cảnh này, tương lai là một cuộc gọi đến phương thức
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2405 với một số dữ liệu hàng và hàng đợi hàng loạt
Để thực hiện công việc theo lịch trình, chúng tôi sử dụng phương thức,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2406, lấy một danh sách [có thể lặp lại] các hợp đồng tương lai và đưa ra kết quả của chúng khi chúng hoàn thành
tảicsv. py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 249
Đoạn mã trên được chuyển thể từ cuốn sách Fluent Python của Luciano Ramalho. Cuốn sách có hai chương xuất sắc về đồng thời với
from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
3 và 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2408
Xử lý từng hàng
Hàm
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2405 thực hiện một số việc
- Lấy một hàng và thêm hàng vào hàng đợi hàng loạt
- Sau đó, kiểm tra xem hàng đợi hàng loạt đã đầy chưa. Nếu đúng như vậy, hàng đợi hàng loạt sẽ được chuyển đến một trong các hàm
6 để chèn vào Cơ sở dữ liệufrom concurrent import futures import csv import queue import sqlactions MULTI_ROW_INSERT_LIMIT = 1000 WORKERS = 6 def read_csv[csv_file]: with open[csv_file, encoding="utf-8", newline=""] as in_file: reader = csv.reader[in_file, delimiter="|"] next[reader] # Header row for row in reader: yield row def process_row[row, batch, table_name, conn_params]: batch.put[row] if batch.full[]: sqlactions.multi_row_insert[batch, table_name, conn_params] return batch def load_csv[csv_file, table_def, conn_params]: # Optional, drops table if it exists before creating sqlactions.make_table[table_def, conn_params] batch = queue.Queue[MULTI_ROW_INSERT_LIMIT] with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor: todo = [] for row in read_csv[csv_file]: future = executor.submit[ process_row, row, batch, table_def["name"], conn_params ] todo.append[future] for future in futures.as_completed[todo]: result = future.result[] # Handle left overs if not result.empty[]: sqlactions.multi_row_insert[result, table_def["name"], conn_params] if __name__ == "__main__": table_def = { "name": "dummy_data", "columns": { "id": "INTEGER", "job": "VARCHAR[100]", "company": "VARCHAR[100]", "name": "VARCHAR[100]", "sex": "CHAR", "mail": "VARCHAR[100]", "birthdate": "DATE", }, } conn_params = { "server": "localhost", "database": "TutorialDB", "user": "yourUserName", "tds_version": "7.4", "password": "yourStrong[!]Password", "port": 1433, "driver": "FreeTDS", } load_csv["dummy_data.csv", table_def, conn_params]
- Cuối cùng, nó trả về hàng đợi. Chúng tôi có thể xem đến cuối tệp CSV và thấy hàng đợi chưa đầy. Trả lại hàng đợi đầy một phần nghĩa là chúng ta có thể chèn riêng các hàng còn lại
tảicsv. py
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] name = "Some new city" country_code = 'PSE' district = 'Someyork' population = 10008 sql = "insert into city VALUES[null, '%s', '%s', '%s', %d]" % \ [name, country_code , district, population] number_of_rows = cursor.execute[sql] db.commit[] db.close[]4
sqlactions
Tại thời điểm này, đã đến lúc giới thiệu mô-đun
from concurrent import futures
import csv
import queue
import sqlactions
MULTI_ROW_INSERT_LIMIT = 1000
WORKERS = 6
def read_csv[csv_file]:
with open[csv_file, encoding="utf-8", newline=""] as in_file:
reader = csv.reader[in_file, delimiter="|"]
next[reader] # Header row
for row in reader:
yield row
def process_row[row, batch, table_name, conn_params]:
batch.put[row]
if batch.full[]:
sqlactions.multi_row_insert[batch, table_name, conn_params]
return batch
def load_csv[csv_file, table_def, conn_params]:
# Optional, drops table if it exists before creating
sqlactions.make_table[table_def, conn_params]
batch = queue.Queue[MULTI_ROW_INSERT_LIMIT]
with futures.ThreadPoolExecutor[max_workers=WORKERS] as executor:
todo = []
for row in read_csv[csv_file]:
future = executor.submit[
process_row, row, batch, table_def["name"], conn_params
]
todo.append[future]
for future in futures.as_completed[todo]:
result = future.result[]
# Handle left overs
if not result.empty[]:
sqlactions.multi_row_insert[result, table_def["name"], conn_params]
if __name__ == "__main__":
table_def = {
"name": "dummy_data",
"columns": {
"id": "INTEGER",
"job": "VARCHAR[100]",
"company": "VARCHAR[100]",
"name": "VARCHAR[100]",
"sex": "CHAR",
"mail": "VARCHAR[100]",
"birthdate": "DATE",
},
}
conn_params = {
"server": "localhost",
"database": "TutorialDB",
"user": "yourUserName",
"tds_version": "7.4",
"password": "yourStrong[!]Password",
"port": 1433,
"driver": "FreeTDS",
}
load_csv["dummy_data.csv", table_def, conn_params]
6Module là tập hợp các hàm thực hiện các thao tác trên Cơ sở dữ liệu. Mô-đun sử dụng thư viện xây dựng truy vấn PyPika để xây dựng các truy vấn SQL và kết hợp chúng với dữ liệu
sqlactions. py
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] name = "Some new city" country_code = 'PSE' district = 'Someyork' population = 10008 sql = "insert into city VALUES[null, '%s', '%s', '%s', %d]" % \ [name, country_code , district, population] number_of_rows = cursor.execute[sql] db.commit[] db.close[]6
Chèn từng đợt
Chức năng thiết yếu ở đây là
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2422. Việc chèn từng lô diễn ra bằng cách xây dựng một câu lệnh
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] sql = "insert into city VALUES[null, 'Mars City', 'MAC', 'MARC', 1233]" number_of_rows = cursor.execute[sql] db.commit[] # you need to call commit[] method to save # your changes to the database db.close[]7 và thực hiện nó trên Cơ sở dữ liệu
sqlactions. py
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] name = "Some new city" country_code = 'PSE' district = 'Someyork' population = 10008 sql = "insert into city VALUES[null, '%s', '%s', '%s', %d]" % \ [name, country_code , district, population] number_of_rows = cursor.execute[sql] db.commit[] db.close[]8
Mã bắt đầu xây dựng Chèn nhiều hàng bằng cách lặp qua hàng đợi hàng loạt, loại bỏ từng hàng khỏi hàng đợi. Dữ liệu hàng được chuyển đổi thành một bộ và được thêm vào danh sách
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2424
Việc xóa các mục ở phía trước hàng đợi sẽ tạo ra các vị trí ở cuối được lấp đầy bởi các luồng đang chờ
Với dữ liệu hàng đã được chuẩn bị, nó sẽ được kết hợp thành Chèn nhiều hàng thông qua hàm PyPika
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2425, hàm này lấy các bộ dữ liệu làm dữ liệu hàng
Cuối cùng, câu lệnh được thực thi và sử dụng liên kết kết nối Cơ sở dữ liệu và giao dịch với một loạt bản ghi
Tại sao xây dựng một truy vấn?
Bạn có thể thắc mắc tại sao chúng tôi đang xây dựng SQL thô mà không sử dụng truy vấn được tham số hóa? . Trong SQL Server, giới hạn này là 2100
Trong một truy vấn được tham số hóa, mọi phần dữ liệu đều là một tham số và giảm đáng kể lượng dữ liệu được cam kết trong một giao dịch
Kết hợp SQL thô và dữ liệu
Cần phải chỉ ra rằng việc xây dựng SQL thô và kết nối với dữ liệu là một điều tối kỵ về bảo mật. Xây dựng SQL thô có thể khiến Cơ sở dữ liệu đích dễ bị tấn công SQL injection
Bạn sẽ cần đánh giá xem phương pháp này có phù hợp với mình hay không và có khả năng thực hiện các bước để bảo vệ chống lại điều này
Dọn dẹp lần cuối
Bước cuối cùng trong quy trình là cam kết bất kỳ hàng nào còn lại
Bởi vì các bản ghi được chèn vào khi hàng đợi lô đầy, chúng tôi có thể đến cuối tập dữ liệu và thấy hàng đợi chưa đầy
Hàng đợi đầy một phần được chuyển đến hàm
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 2426 để đảm bảo các bản ghi còn lại được tải
tảicsv. py
from __future__ import print_function import MySQLdb as my db = my.connect[host="127.0.0.1", user="root", passwd="", db="world" ] cursor = db.cursor[] sql = "insert into city VALUES[null, 'Mars City', 'MAC', 'MARC', 1233]" number_of_rows = cursor.execute[sql] db.commit[] # you need to call commit[] method to save # your changes to the database db.close[]2
Một từ về ngoại lệ
Mã mà chúng tôi đã xem qua cố tình bỏ qua các ngoại lệ để chúng tôi có thể tập trung vào logic và quy trình
Tải một lượng lớn dữ liệu vào Cơ sở dữ liệu có thể mất một khoảng thời gian không nhỏ
Bạn nên cân nhắc việc xử lý các ngoại lệ theo cách không làm giảm tải và buộc bạn phải bắt đầu lại—ví dụ: ghi lại bất kỳ dữ liệu nào khiến thao tác chèn không thành công, tiếp tục và sau đó xử lý các ngoại lệ đó một cách riêng biệt
Phần kết luận
Phù. Ở đó chúng tôi có nó
Hy vọng rằng bài viết này đã cung cấp cho bạn ý tưởng hay về các kỹ thuật bạn có thể sử dụng để tải các tập dữ liệu lớn vào Cơ sở dữ liệu của mình một cách nhanh chóng
Có một vài thứ khác mà chúng tôi chưa thử, Chèn bảng dẫn xuất, đọc dữ liệu theo khối lớn hơn hoặc sử dụng asyncio. Tôi sẽ để lại những điều đó để bạn xem xét