Luồng NodeJS

Lập trình không đồng bộ nhất trong Node. js được thực hiện bằng lời hứa, nhưng nếu bạn cần tách dữ liệu và miền thời gian thì sao? . Lời hứa sẽ không hoạt động trong những tình huống này và các luồng là giải pháp thay thế ưu tiên. Chúng tôi sẽ mô tả một số luồng chi tiết và xem xét các ví dụ cụ thể. Đến cuối bài viết này, bạn sẽ biết tại sao và cách sử dụng chúng

Một số ứng dụng phổ biến của dòng là

  • Xử lý dữ liệu thời gian thực như cảm biến IoT
  • ghi nhật ký
  • Nền tảng quảng cáo thời gian thực
  • (Trực tiếp) truyền phát video
  • Phát thanh
  • Xử lý các tệp lớn
  • Di chuyển dữ liệu lớn (cơ sở)

Ảnh của Martin Adams trên Bapt

Luồng đã là một tính năng của Node. js kể từ đầu pre-1. 0 phiên bản và những lời hứa trước. Là một mô hình, nó không phải là duy nhất đối với Node. js. Các toán tử stdin và đường ống đầu cuối (Linux) là một dạng truyền dữ liệu, cũng như các ổ cắm mạng. Nút. js cung cấp một sự trừu tượng tốt đẹp trên các mô hình hiện có này

Các luồng chính xác như tên của nó. Dữ liệu sẽ truyền hoặc chuyển từ ứng dụng/luồng/ổ cắm này sang ứng dụng/luồng/ổ cắm tiếp theo. Dữ liệu được xử lý khi nhận được — theo khối. Các luồng có thể được đọc hoặc ghi vào nhiều nguồn dữ liệu và phần chìm khác nhau, bao gồm các tệp, ổ cắm mạng và thiết bị xuất chuẩn/thiết bị xuất chuẩn. Các luồng có thể được nối với nhau để đầu ra của luồng này trở thành đầu vào của luồng khác. Điều này có thể được sử dụng để tạo các đường ống xử lý dữ liệu phức tạp

Tại thời điểm này, có hai tiêu chuẩn cạnh tranh cho luồng trong JavaScript

WebStream là một API gốc tương đối mới cho phép bạn truy cập trực tiếp và thao tác với luồng phương tiện của trình duyệt (thường là từ webcam hoặc micrô)

Nút. mặt khác, các luồng js đã tồn tại lâu hơn nhiều. Nút. js đã có những cải tiến gần đây đối với API luồng để dễ sử dụng hơn và giống như một cấu trúc chức năng hơn. Chúng tôi sẽ tập trung vào Nút. js trong bài viết này

từ https. // nhà phát triển. mozilla. org/en-US/docs/Web/API/Streams_API

Luồng là các đối tượng mở rộng

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
1. Có ba cấu trúc luồng cơ bản

  • const handler = (req, res, next)=>{
    S3.getObject(params).createReadStream()
    .pipe(res)
    }
    2luồng. Một nhà sản xuất dữ liệu có thể được đọc từ
  • dòng
    const handler = (req, res, next)=>{
    S3.getObject(params).createReadStream()
    .pipe(res)
    }
    3. Một bồn chứa dữ liệu. Chúng ta có thể ghi dữ liệu vào luồng này
  • const handler = (req, res, next)=>{
    S3.getObject(params).createReadStream()
    .pipe(res)
    }
    4luồng. Luồng vừa có thể đọc vừa có thể ghi với chức năng chuyển đổi dữ liệu

Có thể tự mở rộng các đối tượng luồng này để tạo các nguồn và phần chìm mới, mặc dù việc triển khai các luồng Chuyển đổi phổ biến hơn

Kể từ nút. js v12, có thể tạo một

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
2 từ bất kỳ lần lặp (không đồng bộ) nào, giúp sử dụng các luồng cho lập trình chức năng dễ dàng hơn nhiều

Theo mặc định, các luồng hoạt động ở chế độ Bộ đệm hoặc chế độ chuỗi, nhưng chúng có thể tùy chọn hoạt động ở chế độ objectMode

Bây giờ chúng ta đã biết luồng là gì, hãy tìm hiểu sâu hơn về một số trường hợp sử dụng và lợi thế

Sử dụng bộ nhớ thấp hơn

Xử lý dữ liệu sẽ giảm yêu cầu bộ nhớ khi nó đến và bộ thu gom rác phải làm ít hơn, dẫn đến máy chủ nhanh hơn và chi phí cơ sở hạ tầng thấp hơn. Cách thức hoạt động của nó sẽ rõ ràng hơn với ví dụ sau

