Chuyển đổi xml sang python sàn gỗ

Trên Đám mây Amazon Web Services [AWS], AWS Glue là dịch vụ trích xuất, chuyển đổi và tải [ETL] được quản lý toàn phần. AWS Glue giúp phân loại dữ liệu của bạn, làm sạch, bổ sung dữ liệu và di chuyển dữ liệu một cách đáng tin cậy giữa các kho lưu trữ dữ liệu và luồng dữ liệu khác nhau với chi phí hiệu quả

Mẫu này cung cấp các loại công việc khác nhau trong AWS Glue và sử dụng ba tập lệnh khác nhau để minh họa các công việc ETL tác giả

Bạn có thể sử dụng AWS Glue để viết các lệnh ETL trong môi trường trình bao Python. Bạn cũng có thể tạo cả công việc ETL hàng loạt và trực tuyến bằng cách sử dụng Python [PySpark] hoặc Scala trong môi trường Apache Spark được quản lý. Để giúp bạn bắt đầu với việc soạn thảo các công việc ETL, mẫu này tập trung vào các công việc ETL hàng loạt bằng cách sử dụng Python shell, PySpark và Scala. Công việc trình bao Python dành cho khối lượng công việc yêu cầu sức mạnh tính toán thấp hơn. Môi trường Apache Spark được quản lý dành cho khối lượng công việc yêu cầu sức mạnh tính toán cao

Apache Parquet được xây dựng để hỗ trợ các sơ đồ nén và mã hóa hiệu quả. Nó có thể tăng tốc khối lượng công việc phân tích của bạn vì nó lưu trữ dữ liệu theo kiểu cột. Chuyển đổi dữ liệu sang Parquet có thể giúp bạn tiết kiệm cả không gian lưu trữ, chi phí và thời gian trong thời gian dài hơn. Để tìm hiểu thêm về Parquet, hãy xem bài đăng trên blog Apache Parquet. Cách trở thành anh hùng với định dạng dữ liệu cột nguồn mở trên đám mây Google, Azure và Amazon

Điều kiện tiên quyết và hạn chế

điều kiện tiên quyết

  • Vai trò AWS Identity and Access Management [IAM]

Ngành kiến ​​​​trúc

Ngăn xếp công nghệ mục tiêu

  • Keo AWS

  • Dịch vụ lưu trữ đơn giản của Amazon [Amazon S3]

  • Sàn gỗ Apache

Tự động hóa và quy mô

Quy trình làm việc AWS Glue hỗ trợ tự động hóa hoàn toàn quy trình ETL

Bạn có thể thay đổi số lượng đơn vị xử lý dữ liệu [DPU] hoặc loại worker để chia tỷ lệ theo chiều ngang và chiều dọc

Công cụ

  • Amazon S3 – Amazon Simple Storage Service [Amazon S3] cung cấp dung lượng lưu trữ cho internet. Bạn có thể sử dụng Amazon S3 để lưu trữ và truy xuất lượng dữ liệu bất kỳ vào bất kỳ lúc nào, từ bất kỳ đâu trên web

  • AWS Glue – AWS Glue là một dịch vụ ETL được quản lý hoàn toàn để phân loại, làm sạch, bổ sung và di chuyển dữ liệu của bạn giữa các kho dữ liệu và luồng dữ liệu khác nhau

Cấu hình

Sau đây là các cài đặt để định cấu hình sức mạnh tính toán của AWS Glue ETL. Để giảm chi phí, hãy sử dụng cài đặt tối thiểu khi bạn chạy khối lượng công việc được cung cấp trong mẫu này.  

  • Vỏ Python – Bạn có thể sử dụng 1 DPU để sử dụng 16 GB bộ nhớ hoặc 0. 0625 DPU để sử dụng 1 GB bộ nhớ. Mẫu này sử dụng 0. 0625 DPU, là mặc định trong bảng điều khiển AWS Glue

  • Python hoặc Scala cho Spark – Nếu bạn chọn loại công việc liên quan đến Spark trong bảng điều khiển, thì AWS Glue theo mặc định sử dụng 10 công nhân và G. loại công nhân 1X. Mẫu này sử dụng hai công nhân, đây là số lượng tối thiểu được phép, với loại công nhân tiêu chuẩn, đủ và tiết kiệm chi phí

Bảng sau hiển thị các loại công nhân AWS Glue khác nhau cho môi trường Apache Spark. Bởi vì Python shell job không sử dụng môi trường Apache Spark để chạy Python nên nó không được đưa vào bảng

Tiêu chuẩn

G. 1X

G. 2X

vCPU

4

4

8

Kỉ niệm

16 GB

16 GB

32 GB

Dung lượng đĩa

50 GB

64 GB

