প্রবাহ কি?
Node.js , , .
এগুলিকে কনভেয়র বেল্ট হিসাবে ভাবুন যা ডেটা এক জায়গা থেকে অন্য জায়গায় স্থানান্তরিত করে, আপনাকে সম্পূর্ণ ডেটা সেটের জন্য অপেক্ষা করার পরিবর্তে এটি আসার সাথে সাথে প্রতিটি অংশে কাজ করার অনুমতি দেয়।
স্ট্রিমগুলি Node.js এর সবচেয়ে শক্তিশালী বৈশিষ্ট্যগুলির মধ্যে একটি এবং ব্যাপকভাবে ব্যবহৃত হয়:
ফাইল সিস্টেম অপারেশন
ফাইল পড়া/লেখা
HTTP অনুরোধ এবং প্রতিক্রিয়া
ওয়েব অনুরোধ এবং প্রতিক্রিয়া পরিচালনা করা
ডেটা সংকোচন এবং সম্প্রসারণ
সঙ্কুচিত করুন এবং ডেটা প্রসারিত করুন
ডাটাবেস অপারেশন
ডাটাবেস অপারেশন পরিচালনা করা
রিয়েল-টাইম ডেটা প্রসেসিং
রিয়েল-টাইম ডেটা স্থানান্তর এবং প্রক্রিয়াকরণ
স্ট্রিম দিয়ে শুরু করা
স্ট্রিমগুলি Node.js-এ ডেটার দক্ষ ম্যানিপুলেশনের জন্য মৌলিক ধারণাগুলির মধ্যে একটি।
তারা আপনাকে একবারে সবকিছু মেমরিতে লোড করার পরিবর্তে উপলভ্য হওয়ার সাথে সাথে খণ্ডে ডেটা প্রক্রিয়া করার অনুমতি দেয়।
বেসিক স্ট্রিম উদাহরণ
const fs = require('fs');
// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt', 'utf8');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');
// Pipe the data from readable to writable stream
readableStream.pipe(writableStream);
// Handle completion and errors
writableStream.on('finish', () => {
console.log('File copy completed!');
});
readableStream.on('error', (err) => {
console.error('Error reading file:', err);
});
writableStream.on('error', (err) => {
console.error('Error writing file:', err);
});
কেন স্ট্রিম ব্যবহার করবেন?
স্ট্রিম ব্যবহার করার অনেক সুবিধা আছে:
প্রবাহের সুবিধা:
512MB RAM সহ একটি সার্ভারে একটি 1GB ফাইল পড়ার কল্পনা করুন:
- স্ট্রীম ছাড়া:পুরো ফাইলটি মেমরিতে লোড করার চেষ্টা করার সময় প্রক্রিয়াটি ক্র্যাশ হয়ে যায়
- স্ট্রিম সহ:আপনি ফাইলটি ছোট অংশে প্রক্রিয়া করুন (যেমন, একবারে 64KB)।
প্রধান স্ট্রিম প্রকার
Node.js , :
| স্ট্রিম টাইপ | ব্যাখ্যা | সাধারণ উদাহরণ |
|---|---|---|
| Readable | যে স্ট্রিমগুলি থেকে ডেটা পড়া যায় (ডেটা দ্বারা) | fs.createReadStream(), HTTP , process.stdin |
| Writable | যে স্ট্রীমগুলিতে ডেটা লেখা যেতে পারে (ডেটা গন্তব্য) | fs.createWriteStream(), HTTP , process.stdout |
| Duplex | স্ট্রীম দুটিই পঠনযোগ্য এবং লেখার যোগ্য | TCP সকেট, Zlib স্ট্রীম |
| Transform | ডুপ্লেক্স স্ট্রীম যেখানে ডেটা লেখা ও পড়ার সাথে সাথে বিপরীত করা যেতে পারে | Zlib স্ট্রীম, ক্রিপ্টো স্ট্রীম |
দ্রষ্টব্য:
Node.js EventEmitter , .
পঠনযোগ্য প্রবাহ
পঠনযোগ্য স্ট্রীম আপনাকে একটি উৎস থেকে ডেটা পড়ার অনুমতি দেয়। উদাহরণ:
- একটি ফাইল থেকে পড়া
- ক্লায়েন্টে HTTP প্রতিক্রিয়া
- সার্ভারে HTTP অনুরোধ
- process.stdin
একটি পঠনযোগ্য স্ট্রীম তৈরি করা হচ্ছে
const fs = require('fs');
// Create a readable stream from a file
const readableStream = fs.createReadStream('myfile.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Events for readable streams
readableStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
console.log(chunk);
});
readableStream.on('end', () => {
console.log('No more data to read.');
});
readableStream.on('error', (err) => {
console.error('Error reading from stream:', err);
});
পড়ার পদ্ধতি
পঠনযোগ্য স্ট্রীম দুটি উপায়ে কাজ করে:
- Flowing Mode:তথ্য উৎস থেকে দ্রুত পড়া হয় এবং ইভেন্ট ব্যবহার করে আপনার অ্যাপ্লিকেশনে পরিবেশন করা হয়
- Paused Mode:স্ট্রীম থেকে ডেটার খণ্ডগুলি পেতে আপনাকে অবশ্যই স্পষ্টভাবে stream.read() কল করতে হবে
const fs = require('fs');
// Paused mode example
const readableStream = fs.createReadStream('myfile.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Manually consume the stream using read()
readableStream.on('readable', () => {
let chunk;
while (null !== (chunk = readableStream.read())) {
console.log(`Read ${chunk.length} bytes of data.`);
console.log(chunk);
}
});
readableStream.on('end', () => {
console.log('No more data to read.');
});
লেখার যোগ্য স্ট্রীম
লেখার যোগ্য স্ট্রিমগুলি আপনাকে একটি গন্তব্যে ডেটা লিখতে দেয়। উদাহরণ:
- একটি ফাইলে লেখা
- ক্লায়েন্টে HTTP অনুরোধ
- সার্ভারে HTTP প্রতিক্রিয়া
- process.stdout
একটি লিখনযোগ্য স্ট্রীম তৈরি করা হচ্ছে
const fs = require('fs');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');
// Write data to the stream
writableStream.write('Hello, ');
writableStream.write('World!');
writableStream.write('\nWriting to a stream is easy!');
// End the stream
writableStream.end();
// Events for writable streams
writableStream.on('finish', () => {
console.log('All data has been written to the file.');
});
writableStream.on('error', (err) => {
console.error('Error writing to stream:', err);
});
পিছনের চাপ হ্যান্ডলিং
একটি স্ট্রীমে লেখার সময়, ডেটা প্রক্রিয়া করার চেয়ে দ্রুত লেখা হলে ল্যাগ হয়।
write() পদ্ধতিটি লেখা চালিয়ে যাওয়া নিরাপদ কিনা তা নির্দেশ করে একটি বুলিয়ান মান প্রদান করে।
const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');
function writeData() {
let i = 100;
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time, close the stream
writableStream.write('Last chunk!\n');
writableStream.end();
} else {
// Continue writing data
const data = `Data chunk ${i}\n`;
// Write and check if we should continue
ok = writableStream.write(data);
}
}
while (i > 0 && ok);
if (i > 0) {
// We need to wait for the drain event before writing more
writableStream.once('drain', write);
}
}
write();
}
writeData();
writableStream.on('finish', () => {
console.log('All data written successfully.');
});
Pipe
পাইপ() পদ্ধতি একটি পঠনযোগ্য স্ট্রীমকে একটি লেখার যোগ্য স্ট্রিমের সাথে সংযুক্ত করে, স্বয়ংক্রিয়ভাবে ডেটার প্রবাহ পরিচালনা করে এবং ব্যাকপ্রেশার পরিচালনা করে।
এটি স্ট্রিম গ্রাস করার একটি সহজ উপায়।
const fs = require('fs');
// Create readable and writable streams
const readableStream = fs.createReadStream('source.txt');
const writableStream = fs.createWriteStream('destination.txt');
// Pipe the readable stream to the writable stream
readableStream.pipe(writableStream);
// Handle completion and errors
readableStream.on('error', (err) => {
console.error('Read error:', err);
});
writableStream.on('error', (err) => {
console.error('Write error:', err);
});
writableStream.on('finish', () => {
console.log('File copy completed!');
});
চেইন পাইপ
আপনি পাইপ() ব্যবহার করে একসাথে একাধিক স্ট্রীমে যোগ দিতে পারেন।
ট্রান্সফর্ম স্ট্রিমগুলির সাথে কাজ করার সময় এটি বিশেষভাবে কার্যকর।
const fs = require('fs');
const zlib = require('zlib');
// Create a pipeline to read a file, compress it, and write to a new file
fs.createReadStream('source.txt')
.pipe(zlib.createGzip()) // Compress the data
.pipe(fs.createWriteStream('destination.txt.gz'))
.on('finish', () => {
console.log('File compressed successfully!');
});
দ্রষ্টব্য:
পাইপ() পদ্ধতি গন্তব্য স্ট্রীম ফেরত দেয়, যা চেইন চালায়।
ডুপ্লেক্স এবং ট্রান্সফর্ম স্ট্রীম
ডুপ্লেক্স স্ট্রীম
ডুপ্লেক্স স্ট্রীম দুটি পঠনযোগ্য এবং লেখার যোগ্য, একটি দ্বিমুখী পাইপের মতো।
একটি টিসিপি সকেট একটি ডুপ্লেক্স স্ট্রিমের একটি ভাল উদাহরণ।
const net = require('net');
// Create a TCP server
const server = net.createServer((socket) => {
// 'socket' is a duplex stream
// Handle incoming data (readable side)
socket.on('data', (data) => {
console.log('Received:', data.toString());
// Echo back (writable side)
socket.write(`Echo: ${data}`);
});
socket.on('end', () => {
console.log('Client disconnected');
});
});
server.listen(8080, () => {
console.log('Server listening on port 8080');
});
// To test, you can use a tool like netcat or telnet:
// $ nc localhost 8080
// or create a client:
/*
const client = net.connect({ port: 8080 }, () => {
console.log('Connected to server');
client.write('Hello from client!');
});
client.on('data', (data) => {
console.log('Server says:', data.toString());
client.end(); // Close the connection
});
*/
স্ট্রীম রূপান্তর
ট্রান্সফর্ম স্ট্রীম হল ডুপ্লেক্স স্ট্রীম যেগুলো ডেটার মধ্য দিয়ে যাওয়ার সাথে সাথে রূপান্তর করতে পারে।
তারা পাইপ উপর তথ্য প্রক্রিয়াকরণের জন্য আদর্শ.
const { Transform } = require('stream');
const fs = require('fs');
// Create a transform stream that converts text to uppercase
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
// Transform the chunk to uppercase
const upperChunk = chunk.toString().toUpperCase();
// Push the transformed data
this.push(upperChunk);
// Signal that we're done with this chunk
callback();
}
}
// Create an instance of our transform stream
const uppercaseTransform = new UppercaseTransform();
// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output-uppercase.txt');
// Pipe the data through our transform stream
readableStream
.pipe(uppercaseTransform)
.pipe(writableStream)
.on('finish', () => {
console.log('Transformation completed!');
});
স্ট্রিম ইভেন্ট
সমস্ত স্ট্রীম ইভেন্ট ইমিটারের উদাহরণ এবং একাধিক ইভেন্ট নির্গত করে:
পঠনযোগ্য স্ট্রিম ইভেন্ট
- data:স্ট্রীমে পড়ার জন্য ডেটা উপলব্ধ হলে নির্গত হয়
- end:নির্গত হয় যখন এখনও কোন ডেটা ব্যবহার করা হয় না
- error:পড়ার সময় কোনো ত্রুটি ঘটলে নির্গত হয়
- close:প্রবাহের অন্তর্নিহিত সংস্থান বন্ধ থাকলে নির্গত হয়
- readable:ডেটা পড়ার জন্য উপলব্ধ হলে নির্গত হয়
লেখার যোগ্য স্ট্রিম ইভেন্ট
- drain:write() পদ্ধতি মিথ্যা ফেরত দেওয়ার পরে স্ট্রীম আরও ডেটা গ্রহণ করার জন্য প্রস্তুত হলে নির্গত হয়
- finish:বেস সিস্টেমে ফ্লাশ করার সময় সমস্ত ডেটা নির্গত হয়
- error:লেখার সময় কোনো ত্রুটি ঘটলে নির্গত হয়
- close:প্রবাহের অন্তর্নিহিত সংস্থান বন্ধ থাকলে নির্গত হয়
- pipe:যখন পাইপ() পদ্ধতিটি পাঠযোগ্য প্রবাহে কল করা হয় তখন নির্গত হয়
- unpipe:একটি পঠনযোগ্য স্ট্রীমে unpipe() পদ্ধতি কল করা হলে নির্গত হয়
stream.pipeline() পদ্ধতি
পাইপলাইন() ফাংশন (Node.js v10.0.0 থেকে উপলব্ধ) একটি আরও শক্তিশালী উপায় যা একসাথে চেইন স্ট্রিম, বিশেষত ত্রুটি পরিচালনার জন্য।
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// Create a pipeline that handles errors properly
pipeline(
fs.createReadStream('source.txt'),
zlib.createGzip(),
fs.createWriteStream('destination.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded!');
}
}
);
দ্রষ্টব্য:
পাইপলাইন() সঠিকভাবে সমস্ত স্ট্রীম পরিষ্কার করে যদি তাদের মধ্যে কোনো ত্রুটি থাকে, সম্ভাব্য মেমরি লিক প্রতিরোধ করে।
অবজেক্ট মোড স্ট্রীম
ডিফল্টরূপে, স্ট্রিমগুলি স্ট্রিং এবং বাফার বস্তুর সাথে কাজ করে।
যাইহোক, জাভাস্ক্রিপ্ট অবজেক্টের সাথে কাজ করার জন্য স্ট্রীমগুলিকে 'অবজেক্ট মোডে' সেট করা যেতে পারে।
const { Readable, Writable, Transform } = require('stream');
// Create a readable stream in object mode
const objectReadable = new Readable({
objectMode: true,
read() {} // Implementation required but can be no-op
});
// Create a transform stream in object mode
const objectTransform = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
// Add a property to the object
chunk.transformed = true;
chunk.timestamp = new Date();
this.push(chunk);
callback();
}
});
// Create a writable stream in object mode
const objectWritable = new Writable({
objectMode: true,
write(chunk, encoding, callback) {
console.log('Received object:', chunk);
callback();
}
});
// Connect the streams
objectReadable
.pipe(objectTransform)
.pipe(objectWritable);
// Push some objects to the stream
objectReadable.push({ name: 'Object 1', value: 10 });
objectReadable.push({ name: 'Object 2', value: 20 });
objectReadable.push({ name: 'Object 3', value: 30 });
objectReadable.push(null); // Signal the end of data
উন্নত স্ট্রীম মোড
1. পাইপলাইনের সাহায্যে ত্রুটি হ্যান্ডলিং()
পাইপলাইন() পদ্ধতি হল স্ট্রিম চেইনে ত্রুটিগুলি পরিচালনা করার প্রস্তাবিত উপায়:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
2. অবজেক্ট মোড স্ট্রীম
স্ট্রিমগুলি জাভাস্ক্রিপ্ট অবজেক্টের সাথে কাজ করতে পারে, শুধু স্ট্রিং এবং বাফার নয়:
const { Readable } = require('stream');
// Create a readable stream in object mode
const objectStream = new Readable({
objectMode: true,
read() {}
});
// Push objects to the stream
objectStream.push({ id: 1, name: 'Alice' });
objectStream.push({ id: 2, name: 'Bob' });
objectStream.push(null); // Signal end of stream
// Consume the stream
objectStream.on('data', (obj) => {
console.log('Received:', obj);
});
ব্যবহারিক উদাহরণ
HTTP স্ট্রিমিং
HTTP অনুরোধ এবং প্রতিক্রিয়াগুলিতে স্ট্রিমগুলি ব্যাপকভাবে ব্যবহৃত হয়।
const http = require('http');
const fs = require('fs');
// Create an HTTP server
const server = http.createServer((req, res) => {
// Handle different routes
if (req.url === '/') {
// Send a simple response
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end('Stream Demo
Try streaming a file or streaming a video.
');
}
else if (req.url === '/file') {
// Stream a large text file
res.writeHead(200, { 'Content-Type': 'text/plain' });
const fileStream = fs.createReadStream('largefile.txt', 'utf8');
// Pipe the file to the response (handles backpressure automatically)
fileStream.pipe(res);
// Handle errors
fileStream.on('error', (err) => {
console.error('File stream error:', err);
res.statusCode = 500;
res.end('Server Error');
});
}
else if (req.url === '/video') {
// Stream a video file with proper headers
const videoPath = 'video.mp4';
const stat = fs.statSync(videoPath);
const fileSize = stat.size;
const range = req.headers.range;
if (range) {
// Handle range requests for video seeking
const parts = range.replace(/bytes=/, "").split("-");
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
const chunksize = (end - start) + 1;
const videoStream = fs.createReadStream(videoPath, { start, end });
res.writeHead(206, {
'Content-Range': `bytes ${start}-${end}/${fileSize}`,
'Accept-Ranges': 'bytes',
'Content-Length': chunksize,
'Content-Type': 'video/mp4'
});
videoStream.pipe(res);
} else {
// No range header, send entire video
res.writeHead(200, {
'Content-Length': fileSize,
'Content-Type': 'video/mp4'
});
fs.createReadStream(videoPath).pipe(res);
}
} else {
// 404 Not Found
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.end('Not Found');
}
});
// Start the server
server.listen(8080, () => {
console.log('Server running at http://localhost:8080/');
});
বড় CSV ফাইল প্রসেস করা হচ্ছে
const fs = require('fs');
const { Transform } = require('stream');
const csv = require('csv-parser'); // npm install csv-parser
// Create a transform stream to filter and transform CSV data
const filterTransform = new Transform({
objectMode: true,
transform(row, encoding, callback) {
// Only pass through rows that meet our criteria
if (parseInt(row.age) > 18) {
// Modify the row
row.isAdult = 'Yes';
// Push the transformed row
this.push(row);
}
callback();
}
});
// Create a writable stream for the results
const results = [];
const writeToArray = new Transform({
objectMode: true,
transform(row, encoding, callback) {
results.push(row);
callback();
}
});
// Create the processing pipeline
fs.createReadStream('people.csv')
.pipe(csv())
.pipe(filterTransform)
.pipe(writeToArray)
.on('finish', () => {
console.log(`Processed ${results.length} records:`);
console.log(results);
})
.on('error', (err) => {
console.error('Error processing CSV:', err);
});
সর্বোত্তম অনুশীলন
সতর্কতা:
স্ট্রীমগুলিকে ভুলভাবে পরিচালনা করার ফলে মেমরি লিক এবং কর্মক্ষমতা সমস্যা হতে পারে।
সর্বদা ত্রুটিগুলি পরিচালনা করুন এবং সঠিকভাবে স্ট্রিমগুলি বন্ধ করুন৷
সারাংশ
স্ট্রিমগুলি হল Node.js-এর একটি মৌলিক ধারণা যা খোলা ডেটা ম্যানিপুলেশনের অনুমতি দেয়। তারা হল:
- তারা মেমরিতে সবকিছু লোড না করেই ডেটা টুকরো টুকরো প্রক্রিয়া করে
- বড় ডেটা সেটের জন্য আরও ভাল মেমরির ক্ষমতা প্রদান করুন
- সমস্ত ডেটা উপলব্ধ হওয়ার আগে প্রক্রিয়াকরণ শুরু করার অনুমতি দিন
- শক্তিশালী ডেটা প্রসেসিং পাইপলাইন চালান
- Core Node.js API ব্যাপকভাবে ব্যবহৃত হয়