ஸ்ட்ரீம்கள் என்றால் என்ன?
Node.js இல், ஸ்ட்ரீம்கள் என்பது தரவின் தொகுப்புகள் ஆகும், அவை ஒரே நேரத்தில் முழுமையாக கிடைக்காமல் இருக்கலாம் மற்றும் நினைவகத்தில் பொருந்த வேண்டியதில்லை.
ஒரு இடத்திலிருந்து மற்றொரு இடத்திற்கு தரவை நகர்த்தும் கன்வேயர் பெல்ட்களாக அவற்றை நினைத்துப் பாருங்கள், முழுத் தரவுத் தொகுப்புக்காகக் காத்திருக்காமல் ஒவ்வொரு துண்டும் வரும் போதே வேலை செய்ய உங்களை அனுமதிக்கிறது.
ஸ்ட்ரீம்கள் Node.js இன் மிகவும் சக்திவாய்ந்த அம்சங்களில் ஒன்றாகும் மற்றும் பரவலாகப் பயன்படுத்தப்படுகின்றன:
கோப்பு முறைமை செயல்பாடுகள்
கோப்புகளைப் படித்தல்/எழுதுதல்
HTTP கோரிக்கைகள் மற்றும் பதில்கள்
வலை கோரிக்கைகள் மற்றும் பதில்களை கையாளுதல்
தரவு சுருக்கம் மற்றும் விரிவாக்கம்
தரவை சுருக்குதல் மற்றும் விரிவுபடுத்துதல்
தரவுத்தள செயல்பாடுகள்
தரவுத்தள செயல்பாடுகளை கையாளுதல்
நிகழ்நேர தரவு செயலாக்கம்
நிகழ்நேர தரவு பரிமாற்றம் மற்றும் செயலாக்கம்
ஸ்ட்ரீம்களுடன் தொடங்குதல்
ஸ்ட்ரீம்கள் Node.js இல் தரவை திறம்பட கையாளுவதற்கான அடிப்படை கருத்துக்களில் ஒன்றாகும்.
அவை எல்லாவற்றையும் ஒரே நேரத்தில் நினைவகத்தில் ஏற்றுவதற்குப் பதிலாக, தரவு கிடைக்கும் போது அதைத் துண்டுகளாக செயலாக்க உங்களை அனுமதிக்கின்றன.
அடிப்படை ஸ்ட்ரீம் எடுத்துக்காட்டு
const fs = require('fs');
// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt', 'utf8');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');
// Pipe the data from readable to writable stream
readableStream.pipe(writableStream);
// Handle completion and errors
writableStream.on('finish', () => {
console.log('File copy completed!');
});
readableStream.on('error', (err) => {
console.error('Error reading file:', err);
});
writableStream.on('error', (err) => {
console.error('Error writing file:', err);
});
ஏன் ஸ்ட்ரீம்களைப் பயன்படுத்த வேண்டும்?
ஸ்ட்ரீம்களைப் பயன்படுத்த பல நன்மைகள் உள்ளன:
ஸ்ட்ரீம்களின் நன்மைகள்:
512MB RAM உள்ள ஒரு சேவையகத்தில் 1GB கோப்பைப் படிப்பதை கற்பனை செய்து பாருங்கள்:
- ஸ்ட்ரீம்கள் இல்லாமல்: முழு கோப்பையும் நினைவகத்தில் ஏற்ற முயற்சிக்கும் போது செயல்முறை கிராஷ் ஆகும்
- ஸ்ட்ரீம்களுடன்: நீங்கள் கோப்பை சிறிய துண்டுகளாக (எ.கா., ஒரு நேரத்தில் 64KB) செயலாக்குகிறீர்கள்
முக்கிய ஸ்ட்ரீம் வகைகள்
Node.js நான்கு அடிப்படை வகை ஸ்ட்ரீம்களை வழங்குகிறது, ஒவ்வொன்றும் தரவு கையாளுதலில் ஒரு குறிப்பிட்ட நோக்கத்திற்கு சேவை செய்கின்றன:
| ஸ்ட்ரீம் வகை | விளக்கம் | பொதுவான எடுத்துக்காட்டுகள் |
|---|---|---|
| Readable | தரவைப் படிக்கக்கூடிய ஸ்ட்ரீம்கள் (தரவு மூலம்) | fs.createReadStream(), HTTP பதில்கள், process.stdin |
| Writable | தரவை எழுதக்கூடிய ஸ்ட்ரீம்கள் (தரவு இலக்கு) | fs.createWriteStream(), HTTP கோரிக்கைகள், process.stdout |
| Duplex | Readable மற்றும் Writable இரண்டுமான ஸ்ட்ரீம்கள் | TCP சாக்கெட்டுகள், Zlib ஸ்ட்ரீம்கள் |
| Transform | தரவு எழுதப்பட்டு படிக்கப்படும் போது அதை மாற்றியமைக்கக்கூடிய Duplex ஸ்ட்ரீம்கள் | Zlib ஸ்ட்ரீம்கள், crypto ஸ்ட்ரீம்கள் |
குறிப்பு:
Node.js இல் உள்ள அனைத்து ஸ்ட்ரீம்களும் EventEmitter இன் நிகழ்வுகளாகும், அதாவது அவை கேட்கக்கூடிய மற்றும் கையாளக்கூடிய நிகழ்வுகளை உமிழ்கின்றன.
Readable ஸ்ட்ரீம்கள்
Readable ஸ்ட்ரீம்கள் ஒரு மூலத்திலிருந்து தரவைப் படிக்க உங்களை அனுமதிக்கின்றன. எடுத்துக்காட்டுகள்:
- ஒரு கோப்பிலிருந்து படித்தல்
- கிளையன்டில் HTTP பதில்கள்
- சேவையகத்தில் HTTP கோரிக்கைகள்
- process.stdin
Readable ஸ்ட்ரீம் உருவாக்குதல்
const fs = require('fs');
// Create a readable stream from a file
const readableStream = fs.createReadStream('myfile.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Events for readable streams
readableStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
console.log(chunk);
});
readableStream.on('end', () => {
console.log('No more data to read.');
});
readableStream.on('error', (err) => {
console.error('Error reading from stream:', err);
});
படிக்கும் முறைகள்
Readable ஸ்ட்ரீம்கள் இரண்டு முறைகளில் ஒன்றில் செயல்படுகின்றன:
- Flowing Mode: தரவு மூலத்திலிருந்து விரைவாகப் படிக்கப்பட்டு நிகழ்வுகளைப் பயன்படுத்தி உங்கள் பயன்பாட்டிற்கு வழங்கப்படுகிறது
- Paused Mode: ஸ்ட்ரீமிலிருந்து தரவின் துண்டுகளைப் பெற stream.read() ஐ வெளிப்படையாக அழைக்க வேண்டும்
const fs = require('fs');
// Paused mode example
const readableStream = fs.createReadStream('myfile.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Manually consume the stream using read()
readableStream.on('readable', () => {
let chunk;
while (null !== (chunk = readableStream.read())) {
console.log(`Read ${chunk.length} bytes of data.`);
console.log(chunk);
}
});
readableStream.on('end', () => {
console.log('No more data to read.');
});
Writable ஸ்ட்ரீம்கள்
Writable ஸ்ட்ரீம்கள் ஒரு இலக்குக்கு தரவை எழுத உங்களை அனுமதிக்கின்றன. எடுத்துக்காட்டுகள்:
- ஒரு கோப்பில் எழுதுதல்
- கிளையன்டில் HTTP கோரிக்கைகள்
- சேவையகத்தில் HTTP பதில்கள்
- process.stdout
Writable ஸ்ட்ரீம் உருவாக்குதல்
const fs = require('fs');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');
// Write data to the stream
writableStream.write('Hello, ');
writableStream.write('World!');
writableStream.write('\nWriting to a stream is easy!');
// End the stream
writableStream.end();
// Events for writable streams
writableStream.on('finish', () => {
console.log('All data has been written to the file.');
});
writableStream.on('error', (err) => {
console.error('Error writing to stream:', err);
});
பின்னழுத்தத்தைக் கையாளுதல்
ஒரு ஸ்ட்ரீமில் எழுதும் போது, தரவு செயலாக்கப்படுவதை விட வேகமாக எழுதப்பட்டால், பின்னழுத்தம் ஏற்படுகிறது.
write() முறை தொடர்ந்து எழுதுவது பாதுகாப்பானதா என்பதைக் குறிக்கும் பூலியன் மதிப்பை வழங்குகிறது.
const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');
function writeData() {
let i = 100;
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time, close the stream
writableStream.write('Last chunk!\n');
writableStream.end();
} else {
// Continue writing data
const data = `Data chunk ${i}\n`;
// Write and check if we should continue
ok = writableStream.write(data);
}
}
while (i > 0 && ok);
if (i > 0) {
// We need to wait for the drain event before writing more
writableStream.once('drain', write);
}
}
write();
}
writeData();
writableStream.on('finish', () => {
console.log('All data written successfully.');
});
Pipe
pipe() முறை ஒரு readable ஸ்ட்ரீமை writable ஸ்ட்ரீமுடன் இணைக்கிறது, தரவின் ஓட்டத்தை தானாக மேலாண்மை செய்கிறது மற்றும் பின்னழுத்தத்தைக் கையாளுகிறது.
இது ஸ்ட்ரீம்களை நுகர எளிதான வழியாகும்.
const fs = require('fs');
// Create readable and writable streams
const readableStream = fs.createReadStream('source.txt');
const writableStream = fs.createWriteStream('destination.txt');
// Pipe the readable stream to the writable stream
readableStream.pipe(writableStream);
// Handle completion and errors
readableStream.on('error', (err) => {
console.error('Read error:', err);
});
writableStream.on('error', (err) => {
console.error('Write error:', err);
});
writableStream.on('finish', () => {
console.log('File copy completed!');
});
சங்கிலி Pipes
நீங்கள் pipe() ஐப் பயன்படுத்தி பல ஸ்ட்ரீம்களை ஒன்றாக இணைக்கலாம்.
இது transform ஸ்ட்ரீம்களுடன் பணிபுரியும் போது குறிப்பாக பயனுள்ளதாக இருக்கும்.
const fs = require('fs');
const zlib = require('zlib');
// Create a pipeline to read a file, compress it, and write to a new file
fs.createReadStream('source.txt')
.pipe(zlib.createGzip()) // Compress the data
.pipe(fs.createWriteStream('destination.txt.gz'))
.on('finish', () => {
console.log('File compressed successfully!');
});
குறிப்பு:
pipe() முறை இலக்கு ஸ்ட்ரீமை வழங்குகிறது, இது சங்கிலியை இயக்கும்.
Duplex மற்றும் Transform ஸ்ட்ரீம்கள்
Duplex ஸ்ட்ரீம்கள்
Duplex ஸ்ட்ரீம்கள் இரண்டும் readable மற்றும் writable ஆகும், இருவழி குழாய் போன்றது.
ஒரு TCP சாக்கெட் duplex ஸ்ட்ரீமின் நல்ல எடுத்துக்காட்டாகும்.
const net = require('net');
// Create a TCP server
const server = net.createServer((socket) => {
// 'socket' is a duplex stream
// Handle incoming data (readable side)
socket.on('data', (data) => {
console.log('Received:', data.toString());
// Echo back (writable side)
socket.write(`Echo: ${data}`);
});
socket.on('end', () => {
console.log('Client disconnected');
});
});
server.listen(8080, () => {
console.log('Server listening on port 8080');
});
// To test, you can use a tool like netcat or telnet:
// $ nc localhost 8080
// or create a client:
/*
const client = net.connect({ port: 8080 }, () => {
console.log('Connected to server');
client.write('Hello from client!');
});
client.on('data', (data) => {
console.log('Server says:', data.toString());
client.end(); // Close the connection
});
*/
Transform ஸ்ட்ரீம்கள்
Transform ஸ்ட்ரீம்கள் duplex ஸ்ட்ரீம்கள் ஆகும், அவை அதன் வழியாக செல்லும் போது தரவை மாற்ற முடியும்.
அவை குழாய்களில் தரவை செயலாக்குவதற்கு சிறந்தவை.
const { Transform } = require('stream');
const fs = require('fs');
// Create a transform stream that converts text to uppercase
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
// Transform the chunk to uppercase
const upperChunk = chunk.toString().toUpperCase();
// Push the transformed data
this.push(upperChunk);
// Signal that we're done with this chunk
callback();
}
}
// Create an instance of our transform stream
const uppercaseTransform = new UppercaseTransform();
// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output-uppercase.txt');
// Pipe the data through our transform stream
readableStream
.pipe(uppercaseTransform)
.pipe(writableStream)
.on('finish', () => {
console.log('Transformation completed!');
});
ஸ்ட்ரீம் நிகழ்வுகள்
அனைத்து ஸ்ட்ரீம்களும் EventEmitter இன் நிகழ்வுகளாகும் மற்றும் பல நிகழ்வுகளை உமிழ்கின்றன:
Readable ஸ்ட்ரீம் நிகழ்வுகள்
- data: ஸ்ட்ரீமில் படிக்க தரவு கிடைக்கும் போது உமிழப்படுகிறது
- end: நுகரப்பட இன்னும் தரவு இல்லாத போது உமிழப்படுகிறது
- error: படிக்கும் போது பிழை ஏற்பட்டால் உமிழப்படுகிறது
- close: ஸ்ட்ரீமின் அடிப்படை வளம் மூடப்பட்டால் உமிழப்படுகிறது
- readable: தரவு படிக்க கிடைக்கும் போது உமிழப்படுகிறது
Writable ஸ்ட்ரீம் நிகழ்வுகள்
- drain: write() முறை false ஐ வழங்கிய பிறகு ஸ்ட்ரீம் மேலும் தரவை ஏற்க தயாராக இருக்கும் போது உமிழப்படுகிறது
- finish: அனைத்து தரவும் அடிப்படை அமைப்புக்கு ப்ளஷ் செய்யப்பட்டால் உமிழப்படுகிறது
- error: எழுதும் போது பிழை ஏற்பட்டால் உமிழப்படுகிறது
- close: ஸ்ட்ரீமின் அடிப்படை வளம் மூடப்பட்டால் உமிழப்படுகிறது
- pipe: readable ஸ்ட்ரீமில் pipe() முறை அழைக்கப்படும் போது உமிழப்படுகிறது
- unpipe: readable ஸ்ட்ரீமில் unpipe() முறை அழைக்கப்படும் போது உமிழப்படுகிறது
stream.pipeline() முறை
pipeline() செயல்பாடு (Node.js v10.0.0 முதல் கிடைக்கும்) ஸ்ட்ரீம்களை ஒன்றாக இணைக்க மிகவும் உறுதியான வழியாகும், குறிப்பாக பிழை கையாளுதலுக்கு.
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// Create a pipeline that handles errors properly
pipeline(
fs.createReadStream('source.txt'),
zlib.createGzip(),
fs.createWriteStream('destination.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded!');
}
}
);
குறிப்பு:
pipeline() அவற்றில் ஏதேனும் பிழை ஏற்பட்டால் அனைத்து ஸ்ட்ரீம்களையும் சரியாக சுத்தம் செய்யும், சாத்தியமான நினைவக கசிவுகளைத் தடுக்கும்.
Object Mode ஸ்ட்ரீம்கள்
இயல்பாக, ஸ்ட்ரீம்கள் சரங்கள் மற்றும் Buffer பொருள்களுடன் வேலை செய்கின்றன.
இருப்பினும், ஸ்ட்ரீம்கள் JavaScript பொருள்களுடன் வேலை செய்ய 'object mode' க்கு அமைக்கப்படலாம்.
const { Readable, Writable, Transform } = require('stream');
// Create a readable stream in object mode
const objectReadable = new Readable({
objectMode: true,
read() {} // Implementation required but can be no-op
});
// Create a transform stream in object mode
const objectTransform = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
// Add a property to the object
chunk.transformed = true;
chunk.timestamp = new Date();
this.push(chunk);
callback();
}
});
// Create a writable stream in object mode
const objectWritable = new Writable({
objectMode: true,
write(chunk, encoding, callback) {
console.log('Received object:', chunk);
callback();
}
});
// Connect the streams
objectReadable
.pipe(objectTransform)
.pipe(objectWritable);
// Push some objects to the stream
objectReadable.push({ name: 'Object 1', value: 10 });
objectReadable.push({ name: 'Object 2', value: 20 });
objectReadable.push({ name: 'Object 3', value: 30 });
objectReadable.push(null); // Signal the end of data
மேம்பட்ட ஸ்ட்ரீம் முறைகள்
1. pipeline() உடன் பிழை கையாளுதல்
pipeline() முறை ஸ்ட்ரீம் சங்கிலிகளில் பிழைகளைக் கையாள பரிந்துரைக்கப்படும் வழியாகும்:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
2. Object Mode ஸ்ட்ரீம்கள்
ஸ்ட்ரீம்கள் சரங்கள் மற்றும் buffers மட்டுமல்ல JavaScript பொருள்களுடன் வேலை செய்ய முடியும்:
const { Readable } = require('stream');
// Create a readable stream in object mode
const objectStream = new Readable({
objectMode: true,
read() {}
});
// Push objects to the stream
objectStream.push({ id: 1, name: 'Alice' });
objectStream.push({ id: 2, name: 'Bob' });
objectStream.push(null); // Signal end of stream
// Consume the stream
objectStream.on('data', (obj) => {
console.log('Received:', obj);
});
நடைமுறை எடுத்துக்காட்டுகள்
HTTP ஸ்ட்ரீமிங்
ஸ்ட்ரீம்கள் HTTP கோரிக்கைகள் மற்றும் பதில்களில் பரவலாகப் பயன்படுத்தப்படுகின்றன.
const http = require('http');
const fs = require('fs');
// Create an HTTP server
const server = http.createServer((req, res) => {
// Handle different routes
if (req.url === '/') {
// Send a simple response
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end('Stream Demo
Try streaming a file or streaming a video.
');
}
else if (req.url === '/file') {
// Stream a large text file
res.writeHead(200, { 'Content-Type': 'text/plain' });
const fileStream = fs.createReadStream('largefile.txt', 'utf8');
// Pipe the file to the response (handles backpressure automatically)
fileStream.pipe(res);
// Handle errors
fileStream.on('error', (err) => {
console.error('File stream error:', err);
res.statusCode = 500;
res.end('Server Error');
});
}
else if (req.url === '/video') {
// Stream a video file with proper headers
const videoPath = 'video.mp4';
const stat = fs.statSync(videoPath);
const fileSize = stat.size;
const range = req.headers.range;
if (range) {
// Handle range requests for video seeking
const parts = range.replace(/bytes=/, "").split("-");
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
const chunksize = (end - start) + 1;
const videoStream = fs.createReadStream(videoPath, { start, end });
res.writeHead(206, {
'Content-Range': `bytes ${start}-${end}/${fileSize}`,
'Accept-Ranges': 'bytes',
'Content-Length': chunksize,
'Content-Type': 'video/mp4'
});
videoStream.pipe(res);
} else {
// No range header, send entire video
res.writeHead(200, {
'Content-Length': fileSize,
'Content-Type': 'video/mp4'
});
fs.createReadStream(videoPath).pipe(res);
}
} else {
// 404 Not Found
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.end('Not Found');
}
});
// Start the server
server.listen(8080, () => {
console.log('Server running at http://localhost:8080/');
});
பெரிய CSV கோப்புகளை செயலாக்குதல்
const fs = require('fs');
const { Transform } = require('stream');
const csv = require('csv-parser'); // npm install csv-parser
// Create a transform stream to filter and transform CSV data
const filterTransform = new Transform({
objectMode: true,
transform(row, encoding, callback) {
// Only pass through rows that meet our criteria
if (parseInt(row.age) > 18) {
// Modify the row
row.isAdult = 'Yes';
// Push the transformed row
this.push(row);
}
callback();
}
});
// Create a writable stream for the results
const results = [];
const writeToArray = new Transform({
objectMode: true,
transform(row, encoding, callback) {
results.push(row);
callback();
}
});
// Create the processing pipeline
fs.createReadStream('people.csv')
.pipe(csv())
.pipe(filterTransform)
.pipe(writeToArray)
.on('finish', () => {
console.log(`Processed ${results.length} records:`);
console.log(results);
})
.on('error', (err) => {
console.error('Error processing CSV:', err);
});
சிறந்த நடைமுறைகள்
எச்சரிக்கை:
ஸ்ட்ரீம்களை தவறாக கையாளுதல் நினைவக கசிவுகள் மற்றும் செயல்திறன் சிக்கல்களுக்கு வழிவகுக்கும்.
பிழைகளை எப்போதும் கையாளுங்கள் மற்றும் ஸ்ட்ரீம்களை சரியாக முடிக்கவும்.
சுருக்கம்
ஸ்ட்ரீம்கள் Node.js இல் ஒரு அடிப்படை கருத்தாகும், இது திறந்த தரவு கையாளுதலுக்கு அனுமதிக்கிறது. அவை:
- எல்லாவற்றையும் நினைவகத்தில் ஏற்றாமல் துண்டு துண்டாக தரவை செயலாக்குகின்றன
- பெரிய தரவுத் தொகுப்புகளுக்கு சிறந்த நினைவக திறனை வழங்குகின்றன
- அனைத்து தரவும் கிடைப்பதற்கு முன்பே செயலாக்கத்தைத் தொடங்க அனுமதிக்கின்றன
- சக்திவாய்ந்த தரவு செயலாக்க குழாய்களை இயக்குகின்றன
- Core Node.js APIகளில் பரவலாகப் பயன்படுத்தப்படுகின்றன