128 GB

Giám đốc điều hành cho mỗi công nhân

2

1

1

Mã số

Khi tạo tác vụ AWS Glue, bạn có thể sử dụng vai trò IAM được đính kèm hoặc vai trò hiện có

Vỏ Python keo AWS

Mã Python sử dụng thư viện Pandas và PyArrow để chuyển đổi dữ liệu sang Parquet. Thư viện Pandas đã có sẵn. Thư viện PyArrow được tải xuống khi bạn chạy mẫu, vì đây là lần chạy một lần. Bạn có thể sử dụng tệp bánh xe để chuyển đổi PyArrow thành thư viện và cung cấp tệp dưới dạng gói thư viện. Để biết thêm thông tin về các tệp bánh xe đóng gói, hãy xem Cung cấp thư viện Python của riêng bạn

from io import BytesIO
import pandas as pd
import boto3
import os
import io
import site
from importlib import reload
from setuptools.command import easy_install
install_path = os.environ['GLUE_INSTALLATION']
easy_install.main[ ["--install-dir", install_path, "pyarrow"] ]
reload[site]
import pyarrow


input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"


input_bucket = input_loc.split['/', 1][0]
object_key = input_loc.split['/', 1][1]


output_loc_bucket = output_loc.split['/', 1][0]
output_loc_prefix = output_loc.split['/', 1][1] 


s3 = boto3.client['s3']
obj = s3.get_object[Bucket=input_bucket, Key=object_key]
df = pd.read_csv[io.BytesIO[obj['Body'].read[]]]


parquet_buffer = BytesIO[]
s3_resource = boto3.resource['s3']
df.to_parquet[parquet_buffer, index=False] 
s3_resource.Object[output_loc_bucket, output_loc_prefix +  'data' + '.parquet'].put[Body=parquet_buffer.getvalue[]] 

Công việc AWS Glue Spark với Python

Để sử dụng loại công việc AWS Glue Spark với Python, hãy chọn Spark làm loại công việc. Chọn tia lửa 2. 4, Python 3 với thời gian khởi động công việc được cải thiện [Phiên bản keo 2. 0] dưới dạng phiên bản Keo AWS

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job


sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]

input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options[\
    connection_type = "s3", \
    connection_options = { 
        "paths": [input_loc]}, \
    format = "csv",
    format_options={
        "withHeader": True,
        "separator": ","
    }]


outputDF = glueContext.write_dynamic_frame.from_options[\
    frame = inputDyf, \
    connection_type = "s3", \
    connection_options = {"path": output_loc \
        }, format = "parquet"]    

Công việc AWS Glue Spark với Scala

Để sử dụng loại công việc AWS Glue Spark với Scala, hãy chọn Spark làm loại công việc. Chọn tia lửa 2. 4, Scala 2 với thời gian khởi động công việc được cải thiện [Glue Version 2. 0] dưới dạng phiên bản Keo AWS. Để tiết kiệm dung lượng lưu trữ, mẫu AWS Glue with Scala sau đây cũng sử dụng tính năng applyMapping để chuyển đổi các loại dữ liệu

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._


object GlueScalaApp {
  def main[sysArgs: Array[String]] {
    
    @transient val spark: SparkContext = SparkContext.getOrCreate[]
    val glueContext: GlueContext = new GlueContext[spark]

    val inputLoc = "s3://bucket-name/prefix/sample_data.csv"
    val outputLoc = "s3://bucket-name/prefix/"

    val readCSV = glueContext.getSource["csv", JsonOptions[Map["paths" -> Set[inputLoc]]]].getDynamicFrame[]

    val applyMapping = readCSV.applyMapping[mappings = Seq[["_c0", "string", "date", "string"], ["_c1", "string", "sales", "long"],
    ["_c2", "string", "profit", "double"]], caseSensitive = false]

    val formatPartition = applyMapping.toDF[].coalesce[1]

    val dynamicFrame = DynamicFrame[formatPartition, glueContext]

    val dataSink = glueContext.getSinkWithFormat[
        connectionType = "s3", 
        options = JsonOptions[Map["path" -> outputLoc ]],
        transformationContext = "dataSink", format = "parquet"].writeDynamicFrame[dynamicFrame]
  }
}

sử thi

Nhiệm vụMô tảKỹ năng bắt buộcTải dữ liệu lên bộ chứa S3 mới hoặc hiện có

Tạo hoặc sử dụng bộ chứa S3 hiện có trong tài khoản của bạn. Tải lên sample_data. csv từ phần Tệp đính kèm và lưu ý bộ chứa S3 và vị trí tiền tố

AWS chung

Mô tả nhiệm vụ Yêu cầu kỹ năngTạo công việc AWS Glue

