Trên Đám mây Amazon Web Services [AWS], AWS Glue là dịch vụ trích xuất, chuyển đổi và tải [ETL] được quản lý toàn phần. AWS Glue giúp phân loại dữ liệu của bạn, làm sạch, bổ sung dữ liệu và di chuyển dữ liệu một cách đáng tin cậy giữa các kho lưu trữ dữ liệu và luồng dữ liệu khác nhau với chi phí hiệu quả
Mẫu này cung cấp các loại công việc khác nhau trong AWS Glue và sử dụng ba tập lệnh khác nhau để minh họa các công việc ETL tác giả
Bạn có thể sử dụng AWS Glue để viết các lệnh ETL trong môi trường trình bao Python. Bạn cũng có thể tạo cả công việc ETL hàng loạt và trực tuyến bằng cách sử dụng Python [PySpark] hoặc Scala trong môi trường Apache Spark được quản lý. Để giúp bạn bắt đầu với việc soạn thảo các công việc ETL, mẫu này tập trung vào các công việc ETL hàng loạt bằng cách sử dụng Python shell, PySpark và Scala. Công việc trình bao Python dành cho khối lượng công việc yêu cầu sức mạnh tính toán thấp hơn. Môi trường Apache Spark được quản lý dành cho khối lượng công việc yêu cầu sức mạnh tính toán cao
Apache Parquet được xây dựng để hỗ trợ các sơ đồ nén và mã hóa hiệu quả. Nó có thể tăng tốc khối lượng công việc phân tích của bạn vì nó lưu trữ dữ liệu theo kiểu cột. Chuyển đổi dữ liệu sang Parquet có thể giúp bạn tiết kiệm cả không gian lưu trữ, chi phí và thời gian trong thời gian dài hơn. Để tìm hiểu thêm về Parquet, hãy xem bài đăng trên blog Apache Parquet. Cách trở thành anh hùng với định dạng dữ liệu cột nguồn mở trên đám mây Google, Azure và Amazon
Điều kiện tiên quyết và hạn chế
điều kiện tiên quyết
Vai trò AWS Identity and Access Management [IAM]
Ngành kiến trúc
Ngăn xếp công nghệ mục tiêu
Keo AWS
Dịch vụ lưu trữ đơn giản của Amazon [Amazon S3]
Sàn gỗ Apache
Tự động hóa và quy mô
Quy trình làm việc AWS Glue hỗ trợ tự động hóa hoàn toàn quy trình ETL
Bạn có thể thay đổi số lượng đơn vị xử lý dữ liệu [DPU] hoặc loại worker để chia tỷ lệ theo chiều ngang và chiều dọc
Công cụ
Amazon S3 – Amazon Simple Storage Service [Amazon S3] cung cấp dung lượng lưu trữ cho internet. Bạn có thể sử dụng Amazon S3 để lưu trữ và truy xuất lượng dữ liệu bất kỳ vào bất kỳ lúc nào, từ bất kỳ đâu trên web
AWS Glue – AWS Glue là một dịch vụ ETL được quản lý hoàn toàn để phân loại, làm sạch, bổ sung và di chuyển dữ liệu của bạn giữa các kho dữ liệu và luồng dữ liệu khác nhau
Cấu hình
Sau đây là các cài đặt để định cấu hình sức mạnh tính toán của AWS Glue ETL. Để giảm chi phí, hãy sử dụng cài đặt tối thiểu khi bạn chạy khối lượng công việc được cung cấp trong mẫu này.
Vỏ Python – Bạn có thể sử dụng 1 DPU để sử dụng 16 GB bộ nhớ hoặc 0. 0625 DPU để sử dụng 1 GB bộ nhớ. Mẫu này sử dụng 0. 0625 DPU, là mặc định trong bảng điều khiển AWS Glue
Python hoặc Scala cho Spark – Nếu bạn chọn loại công việc liên quan đến Spark trong bảng điều khiển, thì AWS Glue theo mặc định sử dụng 10 công nhân và G. loại công nhân 1X. Mẫu này sử dụng hai công nhân, đây là số lượng tối thiểu được phép, với loại công nhân tiêu chuẩn, đủ và tiết kiệm chi phí
Bảng sau hiển thị các loại công nhân AWS Glue khác nhau cho môi trường Apache Spark. Bởi vì Python shell job không sử dụng môi trường Apache Spark để chạy Python nên nó không được đưa vào bảng
Tiêu chuẩn
G. 1X
G. 2X
vCPU
4
4
8
Kỉ niệm
16 GB
16 GB
32 GB
Dung lượng đĩa
50 GB
64 GB
128 GB
Giám đốc điều hành cho mỗi công nhân
2
1
1
Mã số
Khi tạo tác vụ AWS Glue, bạn có thể sử dụng vai trò IAM được đính kèm hoặc vai trò hiện có
Vỏ Python keo AWS
Mã Python sử dụng thư viện Pandas và PyArrow để chuyển đổi dữ liệu sang Parquet. Thư viện Pandas đã có sẵn. Thư viện PyArrow được tải xuống khi bạn chạy mẫu, vì đây là lần chạy một lần. Bạn có thể sử dụng tệp bánh xe để chuyển đổi PyArrow thành thư viện và cung cấp tệp dưới dạng gói thư viện. Để biết thêm thông tin về các tệp bánh xe đóng gói, hãy xem Cung cấp thư viện Python của riêng bạn
from io import BytesIO
import pandas as pd
import boto3
import os
import io
import site
from importlib import reload
from setuptools.command import easy_install
install_path = os.environ['GLUE_INSTALLATION']
easy_install.main[ ["--install-dir", install_path, "pyarrow"] ]
reload[site]
import pyarrow
input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"
input_bucket = input_loc.split['/', 1][0]
object_key = input_loc.split['/', 1][1]
output_loc_bucket = output_loc.split['/', 1][0]
output_loc_prefix = output_loc.split['/', 1][1]
s3 = boto3.client['s3']
obj = s3.get_object[Bucket=input_bucket, Key=object_key]
df = pd.read_csv[io.BytesIO[obj['Body'].read[]]]
parquet_buffer = BytesIO[]
s3_resource = boto3.resource['s3']
df.to_parquet[parquet_buffer, index=False]
s3_resource.Object[output_loc_bucket, output_loc_prefix + 'data' + '.parquet'].put[Body=parquet_buffer.getvalue[]]
Công việc AWS Glue Spark với Python
Để sử dụng loại công việc AWS Glue Spark với Python, hãy chọn Spark làm loại công việc. Chọn tia lửa 2. 4, Python 3 với thời gian khởi động công việc được cải thiện [Phiên bản keo 2. 0] dưới dạng phiên bản Keo AWS
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]
input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"
inputDyf = glueContext.create_dynamic_frame_from_options[\
connection_type = "s3", \
connection_options = {
"paths": [input_loc]}, \
format = "csv",
format_options={
"withHeader": True,
"separator": ","
}]
outputDF = glueContext.write_dynamic_frame.from_options[\
frame = inputDyf, \
connection_type = "s3", \
connection_options = {"path": output_loc \
}, format = "parquet"]
Công việc AWS Glue Spark với Scala
Để sử dụng loại công việc AWS Glue Spark với Scala, hãy chọn Spark làm loại công việc. Chọn tia lửa 2. 4, Scala 2 với thời gian khởi động công việc được cải thiện [Glue Version 2. 0] dưới dạng phiên bản Keo AWS. Để tiết kiệm dung lượng lưu trữ, mẫu AWS Glue with Scala sau đây cũng sử dụng tính năng applyMapping để chuyển đổi các loại dữ liệu
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
object GlueScalaApp {
def main[sysArgs: Array[String]] {
@transient val spark: SparkContext = SparkContext.getOrCreate[]
val glueContext: GlueContext = new GlueContext[spark]
val inputLoc = "s3://bucket-name/prefix/sample_data.csv"
val outputLoc = "s3://bucket-name/prefix/"
val readCSV = glueContext.getSource["csv", JsonOptions[Map["paths" -> Set[inputLoc]]]].getDynamicFrame[]
val applyMapping = readCSV.applyMapping[mappings = Seq[["_c0", "string", "date", "string"], ["_c1", "string", "sales", "long"],
["_c2", "string", "profit", "double"]], caseSensitive = false]
val formatPartition = applyMapping.toDF[].coalesce[1]
val dynamicFrame = DynamicFrame[formatPartition, glueContext]
val dataSink = glueContext.getSinkWithFormat[
connectionType = "s3",
options = JsonOptions[Map["path" -> outputLoc ]],
transformationContext = "dataSink", format = "parquet"].writeDynamicFrame[dynamicFrame]
}
}
sử thi
Nhiệm vụMô tảKỹ năng bắt buộcTải dữ liệu lên bộ chứa S3 mới hoặc hiện có
Tạo hoặc sử dụng bộ chứa S3 hiện có trong tài khoản của bạn. Tải lên sample_data. csv từ phần Tệp đính kèm và lưu ý bộ chứa S3 và vị trí tiền tố
AWS chungMô tả nhiệm vụ Yêu cầu kỹ năngTạo công việc AWS Glue
Trong phần ETL của bảng điều khiển AWS Glue, hãy thêm một công việc AWS Glue. Chọn loại công việc phù hợp, phiên bản AWS Glue, loại DPU/Công nhân tương ứng và số lượng công nhân. Để biết chi tiết, xem phần Cấu hình
Sao chép mã tương ứng với công việc Keo của bạn và thay đổi vị trí đầu vào và đầu ra mà bạn đã lưu ý trong Tải lên dữ liệu sử thi
Nhà phát triển, đám mây hoặc dữ liệuChạy công việc ETLChạy công việc của bạn và kiểm tra đầu ra. Lưu ý dung lượng đã giảm so với tệp gốc
Nhà phát triển, đám mây hoặc dữ liệuNgười giới thiệu
Hướng dẫn và video
Thông tin thêm
cấu hình thông số
Bạn có thể sử dụng các đoạn mã sau để đặt tham số cho công việc ETL của mình. AWS Glue sử dụng bốn tên đối số trong nội bộ.
--conf
0import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext[] glueContext = GlueContext[sc] spark = glueContext.spark_session job = Job[glueContext] input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options[\ connection_type = "s3", \ connection_options = { "paths": [input_loc]}, \ format = "csv", format_options={ "withHeader": True, "separator": "," }] outputDF = glueContext.write_dynamic_frame.from_options[\ frame = inputDyf, \ connection_type = "s3", \ connection_options = {"path": output_loc \ }, format = "parquet"]
1import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext[] glueContext = GlueContext[sc] spark = glueContext.spark_session job = Job[glueContext] input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options[\ connection_type = "s3", \ connection_options = { "paths": [input_loc]}, \ format = "csv", format_options={ "withHeader": True, "separator": "," }] outputDF = glueContext.write_dynamic_frame.from_options[\ frame = inputDyf, \ connection_type = "s3", \ connection_options = {"path": output_loc \ }, format = "parquet"]
2import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext[] glueContext = GlueContext[sc] spark = glueContext.spark_session job = Job[glueContext] input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options[\ connection_type = "s3", \ connection_options = { "paths": [input_loc]}, \ format = "csv", format_options={ "withHeader": True, "separator": "," }] outputDF = glueContext.write_dynamic_frame.from_options[\ frame = inputDyf, \ connection_type = "s3", \ connection_options = {"path": output_loc \ }, format = "parquet"]
Tham số
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]
input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"
inputDyf = glueContext.create_dynamic_frame_from_options[\
connection_type = "s3", \
connection_options = {
"paths": [input_loc]}, \
format = "csv",
format_options={
"withHeader": True,
"separator": ","
}]
outputDF = glueContext.write_dynamic_frame.from_options[\
frame = inputDyf, \
connection_type = "s3", \
connection_options = {"path": output_loc \
}, format = "parquet"]
2 phải được nhập rõ ràng trên bảng điều khiển AWS Glue. Chọn Công việc, Chỉnh sửa Công việc, Cấu hình bảo mật, thư viện tập lệnh và thông số công việc [tùy chọn]. Nhập import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]
input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"
inputDyf = glueContext.create_dynamic_frame_from_options[\
connection_type = "s3", \
connection_options = {
"paths": [input_loc]}, \
format = "csv",
format_options={
"withHeader": True,
"separator": ","
}]
outputDF = glueContext.write_dynamic_frame.from_options[\
frame = inputDyf, \
connection_type = "s3", \
connection_options = {"path": output_loc \
}, format = "parquet"]
2 làm khóa và cung cấp giá trị. Bạn cũng có thể sử dụng Giao diện dòng lệnh AWS [AWS CLI] hoặc AWS Glue API để đặt tham số này. Tham số import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]
input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"
inputDyf = glueContext.create_dynamic_frame_from_options[\
connection_type = "s3", \
connection_options = {
"paths": [input_loc]}, \
format = "csv",
format_options={
"withHeader": True,
"separator": ","
}]
outputDF = glueContext.write_dynamic_frame.from_options[\
frame = inputDyf, \
connection_type = "s3", \
connection_options = {"path": output_loc \
}, format = "parquet"]
2 được Spark sử dụng và không cần thiết trong công việc môi trường trình bao PythonBạn phải thêm
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]
input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"
inputDyf = glueContext.create_dynamic_frame_from_options[\
connection_type = "s3", \
connection_options = {
"paths": [input_loc]}, \
format = "csv",
format_options={
"withHeader": True,
"separator": ","
}]
outputDF = glueContext.write_dynamic_frame.from_options[\
frame = inputDyf, \
connection_type = "s3", \
connection_options = {"path": output_loc \
}, format = "parquet"]
3 trước mỗi tên tham số; . Ví dụ: đối với các đoạn mã sau, tham số vị trí phải được gọi bởi import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]
input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"
inputDyf = glueContext.create_dynamic_frame_from_options[\
connection_type = "s3", \
connection_options = {
"paths": [input_loc]}, \
format = "csv",
format_options={
"withHeader": True,
"separator": ","
}]
outputDF = glueContext.write_dynamic_frame.from_options[\
frame = inputDyf, \
connection_type = "s3", \
connection_options = {"path": output_loc \
}, format = "parquet"]
4 và import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]
input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"
inputDyf = glueContext.create_dynamic_frame_from_options[\
connection_type = "s3", \
connection_options = {
"paths": [input_loc]}, \
format = "csv",
format_options={
"withHeader": True,
"separator": ","
}]
outputDF = glueContext.write_dynamic_frame.from_options[\
frame = inputDyf, \
connection_type = "s3", \
connection_options = {"path": output_loc \
}, format = "parquet"]
5Vỏ Python keo AWS
from awsglue.utils import getResolvedOptions
args = getResolvedOptions[sys.argv, ["input_loc", "output_loc"]]
AWS Keo Python
from awsglue.utils import getResolvedOptions
args = getResolvedOptions[sys.argv, ["JOB_NAME", "input_loc", "output_loc"]]
Keo AWS Scala
________số 8tệp đính kèm
Để truy cập nội dung bổ sung được liên kết với tài liệu này, hãy giải nén tệp sau. tập tin đính kèm. khóa kéo