Giả sử chúng tôi tải xuống một tệp từ S3, lưu trữ tệp đó trong một biến cục bộ và gửi tệp đó cho khách hàng, một trường hợp sử dụng phổ biến

const handler = (req, res, next)=>{
let data = await S3.getObject(params).promise()
res.send(data);
}

Hãy tưởng tượng tệp có kích thước 1 Mb; . js phải chỉ định 1 Mb bộ nhớ đó cho tệp trong khi tệp đang tải xuống và khi hoàn tất, hãy gửi tệp đó qua ổ cắm phản hồi đang chờ. Khi tệp đã được gửi, nó phải giải phóng bộ nhớ. Thật lãng phí khi gán và dọn dẹp bộ nhớ khi dữ liệu đến trong các khối nhỏ hơn mà chúng tôi có thể gửi ngay lập tức

Ví dụ thứ hai này cho thấy cách chúng ta có thể truyền trực tiếp dữ liệu từ S3 đến ổ cắm phản hồi theo từng đoạn. Chúng tôi bỏ qua biến cục bộ, ít bộ nhớ hơn sẽ được sử dụng và thời gian cho byte đầu tiên sẽ giảm

Đối với một yêu cầu, sự khác biệt là nhỏ, nhưng trên một máy chủ bận rộn, những sự khác biệt này sẽ tăng lên nhanh chóng

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
Áp suất ngược

Hãy tưởng tượng một tình huống trong đó chúng ta có một nhà sản xuất nhanh và một người tiêu dùng chậm, giả sử, một chiếc điện thoại di động có kết nối không ổn định với tư cách là khách hàng

Với một nhà sản xuất và người tiêu dùng tách rời truyền thống, mức sử dụng bộ nhớ trên máy chủ sẽ tăng lên do kết nối điện thoại không thể xử lý dữ liệu đủ nhanh. Đường ống xuôi dòng sẽ cung cấp áp suất ngược khi chúng tôi sử dụng các luồng đường ống để tránh sự cố này. Mọi kết nối ngược dòng sẽ được điều chỉnh dựa trên thông lượng của liên kết chậm nhất trong chuỗi. Các luồng có thể được định cấu hình với giá trị hình mờ cao để đặt bộ đệm/bộ đệm bên trong

Chuyển đổi dữ liệu

Một ví dụ cơ bản trong Node. js sẽ chuyển từ luồng tệp sang luồng đầu ra của máy chủ. Nó làm chính xác những gì nó trông giống như. đọc một tệp và ghi vào luồng phản hồi trong khi đọc

const {createReadStream} = require('fs');