Trong phần ETL của bảng điều khiển AWS Glue, hãy thêm một công việc AWS Glue. Chọn loại công việc phù hợp, phiên bản AWS Glue, loại DPU/Công nhân tương ứng và số lượng công nhân. Để biết chi tiết, xem phần Cấu hình

Nhà phát triển, đám mây hoặc dữ liệuThay đổi vị trí đầu vào và đầu ra

Sao chép mã tương ứng với công việc Keo của bạn và thay đổi vị trí đầu vào và đầu ra mà bạn đã lưu ý trong Tải lên dữ liệu sử thi

Nhà phát triển, đám mây hoặc dữ liệuChạy công việc ETL

Chạy công việc của bạn và kiểm tra đầu ra. Lưu ý dung lượng đã giảm so với tệp gốc

Nhà phát triển, đám mây hoặc dữ liệu

Người giới thiệu

Hướng dẫn và video

Thông tin thêm

cấu hình thông số

Bạn có thể sử dụng các đoạn mã sau để đặt tham số cho công việc ETL của mình. AWS Glue sử dụng bốn tên đối số trong nội bộ.  

  • --conf

  • import sys
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.transforms import *
    from awsglue.dynamicframe import DynamicFrame
    from awsglue.utils import getResolvedOptions
    from awsglue.job import Job
    
    
    sc = SparkContext[]
    glueContext = GlueContext[sc]
    spark = glueContext.spark_session
    job = Job[glueContext]
    
    input_loc = "bucket-name/prefix/sample_data.csv"
    output_loc = "bucket-name/prefix/"
    
    inputDyf = glueContext.create_dynamic_frame_from_options[\
        connection_type = "s3", \
        connection_options = { 
            "paths": [input_loc]}, \
        format = "csv",
        format_options={
            "withHeader": True,
            "separator": ","
        }]
    
    
    outputDF = glueContext.write_dynamic_frame.from_options[\
        frame = inputDyf, \
        connection_type = "s3", \
        connection_options = {"path": output_loc \
            }, format = "parquet"]    
    
    
    0

  • import sys
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.transforms import *
    from awsglue.dynamicframe import DynamicFrame
    from awsglue.utils import getResolvedOptions
    from awsglue.job import Job
    
    
    sc = SparkContext[]
    glueContext = GlueContext[sc]
    spark = glueContext.spark_session
    job = Job[glueContext]
    
    input_loc = "bucket-name/prefix/sample_data.csv"
    output_loc = "bucket-name/prefix/"
    
    inputDyf = glueContext.create_dynamic_frame_from_options[\
        connection_type = "s3", \
        connection_options = { 
            "paths": [input_loc]}, \
        format = "csv",
        format_options={
            "withHeader": True,
            "separator": ","
        }]
    
    
    outputDF = glueContext.write_dynamic_frame.from_options[\
        frame = inputDyf, \
        connection_type = "s3", \
        connection_options = {"path": output_loc \
            }, format = "parquet"]    
    
    
    1

  • import sys
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.transforms import *
    from awsglue.dynamicframe import DynamicFrame
    from awsglue.utils import getResolvedOptions
    from awsglue.job import Job
    
    
    sc = SparkContext[]
    glueContext = GlueContext[sc]
    spark = glueContext.spark_session
    job = Job[glueContext]
    
    input_loc = "bucket-name/prefix/sample_data.csv"
    output_loc = "bucket-name/prefix/"
    
    inputDyf = glueContext.create_dynamic_frame_from_options[\
        connection_type = "s3", \
        connection_options = { 
            "paths": [input_loc]}, \
        format = "csv",
        format_options={
            "withHeader": True,
            "separator": ","
        }]
    
    
    outputDF = glueContext.write_dynamic_frame.from_options[\
        frame = inputDyf, \
        connection_type = "s3", \
        connection_options = {"path": output_loc \
            }, format = "parquet"]    
    
    
    2

Tham số

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job


sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]

input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options[\
    connection_type = "s3", \
    connection_options = { 
        "paths": [input_loc]}, \
    format = "csv",
    format_options={
        "withHeader": True,
        "separator": ","
    }]


outputDF = glueContext.write_dynamic_frame.from_options[\
    frame = inputDyf, \
    connection_type = "s3", \
    connection_options = {"path": output_loc \
        }, format = "parquet"]    

2 phải được nhập rõ ràng trên bảng điều khiển AWS Glue. Chọn Công việc, Chỉnh sửa Công việc, Cấu hình bảo mật, thư viện tập lệnh và thông số công việc [tùy chọn]. Nhập
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job


sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]

input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options[\
    connection_type = "s3", \
    connection_options = { 
        "paths": [input_loc]}, \
    format = "csv",
    format_options={
        "withHeader": True,
        "separator": ","
    }]