handler(req, res, next){
createReadStream('myFile.txt).pipe(res)
}

Nhưng nếu chúng ta muốn làm gì đó với nội dung tệp trong khi đọc thì sao?

________số 8

Xin lưu ý rằng chúng ta có thể tạo các luồng của riêng mình bằng hàm tạo tốc ký ở trên hoặc mở rộng lớp

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
4 (hoặc có thể đọc/ghi) và thêm các phương thức và chức năng của riêng chúng ta

Vấn đề với ví dụ trên là các lỗi không được lan truyền một cách độc đáo và phải được xử lý ở mỗi lệnh gọi đường ống, điều này đưa chúng ta đến đường ống dữ liệu

Đường ống dữ liệu

Phương pháp đường ống xử lý và dọn dẹp lỗi. Sử dụng đường dẫn, chúng tôi không phải chuyển đường dẫn thủ công từ một

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
2 đến
const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
3, chúng tôi cũng không phải bắt lỗi cho từng luồng

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
2

Một đường dẫn trả về một luồng có thể ghi được dẫn đến một luồng ________ 03. Nó nhận một cuộc gọi lại làm đối số cuối cùng (mặc dù có phiên bản dựa trên lời hứa). Gọi lại được sử dụng để xử lý lỗi. Cần phải đề cập rằng khi xảy ra lỗi trong đường ống, tất cả các luồng sẽ bị hủy, bao gồm cả luồng

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
3 cuối cùng

Khi sử dụng Phản hồi của máy chủ làm luồng cuối cùng trong quy trình, bạn sẽ không thể gửi thông báo lỗi tới máy khách khi xảy ra lỗi do phản hồi bị hủy trong trường hợp xảy ra lỗi. Giải pháp đơn giản là chuyển luồng đường ống sang luồng phản hồi

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
5

Trong ví dụ này, luồng phản hồi sẽ không bị hủy trong trường hợp xảy ra lỗi trong thao tác tiếp tục. Nó sẽ sẵn sàng để xử lý phản hồi cho khách hàng một cách duyên dáng

Trong ví dụ trên, chúng ta thấy việc sử dụng phương thức đã hoàn thành, giải quyết một lời hứa khi luồng trong đối số được xử lý xong

Sử dụng trong các ứng dụng và tiện ích dòng lệnh

Nút. js’

const {createReadStream} = require('fs');

handler(req, res, next){
createReadStream('myFile.txt).pipe(res)
}
1 là luồng có thể ghi và
const {createReadStream} = require('fs');

handler(req, res, next){
createReadStream('myFile.txt).pipe(res)
}
2 là luồng có thể đọc, có nghĩa là chúng ta có thể chuyển tất cả các loại dữ liệu đến và từ một Nút. quy trình js

Ví dụ, hãy xem trình thay đổi kích thước hình ảnh sáu dòng này

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
8

Mà chúng ta có thể gọi bằng cách sử dụng

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
9

Nó nhận một đối số là đường dẫn đến một hình ảnh hiện có, sau đó tạo một luồng có thể đọc được, chuyển nó thành một phiên bản sắc nét, sau đó ghi hình ảnh đã thay đổi kích thước vào luồng đầu ra

Lệnh dòng lệnh cho biết cách chúng ta ghi đầu ra thô vào một tệp bằng cách sử dụng phần

const {createReadStream} = require('fs');

handler(req, res, next){
createReadStream('myFile.txt).pipe(res)
}
3

Bản đồ/Giảm/Lọc

trong nút. js 17, một số tính năng thử nghiệm đã được giới thiệu giúp luồng hoạt động hiệu quả hơn. Với các phương pháp này, có thể thay thế RXjs bằng Node gốc. mã js

Ví dụ này từ Node. tài liệu js cho thấy cách các luồng có thể trở thành một phần mạnh mẽ của vành đai tiện ích ngoài việc truyền dữ liệu đơn thuần

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
1

Với chức năng bản đồ, chúng ta không còn phải tạo các luồng biến đổi của riêng mình và mã của chúng ta sẽ trở nên ngắn gọn hơn nhiều

Hãy xem tài liệu và xin lưu ý rằng một số hàm này trả về một lời hứa và một số trả về một

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
2

Xử lý dữ liệu song song

Nhưng thật tuyệt làm sao khi có một tùy chọn đồng thời cho các chức năng truyền phát này? . Nếu chúng tôi có các tác vụ nặng I/O mà chúng tôi muốn chia nhỏ hoặc các tính toán sử dụng nhiều CPU mà chúng tôi muốn chạy trong web worker, điều này cho phép chúng tôi quyết định có bao nhiêu tác vụ sẽ chạy song song

Đây là một tính năng mà tôi rất hào hứng. Với các tính năng chức năng mới, bạn có thể xác định đồng thời bạn muốn sử dụng. Giả sử bạn phải thực hiện 10 yêu cầu HTTP và 10 tệp ghi vào một đường dẫn nhất định. Vì các thao tác này rất khác nhau nên bạn có thể muốn chỉ định nhiều hoặc ít luồng hơn cho mỗi thao tác

Phân tích cú pháp
const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
3CSV

Nút. js có một mô-đun gọi là readline, có thể xử lý một dòng

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
2 theo dòng. Nó có thể được sử dụng để đọc đầu vào dòng lệnh trong ứng dụng đầu cuối hoặc để phân tích cú pháp CSV từ một luồng, như trong ví dụ bên dưới. Trước đây tôi đã sử dụng điều này để nhập các tập dữ liệu lớn vào cơ sở dữ liệu, trong đó các tệp CSV đến từ một nhóm S3

Các tệp được truyền trực tuyến từ S3 bằng mô-đun đường đọc để chia tệp thành các bản ghi. Nhiều luồng chuyển đổi được khử trùng và xử lý dữ liệu trước khi chúng được truyền trực tiếp vào cơ sở dữ liệu Postgres. Nó hoạt động rất tốt và chúng tôi có thể giảm thời gian di chuyển cơ sở dữ liệu này từ nhiều ngày xuống chỉ còn vài giờ

Vâng, đây là một tập dữ liệu khá lớn

const handler = (req, res, next)=>{
S3.getObject(params).createReadStream()
.pipe(res)
}
0

Tôi hy vọng bạn đã hiểu hơn một chút về những gì luồng có thể làm. Vui lòng đọc tài liệu để biết thêm các thủ thuật hữu ích. Chủ đề này đủ lớn để viết cả một cuốn sách về. Hãy cho tôi biết trong các nhận xét về cách bạn sử dụng luồng trong ứng dụng của mình

Nếu bạn thích những gì bạn đọc, hãy cân nhắc tham gia Medium và đọc nhiều bài viết khác. Một phần phí của bạn sẽ hỗ trợ các tác giả như tôi. Nhấn vào đây để tham gia