outputDF = glueContext.write_dynamic_frame.from_options[\
    frame = inputDyf, \
    connection_type = "s3", \
    connection_options = {"path": output_loc \
        }, format = "parquet"]    

2 làm khóa và cung cấp giá trị. Bạn cũng có thể sử dụng Giao diện dòng lệnh AWS [AWS CLI] hoặc AWS Glue API để đặt tham số này. Tham số
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job


sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]

input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options[\
    connection_type = "s3", \
    connection_options = { 
        "paths": [input_loc]}, \
    format = "csv",
    format_options={
        "withHeader": True,
        "separator": ","
    }]


outputDF = glueContext.write_dynamic_frame.from_options[\
    frame = inputDyf, \
    connection_type = "s3", \
    connection_options = {"path": output_loc \
        }, format = "parquet"]    

2 được Spark sử dụng và không cần thiết trong công việc môi trường trình bao Python

Bạn phải thêm

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job


sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]

input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options[\
    connection_type = "s3", \
    connection_options = { 
        "paths": [input_loc]}, \
    format = "csv",
    format_options={
        "withHeader": True,
        "separator": ","
    }]


outputDF = glueContext.write_dynamic_frame.from_options[\
    frame = inputDyf, \
    connection_type = "s3", \
    connection_options = {"path": output_loc \
        }, format = "parquet"]    

3 trước mỗi tên tham số; . Ví dụ: đối với các đoạn mã sau, tham số vị trí phải được gọi bởi
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job


sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]

input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options[\
    connection_type = "s3", \
    connection_options = { 
        "paths": [input_loc]}, \
    format = "csv",
    format_options={
        "withHeader": True,
        "separator": ","
    }]


outputDF = glueContext.write_dynamic_frame.from_options[\
    frame = inputDyf, \
    connection_type = "s3", \
    connection_options = {"path": output_loc \
        }, format = "parquet"]    

4 và
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.job import Job


sc = SparkContext[]
glueContext = GlueContext[sc]
spark = glueContext.spark_session
job = Job[glueContext]

input_loc = "bucket-name/prefix/sample_data.csv"
output_loc = "bucket-name/prefix/"

inputDyf = glueContext.create_dynamic_frame_from_options[\
    connection_type = "s3", \
    connection_options = { 
        "paths": [input_loc]}, \
    format = "csv",
    format_options={
        "withHeader": True,
        "separator": ","
    }]


outputDF = glueContext.write_dynamic_frame.from_options[\
    frame = inputDyf, \
    connection_type = "s3", \
    connection_options = {"path": output_loc \
        }, format = "parquet"]    

5

Vỏ Python keo AWS

from awsglue.utils import getResolvedOptions

args = getResolvedOptions[sys.argv, ["input_loc", "output_loc"]]

AWS Keo Python

from awsglue.utils import getResolvedOptions

args = getResolvedOptions[sys.argv, ["JOB_NAME", "input_loc", "output_loc"]]

Keo AWS Scala

________số 8

tệp đính kèm

Để truy cập nội dung bổ sung được liên kết với tài liệu này, hãy giải nén tệp sau. tập tin đính kèm. khóa kéo

Chúng tôi có thể chuyển đổi XML thành DataFrame trong python không?

xml” bắt đầu từ gốc của cây, cụ thể là phần tử chúng ta có thể lặp qua từng nút của cây, điều đó có nghĩa là chúng ta sẽ lấy từng phần tử sinh viên và lấy thuộc tính name và tất cả các phần tử con của nó để xây dựng khung dữ liệu của chúng ta.

Gấu trúc có thể đọc các tệp XML không?

Thư viện phân tích dữ liệu Pandas cung cấp chức năng đọc/ghi dữ liệu cho hầu hết các loại tệp. Ví dụ: nó bao gồm read_csv[] và to_csv[] để tương tác với tệp CSV. Tuy nhiên, Pandas không bao gồm bất kỳ phương thức nào để đọc và ghi tệp XML .

Có thể chuyển đổi một. Tệp XML vào khung dữ liệu bằng python nếu vậy nên sử dụng thư viện và phương thức nào?

Chúng tôi sẽ trích xuất dữ liệu từ tệp XML bằng thư viện này và sau đó chúng tôi sẽ chuyển đổi dữ liệu được trích xuất thành Dataframe. Để chuyển đổi thành Dataframes, chúng ta cần cài đặt thư viện của gấu trúc .

Parquet có cần lược đồ không?

Bạn sẽ luôn cần một lược đồ cho tệp Parquet vì chúng ở dạng nhị phân và nếu không có lược đồ thì trình đọc không thể giải tuần tự hóa.

Chủ Đề